多執行緒
Base.Threads.@threads
— 巨集Threads.@threads [schedule] for ... end
一個巨集用於平行執行 for
迴圈。迭代空間會分配給粗粒度的任務。這個政策可以用 schedule
參數指定。迴圈的執行會等待所有迭代的評估。
另請參閱:@spawn
和 Distributed
中的 pmap
。
延伸說明
語意
除非排程選項指定了更強的保證,否則由 @threads
巨集執行的迴圈具有下列語意。
@threads
巨集會以未指定的順序執行迴圈主體,並有可能同時執行。它不會指定任務和工作執行緒的確切分配。分配可能會因每次執行而異。迴圈主體程式碼(包括從中暫時呼叫的任何程式碼)不得對任務的迭代分配或執行它們的工作執行緒做出任何假設。每個迭代的迴圈主體必須能夠獨立於其他迭代進行前進,並且沒有資料競爭。因此,跨迭代的無效同步可能會導致死結,而未同步的記憶體存取可能會導致未定義的行為。
例如,上述條件表示
- 在迭代中取得的鎖定必須在同一個迭代中釋放。
- 使用像
Channel
之類的封鎖原語在迭代之間進行通訊是不正確的。 - 僅寫入未跨迭代共用的位置(除非使用鎖定或原子操作)。
- 除非使用
:static
排程,否則threadid()
的值甚至可能在單一迭代中變更。請參閱任務遷移
。
排程器
在沒有排程器引數的情況下,確切的排程是未指定的,並且會因 Julia 版本而異。目前,在未指定排程器時會使用 :dynamic
。
schedule
引數在 Julia 1.5 中可用。
:dynamic
(預設)
:dynamic
排程器會動態執行迭代至可用的工作執行緒。目前的實作假設每個迭代的工作負載是一致的。但是,此假設可能會在未來移除。
此排程選項僅為底層執行機制的提示。不過,可以預期一些屬性。:dynamic
排程器所使用的 Task
數量會受到可用工作執行緒數量的常數倍數限制 (Threads.threadpoolsize()
)。每個工作處理連續的迭代空間區域。因此,如果 length(xs)
遠大於工作執行緒數量,且 f(x)
的執行時間相對小於產生和同步工作所需的成本(通常小於 10 微秒),則 @threads :dynamic for x in xs; f(x); end
通常比 @sync for x in xs; @spawn f(x); end
更有效率。
:dynamic
選項可用於 schedule
參數,且為 Julia 1.8 的預設值。
:static
:static
排程器會為每個執行緒建立一個工作,並在它們之間平均分配迭代,將每個工作特別指定給每個執行緒。特別是,threadid()
的值保證在一個迭代中為常數。如果從另一個 @threads
迴圈內部或從 1 以外的執行緒使用,指定 :static
會產生錯誤。
:static
排程存在於支援在 Julia 1.3 之前編寫的程式碼轉換。在新的函式庫函數中,不建議使用 :static
排程,因為使用此選項的函數無法從任意工作執行緒呼叫。
範例
為了說明不同的排程策略,請考慮下列包含非讓步計時迴圈的函數 busywait
,該迴圈會執行給定的秒數。
julia> function busywait(seconds)
tstart = time_ns()
while (time_ns() - tstart) / 1e9 < seconds
end
end
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :static for i in 1:Threads.threadpoolsize()
busywait(1)
end
end
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
busywait(1)
end
end
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)
:dynamic
範例花費 2 秒,因為其中一個未佔用的執行緒能夠執行兩個 1 秒的迭代來完成 for 迴圈。
Base.Threads.foreach
— 函數Threads.foreach(f, channel::Channel;
schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
ntasks=Threads.threadpoolsize())
類似於 foreach(f, channel)
,但會在 Threads.@spawn
產生的 ntasks
個工作中分割 channel
的迭代和對 f
的呼叫。此函數會等到所有內部產生的工作完成後才會傳回。
如果 schedule isa FairSchedule
,Threads.foreach
會嘗試以一種方式產生工作,讓 Julia 的排程器能夠更自由地在執行緒之間負載平衡工作項目。此方法通常具有較高的每個項目開銷,但在與其他多執行緒工作負載並行時,可能會比 StaticSchedule
表現得更好。
如果 schedule isa StaticSchedule
,Threads.foreach
會以一種方式產生工作,讓每個項目的開銷比 FairSchedule
低,但較不適合負載平衡。因此,此方法可能更適合細緻、一致的工作負載,但在與其他多執行緒工作負載並行時,可能會比 FairSchedule
表現得更差。
範例
julia> n = 20
julia> c = Channel{Int}(ch -> foreach(i -> put!(ch, i), 1:n), 1)
julia> d = Channel{Int}(n) do ch
f = i -> put!(ch, i^2)
Threads.foreach(f, c)
end
julia> collect(d)
collect(d) = [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
此函數需要 Julia 1.6 或更新版本。
Base.Threads.@spawn
— 巨集Threads.@spawn [:default|:interactive] expr
建立一個 工作
並 排程
它在指定的執行緒池中任何可用的執行緒上執行(未指定時為 :default
)。一旦有一個執行緒可用,工作就會分配給該執行緒。若要等到工作完成,請對此巨集的結果呼叫 wait
,或呼叫 fetch
來等待並取得其傳回值。
值可透過 $
內插到 @spawn
,這會將值直接複製到建構中的底層閉包中。這允許您插入變數的值,將非同步程式碼與目前工作中變數值的變更隔離。
此巨集自 Julia 1.3 起可用。
自 Julia 1.4 起,可透過 $
內插值。
自 Julia 1.9 起,可指定執行緒池。
範例
julia> t() = println("Hello from ", Threads.threadid());
julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
Hello from 1
Hello from 1
Hello from 3
Hello from 4
Base.Threads.threadid
— 函數Threads.threadid() -> Int
取得目前執行緒的 ID 號碼。主執行緒的 ID 為 1
。
範例
julia> Threads.threadid()
1
julia> Threads.@threads for i in 1:4
println(Threads.threadid())
end
4
2
5
4
如果工作讓步,工作執行的執行緒可能會變更,這稱為 工作移轉
。因此,在大部分情況下,使用 threadid()
來索引緩衝區或有狀態物件的向量是不安全的。
Base.Threads.maxthreadid
— 函數Threads.maxthreadid() -> Int
取得 Julia 程序可用的執行緒數目 (跨所有執行緒池) 的下限,具有原子取得語意。結果將永遠大於或等於 threadid()
,以及在呼叫 maxthreadid
之前您能觀察到的任何工作的 threadid(task)
。
Base.Threads.nthreads
— 函數Threads.nthreads(:default | :interactive) -> Int
取得指定執行緒池內目前執行緒的數量。:interactive
中的執行緒有編號 1:nthreads(:interactive)
,而 :default
中的執行緒有編號 nthreads(:interactive) .+ (1:nthreads(:default))
。
另請參閱 LinearAlgebra
標準函式庫中的 BLAS.get_num_threads
和 BLAS.set_num_threads
,以及 Distributed
標準函式庫中的 nprocs()
和 Threads.maxthreadid()
。
Base.Threads.threadpool
— 函數Threads.threadpool(tid = threadid()) -> Symbol
傳回指定的執行緒的執行緒池;可能是 :default
、:interactive
或 :foreign
。
Base.Threads.nthreadpools
— 函數Threads.nthreadpools() -> Int
傳回目前設定的執行緒池數量。
Base.Threads.threadpoolsize
— 函數Threads.threadpoolsize(pool::Symbol = :default) -> Int
取得預設執行緒池(或指定的執行緒池)可用的執行緒數量。
另請參閱:LinearAlgebra
標準函式庫中的 BLAS.get_num_threads
和 BLAS.set_num_threads
,以及 Distributed
標準函式庫中的 nprocs()
。
Base.Threads.ngcthreads
— 函數Threads.ngcthreads() -> Int
傳回目前設定的 GC 執行緒數量。這包含標記執行緒和同時掃描執行緒。
另請參閱 多執行緒。
原子操作
atomic
— 關鍵字不安全的指標操作與在 C11 和 C++23 中分別使用 _Atomic
和 std::atomic
類型宣告的指標載入和儲存相容。如果沒有支援以原子方式載入 Julia 類型 T
,可能會擲回錯誤。
另請參閱:unsafe_load
、unsafe_modify!
、unsafe_replace!
、unsafe_store!
、unsafe_swap!
Base.@atomic
— 巨集@atomic var
@atomic order ex
將 var
或 ex
標記為以原子方式執行,如果 ex
是受支援的表達式。如果未指定 order
,則預設為 :sequentially_consistent。
@atomic a.b.x = new
@atomic a.b.x += addend
@atomic :release a.b.x = new
@atomic :acquire_release a.b.x += addend
以原子方式執行在右側表示的儲存操作,並傳回新的值。
使用 =
,此操作會轉換為 setproperty!(a.b, :x, new)
呼叫。使用任何運算子,此操作也會轉換為 modifyproperty!(a.b, :x, +, addend)[2]
呼叫。
@atomic a.b.x max arg2
@atomic a.b.x + arg2
@atomic max(a.b.x, arg2)
@atomic :acquire_release max(a.b.x, arg2)
@atomic :acquire_release a.b.x + arg2
@atomic :acquire_release a.b.x max arg2
以原子方式執行在右側表示的二元操作。將結果儲存到第一個引數中的欄位,並傳回值 (old, new)
。
此操作會轉換為 modifyproperty!(a.b, :x, func, arg2)
呼叫。
請參閱手冊中的 每個欄位的原子 章節,以取得更多詳細資料。
範例
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomic a.x # fetch field x of a, with sequential consistency
1
julia> @atomic :sequentially_consistent a.x = 2 # set field x of a, with sequential consistency
2
julia> @atomic a.x += 1 # increment field x of a, with sequential consistency
3
julia> @atomic a.x + 1 # increment field x of a, with sequential consistency
3 => 4
julia> @atomic a.x # fetch field x of a, with sequential consistency
4
julia> @atomic max(a.x, 10) # change field x of a to the max value, with sequential consistency
4 => 10
julia> @atomic a.x max 5 # again change field x of a to the max value, with sequential consistency
10 => 10
此功能至少需要 Julia 1.7。
Base.@atomicswap
— 巨集@atomicswap a.b.x = new
@atomicswap :sequentially_consistent a.b.x = new
將 new
儲存到 a.b.x
,並傳回 a.b.x
的舊值。
此操作會轉換為 swapproperty!(a.b, :x, new)
呼叫。
請參閱手冊中的 每個欄位的原子 章節,以取得更多詳細資料。
範例
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomicswap a.x = 2+2 # replace field x of a with 4, with sequential consistency
1
julia> @atomic a.x # fetch field x of a, with sequential consistency
4
此功能至少需要 Julia 1.7。
Base.@atomicreplace
— 巨集@atomicreplace a.b.x expected => desired
@atomicreplace :sequentially_consistent a.b.x expected => desired
@atomicreplace :sequentially_consistent :monotonic a.b.x expected => desired
以原子方式執行由這對表示的條件式替換,傳回值 (old, success::Bool)
。其中 success
指出是否已完成替換。
此操作會轉換為 replaceproperty!(a.b, :x, expected, desired)
呼叫。
請參閱手冊中的 每個欄位的原子 章節,以取得更多詳細資料。
範例
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomicreplace a.x 1 => 2 # replace field x of a with 2 if it was 1, with sequential consistency
(old = 1, success = true)
julia> @atomic a.x # fetch field x of a, with sequential consistency
2
julia> @atomicreplace a.x 1 => 2 # replace field x of a with 2 if it was 1, with sequential consistency
(old = 2, success = false)
julia> xchg = 2 => 0; # replace field x of a with 0 if it was 2, with sequential consistency
julia> @atomicreplace a.x xchg
(old = 2, success = true)
julia> @atomic a.x # fetch field x of a, with sequential consistency
0
此功能至少需要 Julia 1.7。
下列 API 相當原始,可能會透過類似 unsafe_*
的包裝器公開。
Core.Intrinsics.atomic_pointerref(pointer::Ptr{T}, order::Symbol) --> T
Core.Intrinsics.atomic_pointerset(pointer::Ptr{T}, new::T, order::Symbol) --> pointer
Core.Intrinsics.atomic_pointerswap(pointer::Ptr{T}, new::T, order::Symbol) --> old
Core.Intrinsics.atomic_pointermodify(pointer::Ptr{T}, function::(old::T,arg::S)->T, arg::S, order::Symbol) --> old
Core.Intrinsics.atomic_pointerreplace(pointer::Ptr{T}, expected::Any, new::T, success_order::Symbol, failure_order::Symbol) --> (old, cmp)
下列 API 已棄用,儘管它們的支援可能會持續數個版本。
Base.Threads.Atomic
— 類型Threads.Atomic{T}
持有對 T
類型的物件的參考,確保只會以原子方式存取它,亦即以執行緒安全的方式。
只有某些「簡單」類型可以使用原子方式,即原始布林、整數和浮點類型。這些類型為 Bool
、Int8
...Int128
、UInt8
...UInt128
和 Float16
...Float64
。
可以從非原子值建立新的原子物件;如果未指定,原子物件會初始化為零。
可以使用 []
符號存取原子物件
範例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> x[] = 1
1
julia> x[]
1
原子操作使用 atomic_
前綴,例如 atomic_add!
、atomic_xchg!
等。
Base.Threads.atomic_cas!
— 函數Threads.atomic_cas!(x::Atomic{T}, cmp::T, newval::T) where T
以原子方式比較並設定 x
以原子方式將 x
中的值與 cmp
進行比較。如果相等,將 newval
寫入 x
。否則,讓 x
保持不變。傳回 x
中的舊值。透過將傳回值與 cmp
進行比較(透過 ===
),可以知道 x
是否已修改,且現在持有新值 newval
。
如需更多詳細資訊,請參閱 LLVM 的 cmpxchg
指令。
此函數可用於實作交易語意。在交易之前,記錄 x
中的值。在交易之後,只有當 x
在這段時間內未被修改時,才會儲存新值。
範例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_cas!(x, 4, 2);
julia> x
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_cas!(x, 3, 2);
julia> x
Base.Threads.Atomic{Int64}(2)
Base.Threads.atomic_xchg!
— 函數Threads.atomic_xchg!(x::Atomic{T}, newval::T) where T
原子交換 x
中的值
以原子方式將 x
中的值與 newval
交換。傳回舊值。
如需進一步詳細資料,請參閱 LLVM 的 atomicrmw xchg
指令。
範例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_xchg!(x, 2)
3
julia> x[]
2
Base.Threads.atomic_add!
— 函數Threads.atomic_add!(x::Atomic{T}, val::T) where T <: ArithmeticTypes
以原子方式將 val
加到 x
以原子方式執行 x[] += val
。傳回舊值。未定義 Atomic{Bool}
。
如需進一步詳細資料,請參閱 LLVM 的 atomicrmw add
指令。
範例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_add!(x, 2)
3
julia> x[]
5
Base.Threads.atomic_sub!
— 函數Threads.atomic_sub!(x::Atomic{T}, val::T) where T <: ArithmeticTypes
以原子方式從 x
減去 val
以原子方式執行 x[] -= val
。傳回舊值。未定義 Atomic{Bool}
。
如需進一步詳細資料,請參閱 LLVM 的 atomicrmw sub
指令。
範例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_sub!(x, 2)
3
julia> x[]
1
Base.Threads.atomic_and!
— 函數Threads.atomic_and!(x::Atomic{T}, val::T) where T
以原子方式將 x
與 val
進行位元與運算
以原子方式執行 x[] &= val
。傳回舊值。
如需進一步詳細資料,請參閱 LLVM 的 atomicrmw and
指令。
範例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_and!(x, 2)
3
julia> x[]
2
Base.Threads.atomic_nand!
— 函數Threads.atomic_nand!(x::Atomic{T}, val::T) where T
以原子方式將 x
與 val
進行位元非與運算 (not-and)
以原子方式執行 x[] = ~(x[] & val)
。傳回舊值。
有關詳細資訊,請參閱 LLVM 的 atomicrmw nand
指令。
範例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_nand!(x, 2)
3
julia> x[]
-3
Base.Threads.atomic_or!
— 函數Threads.atomic_or!(x::Atomic{T}, val::T) where T
以原子方式對 x
進行位元或運算 val
以原子方式執行 x[] |= val
。傳回舊值。
有關詳細資訊,請參閱 LLVM 的 atomicrmw or
指令。
範例
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_or!(x, 7)
5
julia> x[]
7
Base.Threads.atomic_xor!
— 函數Threads.atomic_xor!(x::Atomic{T}, val::T) where T
以原子方式對 x
進行位元異或 (互斥或) 運算 val
以原子方式執行 x[] $= val
。傳回舊值。
有關詳細資訊,請參閱 LLVM 的 atomicrmw xor
指令。
範例
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_xor!(x, 7)
5
julia> x[]
2
Base.Threads.atomic_max!
— 函數Threads.atomic_max!(x::Atomic{T}, val::T) where T
以原子方式將 x
和 val
的最大值儲存在 x
中
以原子方式執行 x[] = max(x[], val)
。傳回舊值。
有關詳細資訊,請參閱 LLVM 的 atomicrmw max
指令。
範例
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_max!(x, 7)
5
julia> x[]
7
Base.Threads.atomic_min!
— 函數Threads.atomic_min!(x::Atomic{T}, val::T) where T
以原子方式將 x
和 val
的最小值儲存在 x
中
以原子方式執行 x[] = min(x[], val)
。傳回舊值。
有關詳細資訊,請參閱 LLVM 的 atomicrmw min
指令。
範例
julia> x = Threads.Atomic{Int}(7)
Base.Threads.Atomic{Int64}(7)
julia> Threads.atomic_min!(x, 5)
7
julia> x[]
5
Base.Threads.atomic_fence
— 函數Threads.atomic_fence()
插入順序一致性記憶體圍欄
插入具有順序一致性排序語意的記憶體圍欄。有些演算法需要這個,例如取得/釋放排序不足夠時。
這很可能是一個非常昂貴的操作。由於 Julia 中的其他所有原子操作都已經具備了獲取/釋放語義,因此在大多數情況下不應需要明確的圍欄。
有關更多詳細資訊,請參閱 LLVM 的fence
指令。
ccall 使用 libuv 執行緒池 (實驗性)
Base.@threadcall
— 巨集@threadcall((cfunc, clib), rettype, (argtypes...), argvals...)
@threadcall
巨集的呼叫方式與 ccall
相同,但會在不同的執行緒中執行工作。當您想要呼叫封鎖式 C 函式,而不會導致目前的 julia
執行緒遭到封鎖時,這會很有用。並行性受到 libuv 執行緒池大小的限制,預設為 4 個執行緒,但可以透過設定 UV_THREADPOOL_SIZE
環境變數並重新啟動 julia
程序來增加。請注意,呼叫的函式不應回呼到 Julia。
請注意,呼叫的函式不應回呼到 Julia。
低階同步原語
這些建構模組用於建立常規同步物件。
Base.Threads.SpinLock
— 類型SpinLock()
建立一個不可重入、測試和測試並設定自旋鎖。遞迴使用將導致死結。這種鎖只應在執行時間短且不會阻塞的程式碼(例如執行 I/O)周圍使用。一般而言,應改用 ReentrantLock
。
每個 lock
都必須與一個 unlock
相匹配。如果 !islocked(lck::SpinLock)
成立,則 trylock(lck)
會成功,除非有其他任務「同時」嘗試持有鎖。
測試和測試並設定自旋鎖在約 30 個競爭執行緒時最快。如果競爭比這更多,則應考慮不同的同步方法。