Flow Control

Thread之間除了共用資料以外,流程同步也是常用到的一個技巧。如果把人當做一個thread,彼此之間協同合作就是flow control。是不是常常會有某A做完了一個動作之後,某B甚至某C才可以繼續做後續的動作呢? 在multi-thread中也常常會有這樣的需求。

Wait and notify

在java中有關流程控制有一個最基本的primitive,那就是Object#wait()Object#notify()。當一個thread對一個物件呼叫wait()之後會完全卡住,要一直到另外一個thread觸發 notify()才可以繼續往下執行。幾乎所有有關流程控制的邏輯最底層都是透過wait跟notify來實現。

我打算用一個簡化版的Producer Consumer Pattern來解釋flow control。通常producer負責產生message,而consuemer負責接收並處理message。中間會有一個queue,當produce時queue是滿的,或是consume時queue是空的,那caller thread就會被blocked,直到狀態解除為止。但是為了方便解釋起見,這邊拿掉了queue,而只單純的讓producerd丟一個message給consumer而已。下面是一個範例程式:

public class FlowControl {

    // Lock for synchronization
    private Object lock = new Object();
    // Message to pass
    private String message = null;

    public void produce(String message) {
        System.out.println("produce message: " + message);
        synchronized (lock) {
            this.message = message;
            lock.notify();
        }
    }

    public void consume() throws InterruptedException {
        System.out.println("wait for message");
        synchronized (lock) {
            lock.wait();
            System.out.println("consume message: " + message);
        }
    }


    public static void main(String[] args) throws InterruptedException {
        final FlowControl flowControl = new FlowControl();

        // Create consumer thread
        new Thread(()->{
            try {
                flowControl.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // Sleep for 1 sec
        Thread.sleep(1000);

        // Create producer thread
        new Thread(()->{
            flowControl.produce("helloworld");
        }).start();
    }
}

我們分別定義了produce()consume()兩個methods。在consume中,我們會呼叫lock.wait(),注意wait這個method一定要讓被呼叫的那個object被包在sychronized block之中,不然直接會有compile error。相對的,在produce中我們會呼叫lock.notify(),同樣的也是要包在synchronized中。

在這個例子裡面,我們也可以用this來取代lock物件。也就是用sychronized(this), this.wait(), this.notify()取代。但這邊會用一個獨立的lock有個優點就是有比較好的隔離效果,以避免外部取得FlowContorl此class的instance的人,有機會影響內部結果。

最後看到main method,我們起了兩個threads,先是consumer thread,他會等著consume一個message;再來睡了一秒鐘後,我們產生了producer thread,他會produce一個helloworld message。最後跑出的結果就會像這樣

wait for message
produce message: helloworld
consume message: helloworld

另外有一個Object#notifyAll() method其實跟Object#notify()類似前者一次叫醒所有waiting threads,後者只會叫醒一個waiting thread。

Thread Join

另外一種更簡單的做法就是thread join。Join這個動詞在流程同步的時候常常會使用,它代表的是等待別人工作的完成;而相對的詞是Fork,代表的是把工作發包給別的thread開始做。

下面的程式碼則是一個簡單的範例,我們產生一個worker thread來執行我們的task,在我們的main thread中去join這個worker

// Create workder thread
Thread worker = new Thread(() -> {
    try {
        System.out.println("worker start");
        Thread.sleep(1000);
        System.out.println("worker complete");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

worker.start();

System.out.println("master wait");
worker.join();
System.out.println("master complete");

此程式碼的output會像下面這樣,可以看到master要等worker完成後,才會繼續往下執行。

master wait
worker start
worker complete
master complete

Other Utilities

礙於篇幅關係(還是因為懶),其他還有一些工具可以幫你做flow control

  • CyclicBarrier: Barrier算是一個流程同步的時常用的術語,代表的是所有的threads都需要到達某著stage的時候,才繼續往下走。這概念就很像出遊的時候,大家會在一個地點集合,全員到齊了再一起出發的概念。
  • CountDownLatch: 其實就如其名,是一個倒數的概念。每呼叫一次countdown()則倒數的counter就會減一,當一個thread去呼叫await()時,會一直卡在那邊,直到counter歸零為止,才會繼續往下走。
  • Future and Completable Future: 這個是Flow Control更高層次的封裝,因為內容比較多,我留在後面的章節再做討論。

results matching ""

    No results matching ""