Structured Concurrency trong Java là một cải tiến của Java từ Java 19, trong việc hiện thực, bảo trì các đoạn code liên quan đến việc thực thi các tác vụ gồm nhiều tác vụ con được xử lý multi threading. Việc cải tiến này giúp code chúng ta dễ đọc hơn, control được việc thực thi các tác vụ con sẽ diễn ra như thế nào, nếu có lỗi xảy ra trong quá trình thực thi các tác vụ con thì chúng ta sẽ xử lý ra sao. Cụ thể như thế nào? Chúng ta sẽ cùng nhau tìm hiểu trong bài viết này các bạn nhé!
Để làm ví dụ, mình sẽ tạo mới một Maven project như sau:
Giả sử ứng dụng ví dụ của mình có một tác vụ là lấy thông tin sinh viên từ nhiều nguồn khác nhau, có thể là từ database hoặc là từ một web service. Vì các nguồn này là độc lập nên chúng ta có thể sử dụng multi threading của Java để submit các tác vụ con tương ứng cho từng nguồn đó các bạn!
Mình có class Student chứa thông tin sinh viên đơn giản như sau:
1 2 3 4 5 |
package com.huongdanjava.java; public record Student(String name) { } |
Class StudentDatabaseService implement Callable interface để lấy thông tin sinh viên từ database có nội dung đơn giản như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
package com.huongdanjava.java; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; public class StudentDatabaseService implements Callable<List<Student>> { @Override public List<Student> call() throws Exception { return Arrays.asList(new Student("Khanh"), new Student("Quan")); } } |
Còn class StudentWebService cũng implement interface Callable, cũng có nội dung đơn giản như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
package com.huongdanjava.java; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; public class StudentWebService implements Callable<List<Student>> { @Override public List<Student> call() throws Exception { return Arrays.asList(new Student("Thanh"), new Student("Binh")); } } |
Để thực thi các tác vụ con này sử dụng concurrency, mình sẽ tạo mới một thread pool sử dụng Executor Service framework và submit các tác vụ con này để lấy kết quả như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
package com.huongdanjava.java; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class Application { public static void main(String[] args) throws Exception { try (ExecutorService executorService = Executors.newFixedThreadPool(10)) { Future<List<Student>> studentsFromDatabaseFuture = executorService.submit( new StudentDatabaseService()); Future<List<Student>> studentsFromWebServiceFuture = executorService.submit( new StudentWebService()); List<Student> studentsFromDatabase = studentsFromDatabaseFuture.get(); List<Student> studentsFromWebService = studentsFromWebServiceFuture.get(); List<Student> students = new ArrayList<>(); students.addAll(studentsFromDatabase); students.addAll(studentsFromWebService); students.forEach(s -> System.out.println(s.name())); } } } |
Kết quả khi chạy ứng dụng sẽ như sau:
Này là trường hợp happy, các tác vụ con diễn ra bình thường không có lỗi gì.
Trong trường hợp 1 tác vụ con bị lỗi do một nguyên nhân nào đó, ví dụ như:
1 2 3 4 5 6 7 8 9 10 11 12 |
package com.huongdanjava.java; import java.util.List; import java.util.concurrent.Callable; public class StudentDatabaseService implements Callable<List<Student>> { @Override public List<Student> call() throws Exception { throw new Exception("Something went wrong"); } } |
tác vụ còn lại thì cần thời gian để hoàn thành:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
package com.huongdanjava.java; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; public class StudentWebService implements Callable<List<Student>> { @Override public List<Student> call() throws Exception { Thread.sleep(10000); return Arrays.asList(new Student("Thanh"), new Student("Binh")); } } |
Khi chạy lại ví dụ, các bạn sẽ thấy mặc dù một tác vụ con bị lỗi nhưng chương trình của chúng ta cũng sẽ đợi cho đến khi tác vụ còn lại chạy xong 10s rồi mới throw ra lỗi:
Làm thế nào để chúng ta có thể cancel tác vụ con còn lại để chạy lại tất cả các tác vụ này thay vì chờ tác vụ con còn lại hoàn thành với kết quả không đúng cái chúng ta mong muốn?
Làm thế nào để ví dụ như, các bạn chỉ cần kết quả của một trong tác vụ con mà thôi. Không cần phải chạy hết các tác vụ con, làm sao chúng ta cancel các tác vụ con khác khi một trong các tác vụ con đã hoàn thành?
Để giải quyết những câu hỏi trên, các bạn có thể sử dụng Structured Concurrency của Java các bạn nhé!
Các bạn có thể sử dụng class StructuredTaskScope của Java để hiện thực Structured Concurrency này. Class StructuredTaskScope này sẽ giúp chúng ta định nghĩa rõ scope của từng tác vụ con, dễ dàng cancel các tác vụ con khác khi có exception xảy ra ở một tác vụ con. Chúng ta cũng dễ dàng cancel các tác vụ con khác nếu kết quả của chúng không còn cần thiết nữa.
Với ví dụ trên, mình có thể viết lại code sử dụng class StructuredTaskScope như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
package com.huongdanjava.java; import java.util.ArrayList; import java.util.List; import java.util.concurrent.StructuredTaskScope; import java.util.concurrent.StructuredTaskScope.Subtask; public class Application { public static void main(String[] args) throws Exception { try (var scope = new StructuredTaskScope<>()) { Subtask<List<Student>> studentsFromDatabaseSubtask = scope.fork( new StudentDatabaseService()); Subtask<List<Student>> studentsFromWebServiceSubtask = scope.fork( new StudentWebService()); scope.join(); List<Student> studentsFromDatabase = studentsFromDatabaseSubtask.get(); List<Student> studentsFromWebService = studentsFromWebServiceSubtask.get(); List<Student> students = new ArrayList<>(); students.addAll(studentsFromDatabase); students.addAll(studentsFromWebService); students.forEach(s -> System.out.println(s.name())); } } } |
Chúng ta sẽ thay thế class ExecutorService bằng class StructuredTaskScope. Mỗi tác vụ con sẽ là một Subtask, chúng ta sử dụng phương thức fork() của class StructuredTaskScope để submit tác vụ.
Ngoài ra thì các bạn cũng cần gọi phương thức join() của class StructuredTaskScope để chương trình đợi các tác vụ được hoàn thành. Kết quả của từng tác vụ có thể lấy ra sử dụng phương thức get() giống như khi chúng ta sử dụng interface Future vậy các bạn!
Trong trường hợp happy, với:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
package com.huongdanjava.java; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; public class StudentDatabaseService implements Callable<List<Student>> { @Override public List<Student> call() throws Exception { return Arrays.asList(new Student("Khanh"), new Student("Quan")); } } |
và:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
package com.huongdanjava.java; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; public class StudentWebService implements Callable<List<Student>> { @Override public List<Student> call() throws Exception { return Arrays.asList(new Student("Thanh"), new Student("Binh")); } } |
các bạn sẽ thấy kết quả giống như khi chúng ta sử dụng class ExecutorService với interface Future vậy đó các bạn:
Class StructuredTaskScope định nghĩa các Policies mặc định cho phép chúng ta có thể định nghĩa behavior mà chúng ta muốn khi có một lỗi xảy ra trong một tác vụ con nào đó, hoặc nếu có một tác vụ con hoàn thành thì chúng ta không cần chạy tác vụ con khác nữa.
Có 2 Policies mặc định mà class StructuredTaskScope đã định nghĩa là:
- ShutdownOnFailure
- ShutdownOnSuccess
Policy ShutdownOnSuccess sẽ cancel các tác vụ con khác nếu một trong các tác vụ con đã hoàn thành. Các bạn có thể khởi tạo đối tượng của class StructuredTaskScope với policy ShutdownOnSuccess như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
package com.huongdanjava.java; import java.util.ArrayList; import java.util.List; import java.util.concurrent.StructuredTaskScope; public class Application { public static void main(String[] args) throws Exception { try (var scope = new StructuredTaskScope.ShutdownOnSuccess<List<Student>>()) { scope.fork(new StudentDatabaseService()); scope.fork(new StudentWebService()); scope.join(); List<Student> students = new ArrayList<>(); students.addAll(scope.result()); students.forEach(s -> System.out.println(s.name())); } } } |
Trong trường hợp này, chúng ta không cần phải gọi phương thức get() để lấy kết quả cho từng tác vụ con nữa. Chúng ta chỉ cần sử dụng phương thức result() của class StructuredTaskScope là được nha các bạn!
Kết quả lúc này các bạn sẽ thấy như sau:
Sau khi hoàn thành tác vụ con StudentWebService, kết quả đã được trả về đó các bạn!
Policy ShutdownOnFailure sẽ cancel các tác vụ con khác nếu có exception xảy ra trong một tác vụ con nào đó. Trong trường hợp này, các bạn sẽ viết code như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
package com.huongdanjava.java; import java.util.ArrayList; import java.util.List; import java.util.concurrent.StructuredTaskScope; import java.util.concurrent.StructuredTaskScope.Subtask; public class Application { public static void main(String[] args) throws Exception { try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { Subtask<List<Student>> studentsFromDatabaseSubtask = scope.fork( new StudentDatabaseService()); Subtask<List<Student>> studentsFromWebServiceSubtask = scope.fork( new StudentWebService()); scope.join(); scope.throwIfFailed(); List<Student> studentsFromDatabase = studentsFromDatabaseSubtask.get(); List<Student> studentsFromWebService = studentsFromWebServiceSubtask.get(); List<Student> students = new ArrayList<>(); students.addAll(studentsFromDatabase); students.addAll(studentsFromWebService); students.forEach(s -> System.out.println(s.name())); } } } |
Chúng ta sẽ khởi tạo đối tượng của class StructuredTaskScope với policy ShutdownOnFailure. Phương thức throwIfFailed() của class ShutdownOnFailure sẽ lấy exception xảy ra trong tác vụ con để throw ra các bạn nhé!
Chạy lại ví dụ với:
1 2 3 4 5 6 7 8 9 10 11 12 |
package com.huongdanjava.java; import java.util.List; import java.util.concurrent.Callable; public class StudentDatabaseService implements Callable<List<Student>> { @Override public List<Student> call() throws Exception { throw new Exception("Something went wrong"); } } |
và:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
package com.huongdanjava.java; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; public class StudentWebService implements Callable<List<Student>> { @Override public List<Student> call() throws Exception { Thread.sleep(10000); return Arrays.asList(new Student("Thanh"), new Student("Binh")); } } |
các bạn sẽ thấy chương trình của chúng ta throw ra lỗi ngay sau khi chạy chứ không cần phải đợi 10s cho tác vụ StudentWebService hoàn thành nữa.
Như vậy là trong bài viết này, mình đã giới thiệu với các bạn về Structured Concurrency trong Java, những vấn đề mà Structured Concurrency đã giải quyết.