Multithreading là khái niệm nói về việc xử lý các tác vụ của chương trình không diễn ra trong Thread chính của chương trình mà được nhiều Thread khác nhau xử lý. Khái niệm này sẽ giúp cho việc xử lý của chương trình được nhanh hơn. Chúng ta có nhiều cách để hiện thực multithreading trong Java nhưng trong bài viết này, mình chỉ giới thiệu với các bạn về Callable và Future trong Java với khả năng trả về kết quả sau khi xử lý và có thể throw Exception nếu trong quá trình xử lý tác vụ có lỗi xảy ra.
OK, bắt đầu thôi các bạn…
Đầu tiên, mình sẽ tạo mới một project để làm ví dụ:
Callable là một interface sử dụng Java Generic để định nghĩa đối tượng sẽ trả về sau khi xử lý xong tác vụ. Ở đây, mình sẽ lấy ví dụ về tính tổng của hai số, nhưng thay vì xử lý việc tính tổng này trong thread chính của chương trình, mình sẽ sử dụng Callable trong một thread khác để xử lý.
Đối tượng Callable của mình sẽ có nội dung như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
package com.huongdanjava.callablefuture; import java.util.concurrent.Callable; public class Calculator implements Callable<Integer> { private int a; private int b; public Calculator(int a, int b) { this.a = a; this.b = b; } public Integer call() throws Exception { return a + b; } } |
Để thực thi tác vụ của Callable, chúng ta phải submit nó vào một thread pool sử dụng phương thức submit() của Executor Framework.
Bây giờ, mình sẽ tạo main class cho ví dụ trên với nội dung như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
package com.huongdanjava.callablefuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Example { public static void main(String[] args) { // Create thread pool using Executor Framework ExecutorService executor = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { // Create new Calculator object Calculator c = new Calculator(i, i + 1); executor.submit(c); } executor.shutdown(); } } |
Với đoạn code ở trên, chúng ta chỉ dừng lại ở mức là submit Callable object vào thread pool để xử lý tác vụ. Làm thế nào để lấy kết quả sau khi xử lý xong? Nếu các bạn để ý, các bạn sẽ thấy đối tượng trả về của phương thức submit() là một đối tượng Future. Sử dụng đối tượng này, chúng ta có thể quản lý được status của Callable task và lấy được đối tượng trả về của Callable object.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
package com.huongdanjava.callablefuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class Example { public static void main(String[] args) { // Create thread pool using Executor Framework ExecutorService executor = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { // Create new Calculator object Calculator c = new Calculator(i, i + 1); Future f = executor.submit(c); } executor.shutdown(); } } |
Để lấy được kết quả trả về với đối tượng Future, chúng ta sẽ sử dụng phương thức get() của nó để đợi đến khi tác vụ của Callable hoàn thành và trả về kết quả.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
package com.huongdanjava.callablefuture; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class Example { public static void main(String[] args) throws ExecutionException, InterruptedException { // Create thread pool using Executor Framework ExecutorService executor = Executors.newFixedThreadPool(10); List<Future<Integer>> list = new ArrayList<Future<Integer>>(); for (int i = 0; i < 10; i++) { // Create new Calculator object Calculator c = new Calculator(i, i + 1); list.add(executor.submit(c)); } for (Future f : list) { System.out.println(f.get()); } executor.shutdown(); } } |
Kết quả:
Phương thức get() là synchronous, do đó nó sẽ blocking chương trình của chúng ta mỗi khi đợi kết quả của Callable. Để hạn chế rủi ro nó blocking chương trình quá lâu, chúng ta có thể sử dụng phương thức get() này với một thời gian timeout như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
package com.huongdanjava.callablefuture; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; public class Example { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { // Create thread pool using Executor Framework ExecutorService executor = Executors.newFixedThreadPool(10); List<Future<Integer>> list = new ArrayList<Future<Integer>>(); for (int i = 0; i < 10; i++) { // Create new Calculator object Calculator c = new Calculator(i, i + 1); list.add(executor.submit(c)); } for (Future f : list) { System.out.println(f.get(3, TimeUnit.SECONDS)); } executor.shutdown(); } } |
Ngoài ra, chúng ta còn có thể sử dụng các phương thức isDone(), isCancelled() của đối tượng Future để tìm hiểu về status hiện tại của đối tượng Callable mà nó đang monitor.
Tuấn
Bạn ơi chỉ giúp mình chút nhé, mình đang gặp một vấn đề thế này.
Mình submit 10 tasks vào pool có 4 threads, mỗi tasks này truyền vào 1 file xml để parser và return về 1 object.
Khi run thì object từ future đầu tiên cho kết quả OK, lần 2 thì bị null và bắn Exeption. Mình đang nghi
ngờ là tasks 2 sau khi parser xong lại bị khởi tạo lại object. Không biết có phải như vậy không, nhờ bạn chỉ giúp mình nhé!!
Tuấn
Mình hiểu vấn đề rồi, điều mình nghi ngờ không xảy ra hàm get của future block cho tới khi return kết quả. Vấn đề của mình nằm ở chỗ khác trong code của mình :))
Khanh Nguyen
🙂
quan
a ơi giờ em muốn thêm khoảng 15 object spring boot…= callable…thì xử lý = map ạ
Khanh Nguyen
Chưa hiểu nhu cầu của em lắm?
Chiến
Làm sao để biết được tất cả các task đã hoàn thành thế anh ?
Khanh Nguyen
Sử dụng phương thức isDone() của đối tượng Future như anh đã nói ở trên đó em!