非同步程式設計
當程式需要與外界互動時,例如透過網際網路與另一台機器通訊,程式中的操作可能需要以無法預測的順序發生。假設你的程式需要下載一個檔案。我們希望在啟動下載操作後,執行其他操作,同時等待下載完成,然後在下載的檔案可用時,繼續執行需要該檔案的程式碼。這種情況屬於非同步程式設計的範疇,有時也稱為並行程式設計(因為在概念上,多件事會同時發生)。
為了處理這些場景,Julia 提供 Task
(也以其他幾個名稱所知,例如對稱協程、輕量級執行緒、合作式多工處理或一次性延續)。當一段運算工作(實際上是執行特定函數)被指定為 Task
時,就可以透過切換到另一個 Task
來中斷它。原始的 Task
之後可以繼續執行,屆時它會從中斷處繼續執行。一開始,這可能看起來類似函數呼叫。不過有兩個關鍵差異。首先,切換工作不會使用任何空間,因此可以執行任意次數的工作切換,而不會消耗呼叫堆疊。其次,與函數呼叫不同,工作之間的切換可以按任何順序進行,函數呼叫中被呼叫的函數必須在控制權傳回呼叫函數之前完成執行。
基本 Task
操作
您可以將 Task
視為要執行的運算工作單元的控制代碼。它有一個建立-開始-執行-完成的生命週期。透過對 0 個引數函數呼叫 Task
建構函數來建立工作,或使用 @task
巨集
julia> t = @task begin; sleep(5); println("done"); end
Task (runnable) @0x00007f13a40c0eb0
@task x
等於 Task(()->x)
。
此工作會等待五秒鐘,然後印出 完成
。不過,它尚未開始執行。我們可以在準備好時呼叫 schedule
來執行它
julia> schedule(t);
如果你在 REPL 中嘗試這個,你會看到 schedule
立即回傳。這是因為它只是將 t
加入到一個內部任務佇列中執行。然後,REPL 會列印下一個提示並等待更多輸入。等待鍵盤輸入提供了其他任務執行的機會,因此在那個時候 t
將會啟動。t
呼叫 sleep
,它會設定一個計時器並停止執行。如果已排程其他任務,它們可以在此時執行。五秒後,計時器會觸發並重新啟動 t
,你會看到列印出 done
。然後 t
就完成了。
wait
函式會封鎖呼叫任務,直到其他任務完成。因此,例如如果你輸入
julia> schedule(t); wait(t)
而不是只呼叫 schedule
,你會在出現下一個輸入提示之前看到五秒的暫停。這是因為 REPL 在繼續之前正在等待 t
完成。
通常會想要建立一個任務並立即排程它,因此提供巨集 @async
以達到此目的 –- @async x
等於 schedule(@task x)
。
與通道通訊
在某些問題中,各種所需工作並非自然地透過函式呼叫關聯;在需要完成的工作中沒有明顯的「呼叫方」或「被呼叫方」。一個範例是生產者-消費者問題,其中一個複雜程序正在產生值,而另一個複雜程序正在使用它們。消費者無法僅呼叫生產者函式來取得值,因為生產者可能還有更多值要產生,因此可能尚未準備好回傳。透過任務,生產者和消費者都可以根據需要執行,並視需要傳遞值。
Julia 提供一個 Channel
機制來解決這個問題。一個 Channel
是個可等待的先進先出佇列,它可以有多個任務讀取和寫入。
讓我們定義一個產生者任務,它透過 put!
呼叫來產生值。要使用這些值,我們需要排程產生者在一個新任務中執行。一個特殊的 Channel
建構函式,它接受一個 1 個參數的函式作為參數,可以用來執行一個繫結到頻道的任務。然後我們可以重複從頻道物件 take!
值
julia> function producer(c::Channel)
put!(c, "start")
for n=1:4
put!(c, 2n)
end
put!(c, "stop")
end;
julia> chnl = Channel(producer);
julia> take!(chnl)
"start"
julia> take!(chnl)
2
julia> take!(chnl)
4
julia> take!(chnl)
6
julia> take!(chnl)
8
julia> take!(chnl)
"stop"
思考這個行為的一種方式是,producer
能夠多次傳回。在呼叫 put!
之間,產生者的執行會暫停,而使用者有控制權。
傳回的 Channel
可以用作一個 iterable 物件在一個 for
迴圈中,在這種情況下,迴圈變數會帶入所有產生的值。當頻道關閉時,迴圈會終止。
julia> for x in Channel(producer)
println(x)
end
start
2
4
6
8
stop
請注意,我們不必在產生者中明確關閉頻道。這是因為將一個 Channel
繫結到一個 Task
的動作,會將頻道的開啟生命週期與繫結任務的生命週期關聯在一起。當任務終止時,頻道物件會自動關閉。多個頻道可以繫結到一個任務,反之亦然。
雖然 Task
建構函式預期一個 0 個參數的函式,但建立一個任務繫結頻道的 Channel
方法預期一個函式,它接受一個型別為 Channel
的單一參數。一個常見的模式是產生者被參數化,在這種情況下,需要一個部分函式應用來建立一個 0 或 1 個參數的 匿名函式。
對於 Task
物件,這可以直接完成,或透過使用便利巨集來完成
function mytask(myarg)
...
end
taskHdl = Task(() -> mytask(7))
# or, equivalently
taskHdl = @task mytask(7)
若要編排更進階的工作分配模式,bind
和 schedule
可以與 Task
和 Channel
建構函式搭配使用,以明確連結一組通道和一組生產者/消費者工作。
更多關於通道
通道可以視覺化為一個管線,也就是說,它有一個寫入端和一個讀取端
不同工作中的多個寫入器可以透過
put!
呼叫同時寫入同一個通道。不同工作中的多個讀取器可以透過
take!
呼叫同時讀取資料。舉例來說
# Given Channels c1 and c2, c1 = Channel(32) c2 = Channel(32) # and a function `foo` which reads items from c1, processes the item read # and writes a result to c2, function foo() while true data = take!(c1) [...] # process data put!(c2, result) # write out result end end # we can schedule `n` instances of `foo` to be active concurrently. for _ in 1:n errormonitor(@async foo()) end
通道是透過
Channel{T}(sz)
建構函式建立的。通道只會保留T
類型的物件。如果沒有指定類型,通道可以保留任何類型的物件。sz
指的是通道中任何時間可以保留的最大元素數量。例如,Channel(32)
建立一個可以保留最多 32 個任何類型物件的通道。Channel{MyType}(64)
可以隨時保留最多 64 個MyType
物件。一個
Channel
最初處於開啟狀態。這表示它可以透過take!
和put!
呼叫自由地讀取和寫入。close
會關閉Channel
。在關閉的Channel
上,put!
會失敗。例如julia> c = Channel(2); julia> put!(c, 1) # `put!` on an open channel succeeds 1 julia> close(c); julia> put!(c, 2) # `put!` on a closed channel throws an exception. ERROR: InvalidStateException: Channel is closed. Stacktrace: [...]
take!
和fetch
(會擷取但不會移除值)在關閉的頻道上會成功傳回任何現有的值,直到它為空。繼續上述範例julia> fetch(c) # Any number of `fetch` calls succeed. 1 julia> fetch(c) 1 julia> take!(c) # The first `take!` removes the value. 1 julia> take!(c) # No more data available on a closed channel. ERROR: InvalidStateException: Channel is closed. Stacktrace: [...]
考慮一個使用頻道進行任務間通訊的簡單範例。我們啟動 4 個任務來處理來自單一 jobs
頻道的資料。由 ID (job_id
) 識別的工作會寫入頻道。此模擬中的每個任務都會讀取 job_id
,等待一段隨機時間,然後將 job_id
和模擬時間的元組寫回結果頻道。最後,所有 results
都會列印出來。
julia> const jobs = Channel{Int}(32);
julia> const results = Channel{Tuple}(32);
julia> function do_work()
for job_id in jobs
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
# typically performed externally.
put!(results, (job_id, exec_time))
end
end;
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs
julia> for i in 1:4 # start 4 tasks to process requests in parallel
errormonitor(@async do_work())
end
julia> @elapsed while n > 0 # print out results
job_id, exec_time = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds")
global n = n - 1
end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311
與其使用 errormonitor(t)
,更健全的解決方案可能是使用 bind(results, t)
,因為它不僅會記錄任何意外的失敗,還會強制關閉相關資源,並將例外狀況傳播到各處。
更多任務操作
任務操作建立在稱為 yieldto
的低階原語上。yieldto(task, value)
會暫停目前的任務,切換到指定的 task
,並導致該任務最後的 yieldto
呼叫傳回指定的 value
。請注意,yieldto
是使用任務式控制流程所需的唯一操作;我們永遠不會呼叫和傳回,而是只切換到不同的任務。這就是此功能也稱為「對稱協程」的原因;每個任務的切換都使用相同的機制。
yieldto
很強大,但大多數任務的使用並不會直接呼叫它。想想為什麼會這樣。如果您從目前的任務切換出去,您可能希望在某個時間點切換回來,但要知道何時切換回來,以及知道哪個任務有責任切換回來,可能需要相當程度的協調。例如,put!
和 take!
是封鎖操作,在通道的內容中使用時,會維護狀態以記住消費者是誰。不需要手動追蹤使用中的任務,這使得 put!
比低階的 yieldto
更容易使用。
除了 yieldto
之外,還需要一些其他基本函數才能有效使用任務。
current_task
取得目前正在執行的任務的參考。istaskdone
查詢任務是否已結束。istaskstarted
查詢任務是否已執行。task_local_storage
處理特定於目前任務的鍵值儲存。
任務與事件
大部分的任務切換發生在等待事件的結果,例如 I/O 請求,並由 Julia Base 中包含的排程器執行。排程器維護一個可執行任務佇列,並執行一個事件迴圈,根據外部事件(例如訊息到達)重新啟動任務。
等待事件的基本函數為 wait
。幾個物件實作 wait
;例如,給定一個 Process
物件,wait
會等待它結束。wait
通常是隱含的;例如,wait
會發生在呼叫 read
以等待資料可用的情況下。
在所有這些情況下,wait
最終會在 Condition
物件上執行,負責佇列和重新啟動任務。當任務在 Condition
上呼叫 wait
時,任務會標記為不可執行,新增到條件的佇列,並切換到排程器。然後,排程器會選擇另一個任務執行,或封鎖等待外部事件。如果一切順利,最終事件處理常式會在條件上呼叫 notify
,這會導致等待該條件的任務再次變為可執行。
透過呼叫 Task
明確建立的任務最初不會被排程器知道。這允許您使用 yieldto
手動管理任務(如果您願意)。但是,當此類任務等待事件時,當事件發生時,它仍會自動重新啟動,正如您所預期的。