任務

Core.Task類型
Task(func)

建立一個 Task(即協程)來執行給定的函數 func(必須可以不帶參數呼叫)。當此函數傳回時,任務結束。當 schedule 時,任務將在建構時從父代的「世界年齡」中執行。

警告

預設情況下,任務的 sticky 位元會設定為 true t.sticky。這模擬了 @async 的歷史預設值。Sticky 任務只能在它們首次排程的工作執行緒上執行。若要取得 Threads.@spawn 的行為,請手動將 sticky 位元設定為 false

範例

julia> a() = sum(i for i in 1:1000);

julia> b = Task(a);

在此範例中,b 是可執行 Task,但尚未開始。

原始碼
Base.@task巨集
@task

將一個表達式封裝在 Task 中而不執行它,並傳回 Task。這只會建立一個任務,而不會執行它。

範例

julia> a1() = sum(i for i in 1:1000);

julia> b = @task a1();

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
原始碼
Base.@async巨集
@async

將一個表達式封裝在 Task 中,並將它加入到本地機器排程器的佇列中。

值可以透過 $ 插值到 @async 中,它會將值直接複製到建構的底層封閉中。這允許您插入變數的,將非同步程式碼與變數值在當前任務中的變更隔離。

警告

強烈建議優先使用 Threads.@spawn 而非 @async即使不需要並行處理,尤其是在公開發布的函式庫中。這是因為在 Julia 的當前實作中,使用 @async 會停用在工作執行緒間遷移父代任務。因此,在函式庫函數中看似無害地使用 @async 可能會對使用者應用程式中非常不同的部分的效能產生重大影響。

Julia 1.4

自 Julia 1.4 起,可透過 $ 內插值。

原始碼
Base.asyncmap函數
asyncmap(f, c...; ntasks=0, batch_size=nothing)

使用多個並行工作來對應集合(或多個等長集合)上的 f。對於多個集合參數,f 會逐一元件套用。

ntasks 指定要並行執行的工作數。根據集合長度,如果未指定 ntasks,將使用最多 100 個工作進行並行對應。

ntasks 也可以指定為零元件函數。在此情況下,會在處理每個元件前檢查要並行執行的工作數,如果 ntasks_func 的值大於目前工作數,就會啟動一個新工作。

如果指定 batch_size,集合會以批次模式處理。f 必須是一個函數,必須接受元件組成的 Vector,並傳回結果向量。輸入向量長度會小於或等於 batch_size

以下範例會透過傳回執行對應函數的工作的 objectid,來強調在不同工作中的執行狀況。

首先,如果未定義 ntasks,每個元件會在不同的工作中處理。

julia> tskoid() = objectid(current_task());

julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961

julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5

如果 ntasks=2,所有元件會在 2 個工作中處理。

julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94

julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2

如果定義 batch_size,則對應函數需要變更為接受元件組成的陣列,並傳回結果陣列。已修改的對應函數中使用 map 來達成此目的。

julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)

julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
原始碼
Base.asyncmap!函數
asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

asyncmap 相同,但會將輸出儲存在 results 中,而不是傳回集合。

警告

當任何已變異參數與任何其他參數共用記憶體時,行為可能會出乎意料。

原始碼
Base.istaskdone函數
istaskdone(t::Task) -> Bool

判斷任務是否已結束。

範例

julia> a2() = sum(i for i in 1:1000);

julia> b = Task(a2);

julia> istaskdone(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
原始碼
Base.istaskstarted函數
istaskstarted(t::Task) -> Bool

判斷任務是否已開始執行。

範例

julia> a3() = sum(i for i in 1:1000);

julia> b = Task(a3);

julia> istaskstarted(b)
false
原始碼
Base.istaskfailed函數
istaskfailed(t::Task) -> Bool

判斷任務是否因例外狀況而結束。

範例

julia> a4() = error("task failed");

julia> b = Task(a4);

julia> istaskfailed(b)
false

julia> schedule(b);

julia> yield();

julia> istaskfailed(b)
true
Julia 1.3

此函數至少需要 Julia 1.3。

原始碼
Base.task_local_storage方法
task_local_storage(body, key, value)

使用已修改的任務局部儲存空間呼叫函數 body,其中 value 指定給 key;之後會還原 key 的前一個值(如果有的話)。這對於模擬動態作用域很有用。

原始碼

排程

Base.yield函數
yield()

切換至排程器以允許另一個排程工作執行。呼叫此函式的任務仍然可執行,且如果沒有其他可執行任務,將立即重新啟動。

原始碼
yield(t::Task, arg = nothing)

schedule(t, arg); yield() 的快速、不公平排程版本,它會在呼叫排程器之前立即讓出給 t

原始碼
Base.yieldto函式
yieldto(t::Task, arg = nothing)

切換至指定的任務。第一次切換至任務時,任務的函式會在沒有參數的情況下被呼叫。在後續切換中,arg 會從任務最後一次呼叫 yieldto 的結果中傳回。這是一個低階呼叫,只會切換任務,不會考慮狀態或排程。不建議使用。

原始碼
Base.sleep函式
sleep(seconds)

封鎖目前的任務指定秒數。最短睡眠時間為 1 毫秒或輸入 0.001

原始碼
Base.schedule函式
schedule(t::Task, [val]; error=false)

Task 加入排程器的佇列。這會導致任務在系統閒置時持續執行,除非任務執行封鎖操作,例如 wait

如果提供第二個參數 val,它會在任務再次執行時傳遞給任務(透過 yieldto 的傳回值)。如果 errortrue,值會在喚醒的任務中作為例外狀況引發。

警告

在已經啟動的任意 Task 上使用 schedule 是不正確的。有關更多資訊,請參閱 API 參考

範例

julia> a5() = sum(i for i in 1:1000);

julia> b = Task(a5);

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskstarted(b)
true

julia> istaskdone(b)
true
原始碼

同步

Base.errormonitor函式
errormonitor(t::Task)

如果任務 t 失敗,請將錯誤記錄列印至 stderr

範例

julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
Unhandled Task ERROR: task failed
Stacktrace:
[...]
原始碼
Base.@sync巨集
@sync

等待所有語法上封閉的使用 @async@spawn@spawnat@distributed 完成。封閉的非同步作業所引發的所有例外狀況都會收集並作為 CompositeException 引發。

範例

julia> Threads.nthreads()
4

julia> @sync begin
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
       end;
Thread-id 3, task 1
Thread-id 1, task 2
原始碼
Base.wait函數

針對 Threads.Condition 的特別注意事項

呼叫者必須持有擁有 Threads.Conditionlock,才能呼叫此方法。呼叫工作會被封鎖,直到其他工作喚醒它,通常是透過在同一個 Threads.Condition 物件上呼叫 notify。封鎖時會原子性地釋放鎖定(即使是遞迴鎖定),並會在傳回前重新取得。

原始碼
wait(r::Future)

等待值對指定的 Future 可用。

原始碼
wait(r::RemoteChannel, args...)

等待值對指定的 RemoteChannel 可用。

原始碼
wait([x])

根據引數的類型,封鎖目前的任務,直到發生某些事件

  • Channel:等待值附加到通道。
  • Condition:等待條件上的 notify,並傳回傳遞給 notifyval 參數。等待條件時,還可以傳遞 first=true,這會讓等待者在 notify 上喚醒時排在第一順位,而不是一般的先進先出行為。
  • Process:等待處理程序或處理程序鏈結束。處理程序的 exitcode 欄位可用於判斷成功或失敗。
  • Task:等待 Task 完成。如果任務因例外狀況而失敗,會引發 TaskFailedException(封裝失敗的任務)。
  • RawFD:等待檔案描述符的變更(請參閱 FileWatching 套件)。

如果沒有傳遞引數,任務會封鎖一段未定義的時間。任務只能透過明確呼叫 scheduleyieldto 來重新啟動。

通常 wait 會在 while 迴圈中呼叫,以確保在繼續進行之前已滿足等待的條件。

原始碼
wait(c::Channel)

封鎖直到 Channel isready 為止。

julia> c = Channel(1);

julia> isready(c)
false

julia> task = Task(() -> wait(c));

julia> schedule(task);

julia> istaskdone(task)  # task is blocked because channel is not ready
false

julia> put!(c, 1);

julia> istaskdone(task)  # task is now unblocked
true
原始碼
Base.timedwait函式
timedwait(testcb, timeout::Real; pollint::Real=0.1)

等待直到 testcb() 傳回 true 或經過 timeout 秒,以較早者為準。每隔 pollint 秒輪詢一次測試函式。pollint 的最小值為 0.001 秒,也就是 1 毫秒。

傳回 :ok:timed_out

原始碼
Base.Condition類型
Condition()

建立任務可以等待的邊緣觸發事件來源。在 Condition 上呼叫 wait 的任務會暫停並排隊。當稍後在 Condition 上呼叫 notify 時,任務會被喚醒。邊緣觸發表示只有在呼叫 notify 時正在等待的任務才會被喚醒。對於層級觸發的通知,您必須保留額外的狀態,以追蹤是否已發生通知。ChannelThreads.Event 類型會執行此動作,可用於層級觸發事件。

此物件並非執行緒安全的。請參閱 Threads.Condition 以取得執行緒安全版本。

原始碼
Base.Threads.Condition類型
Threads.Condition([lock])

Base.Condition 的執行緒安全版本。

若要對 Threads.Condition 呼叫 waitnotify,您必須先對其呼叫 lock。當呼叫 wait 時,鎖會在封鎖期間原子釋放,並會在 wait 傳回之前重新取得。因此,Threads.Condition c 的慣用語法如下所示

lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
end
Julia 1.2

此功能至少需要 Julia 1.2。

原始碼
Base.Event類型
Event([autoreset=false])

建立層級觸發事件來源。呼叫 waitEvent 上的任務會暫停並排隊,直到在 Event 上呼叫 notify。在呼叫 notify 之後,Event 會保持在訊號狀態,任務在等待時將不再封鎖,直到呼叫 reset 為止。

如果 autoreset 為 true,則每個對 notify 的呼叫最多會從 wait 釋放一個任務。

這會在 notify/wait 上提供取得和釋放記憶體排序。

Julia 1.1

此功能至少需要 Julia 1.1。

Julia 1.8

autoreset 功能和記憶體排序保證至少需要 Julia 1.8。

原始碼
Base.notify函式
notify(condition, val=nothing; all=true, error=false)

喚醒正在等待條件的任務,傳遞 val 給它們。如果 alltrue (預設值),則所有正在等待的任務都會被喚醒,否則只會喚醒一個。如果 errortrue,則傳遞的值會在喚醒的任務中引發為例外狀況。

傳回喚醒的工作數。如果沒有工作在等待 `condition`,則傳回 0。

原始碼
Base.reset方法
reset(::Event)

Event 重設回未設定狀態。然後,任何未來對 `wait` 的呼叫都會封鎖,直到再次呼叫 notify

原始碼
Base.Semaphore類型
Semaphore(sem_size)

建立一個計數信號量,允許在任何時間使用最多 `sem_size` 個取得。每個取得都必須與一個釋放配對。

這在取得/釋放呼叫上提供取得和釋放記憶體排序。

原始碼
Base.acquire函式
acquire(s::Semaphore)

等待 `sem_size` 個許可之一可用,封鎖直到可以取得一個許可。

原始碼
acquire(f, s::Semaphore)

從信號量 `s` 取得後執行 `f`,並在完成或發生錯誤時 `release`。

例如,一個 do-block 形式,確保在同一時間只會執行 2 個 `foo` 呼叫

s = Base.Semaphore(2)
@sync for _ in 1:100
    Threads.@spawn begin
        Base.acquire(s) do
            foo()
        end
    end
end
Julia 1.8

此方法需要至少 Julia 1.8。

原始碼
Base.release函式
release(s::Semaphore)

將一個許可傳回池中,可能允許另一個工作取得它並繼續執行。

原始碼
Base.lock函式
lock(lock)

lock 可用時取得它。如果 lock 已被不同的工作/執行緒鎖定,請等待它可用。

每個 lock 都必須搭配一個 unlock

原始碼
lock(f::Function, lock)

取得 lock,執行持有的 lockf,並在 f 回傳時釋放 lock。如果 lock 已被不同的工作/執行緒鎖定,請等待它可用。

當此函式回傳時,lock 已被釋放,因此呼叫者不應嘗試 unlock 它。

Julia 1.7

使用 Channel 作為第二個參數需要 Julia 1.7 或更新版本。

原始碼
Base.unlock函式
unlock(lock)

釋放 lock 的擁有權。

如果這是之前已取得的遞迴鎖,請遞減內部計數器並立即回傳。

原始碼
Base.trylock函式
trylock(lock) -> Success (Boolean)

如果鎖可用,請取得鎖,並在成功時回傳 true。如果鎖已由不同的工作/執行緒鎖定,請回傳 false

每個成功的 trylock 都必須搭配一個 unlock

函式 trylock 搭配 islocked 可用於撰寫測試和測試和設定或指數後退演算法如果它受 typeof(lock) 支援(請閱讀其文件)。

原始碼
Base.islocked函式
islocked(lock) -> Status (Boolean)

檢查 lock 是否由任何工作/執行緒持有。此函式本身不應使用於同步。但是,islocked 搭配 trylock 可用於撰寫測試和測試和設定或指數後退演算法如果它受 typeof(lock) 支援(請閱讀其文件)。

延伸說明

例如,如果 lock 實作符合下列文件所述的特性,則可實作指數退避演算法如下。

nspins = 0
while true
    while islocked(lock)
        GC.safepoint()
        nspins += 1
        nspins > LIMIT && error("timeout")
    end
    trylock(lock) && break
    backoff()
end

實作

建議鎖定實作定義 islocked 並具有下列特性,並在文件字串中註明。

  • islocked(lock) 沒有資料競爭。
  • 如果 islocked(lock) 傳回 false,則在沒有其他工作干擾的情況下,立即呼叫 trylock(lock) 一定會成功(傳回 true)。
原始碼
Base.ReentrantLock類型
ReentrantLock()

建立一個可讓 Task 同步的再進入鎖定。同一個工作可以根據需要多次取得鎖定(這就是名稱中「再進入」的部分意義)。每個 lock 都必須搭配一個 unlock

呼叫「lock」也會禁止在該執行緒上執行完成處理常式,直到對應的「unlock」為止。自然應該支援以下所示的標準鎖定模式,但要注意不要顛倒 try/lock 順序或完全遺漏 try 區塊(例如,嘗試在仍然持有鎖定的情況下傳回)

這會在 lock/unlock 呼叫上提供取得/釋放記憶體排序。

lock(l)
try
    <atomic work>
finally
    unlock(l)
end

如果 !islocked(lck::ReentrantLock) 成立,則 trylock(lck) 會成功,除非有其他工作同時嘗試持有鎖定。

原始碼

通道

Base.Channel類型
Channel{T=Any}(size::Int=0)

建構一個 Channel,其內部緩衝區最多可以容納 sizeT 類型的物件。在通道已滿的情況下呼叫 put! 會封鎖,直到物件被 take! 移出。

Channel(0) 建立一個沒有緩衝的通道。put! 會封鎖,直到呼叫相符的 take!。反之亦然。

其他建構函式

  • Channel():預設建構函式,等同於 Channel{Any}(0)
  • Channel(Inf):等同於 Channel{Any}(typemax(Int))
  • Channel(sz):等同於 Channel{Any}(sz)
Julia 1.3

預設建構函式 Channel() 和預設 size=0 已於 Julia 1.3 中新增。

原始碼
Base.Channel方法
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)

func 建立一個新任務,將其繫結到類型為 T、大小為 size 的新通道,並在單一呼叫中排程任務。任務終止時,通道會自動關閉。

func 必須接受繫結的通道作為其唯一引數。

如果你需要建立的任務的參考,請透過關鍵字引數 taskref 傳遞一個 Ref{Task} 物件。

如果 spawn=true,為 func 建立的 Task 可以在平行執行緒上排程,等同於透過 Threads.@spawn 建立任務。

如果 spawn=true 且未設定 threadpool 引數,它會預設為 :default

如果設定了 threadpool 引數(為 :default:interactive),這表示 spawn=true,且新的任務會產生到指定的執行緒池。

傳回一個 Channel

範例

julia> chnl = Channel() do ch
           foreach(i -> put!(ch, i), 1:4)
       end;

julia> typeof(chnl)
Channel{Any}

julia> for i in chnl
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

參照建立的任務

julia> taskref = Ref{Task}();

julia> chnl = Channel(taskref=taskref) do ch
           println(take!(ch))
       end;

julia> istaskdone(taskref[])
false

julia> put!(chnl, "Hello");
Hello

julia> istaskdone(taskref[])
true
Julia 1.3

spawn= 參數已於 Julia 1.3 中新增。此建構函式已於 Julia 1.3 中新增。在 Julia 的早期版本中,Channel 使用關鍵字引數來設定 sizeT,但這些建構函式已不建議使用。

Julia 1.9

threadpool= 引數已於 Julia 1.9 中新增。

julia> chnl = Channel{Char}(1, spawn=true) do ch
           for c in "hello world"
               put!(ch, c)
           end
       end
Channel{Char}(1) (2 items available)

julia> String(collect(chnl))
"hello world"
原始碼
Base.put!方法
put!(c::Channel, v)

將項目 v 附加到通道 c。如果通道已滿,則會封鎖。

對於未緩衝的通道,會封鎖直到不同任務執行 take!

Julia 1.1

呼叫 put! 時,v 會使用 convert 轉換為通道類型。

原始碼
Base.take!方法
take!(c::Channel)

依序從 Channel 中移除並傳回一個值。會封鎖直到資料可用。對於未緩衝的通道,會封鎖直到不同任務執行 put!

範例

緩衝通道

julia> c = Channel(1);

julia> put!(c, 1);

julia> take!(c)
1

未緩衝通道

julia> c = Channel(0);

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);

julia> take!(c)
1
原始碼
Base.isready方法
isready(c::Channel)

判斷 Channel 中是否儲存有值。會立即傳回,不會封鎖。

對於未緩衝的通道,如果任務正在等待 put!,則傳回 true

範例

緩衝通道

julia> c = Channel(1);

julia> isready(c)
false

julia> put!(c, 1);

julia> isready(c)
true

未緩衝通道

julia> c = Channel();

julia> isready(c)  # no tasks waiting to put!
false

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);  # schedule a put! task

julia> isready(c)
true
原始碼
Base.fetch方法
fetch(c::Channel)

等待並傳回 (不移除) Channel 中第一個可用的項目。注意:fetch 不支援未緩衝 (大小為 0) 的 Channel

範例

緩衝通道

julia> c = Channel(3) do ch
           foreach(i -> put!(ch, i), 1:3)
       end;

julia> fetch(c)
1

julia> collect(c)  # item is not removed
3-element Vector{Any}:
 1
 2
 3
原始碼
Base.close方法
close(c::Channel[, excp::Exception])

關閉通道。例外狀況 (可選擇由 excp 提供),會由下列項目擲出:

  • put! 在封閉的通道上。
  • take!fetch 在一個空的、封閉的通道上。
原始碼
Base.bind方法
bind(chnl::Channel, task::Task)

chnl 的生命週期與一個任務關聯起來。當任務終止時,Channel chnl 會自動關閉。任務中任何未捕獲的例外都會傳播到 chnl 上的所有等待者。

chnl 物件可以獨立於任務終止而明確關閉。終止任務對已經關閉的 Channel 物件沒有影響。

當一個通道與多個任務綁定時,第一個終止的任務將關閉通道。當多個通道綁定到同一個任務時,任務的終止將關閉所有綁定的通道。

範例

julia> c = Channel(0);

julia> task = @async foreach(i->put!(c, i), 1:4);

julia> bind(c,task);

julia> for i in c
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

julia> isopen(c)
false
julia> c = Channel(0);

julia> task = @async (put!(c, 1); error("foo"));

julia> bind(c, task);

julia> take!(c)
1

julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
    nested task error: foo
[...]
原始碼

使用 schedulewait 的低階同步

最容易正確使用 schedule 是在尚未啟動(排程)的 Task 上。不過,可以將 schedulewait 當作建構同步介面的極低階建構區塊來使用。呼叫 schedule(task) 的關鍵前置條件是呼叫者必須「擁有」task;也就是說,呼叫者必須知道給定 task 中對 wait 的呼叫會在呼叫 schedule(task) 的程式碼所知道的位置發生。確保此類前置條件的一種策略是使用原子,如下列範例所示

@enum OWEState begin
    OWE_EMPTY
    OWE_WAITING
    OWE_NOTIFYING
end

mutable struct OneWayEvent
    @atomic state::OWEState
    task::Task
    OneWayEvent() = new(OWE_EMPTY)
end

function Base.notify(ev::OneWayEvent)
    state = @atomic ev.state
    while state !== OWE_NOTIFYING
        # Spin until we successfully update the state to OWE_NOTIFYING:
        state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
        if ok
            if state == OWE_WAITING
                # OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
                # already waiting or about to call `wait`. The notifier task must wake up
                # the waiter task.
                schedule(ev.task)
            else
                @assert state == OWE_EMPTY
                # Since we are assuming that there is only one notifier task (for
                # simplicity), we know that the other possible case here is OWE_EMPTY.
                # We do not need to do anything because we know that the waiter task has
                # not called `wait(ev::OneWayEvent)` yet.
            end
            break
        end
    end
    return
end

function Base.wait(ev::OneWayEvent)
    ev.task = current_task()
    state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
    if ok
        # OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
        # invoke OWE_WAITING -> OWE_NOTIFYING transition.  The waiter task must call
        # `wait()` immediately.  In particular, it MUST NOT invoke any function that may
        # yield to the scheduler at this point in code.
        wait()
    else
        @assert state == OWE_NOTIFYING
        # Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
        # notifier task.
    end
    return
end

ev = OneWayEvent()
@sync begin
    @async begin
        wait(ev)
        println("done")
    end
    println("notifying...")
    notify(ev)
end

# output
notifying...
done

OneWayEvent 讓一個工作可以 wait 另一個工作的 notify。由於 wait 只可以從單一工作使用一次,因此它是一種受限的通訊介面(請注意 ev.task 的非原子指派)

在此範例中,notify(ev::OneWayEvent) 只有在 將狀態從 OWE_WAITING 修改為 OWE_NOTIFYING 時才能呼叫 schedule(ev.task)。這讓我們知道執行 wait(ev::OneWayEvent) 的工作現在位於 ok 分支中,而且不會有其他工作嘗試 schedule(ev.task),因為它們的 @atomicreplace(ev.state, state => OWE_NOTIFYING) 會失敗。