在講到具體內容之前,不能不先講下Coroutine的一些背景知識,來先具體了解下什麼是Coroutine。
1. 背景知識
現在的操作系統都是支持多任務的,多任務可通過多進程或多線程的方式去實現,進程和線程的對比就不在這裡說了,在多任務的調度上操作系統采取搶占式和協作式兩種方式,搶占式是指操作系統給每個任務一定的執行時間片,在到達這個時間片後如任務仍然未釋放對CPU的占用,那麼操作系統將強制釋放,這是目前多數操作系統采取的方式;協作式是指操作系統按照任務的順序來分配CPU,每個任務執行過程中除非其主動釋放,否則將一直占據CPU,這種方式非常值得注意的是一旦有任務占據CPU不放,會導致其他任務”餓死”的現象,因此操作系統確實不太適合采用這種方式。
說完操作系統多任務的調度方式後,來看看通常程序是如何實現支持高並發的,一種就是典型的基於操作系統提供的多進程或多線程機制,每個任務占據一個進程或一個線程,當任務中有IO等待等動作時,則將進程或線程放入待調度隊列中,這種方式是目前大多數程序采取的方式,這種方式的壞處在於如想支持高的並發量,就不得不創建很多的進程或線程,而進程和線程都是要消耗不少系統資源的,另外一方面,進程或線程創建太多後,操作系統需要花費很多的時間在進程或線程的切換上,切換動作需要做狀態保持和恢復,這也會消耗掉很多的系統資源;另外一種方式則是每個任務不完全占據一個進程或線程,當任務執行過程中需要進行IO等待等動作時,任務則將其所占據的進程或線程釋放,以便其他任務使用這個進程或線程,這種方式的好處在於可以減少所需要的原生的進程或線程數,並且由於操作系統不需要做進程或線程的切換,而是自行來實現任務的切換,其成本會較操作系統切換低,這種方式也就是本文的重點,Coroutine方式,又稱協程方式,這種方式在目前的大多數語言中都有支持。
各種語言在實現Coroutine方式的支持時,多數都采用了Actor Model來實現,Actor Model簡單來說就是每個任務就是一個Actor,Actor之間通過消息傳遞的方式來進行交互,而不采用共享的方式,Actor可以看做是一個輕量級的進程或線程,通常在一台4G內存的機器上,創建幾十萬個Actor是毫無問題的,Actor支持Continuations,即對於如下代碼:
Actor
act方法
進行一些處理
創建並執行另外一個Actor
通過消息box阻塞獲取另一個Actor執行的結果
繼續基於這個結果進行一些處理
在支持Continuations的情況下,可以做到消息box阻塞時並不是進程或線程級的阻塞,而只是Actor本身的阻塞,並且在阻塞時可將所占據的進程或線程釋放給其他Actor使用,Actor Model實現最典型的就是erLang了。
對於Java應用而言,傳統方式下為了支持高並發,由於一個線程只能用於處理一個請求,即使是線程中其實有很多IO中斷、鎖等待也同樣如此,因此通常的做法是通過啟動很多的線程來支撐高並發,但當線程過多時,就造成了CPU需要消耗不少的時間在線程的切換上,從而出現瓶頸,按照上面對Coroutine的描述,Coroutine的方式理論上而言能夠大幅度的提升Java應用所能支撐的並發量。
2. 在Java中使用Coroutine
Java尚不能從語言層次上支持Coroutine,也許Java 7能夠支持,目前已經有了一個測試性質的版本[1],在Sun JDK 7尚未正式發布的情況下如希望在Java中使用Coroutine,Scala或Kilim是可以做的選擇,來分別看下。
Scala是現在很火的語言之一,Twitter消息中間件基於Scala編寫更是讓Scala名聲鵲起,除了在語法方面所做出的改進外,其中一個最突出的特色就是Scala Actor,Scala Actor是Scala用於實現Coroutine的方式,先來具體看看Scala在Coroutine支持實現的關鍵概念。
l Actor
Scala Actor可以看做是一個輕量級的Java Thread,其使用方式和Java Thread基本也一致,繼承Actor,實現act方法,啟動時也是調用start方法,但和Java Thread不同的是,Scala Actor可等待外部發送過來的消息,並進行相應的處理。
l Actor的消息發送機制
發送消息到Actor的方式有異步、Future兩種方式,異步即指發送後立即返回,繼續後續流程,使用異步發送的方法為:actor ! MessageObject,其中消息對象可以為任何類型,並且Scala還支持一種稱為case Object的對象,便於在收到消息時做pattern matching。
Future方式是指阻塞線程等待消息處理的結果,使用Future方式發送的方法為:actor !! MessageObject,在等待結果方面,Scala支持不限時等待,限時等待以及等待多個Future或個別Future完成,使用方法如下:
val ft=actor !! MessageObject // Future方式發送消息
val result=ft() // 不限時等待
val results=awaitAll(500,ft1,ft2,ft3) // 限時等待多個Future返回值
val results=awaitEither(ft1,ft2) // 等待個別future完成
接收消息方通過reply方法返回Future方式所等待的結果。
l Actor的消息接收機制
當代碼處於Actor的act方法或Actor環境(例如為Actor的act方法調用過來的代碼)中時,可通過以下兩種方式來接收外部發送給Actor的消息:一為receive方式,二為react方式,代碼例子如下:
receive{
case MessageObject(args) => doHandle(args)
}
react{
case MessageObject(args) => doHandle(args)
}
receive和react的差別在於receive需要阻塞當前Java線程,react則僅為阻塞當前Actor,但並不會阻塞Java線程,因此react模式更適合於充分發揮coroutine帶來的原生線程數減少的好處,但react模式有個缺點是react不支持返回。
receive和react都有限時接收的方式,方法為:receiveWithin(timeout)、reactWithin(timeout),超時的消息通過case TIMEOUT的方式來接收。
下面來看基於Scala Actor實現並發處理請求的一個簡單例子。
class Processor extends Actor{
def act(){
loop{
react{
case command:String => doHandle(command)
}
}
}
def doHandle(command:String){
// 業務邏輯處理
}
}