多執行緒

Base.Threads.@threads巨集
Threads.@threads [schedule] for ... end

一個巨集用於平行執行 for 迴圈。迭代空間會分配給粗粒度的任務。這個政策可以用 schedule 參數指定。迴圈的執行會等待所有迭代的評估。

另請參閱:@spawnDistributed 中的 pmap

延伸說明

語意

除非排程選項指定了更強的保證,否則由 @threads 巨集執行的迴圈具有下列語意。

@threads 巨集會以未指定的順序執行迴圈主體,並有可能同時執行。它不會指定任務和工作執行緒的確切分配。分配可能會因每次執行而異。迴圈主體程式碼(包括從中暫時呼叫的任何程式碼)不得對任務的迭代分配或執行它們的工作執行緒做出任何假設。每個迭代的迴圈主體必須能夠獨立於其他迭代進行前進,並且沒有資料競爭。因此,跨迭代的無效同步可能會導致死結,而未同步的記憶體存取可能會導致未定義的行為。

例如,上述條件表示

  • 在迭代中取得的鎖定必須在同一個迭代中釋放。
  • 使用像 Channel 之類的封鎖原語在迭代之間進行通訊是不正確的。
  • 僅寫入未跨迭代共用的位置(除非使用鎖定或原子操作)。
  • 除非使用 :static 排程,否則 threadid() 的值甚至可能在單一迭代中變更。請參閱 任務遷移

排程器

在沒有排程器引數的情況下,確切的排程是未指定的,並且會因 Julia 版本而異。目前,在未指定排程器時會使用 :dynamic

Julia 1.5

schedule 引數在 Julia 1.5 中可用。

:dynamic(預設)

:dynamic 排程器會動態執行迭代至可用的工作執行緒。目前的實作假設每個迭代的工作負載是一致的。但是,此假設可能會在未來移除。

此排程選項僅為底層執行機制的提示。不過,可以預期一些屬性。:dynamic 排程器所使用的 Task 數量會受到可用工作執行緒數量的常數倍數限制 (Threads.threadpoolsize())。每個工作處理連續的迭代空間區域。因此,如果 length(xs) 遠大於工作執行緒數量,且 f(x) 的執行時間相對小於產生和同步工作所需的成本(通常小於 10 微秒),則 @threads :dynamic for x in xs; f(x); end 通常比 @sync for x in xs; @spawn f(x); end 更有效率。

Julia 1.8

:dynamic 選項可用於 schedule 參數,且為 Julia 1.8 的預設值。

:static

:static 排程器會為每個執行緒建立一個工作,並在它們之間平均分配迭代,將每個工作特別指定給每個執行緒。特別是,threadid() 的值保證在一個迭代中為常數。如果從另一個 @threads 迴圈內部或從 1 以外的執行緒使用,指定 :static 會產生錯誤。

注意

:static 排程存在於支援在 Julia 1.3 之前編寫的程式碼轉換。在新的函式庫函數中,不建議使用 :static 排程,因為使用此選項的函數無法從任意工作執行緒呼叫。

範例

為了說明不同的排程策略,請考慮下列包含非讓步計時迴圈的函數 busywait,該迴圈會執行給定的秒數。

julia> function busywait(seconds)
            tstart = time_ns()
            while (time_ns() - tstart) / 1e9 < seconds
            end
        end

julia> @time begin
            Threads.@spawn busywait(5)
            Threads.@threads :static for i in 1:Threads.threadpoolsize()
                busywait(1)
            end
        end
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)

julia> @time begin
            Threads.@spawn busywait(5)
            Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
                busywait(1)
            end
        end
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)

:dynamic 範例花費 2 秒,因為其中一個未佔用的執行緒能夠執行兩個 1 秒的迭代來完成 for 迴圈。

來源
Base.Threads.foreach函數
Threads.foreach(f, channel::Channel;
                schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
                ntasks=Threads.threadpoolsize())

類似於 foreach(f, channel),但會在 Threads.@spawn 產生的 ntasks 個工作中分割 channel 的迭代和對 f 的呼叫。此函數會等到所有內部產生的工作完成後才會傳回。

如果 schedule isa FairScheduleThreads.foreach 會嘗試以一種方式產生工作,讓 Julia 的排程器能夠更自由地在執行緒之間負載平衡工作項目。此方法通常具有較高的每個項目開銷,但在與其他多執行緒工作負載並行時,可能會比 StaticSchedule 表現得更好。

如果 schedule isa StaticScheduleThreads.foreach 會以一種方式產生工作,讓每個項目的開銷比 FairSchedule 低,但較不適合負載平衡。因此,此方法可能更適合細緻、一致的工作負載,但在與其他多執行緒工作負載並行時,可能會比 FairSchedule 表現得更差。

範例

julia> n = 20

julia> c = Channel{Int}(ch -> foreach(i -> put!(ch, i), 1:n), 1)

julia> d = Channel{Int}(n) do ch
           f = i -> put!(ch, i^2)
           Threads.foreach(f, c)
       end

julia> collect(d)
collect(d) = [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
Julia 1.6

此函數需要 Julia 1.6 或更新版本。

來源
Base.Threads.@spawn巨集
Threads.@spawn [:default|:interactive] expr

建立一個 工作排程 它在指定的執行緒池中任何可用的執行緒上執行(未指定時為 :default)。一旦有一個執行緒可用,工作就會分配給該執行緒。若要等到工作完成,請對此巨集的結果呼叫 wait,或呼叫 fetch 來等待並取得其傳回值。

值可透過 $ 內插到 @spawn,這會將值直接複製到建構中的底層閉包中。這允許您插入變數的,將非同步程式碼與目前工作中變數值的變更隔離。

注意

如果工作讓步,工作執行的執行緒可能會變更,因此不應將 threadid() 視為工作中的常數。請參閱 工作移轉,以及更廣泛的 多執行緒 手冊,以進一步了解重要的注意事項。另請參閱 執行緒池 章節。

Julia 1.3

此巨集自 Julia 1.3 起可用。

Julia 1.4

自 Julia 1.4 起,可透過 $ 內插值。

Julia 1.9

自 Julia 1.9 起,可指定執行緒池。

範例

julia> t() = println("Hello from ", Threads.threadid());

julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
Hello from 1
Hello from 1
Hello from 3
Hello from 4
來源
Base.Threads.threadid函數
Threads.threadid() -> Int

取得目前執行緒的 ID 號碼。主執行緒的 ID 為 1

範例

julia> Threads.threadid()
1

julia> Threads.@threads for i in 1:4
          println(Threads.threadid())
       end
4
2
5
4
注意

如果工作讓步,工作執行的執行緒可能會變更,這稱為 工作移轉。因此,在大部分情況下,使用 threadid() 來索引緩衝區或有狀態物件的向量是不安全的。

來源
Base.Threads.maxthreadid函數
Threads.maxthreadid() -> Int

取得 Julia 程序可用的執行緒數目 (跨所有執行緒池) 的下限,具有原子取得語意。結果將永遠大於或等於 threadid(),以及在呼叫 maxthreadid 之前您能觀察到的任何工作的 threadid(task)

來源
Base.Threads.nthreads函數
Threads.nthreads(:default | :interactive) -> Int

取得指定執行緒池內目前執行緒的數量。:interactive 中的執行緒有編號 1:nthreads(:interactive),而 :default 中的執行緒有編號 nthreads(:interactive) .+ (1:nthreads(:default))

另請參閱 LinearAlgebra 標準函式庫中的 BLAS.get_num_threadsBLAS.set_num_threads,以及 Distributed 標準函式庫中的 nprocs()Threads.maxthreadid()

來源
Base.Threads.threadpool函數
Threads.threadpool(tid = threadid()) -> Symbol

傳回指定的執行緒的執行緒池;可能是 :default:interactive:foreign

來源
Base.Threads.threadpoolsize函數
Threads.threadpoolsize(pool::Symbol = :default) -> Int

取得預設執行緒池(或指定的執行緒池)可用的執行緒數量。

另請參閱:LinearAlgebra 標準函式庫中的 BLAS.get_num_threadsBLAS.set_num_threads,以及 Distributed 標準函式庫中的 nprocs()

來源
Base.Threads.ngcthreads函數
Threads.ngcthreads() -> Int

傳回目前設定的 GC 執行緒數量。這包含標記執行緒和同時掃描執行緒。

來源

另請參閱 多執行緒

原子操作

Base.@atomic巨集
@atomic var
@atomic order ex

varex 標記為以原子方式執行,如果 ex 是受支援的表達式。如果未指定 order,則預設為 :sequentially_consistent。

@atomic a.b.x = new
@atomic a.b.x += addend
@atomic :release a.b.x = new
@atomic :acquire_release a.b.x += addend

以原子方式執行在右側表示的儲存操作,並傳回新的值。

使用 =,此操作會轉換為 setproperty!(a.b, :x, new) 呼叫。使用任何運算子,此操作也會轉換為 modifyproperty!(a.b, :x, +, addend)[2] 呼叫。

@atomic a.b.x max arg2
@atomic a.b.x + arg2
@atomic max(a.b.x, arg2)
@atomic :acquire_release max(a.b.x, arg2)
@atomic :acquire_release a.b.x + arg2
@atomic :acquire_release a.b.x max arg2

以原子方式執行在右側表示的二元操作。將結果儲存到第一個引數中的欄位,並傳回值 (old, new)

此操作會轉換為 modifyproperty!(a.b, :x, func, arg2) 呼叫。

請參閱手冊中的 每個欄位的原子 章節,以取得更多詳細資料。

範例

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomic a.x # fetch field x of a, with sequential consistency
1

julia> @atomic :sequentially_consistent a.x = 2 # set field x of a, with sequential consistency
2

julia> @atomic a.x += 1 # increment field x of a, with sequential consistency
3

julia> @atomic a.x + 1 # increment field x of a, with sequential consistency
3 => 4

julia> @atomic a.x # fetch field x of a, with sequential consistency
4

julia> @atomic max(a.x, 10) # change field x of a to the max value, with sequential consistency
4 => 10

julia> @atomic a.x max 5 # again change field x of a to the max value, with sequential consistency
10 => 10
Julia 1.7

此功能至少需要 Julia 1.7。

來源
Base.@atomicswap巨集
@atomicswap a.b.x = new
@atomicswap :sequentially_consistent a.b.x = new

new 儲存到 a.b.x,並傳回 a.b.x 的舊值。

此操作會轉換為 swapproperty!(a.b, :x, new) 呼叫。

請參閱手冊中的 每個欄位的原子 章節,以取得更多詳細資料。

範例

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomicswap a.x = 2+2 # replace field x of a with 4, with sequential consistency
1

julia> @atomic a.x # fetch field x of a, with sequential consistency
4
Julia 1.7

此功能至少需要 Julia 1.7。

來源
Base.@atomicreplace巨集
@atomicreplace a.b.x expected => desired
@atomicreplace :sequentially_consistent a.b.x expected => desired
@atomicreplace :sequentially_consistent :monotonic a.b.x expected => desired

以原子方式執行由這對表示的條件式替換,傳回值 (old, success::Bool)。其中 success 指出是否已完成替換。

此操作會轉換為 replaceproperty!(a.b, :x, expected, desired) 呼叫。

請參閱手冊中的 每個欄位的原子 章節,以取得更多詳細資料。

範例

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomicreplace a.x 1 => 2 # replace field x of a with 2 if it was 1, with sequential consistency
(old = 1, success = true)

julia> @atomic a.x # fetch field x of a, with sequential consistency
2

julia> @atomicreplace a.x 1 => 2 # replace field x of a with 2 if it was 1, with sequential consistency
(old = 2, success = false)

julia> xchg = 2 => 0; # replace field x of a with 0 if it was 2, with sequential consistency

julia> @atomicreplace a.x xchg
(old = 2, success = true)

julia> @atomic a.x # fetch field x of a, with sequential consistency
0
Julia 1.7

此功能至少需要 Julia 1.7。

來源
注意

下列 API 相當原始,可能會透過類似 unsafe_* 的包裝器公開。

Core.Intrinsics.atomic_pointerref(pointer::Ptr{T}, order::Symbol) --> T
Core.Intrinsics.atomic_pointerset(pointer::Ptr{T}, new::T, order::Symbol) --> pointer
Core.Intrinsics.atomic_pointerswap(pointer::Ptr{T}, new::T, order::Symbol) --> old
Core.Intrinsics.atomic_pointermodify(pointer::Ptr{T}, function::(old::T,arg::S)->T, arg::S, order::Symbol) --> old
Core.Intrinsics.atomic_pointerreplace(pointer::Ptr{T}, expected::Any, new::T, success_order::Symbol, failure_order::Symbol) --> (old, cmp)
警告

下列 API 已棄用,儘管它們的支援可能會持續數個版本。

Base.Threads.Atomic類型
Threads.Atomic{T}

持有對 T 類型的物件的參考,確保只會以原子方式存取它,亦即以執行緒安全的方式。

只有某些「簡單」類型可以使用原子方式,即原始布林、整數和浮點類型。這些類型為 BoolInt8...Int128UInt8...UInt128Float16...Float64

可以從非原子值建立新的原子物件;如果未指定,原子物件會初始化為零。

可以使用 [] 符號存取原子物件

範例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> x[] = 1
1

julia> x[]
1

原子操作使用 atomic_ 前綴,例如 atomic_add!atomic_xchg! 等。

來源
Base.Threads.atomic_cas!函數
Threads.atomic_cas!(x::Atomic{T}, cmp::T, newval::T) where T

以原子方式比較並設定 x

以原子方式將 x 中的值與 cmp 進行比較。如果相等,將 newval 寫入 x。否則,讓 x 保持不變。傳回 x 中的舊值。透過將傳回值與 cmp 進行比較(透過 ===),可以知道 x 是否已修改,且現在持有新值 newval

如需更多詳細資訊,請參閱 LLVM 的 cmpxchg 指令。

此函數可用於實作交易語意。在交易之前,記錄 x 中的值。在交易之後,只有當 x 在這段時間內未被修改時,才會儲存新值。

範例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_cas!(x, 4, 2);

julia> x
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_cas!(x, 3, 2);

julia> x
Base.Threads.Atomic{Int64}(2)
來源
Base.Threads.atomic_xchg!函數
Threads.atomic_xchg!(x::Atomic{T}, newval::T) where T

原子交換 x 中的值

以原子方式將 x 中的值與 newval 交換。傳回值。

如需進一步詳細資料,請參閱 LLVM 的 atomicrmw xchg 指令。

範例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_xchg!(x, 2)
3

julia> x[]
2
來源
Base.Threads.atomic_add!函數
Threads.atomic_add!(x::Atomic{T}, val::T) where T <: ArithmeticTypes

以原子方式將 val 加到 x

以原子方式執行 x[] += val。傳回值。未定義 Atomic{Bool}

如需進一步詳細資料,請參閱 LLVM 的 atomicrmw add 指令。

範例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_add!(x, 2)
3

julia> x[]
5
來源
Base.Threads.atomic_sub!函數
Threads.atomic_sub!(x::Atomic{T}, val::T) where T <: ArithmeticTypes

以原子方式從 x 減去 val

以原子方式執行 x[] -= val。傳回值。未定義 Atomic{Bool}

如需進一步詳細資料,請參閱 LLVM 的 atomicrmw sub 指令。

範例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_sub!(x, 2)
3

julia> x[]
1
來源
Base.Threads.atomic_and!函數
Threads.atomic_and!(x::Atomic{T}, val::T) where T

以原子方式將 xval 進行位元與運算

以原子方式執行 x[] &= val。傳回值。

如需進一步詳細資料,請參閱 LLVM 的 atomicrmw and 指令。

範例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_and!(x, 2)
3

julia> x[]
2
來源
Base.Threads.atomic_nand!函數
Threads.atomic_nand!(x::Atomic{T}, val::T) where T

以原子方式將 xval 進行位元非與運算 (not-and)

以原子方式執行 x[] = ~(x[] & val)。傳回值。

有關詳細資訊,請參閱 LLVM 的 atomicrmw nand 指令。

範例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_nand!(x, 2)
3

julia> x[]
-3
來源
Base.Threads.atomic_or!函數
Threads.atomic_or!(x::Atomic{T}, val::T) where T

以原子方式對 x 進行位元或運算 val

以原子方式執行 x[] |= val。傳回值。

有關詳細資訊,請參閱 LLVM 的 atomicrmw or 指令。

範例

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_or!(x, 7)
5

julia> x[]
7
來源
Base.Threads.atomic_xor!函數
Threads.atomic_xor!(x::Atomic{T}, val::T) where T

以原子方式對 x 進行位元異或 (互斥或) 運算 val

以原子方式執行 x[] $= val。傳回值。

有關詳細資訊,請參閱 LLVM 的 atomicrmw xor 指令。

範例

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_xor!(x, 7)
5

julia> x[]
2
來源
Base.Threads.atomic_max!函數
Threads.atomic_max!(x::Atomic{T}, val::T) where T

以原子方式將 xval 的最大值儲存在 x

以原子方式執行 x[] = max(x[], val)。傳回值。

有關詳細資訊,請參閱 LLVM 的 atomicrmw max 指令。

範例

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_max!(x, 7)
5

julia> x[]
7
來源
Base.Threads.atomic_min!函數
Threads.atomic_min!(x::Atomic{T}, val::T) where T

以原子方式將 xval 的最小值儲存在 x

以原子方式執行 x[] = min(x[], val)。傳回值。

有關詳細資訊,請參閱 LLVM 的 atomicrmw min 指令。

範例

julia> x = Threads.Atomic{Int}(7)
Base.Threads.Atomic{Int64}(7)

julia> Threads.atomic_min!(x, 5)
7

julia> x[]
5
來源
Base.Threads.atomic_fence函數
Threads.atomic_fence()

插入順序一致性記憶體圍欄

插入具有順序一致性排序語意的記憶體圍欄。有些演算法需要這個,例如取得/釋放排序不足夠時。

這很可能是一個非常昂貴的操作。由於 Julia 中的其他所有原子操作都已經具備了獲取/釋放語義,因此在大多數情況下不應需要明確的圍欄。

有關更多詳細資訊,請參閱 LLVM 的fence指令。

來源

ccall 使用 libuv 執行緒池 (實驗性)

Base.@threadcall巨集
@threadcall((cfunc, clib), rettype, (argtypes...), argvals...)

@threadcall 巨集的呼叫方式與 ccall 相同,但會在不同的執行緒中執行工作。當您想要呼叫封鎖式 C 函式,而不會導致目前的 julia 執行緒遭到封鎖時,這會很有用。並行性受到 libuv 執行緒池大小的限制,預設為 4 個執行緒,但可以透過設定 UV_THREADPOOL_SIZE 環境變數並重新啟動 julia 程序來增加。請注意,呼叫的函式不應回呼到 Julia。

請注意,呼叫的函式不應回呼到 Julia。

來源

低階同步原語

這些建構模組用於建立常規同步物件。

Base.Threads.SpinLock類型
SpinLock()

建立一個不可重入、測試和測試並設定自旋鎖。遞迴使用將導致死結。這種鎖只應在執行時間短且不會阻塞的程式碼(例如執行 I/O)周圍使用。一般而言,應改用 ReentrantLock

每個 lock 都必須與一個 unlock 相匹配。如果 !islocked(lck::SpinLock) 成立,則 trylock(lck) 會成功,除非有其他任務「同時」嘗試持有鎖。

測試和測試並設定自旋鎖在約 30 個競爭執行緒時最快。如果競爭比這更多,則應考慮不同的同步方法。

來源