多執行緒
瀏覽這篇 部落格文章,以了解 Julia 多執行緒功能的簡報。
使用多執行緒啟動 Julia
預設情況下,Julia 會以單一執行緒啟動。可以使用指令 Threads.nthreads()
驗證
julia> Threads.nthreads()
1
執行緒數量可透過使用 -t
/--threads
命令列參數或
環境變數控制。如果同時指定兩者,則 JULIA_NUM_THREADS
-t
/--threads
優先。
執行緒數量可以整數 (--threads=4
) 或 auto
(--threads=auto
) 指定,其中 auto
會嘗試推斷要使用的執行緒預設數量 (有關更多詳細資訊,請參閱 命令列選項)。
-t
/--threads
命令列參數至少需要 Julia 1.5。在較舊版本中,您必須改用環境變數。
將 auto
用作環境變數 JULIA_NUM_THREADS
的值至少需要 Julia 1.7。在較舊版本中,此值會被忽略。
讓我們以 4 個執行緒啟動 Julia
$ julia --threads 4
讓我們驗證是否有 4 個執行緒可供我們使用。
julia> Threads.nthreads()
4
但我們目前在主執行緒中。若要檢查,我們使用函數 Threads.threadid
julia> Threads.threadid()
1
如果您偏好使用環境變數,可以在 Bash (Linux/macOS) 中設定如下
export JULIA_NUM_THREADS=4
Linux/macOS 上的 C shell、Windows 上的 CMD
set JULIA_NUM_THREADS=4
Windows 上的 Powershell
$env:JULIA_NUM_THREADS=4
請注意,這必須在啟動 Julia 之前 執行。
使用 -t
/--threads
指定的執行緒數量會傳播到使用 -p
/--procs
或 --machine-file
命令列選項衍生的工作程序。例如,julia -p2 -t2
會衍生 1 個主程序和 2 個工作程序,且這三個程序都啟用了 2 個執行緒。若要更精細地控制工作執行緒,請使用 addprocs
,並將 -t
/--threads
傳遞為 exeflags
。
多個 GC 執行緒
垃圾收集器 (GC) 可以使用多個執行緒。使用數量為運算工作執行緒數量的一半,或由 --gcthreads
命令列參數或使用 JULIA_NUM_GC_THREADS
環境變數設定。
--gcthreads
命令列參數至少需要 Julia 1.10。
執行緒池
當程式執行緒忙於執行許多任務時,任務可能會遇到延遲,這可能會對程式的回應性和互動性產生負面影響。為了解決這個問題,您可以在 Threads.@spawn
時指定一個任務為互動式
using Base.Threads
@spawn :interactive f()
互動式任務應避免執行高延遲作業,如果它們是長時間任務,則應頻繁產生。
Julia 可以啟動一個或多個執行緒,專門用於執行互動式任務
$ julia --threads 3,1
環境變數 JULIA_NUM_THREADS
也可以類似地使用
export JULIA_NUM_THREADS=3,1
這會在 :default
執行緒池中啟動 3 個執行緒,在 :interactive
執行緒池中啟動 1 個執行緒
julia> using Base.Threads
julia> nthreadpools()
2
julia> threadpool() # the main thread is in the interactive thread pool
:interactive
julia> nthreads(:default)
3
julia> nthreads(:interactive)
1
julia> nthreads()
3
nthreads
的零參數版本會傳回預設池中的執行緒數量。
根據 Julia 是否以互動式執行緒啟動,主執行緒會在預設或互動式執行緒池中。
兩個數字都可以替換為單字 auto
,這會讓 Julia 選擇合理的預設值。
通訊與同步
儘管 Julia 的執行緒可以透過共享記憶體通訊,但撰寫正確且無資料競爭的多執行緒程式碼出了名的困難。Julia 的 Channel
是執行緒安全的,可用於安全地進行通訊。
資料競爭自由
確保您的程式無資料競爭的責任完全在您身上,如果您沒有遵守此要求,則無法保證任何在此處承諾的事項。觀察到的結果可能會非常不直觀。
確保此問題的最佳方式是在存取可從多個執行緒觀察到的任何資料時取得鎖定。例如,在大多數情況下,您應該使用以下程式碼模式
julia> lock(lk) do
use(a)
end
julia> begin
lock(lk)
try
use(a)
finally
unlock(lk)
end
end
其中 lk
是鎖定(例如 ReentrantLock()
),而 a
是資料。
此外,在存在資料競爭的情況下,Julia 在記憶體方面並不安全。如果另一個執行緒可能會寫入 任何 資料,請務必小心讀取!相反地,在變更其他執行緒存取的資料(例如指派給全域或閉包變數)時,請務必使用上述鎖定模式。
Thread 1:
global b = false
global a = rand()
global b = true
Thread 2:
while !b; end
bad_read1(a) # it is NOT safe to access `a` here!
Thread 3:
while !@isdefined(a); end
bad_read2(a) # it is NOT safe to access `a` here
@threads
巨集
我們使用原生執行緒來處理一個簡單的範例。讓我們建立一個零陣列
julia> a = zeros(10)
10-element Vector{Float64}:
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
讓我們使用 4 個執行緒同時對此陣列進行運算。我們會讓每個執行緒將其執行緒 ID 寫入每個位置。
Julia 使用 Threads.@threads
巨集支援平行迴圈。此巨集附加在 for
迴圈前面,以指示 Julia 迴圈是多執行緒區域
julia> Threads.@threads for i = 1:10
a[i] = Threads.threadid()
end
迭代空間在執行緒之間分割,之後每個執行緒會將其執行緒 ID 寫入其指定的位址
julia> a
10-element Vector{Float64}:
1.0
1.0
1.0
2.0
2.0
2.0
3.0
3.0
4.0
4.0
請注意,Threads.@threads
沒有像 @distributed
的選用式縮減參數。
使用 @threads
避免資料競爭
以一個天真的總和範例來說
julia> function sum_single(a)
s = 0
for i in a
s += i
end
s
end
sum_single (generic function with 1 method)
julia> sum_single(1:1_000_000)
500000500000
只要加入 @threads
就會暴露資料競爭,因為多個執行緒會同時讀取和寫入 s
。
julia> function sum_multi_bad(a)
s = 0
Threads.@threads for i in a
s += i
end
s
end
sum_multi_bad (generic function with 1 method)
julia> sum_multi_bad(1:1_000_000)
70140554652
請注意,結果並非預期的 500000500000
,而且每次評估很可能會改變。
若要修正此問題,可以針對特定工作使用緩衝區,將總和區分成無競爭的區塊。在此,sum_single
會重複使用,並有其自己的內部緩衝區 s
,而向量 a
會分割成 nthreads()
個區塊,以透過 nthreads()
個 @spawn
執行的工作進行平行處理。
julia> function sum_multi_good(a)
chunks = Iterators.partition(a, length(a) ÷ Threads.nthreads())
tasks = map(chunks) do chunk
Threads.@spawn sum_single(chunk)
end
chunk_sums = fetch.(tasks)
return sum_single(chunk_sums)
end
sum_multi_good (generic function with 1 method)
julia> sum_multi_good(1:1_000_000)
500000500000
緩衝區不應根據 threadid()
進行管理,例如 buffers = zeros(Threads.nthreads())
,因為並行的工作可能會讓出,表示多個並行的工作可能會在特定執行緒上使用同一個緩衝區,進而導致資料競爭風險。此外,當有多個執行緒可用時,工作可能會在讓出點變更執行緒,這稱為工作遷移。
另一種選擇是在工作/執行緒間共用的變數上使用原子運算,這可能會根據運算特性而有更好的效能。
原子運算
Julia 支援以原子方式存取和修改值,也就是以執行緒安全的方式來避免競爭條件。值(必須為原始類型)可以包裝成 Threads.Atomic
以表示必須以這種方式存取。以下是範例
julia> i = Threads.Atomic{Int}(0);
julia> ids = zeros(4);
julia> old_is = zeros(4);
julia> Threads.@threads for id in 1:4
old_is[id] = Threads.atomic_add!(i, id)
ids[id] = id
end
julia> old_is
4-element Vector{Float64}:
0.0
1.0
7.0
3.0
julia> i[]
10
julia> ids
4-element Vector{Float64}:
1.0
2.0
3.0
4.0
如果我們嘗試在沒有原子標籤的情況下進行加法,我們可能會因為競爭條件而得到錯誤的答案。以下是一個範例,說明如果我們沒有避免競爭會發生什麼情況
julia> using Base.Threads
julia> Threads.nthreads()
4
julia> acc = Ref(0)
Base.RefValue{Int64}(0)
julia> @threads for i in 1:1000
acc[] += 1
end
julia> acc[]
926
julia> acc = Atomic{Int64}(0)
Atomic{Int64}(0)
julia> @threads for i in 1:1000
atomic_add!(acc, 1)
end
julia> acc[]
1000
每個欄位的原子
我們也可以使用 @atomic
、@atomicswap
和 @atomicreplace
巨集在更精細的層級上使用原子。
記憶體模型和其他設計細節的具體說明寫在 Julia 原子宣言 中,稍後會正式發布。
結構宣告中的任何欄位都可以用 @atomic
修飾,然後任何寫入也必須標記為 @atomic
,並且必須使用定義的原子順序之一(:monotonic
、:acquire
、:release
、:acquire_release
或 :sequentially_consistent
)。任何原子欄位的讀取也可以註解原子順序約束,或者如果未指定,將使用單調(放鬆)順序執行。
每個欄位的原子需要至少 Julia 1.7。
副作用和可變函式引數
當使用多執行緒時,我們必須小心使用非 純函式,因為我們可能會得到錯誤的答案。例如,根據慣例,名稱以 !
結尾的函式會修改其引數,因此不是純函式。
@threadcall
外部函式庫,例如透過 ccall
呼叫的函式庫,對 Julia 的基於工作任務的 I/O 機制構成問題。如果 C 函式庫執行封鎖操作,則會阻止 Julia 排程器執行任何其他工作任務,直到呼叫傳回為止。(例外情況是呼叫自訂 C 程式碼,而該程式碼會呼叫回 Julia,然後可能會讓出,或呼叫 jl_yield()
的 C 程式碼,這是 yield
的 C 等效項。)
@threadcall
巨集提供一種方法,可以在這種情況下避免執行停滯。它會排程 C 函式在獨立執行緒中執行。會使用預設大小為 4 的執行緒池來執行此操作。執行緒池的大小會透過環境變數 UV_THREADPOOL_SIZE
來控制。在等待可用的執行緒時,以及在執行緒可用時執行函式期間,要求的工作任務(在 Julia 主要事件迴圈上)會讓出給其他工作任務。請注意,@threadcall
必須等到執行完成後才會傳回。因此,從使用者的角度來看,它就像其他 Julia API 一樣是一個封鎖呼叫。
非常重要的是,呼叫的函式不得呼叫回 Julia,因為它會導致分段錯誤。
@threadcall
可能會在 Julia 的未來版本中移除或變更。
注意事項
目前,如果使用者程式碼沒有資料競爭,則 Julia 執行時期和標準函式庫中的大多數操作都可以以執行緒安全的方式使用。然而,在某些領域中,穩定執行緒支援的工作仍在進行中。多執行緒程式設計有許多固有的困難,如果使用執行緒的程式出現異常或不良的行為(例如崩潰或神秘的結果),通常應首先懷疑執行緒互動。
在 Julia 中使用執行緒時,有幾個特定的限制和警告事項需要注意
- 如果多個執行緒同時使用基本收集類型,且至少有一個執行緒修改收集(常見範例包括陣列上的
push!
,或將項目插入Dict
),則需要手動鎖定。 @spawn
使用的排程是非決定性的,不應依賴它。- 受計算限制、不配置記憶體的任務可能會阻止在配置記憶體的其他執行緒中執行垃圾回收。在這些情況下,可能需要手動呼叫
GC.safepoint()
以允許執行 GC。此限制將在未來移除。 - 避免並行執行頂層運算,例如類型、方法和模組定義的
include
或eval
。 - 請注意,如果啟用執行緒,由函式庫註冊的完成函式可能會中斷。這可能需要在生態系統中進行一些過渡性工作,才能在有信心地廣泛採用執行緒。有關更多詳細資訊,請參閱下一節。
任務遷移
任務開始在特定執行緒上執行後,如果任務讓步,它可能會移至不同的執行緒。
此類任務可能已使用 @spawn
或 @threads
啟動,儘管 @threads
的 :static
排程選項會凍結執行緒 ID。
這表示在大部分情況下,不應將 threadid()
視為任務中的常數,因此不應使用它來索引緩衝區或有狀態物件的向量。
任務遷移是在 Julia 1.7 中引入的。在此之前,任務始終保留在啟動它們的相同執行緒上。
安全使用 Finalizers
由於 finalizers 會中斷任何程式碼,因此它們在與任何全域狀態互動時必須非常小心。不幸的是,使用 finalizers 的主要原因是更新全域狀態(純函數通常作為 finalizer 沒有意義)。這讓我們陷入了一個難題。有幾種方法可以解決這個問題
當單執行緒時,程式碼可以呼叫內部
jl_gc_enable_finalizers
C 函數,以防止在關鍵區域內排程 finalizers。在內部,這用於某些函數(例如我們的 C 鎖)中,以防止在執行某些操作(增量套件載入、codegen 等)時發生遞迴。鎖和此標誌的組合可用於使 finalizers 安全。第二種策略,由 Base 在幾個地方採用,是明確延遲 finalizer,直到它可以非遞迴地取得其鎖為止。以下範例說明如何將此策略套用於
Distributed.finalize_ref
function finalize_ref(r::AbstractRemoteRef) if r.where > 0 # Check if the finalizer is already run if islocked(client_refs) || !trylock(client_refs) # delay finalizer for later if we aren't free to acquire the lock finalizer(finalize_ref, r) return nothing end try # `lock` should always be followed by `try` if r.where > 0 # Must check again here # Do actual cleanup here r.where = 0 end finally unlock(client_refs) end end nothing end
相關的第三個策略是使用免於讓渡的佇列。目前我們沒有在 Base 中實作免於鎖定的佇列,但
Base.IntrusiveLinkedListSynchronized{T}
適用。這通常是使用事件迴圈程式碼的良好策略。例如,Gtk.jl
使用此策略來管理生命週期引用計數。在此方法中,我們不會在finalizer
內執行任何明確的工作,而是將其新增至佇列,以便在較安全的時機執行。事實上,Julia 的工作排程器已使用此方法,因此將 finalizer 定義為x -> @spawn do_cleanup(x)
是此方法的一個範例。但請注意,這無法控制do_cleanup
在哪個執行緒上執行,因此do_cleanup
仍需要取得鎖定。如果您實作自己的佇列,則不必如此,因為您可以明確地只從您的執行緒中清空該佇列。