ForkJoin Pool
在java7的時候,推出了一種很不一樣的thread pool,他實作了working stealing scheduling。這是什麼概念呢? 試想有一個大的專案,如果一個人無法完成的話,那我們很直覺的會把大的專案,切成很多中型的task,再把他分成幾個小的task,讓整個團隊可以協力完成。這些小的task完成之後,中型的task再把它整合起來,最終完成整合後就完成了大的專案。
咦,這不就是recursion或是divide & conquer嗎? 沒錯,很多的演算法都是類似這種divide and conquer的形式。這種算法的特性是,要等到所有的subtask都完成之後,才可以算出中間的結果,再來可以算出最後完整的結果。而ForkJoin Pool就是為了解決這種問題平行化的solution。
Work Stealing
相較於一般的Thread Pool,大家共用一個queue,ForkJoin Pool是每個thread有一個自己的queue,而且是雙向的queue(deque)。當一個task進來,他會把一部分的工作fork(切)出來先放到queue的最後面,另外一部分開始做。當然可能做進去後,發現task還是太大,就會繼續切更小,並再放到queue的最後方。如此一邊切一邊往下執行,直到task夠小可以直接運算為止。
當一個小工作完成之後,他會從最後端的task拿出來(其實這邊比較像stack的行為),繼續往下做。當然recursion的程式除了divide以外,還有merge results的動作,這邊稱之為join。join會取得每個subtask的結果,最終成為目前task的結果回傳回去。
那其他thread呢? 當其他thread身上的queue是空的時候,他會去別的thread的queue最前頭steal一個task到自己的queue當中。之後的行為就跟前面一模一樣。這有沒有很像一個團隊做專案的時候的行為? 所以如果我們把thread pool類比為取票機,forkjoin pool就很像做專案一樣。
How to use
前面講的是概念,現在來講實際怎麼實作。我拿費式數列當作例子,雖然他recursion不是他的最佳解法,但是他的定義很recursive。為了解釋方便,我們先不管它的效能。來看傳統recursion版本
public int fib(int n) {
if (n <= 1)
return n;
return fib(n-1) + fib(n-2);
}
再來是ForkJoin版本
public class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) { this.n = n; }
public Integer compute() {
if (n <= 1)
return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
Recursion task要繼承於RecursiveTask或是RecursiveAction。前者是有回傳值,後者沒有。繼承需要實作compute()
這個method,裡面實作divide and conquer的邏輯。在當中我們可以直接呼叫subtask的compute()
,也可以呼叫subtask的fork()
,代表的是把subtask丟到queue。等到需要他的結果時,再呼叫join()
,它會把subtask結果回傳回來,再把所有的result去整合成目前task的result。
實際執行我們需要有一個ForkJoinPool。我們可以直接用大家共用的common forkjoin pool,也就是ForkJoinPool.commonPool()
。下面是執行這個RecursiveTask的範例
public static void main(String[] args) throws ExecutionException, InterruptedException {
Fibonacci fibonacci = new Fibonacci(10);
ForkJoinPool.commonPool().execute(fibonacci);
System.out.println(fibonacci.get());
}
跟傳統Thread Pool比較
其實這是兩種不同的執行策略,分別是producer consumer跟recursion。前者適合每個task是獨立的,他可以把大事小事都平均分攤在每個thread去執行;後者是透過divide and conquer演算法,用work stealing的方式去執行。所以主要還是要看你的task是哪一種類型居多。
而ForkJoinPool有一個很大的好處是減少thread因為blocking造成context switching。不管是fork, compute, join都幾乎不會blocking(只有join少數情況會要等待結果)。這可以讓thread一直保持running的狀態,一直到時間到了被context switch,而不是自己卡住了造成的context switch。
但ForkJoinPool對於不可分割的task,並且處理時間差異很大的情境比較不適合,畢竟每個thread都有一個queue。就很像在大賣場排隊結帳,只要運氣不好排到一個前面卡比較久的task就要等比較久。但是別排又沒有閒到可以把你steal走,那就沒有辦法做到先到先處理的特性了。
Java8 Parallel Stream
Java8的Stream API的parallel stream事實上也是用ForkJoinPool,他透過Spliterator來定義怎麼去split(或稱fork)你的input data。若執行結果需要collect,就會join回最後的result。