Message Passing
Message passing簡單來講就是一個thread想把資料丟給另外一個thread去做處理。其實在前面flow control的章節在講producer/consumer pattern時,其實就已經點出了一個概念。但是可以看得出我們的實作過於簡略,producer跟consumer共用了同一個message變數,當producer/consumer多了,勢必會造成前面所說的race condition。那有沒有辦法用更high-level的方式來傳遞資訊呢?
工廠生產的時候,有一個名詞是生產線(pipeline),每一站都把前一個站的產出變成下一站的來源,而最後一站的產出就是這個工廠的產品。其實這個概念可以想像每一站就是一個thread,而這些站跟站之間的半成品就可以稱它為message。而站跟站之間,會透過一個輸送帶當作管道傳遞,而這個管道我們稱為pipe或是queue。
Blocking Queue
在Java中有一個非常好用並且現成的東西幫我們做Consumer/Producer Pattern,那就是BlockingQueue,我們可以透過它輕易做到之前所說的queue滿了讓producer block,以及queue空了讓conusmer block的結果。我把前面一個章節有關producer consumer的code改用BlockingQueue去作實現。
public class ProducerConsumer {
private LinkedBlockingQueue queue = new LinkedBlockingQueue<>(5);
public void produce(String message) throws InterruptedException {
System.out.println("produce message: " + message);
queue.put(message);
}
public void consume() throws InterruptedException {
System.out.println("consume message: " + queue.take());
}
public static void main(String[] args) throws InterruptedException {
final ProducerConsumer producerConsumer = new ProducerConsumer();
// Create consumer thread
new Thread(()->{
Random random = new Random();
try {
while (true) {
producerConsumer.consume();
Thread.sleep(random.nextInt(1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// Create producer thread
new Thread(()->{
Random random = new Random();
try {
int counter = 0;
while (true) {
producerConsumer.produce("message" + counter++);
Thread.sleep(random.nextInt(1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
可以看得到我們在produce中呼叫了BlockingQueue#put()
,它會放一個message進queue中,如果滿了會block;同樣的在consume的地方呼叫BlockingQueue#take()
,它會從queue中取一筆資料出來,如果queue是空的則會block。為了刻意讓queue比較容易滿,我把queue的constructor帶5
進去,這代表的是它的容量。我們可以改變main
中的sleep time,來模擬供過於求跟供不應求的狀況。
Pipe
有使用Unix/Linux的讀者應該很熟悉pipe也就是|
這個shell operation,這是Unix當初在設計的時候非常重要的philosophy:
- Write programs that do one thing and do it well.
- Write programs to work together.
- Write programs to handle text streams, because that is a universal interface.
下面就是一個常見的案例
tail accesslog | awk '{print $1}' | sort | uniq -c | sort -nr
我們把前面process的stdout當作下一個process的stdin。透過這行指令,我們馬上建立一個pipeline,最後把結果印出到terminal之上,有沒有很像我剛剛說的工廠的情境?
在Java中thread之間也可以這麼做(雖然這麼做不常見 ^^),利用的是PipedOutputStream跟PipedInputStream這兩個class,兩者可組合成一個Pipe,其中outputstream的產出就會是inputstream的輸入。下面是個範例。
public class Pipe {
private PipedInputStream in;
private PipedOutputStream out;
public Pipe() throws IOException {
in = new PipedInputStream(4096);
out = new PipedOutputStream(in);
}
public String readAll() throws IOException {
StringBuilder stringBuilder = new StringBuilder();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
String line;
while ((line = reader.readLine()) != null) {
stringBuilder.append(line);
}
}
return stringBuilder.toString();
}
public void writeAll(String string) {
try (PrintStream printStream = new PrintStream(out)){
printStream.print(string);
}
}
public static void main(String[] args) throws IOException {
Pipe pipe = new Pipe();
// produce a message
new Thread(() -> {
pipe.writeAll("hello pipe!!");
}).start();
// consume the message
new Thread(() -> {
try {
System.out.println(pipe.readAll());
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
Other Utility
- Pipe: nio的版本。其實跟
PipedInputStream
跟PipedOutputStream
類似。 - Future and Completable Future: 其實兩個都同時扮演了flow control跟message passing。一樣是留在後面的章節再做討論。
Recap
到這裡我們討論了resource sharing, flow control, message passing。這可以說是multi-threading中最重要的三個議題。但是mult-thread程式不好撰寫,即使是知道了上面這些課題,還是很容易出錯。因此有必要把更high-level,更一般化的問題包裝成更好用的介面。後面章節我們會把thread pool跟asynchronous invokation這兩個常用的情境做更進一步介紹。