任務
Core.Task
— 類型Task(func)
建立一個 Task
(即協程)來執行給定的函數 func
(必須可以不帶參數呼叫)。當此函數傳回時,任務結束。當 schedule
時,任務將在建構時從父代的「世界年齡」中執行。
預設情況下,任務的 sticky 位元會設定為 true t.sticky
。這模擬了 @async
的歷史預設值。Sticky 任務只能在它們首次排程的工作執行緒上執行。若要取得 Threads.@spawn
的行為,請手動將 sticky 位元設定為 false
。
範例
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);
在此範例中,b
是可執行 Task
,但尚未開始。
Base.@task
— 巨集@task
將一個表達式封裝在 Task
中而不執行它,並傳回 Task
。這只會建立一個任務,而不會執行它。
範例
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.@async
— 巨集@async
將一個表達式封裝在 Task
中,並將它加入到本地機器排程器的佇列中。
值可以透過 $
插值到 @async
中,它會將值直接複製到建構的底層封閉中。這允許您插入變數的值,將非同步程式碼與變數值在當前任務中的變更隔離。
強烈建議優先使用 Threads.@spawn
而非 @async
,即使不需要並行處理,尤其是在公開發布的函式庫中。這是因為在 Julia 的當前實作中,使用 @async
會停用在工作執行緒間遷移父代任務。因此,在函式庫函數中看似無害地使用 @async
可能會對使用者應用程式中非常不同的部分的效能產生重大影響。
自 Julia 1.4 起,可透過 $
內插值。
Base.asyncmap
— 函數asyncmap(f, c...; ntasks=0, batch_size=nothing)
使用多個並行工作來對應集合(或多個等長集合)上的 f
。對於多個集合參數,f
會逐一元件套用。
ntasks
指定要並行執行的工作數。根據集合長度,如果未指定 ntasks
,將使用最多 100 個工作進行並行對應。
ntasks
也可以指定為零元件函數。在此情況下,會在處理每個元件前檢查要並行執行的工作數,如果 ntasks_func
的值大於目前工作數,就會啟動一個新工作。
如果指定 batch_size
,集合會以批次模式處理。f
必須是一個函數,必須接受元件組成的 Vector
,並傳回結果向量。輸入向量長度會小於或等於 batch_size
。
以下範例會透過傳回執行對應函數的工作的 objectid
,來強調在不同工作中的執行狀況。
首先,如果未定義 ntasks
,每個元件會在不同的工作中處理。
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
0x6e15e66c75c75853
0x440f8819a1baa682
0x9fb3eeadd0c83985
0xebd3e35fe90d4050
0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5
如果 ntasks=2
,所有元件會在 2 個工作中處理。
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2
如果定義 batch_size
,則對應函數需要變更為接受元件組成的陣列,並傳回結果陣列。已修改的對應函數中使用 map
來達成此目的。
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
"args_tuple: (1,), element_val: 1, task: 9118321258196414413"
"args_tuple: (2,), element_val: 2, task: 4904288162898683522"
"args_tuple: (3,), element_val: 3, task: 9118321258196414413"
"args_tuple: (4,), element_val: 4, task: 4904288162898683522"
"args_tuple: (5,), element_val: 5, task: 9118321258196414413"
Base.asyncmap!
— 函數asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)
與 asyncmap
相同,但會將輸出儲存在 results
中,而不是傳回集合。
當任何已變異參數與任何其他參數共用記憶體時,行為可能會出乎意料。
Base.current_task
— 函數current_task()
取得目前正在執行的 Task
。
Base.istaskdone
— 函數istaskdone(t::Task) -> Bool
判斷任務是否已結束。
範例
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.istaskstarted
— 函數istaskstarted(t::Task) -> Bool
判斷任務是否已開始執行。
範例
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
Base.istaskfailed
— 函數istaskfailed(t::Task) -> Bool
判斷任務是否因例外狀況而結束。
範例
julia> a4() = error("task failed");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
此函數至少需要 Julia 1.3。
Base.task_local_storage
— 方法task_local_storage(key)
查詢目前任務的任務局部儲存空間中某個鍵的值。
Base.task_local_storage
— 方法task_local_storage(key, value)
將值指定給目前任務的任務局部儲存空間中某個鍵。
Base.task_local_storage
— 方法task_local_storage(body, key, value)
使用已修改的任務局部儲存空間呼叫函數 body
,其中 value
指定給 key
;之後會還原 key
的前一個值(如果有的話)。這對於模擬動態作用域很有用。
排程
Base.yield
— 函數yield()
切換至排程器以允許另一個排程工作執行。呼叫此函式的任務仍然可執行,且如果沒有其他可執行任務,將立即重新啟動。
yield(t::Task, arg = nothing)
schedule(t, arg); yield()
的快速、不公平排程版本,它會在呼叫排程器之前立即讓出給 t
。
Base.yieldto
— 函式yieldto(t::Task, arg = nothing)
切換至指定的任務。第一次切換至任務時,任務的函式會在沒有參數的情況下被呼叫。在後續切換中,arg
會從任務最後一次呼叫 yieldto
的結果中傳回。這是一個低階呼叫,只會切換任務,不會考慮狀態或排程。不建議使用。
Base.sleep
— 函式sleep(seconds)
封鎖目前的任務指定秒數。最短睡眠時間為 1 毫秒或輸入 0.001
。
Base.schedule
— 函式schedule(t::Task, [val]; error=false)
將 Task
加入排程器的佇列。這會導致任務在系統閒置時持續執行,除非任務執行封鎖操作,例如 wait
。
如果提供第二個參數 val
,它會在任務再次執行時傳遞給任務(透過 yieldto
的傳回值)。如果 error
為 true
,值會在喚醒的任務中作為例外狀況引發。
在已經啟動的任意 Task
上使用 schedule
是不正確的。有關更多資訊,請參閱 API 參考。
範例
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
同步
Base.errormonitor
— 函式errormonitor(t::Task)
如果任務 t
失敗,請將錯誤記錄列印至 stderr
。
範例
julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
Unhandled Task ERROR: task failed
Stacktrace:
[...]
Base.@sync
— 巨集@sync
等待所有語法上封閉的使用 @async
、@spawn
、@spawnat
和 @distributed
完成。封閉的非同步作業所引發的所有例外狀況都會收集並作為 CompositeException
引發。
範例
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2
Base.wait
— 函數針對 Threads.Condition
的特別注意事項
呼叫者必須持有擁有 Threads.Condition
的 lock
,才能呼叫此方法。呼叫工作會被封鎖,直到其他工作喚醒它,通常是透過在同一個 Threads.Condition
物件上呼叫 notify
。封鎖時會原子性地釋放鎖定(即使是遞迴鎖定),並會在傳回前重新取得。
wait(r::Future)
等待值對指定的 Future
可用。
wait(r::RemoteChannel, args...)
等待值對指定的 RemoteChannel
可用。
wait([x])
根據引數的類型,封鎖目前的任務,直到發生某些事件
Channel
:等待值附加到通道。Condition
:等待條件上的notify
,並傳回傳遞給notify
的val
參數。等待條件時,還可以傳遞first=true
,這會讓等待者在notify
上喚醒時排在第一順位,而不是一般的先進先出行為。Process
:等待處理程序或處理程序鏈結束。處理程序的exitcode
欄位可用於判斷成功或失敗。Task
:等待Task
完成。如果任務因例外狀況而失敗,會引發TaskFailedException
(封裝失敗的任務)。RawFD
:等待檔案描述符的變更(請參閱FileWatching
套件)。
如果沒有傳遞引數,任務會封鎖一段未定義的時間。任務只能透過明確呼叫 schedule
或 yieldto
來重新啟動。
通常 wait
會在 while
迴圈中呼叫,以確保在繼續進行之前已滿足等待的條件。
wait(c::Channel)
封鎖直到 Channel
isready
為止。
julia> c = Channel(1);
julia> isready(c)
false
julia> task = Task(() -> wait(c));
julia> schedule(task);
julia> istaskdone(task) # task is blocked because channel is not ready
false
julia> put!(c, 1);
julia> istaskdone(task) # task is now unblocked
true
Base.fetch
— 方法fetch(t::Task)
等待 Task
完成,然後傳回其結果值。如果任務因例外狀況而失敗,則會擲回 TaskFailedException
(封裝失敗的任務)。
Base.fetch
— 方法fetch(x::Any)
傳回 x
。
Base.timedwait
— 函式timedwait(testcb, timeout::Real; pollint::Real=0.1)
等待直到 testcb()
傳回 true
或經過 timeout
秒,以較早者為準。每隔 pollint
秒輪詢一次測試函式。pollint
的最小值為 0.001 秒,也就是 1 毫秒。
傳回 :ok
或 :timed_out
。
Base.Condition
— 類型Condition()
建立任務可以等待的邊緣觸發事件來源。在 Condition
上呼叫 wait
的任務會暫停並排隊。當稍後在 Condition
上呼叫 notify
時,任務會被喚醒。邊緣觸發表示只有在呼叫 notify
時正在等待的任務才會被喚醒。對於層級觸發的通知,您必須保留額外的狀態,以追蹤是否已發生通知。Channel
和 Threads.Event
類型會執行此動作,可用於層級觸發事件。
此物件並非執行緒安全的。請參閱 Threads.Condition
以取得執行緒安全版本。
Base.Threads.Condition
— 類型Threads.Condition([lock])
Base.Condition
的執行緒安全版本。
若要對 Threads.Condition
呼叫 wait
或 notify
,您必須先對其呼叫 lock
。當呼叫 wait
時,鎖會在封鎖期間原子釋放,並會在 wait
傳回之前重新取得。因此,Threads.Condition
c
的慣用語法如下所示
lock(c)
try
while !thing_we_are_waiting_for
wait(c)
end
finally
unlock(c)
end
此功能至少需要 Julia 1.2。
Base.Event
— 類型Event([autoreset=false])
建立層級觸發事件來源。呼叫 wait
在 Event
上的任務會暫停並排隊,直到在 Event
上呼叫 notify
。在呼叫 notify
之後,Event
會保持在訊號狀態,任務在等待時將不再封鎖,直到呼叫 reset
為止。
如果 autoreset
為 true,則每個對 notify
的呼叫最多會從 wait
釋放一個任務。
這會在 notify/wait 上提供取得和釋放記憶體排序。
此功能至少需要 Julia 1.1。
autoreset
功能和記憶體排序保證至少需要 Julia 1.8。
Base.notify
— 函式notify(condition, val=nothing; all=true, error=false)
喚醒正在等待條件的任務,傳遞 val
給它們。如果 all
為 true
(預設值),則所有正在等待的任務都會被喚醒,否則只會喚醒一個。如果 error
為 true
,則傳遞的值會在喚醒的任務中引發為例外狀況。
傳回喚醒的工作數。如果沒有工作在等待 `condition`,則傳回 0。
Base.reset
— 方法Base.Semaphore
— 類型Semaphore(sem_size)
建立一個計數信號量,允許在任何時間使用最多 `sem_size` 個取得。每個取得都必須與一個釋放配對。
這在取得/釋放呼叫上提供取得和釋放記憶體排序。
Base.acquire
— 函式acquire(s::Semaphore)
等待 `sem_size` 個許可之一可用,封鎖直到可以取得一個許可。
acquire(f, s::Semaphore)
從信號量 `s` 取得後執行 `f`,並在完成或發生錯誤時 `release`。
例如,一個 do-block 形式,確保在同一時間只會執行 2 個 `foo` 呼叫
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
end
此方法需要至少 Julia 1.8。
Base.release
— 函式release(s::Semaphore)
將一個許可傳回池中,可能允許另一個工作取得它並繼續執行。
Base.AbstractLock
— 類型Base.lock
— 函式lock(f::Function, lock)
取得 lock
,執行持有的 lock
的 f
,並在 f
回傳時釋放 lock
。如果 lock
已被不同的工作/執行緒鎖定,請等待它可用。
當此函式回傳時,lock
已被釋放,因此呼叫者不應嘗試 unlock
它。
使用 Channel
作為第二個參數需要 Julia 1.7 或更新版本。
Base.unlock
— 函式unlock(lock)
釋放 lock
的擁有權。
如果這是之前已取得的遞迴鎖,請遞減內部計數器並立即回傳。
Base.trylock
— 函式trylock(lock) -> Success (Boolean)
如果鎖可用,請取得鎖,並在成功時回傳 true
。如果鎖已由不同的工作/執行緒鎖定,請回傳 false
。
每個成功的 trylock
都必須搭配一個 unlock
。
函式 trylock
搭配 islocked
可用於撰寫測試和測試和設定或指數後退演算法如果它受 typeof(lock)
支援(請閱讀其文件)。
Base.islocked
— 函式islocked(lock) -> Status (Boolean)
檢查 lock
是否由任何工作/執行緒持有。此函式本身不應使用於同步。但是,islocked
搭配 trylock
可用於撰寫測試和測試和設定或指數後退演算法如果它受 typeof(lock)
支援(請閱讀其文件)。
延伸說明
例如,如果 lock
實作符合下列文件所述的特性,則可實作指數退避演算法如下。
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("timeout")
end
trylock(lock) && break
backoff()
end
實作
建議鎖定實作定義 islocked
並具有下列特性,並在文件字串中註明。
islocked(lock)
沒有資料競爭。- 如果
islocked(lock)
傳回false
,則在沒有其他工作干擾的情況下,立即呼叫trylock(lock)
一定會成功(傳回true
)。
Base.ReentrantLock
— 類型ReentrantLock()
建立一個可讓 Task
同步的再進入鎖定。同一個工作可以根據需要多次取得鎖定(這就是名稱中「再進入」的部分意義)。每個 lock
都必須搭配一個 unlock
。
呼叫「lock」也會禁止在該執行緒上執行完成處理常式,直到對應的「unlock」為止。自然應該支援以下所示的標準鎖定模式,但要注意不要顛倒 try/lock 順序或完全遺漏 try 區塊(例如,嘗試在仍然持有鎖定的情況下傳回)
這會在 lock/unlock 呼叫上提供取得/釋放記憶體排序。
lock(l)
try
<atomic work>
finally
unlock(l)
end
如果 !islocked(lck::ReentrantLock)
成立,則 trylock(lck)
會成功,除非有其他工作同時嘗試持有鎖定。
通道
Base.AbstractChannel
— 類型AbstractChannel{T}
傳遞 T
類型物件的通道表示。
Base.Channel
— 類型Channel{T=Any}(size::Int=0)
建構一個 Channel
,其內部緩衝區最多可以容納 size
個 T
類型的物件。在通道已滿的情況下呼叫 put!
會封鎖,直到物件被 take!
移出。
Channel(0)
建立一個沒有緩衝的通道。put!
會封鎖,直到呼叫相符的 take!
。反之亦然。
其他建構函式
Channel()
:預設建構函式,等同於Channel{Any}(0)
Channel(Inf)
:等同於Channel{Any}(typemax(Int))
Channel(sz)
:等同於Channel{Any}(sz)
預設建構函式 Channel()
和預設 size=0
已於 Julia 1.3 中新增。
Base.Channel
— 方法Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)
從 func
建立一個新任務,將其繫結到類型為 T
、大小為 size
的新通道,並在單一呼叫中排程任務。任務終止時,通道會自動關閉。
func
必須接受繫結的通道作為其唯一引數。
如果你需要建立的任務的參考,請透過關鍵字引數 taskref
傳遞一個 Ref{Task}
物件。
如果 spawn=true
,為 func
建立的 Task
可以在平行執行緒上排程,等同於透過 Threads.@spawn
建立任務。
如果 spawn=true
且未設定 threadpool
引數,它會預設為 :default
。
如果設定了 threadpool
引數(為 :default
或 :interactive
),這表示 spawn=true
,且新的任務會產生到指定的執行緒池。
傳回一個 Channel
。
範例
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
參照建立的任務
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
spawn=
參數已於 Julia 1.3 中新增。此建構函式已於 Julia 1.3 中新增。在 Julia 的早期版本中,Channel 使用關鍵字引數來設定 size
和 T
,但這些建構函式已不建議使用。
threadpool=
引數已於 Julia 1.9 中新增。
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"
Base.put!
— 方法put!(c::Channel, v)
將項目 v
附加到通道 c
。如果通道已滿,則會封鎖。
對於未緩衝的通道,會封鎖直到不同任務執行 take!
。
呼叫 put!
時,v
會使用 convert
轉換為通道類型。
Base.take!
— 方法take!(c::Channel)
依序從 Channel
中移除並傳回一個值。會封鎖直到資料可用。對於未緩衝的通道,會封鎖直到不同任務執行 put!
。
範例
緩衝通道
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1
未緩衝通道
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1
Base.isready
— 方法isready(c::Channel)
判斷 Channel
中是否儲存有值。會立即傳回,不會封鎖。
對於未緩衝的通道,如果任務正在等待 put!
,則傳回 true
。
範例
緩衝通道
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
true
未緩衝通道
julia> c = Channel();
julia> isready(c) # no tasks waiting to put!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # schedule a put! task
julia> isready(c)
true
Base.fetch
— 方法fetch(c::Channel)
等待並傳回 (不移除) Channel
中第一個可用的項目。注意:fetch
不支援未緩衝 (大小為 0) 的 Channel
。
範例
緩衝通道
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # item is not removed
3-element Vector{Any}:
1
2
3
Base.close
— 方法close(c::Channel[, excp::Exception])
關閉通道。例外狀況 (可選擇由 excp
提供),會由下列項目擲出:
Base.bind
— 方法bind(chnl::Channel, task::Task)
將 chnl
的生命週期與一個任務關聯起來。當任務終止時,Channel
chnl
會自動關閉。任務中任何未捕獲的例外都會傳播到 chnl
上的所有等待者。
chnl
物件可以獨立於任務終止而明確關閉。終止任務對已經關閉的 Channel
物件沒有影響。
當一個通道與多個任務綁定時,第一個終止的任務將關閉通道。當多個通道綁定到同一個任務時,任務的終止將關閉所有綁定的通道。
範例
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
false
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]
使用 schedule
和 wait
的低階同步
最容易正確使用 schedule
是在尚未啟動(排程)的 Task
上。不過,可以將 schedule
和 wait
當作建構同步介面的極低階建構區塊來使用。呼叫 schedule(task)
的關鍵前置條件是呼叫者必須「擁有」task
;也就是說,呼叫者必須知道給定 task
中對 wait
的呼叫會在呼叫 schedule(task)
的程式碼所知道的位置發生。確保此類前置條件的一種策略是使用原子,如下列範例所示
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Spin until we successfully update the state to OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
# already waiting or about to call `wait`. The notifier task must wake up
# the waiter task.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Since we are assuming that there is only one notifier task (for
# simplicity), we know that the other possible case here is OWE_EMPTY.
# We do not need to do anything because we know that the waiter task has
# not called `wait(ev::OneWayEvent)` yet.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
# invoke OWE_WAITING -> OWE_NOTIFYING transition. The waiter task must call
# `wait()` immediately. In particular, it MUST NOT invoke any function that may
# yield to the scheduler at this point in code.
wait()
else
@assert state == OWE_NOTIFYING
# Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
# notifier task.
end
return
end
ev = OneWayEvent()
@sync begin
@async begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# output
notifying...
done
OneWayEvent
讓一個工作可以 wait
另一個工作的 notify
。由於 wait
只可以從單一工作使用一次,因此它是一種受限的通訊介面(請注意 ev.task
的非原子指派)
在此範例中,notify(ev::OneWayEvent)
只有在 它 將狀態從 OWE_WAITING
修改為 OWE_NOTIFYING
時才能呼叫 schedule(ev.task)
。這讓我們知道執行 wait(ev::OneWayEvent)
的工作現在位於 ok
分支中,而且不會有其他工作嘗試 schedule(ev.task)
,因為它們的 @atomicreplace(ev.state, state => OWE_NOTIFYING)
會失敗。