分散式運算
分散式平行處理工具。
Distributed.addprocs
— 函式addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers
透過指定的叢集管理員啟動工作程序。
例如,Beowulf 叢集透過套件 ClusterManagers.jl
中實作的客製化叢集管理員獲得支援。
新啟動的工作程序等待主控端建立連線的秒數,可以在工作程序的環境中透過變數 JULIA_WORKER_TIMEOUT
指定。僅在使用 TCP/IP 作為傳輸時相關。
若要在不封鎖 REPL 或包含函式的情況下啟動工作程序(如果以程式方式啟動工作程序),請在自己的工作中執行 addprocs
。
範例
# On busy clusters, call `addprocs` asynchronously
t = @async addprocs(...)
# Utilize workers as and when they come online
if nprocs() > 1 # Ensure at least one new worker is available
.... # perform distributed execution
end
# Retrieve newly launched worker IDs, or any error messages
if istaskdone(t) # Check if `addprocs` has completed to ensure `fetch` doesn't block
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
end
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> List of process identifiers
透過 SSH 在遠端機器上新增工作程序。設定使用關鍵字參數(如下所示)進行。特別是,exename
關鍵字可用於指定遠端機器上 julia
二進檔的路徑。
machines
是「機器規格」的向量,以 [user@]host[:port] [bind_addr[:port]]
形式的字串給出。user
預設為目前使用者,port
預設為標準 SSH 連接埠。如果指定 [bind_addr[:port]]
,其他工作程序會在指定的 bind_addr
和 port
上連線到此工作程序。
可以在 machines
向量中使用組或 (machine_spec, count)
形式在遠端主機上啟動多個程序,其中 count
是要在指定主機上啟動的工作程序數量。將 :auto
傳遞為工作程序數量會啟動與遠端主機上 CPU 執行緒數量一樣多的工作程序。
範例:
addprocs([
"remote1", # one worker on 'remote1' logging in with the current username
"user@remote2", # one worker on 'remote2' logging in with the 'user' username
"user@remote3:2222", # specifying SSH port to '2222' for 'remote3'
("user@remote4", 4), # launch 4 workers on 'remote4'
("user@remote5", :auto), # launch as many workers as CPU threads on 'remote5'
])
關鍵字參數:
tunnel
:如果為true
,則會使用 SSH 隧道路由從主程序連線到工作程序。預設為false
。multiplex
:如果為true
,則會使用 SSH 多工處理進行 SSH 隧道路由。預設為false
。ssh
:用於啟動工作程序的 SSH 執行檔的名稱或路徑。預設為"ssh"
。sshflags
:指定其他 ssh 選項,例如sshflags=`-i /home/foo/bar.pem`
max_parallel
:指定在主機上並行連線的最大工作程序數量。預設為 10。shell
:指定 ssh 在工作程序上連線的 shell 類型。shell=:posix
:相容於 POSIX 的 Unix/Linux shell(sh、ksh、bash、dash、zsh 等)。預設值。shell=:csh
:Unix C shell (csh、tcsh)。shell=:wincmd
:Microsoft Windowscmd.exe
。
dir
:指定工作人員的工作目錄。預設為主機的目前目錄(由pwd()
找到)。enable_threaded_blas
:如果為true
,則 BLAS 將在新增處理程序的多個執行緒上執行。預設為false
。exename
:julia
可執行檔的名稱。預設為"$(Sys.BINDIR)/julia"
或"$(Sys.BINDIR)/julia-debug"
(視情況而定)。建議在所有遠端機器上使用相同的 Julia 版本,否則序列化和程式碼分發可能會失敗。exeflags
:傳遞給工作人員處理程序的其他旗標。topology
:指定工作人員如何彼此連線。在未連線的工作人員之間傳送訊息會導致錯誤。topology=:all_to_all
:所有處理程序都彼此連線。預設。topology=:master_worker
:只有驅動程式處理程序,即pid
1 會連線到工作人員。工作人員不會彼此連線。topology=:custom
:叢集管理員的launch
方法透過WorkerConfig
中的欄位ident
和connect_idents
指定連線拓撲。具有叢集管理員身分ident
的工作人員將連線到connect_idents
中指定的所有工作人員。
lazy
:僅適用於topology=:all_to_all
。如果為true
,工作人員對工作人員的連線會延遲設定,即會在工作人員之間的遠端呼叫第一次執行時設定。預設為 true。env
:提供字串配對陣列,例如env=["JULIA_DEPOT_PATH"=>"/depot"]
,以要求在遠端機器上設定環境變數。預設情況下,只有環境變數JULIA_WORKER_TIMEOUT
會自動從本機傳遞到遠端環境。cmdline_cookie
:透過--worker
命令列選項傳遞驗證 cookie。透過 ssh stdio 傳遞 cookie 的(更安全的)預設行為可能會讓使用較舊(ConPTY 之前)Julia 或 Windows 版本的 Windows 工作器當機,此時cmdline_cookie=true
提供了替代方案。
關鍵字引數 ssh
、shell
、env
和 cmdline_cookie
已新增至 Julia 1.6。
環境變數
如果主處理程序無法在 60.0 秒內與新啟動的工作器建立連線,工作器會將其視為致命情況並終止。此逾時時間可透過環境變數 JULIA_WORKER_TIMEOUT
控制。主處理程序上的 JULIA_WORKER_TIMEOUT
值指定新啟動的工作器等待建立連線的秒數。
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> List of process identifiers
使用內建的 LocalManager
在本機主機上啟動 np
個工作器。
本機工作器會繼承主處理程序的目前套件環境(即,目前專案、LOAD_PATH
和 DEPOT_PATH
)。
關鍵字參數:
restrict::Bool
:如果為true
(預設),繫結會限制為127.0.0.1
。dir
、exename
、exeflags
、env
、topology
、lazy
、enable_threaded_blas
:與SSHManager
的效果相同,請參閱addprocs(machines::AbstractVector)
的文件。
繼承套件環境和 env
關鍵字引數已新增至 Julia 1.9。
Distributed.nprocs
— 函式nprocs()
取得可用處理程序的數量。
範例
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.nworkers
— 函式nworkers()
取得可用的工作程序數量。這比 nprocs()
少一個。如果 nprocs() == 1
,則等於 nprocs()
。
範例
$ julia -p 2
julia> nprocs()
3
julia> nworkers()
2
Distributed.procs
— 方法procs()
傳回所有程序識別碼的清單,包括 pid 1(workers()
不包含此識別碼)。
範例
$ julia -p 2
julia> procs()
3-element Array{Int64,1}:
1
2
3
Distributed.procs
— 方法procs(pid::Integer)
傳回同一實體節點上所有程序識別碼的清單。特別是所有繫結到與 pid
相同 IP 位址的工作程序都會傳回。
Distributed.workers
— 函數workers()
傳回所有工作程序識別碼的清單。
範例
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.rmprocs
— 函數rmprocs(pids...; waitfor=typemax(Int))
移除指定的作業。請注意,只有程序 1 可以新增或移除作業。
參數 waitfor
指定等待作業關閉的時間長度
- 如果未指定,
rmprocs
會等到所有要求的pids
都移除為止。 - 如果無法在要求的
waitfor
秒數內終止所有作業,就會引發ErrorException
。 - 如果
waitfor
值為 0,則呼叫會立即傳回,而作業會排程在不同的工作中移除。排程的Task
物件會傳回。使用者應在呼叫任何其他平行呼叫之前,對工作呼叫wait
。
範例
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6
Distributed.interrupt
— 函數interrupt(pids::Integer...)
中斷指定工作人員上目前執行的任務。這等同於在本地機器上按 Ctrl-C。如果沒有給定任何參數,則會中斷所有工作人員。
interrupt(pids::AbstractVector=workers())
中斷指定工作人員上目前執行的任務。這等同於在本地機器上按 Ctrl-C。如果沒有給定任何參數,則會中斷所有工作人員。
Distributed.myid
— 函數myid()
取得目前程序的 ID。
範例
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4
Distributed.pmap
— 函數pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection
使用可用工作人員和任務將 f
套用至每個元素來轉換集合 c
。
對於多個集合參數,逐元素套用 f
。
請注意,f
必須提供給所有工作人員程序;有關詳細資訊,請參閱 程式碼可用性和載入套件。
如果未指定工作人員池,則會使用所有可用工作人員,亦即預設工作人員池。
預設情況下,pmap
會將運算分佈至所有指定的工作人員。若要僅使用本地程序並分佈至任務,請指定 distributed=false
。這等同於使用 asyncmap
。例如,pmap(f, c; distributed=false)
等同於 asyncmap(f,c; ntasks=()->nworkers())
pmap
也可以透過 batch_size
參數使用程序和任務的組合。對於大於 1 的批次大小,集合會以多個批次處理,每個批次的長度小於或等於 batch_size
。批次會作為單一請求傳送至空閒工作人員,其中本地 asyncmap
會使用多個並行任務處理批次中的元素。
任何錯誤都會停止 pmap
處理集合的其餘部分。若要覆寫此行為,您可以透過參數 on_error
指定錯誤處理函數,該函數會接收一個單一參數,即例外情況。此函數可以透過重新擲回錯誤來停止處理,或傳回任何值以繼續,然後將該值與結果一起內嵌傳回給呼叫者。
考慮以下兩個範例。第一個會內嵌傳回例外情況物件,第二個會在任何例外情況中傳回 0
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0
也可以透過重試失敗的運算來處理錯誤。關鍵字參數 retry_delays
和 retry_check
會傳遞到 retry
,分別作為關鍵字參數 delays
和 check
。如果指定批次處理,且整個批次失敗,則會重試批次中的所有項目。
請注意,如果同時指定 on_error
和 retry_delays
,則會在重試之前呼叫 on_error
掛鉤。如果 on_error
沒有擲回(或重新擲回)例外情況,則不會重試元素。
範例:發生錯誤時,在元素上重試 f
最多 3 次,且重試之間沒有任何延遲。
pmap(f, c; retry_delays = zeros(3))
範例:僅在例外情況不是 InexactError
類型時重試 f
,並以指數方式增加延遲,最多 3 次。在所有 InexactError
發生時,傳回 NaN
。
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
Distributed.RemoteException
— 類型RemoteException(captured)
遠端運算的例外情況會被擷取並在本地重新擲回。RemoteException
會封裝工作者的 pid
和擷取的例外情況。CapturedException
會擷取遠端例外情況以及例外情況引發時的呼叫堆疊的可序列化表單。
Distributed.ProcessExitedException
— 類型ProcessExitedException(worker_id::Int)
在 Julia 程序結束後,進一步嘗試引用已結束的子程序會引發此例外。
Distributed.Future
— 類型Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)
Future
是單一計算的佔位符,其終止狀態和時間未知。對於多個潛在計算,請參閱 RemoteChannel
。請參閱 remoteref_id
以識別 AbstractRemoteRef
。
Distributed.RemoteChannel
— 類型RemoteChannel(pid::Integer=myid())
在程序 pid
上建立對 Channel{Any}(1)
的引用。預設 pid
是目前的程序。
RemoteChannel(f::Function, pid::Integer=myid())
建立特定大小和類型的遠端通道引用。f
是在 pid
上執行時必須傳回 AbstractChannel
實作的函數。
例如,RemoteChannel(()->Channel{Int}(10), pid)
會傳回 pid
上類型為 Int
、大小為 10 的通道引用。
預設 pid
是目前的程序。
Base.fetch
— 方法fetch(x::Future)
等待並取得 Future
的值。取得的值會快取在本地。對同一個引用的後續 fetch
呼叫會傳回快取的值。如果遠端值是例外,會引發 RemoteException
,其中包含遠端例外和回溯追蹤。
Base.fetch
— 方法fetch(c::RemoteChannel)
等待並從 RemoteChannel
取得一個值。引發的例外狀況與 Future
相同。不會移除擷取的項目。
fetch(x::Any)
傳回 x
。
Distributed.remotecall
— 方法remotecall(f, id::Integer, args...; kwargs...) -> Future
在指定的程序上,非同步地對指定參數呼叫函數 f
。傳回 Future
。如有任何關鍵字參數,會傳遞給 f
。
Distributed.remotecall_wait
— 方法remotecall_wait(f, id::Integer, args...; kwargs...)
在工作者 ID id
指定的 Worker
上,以一個訊息執行較快的 wait(remotecall(...))
。如有任何關鍵字參數,會傳遞給 f
。
另請參閱 wait
和 remotecall
。
Distributed.remotecall_fetch
— 方法remotecall_fetch(f, id::Integer, args...; kwargs...)
在一個訊息中執行 fetch(remotecall(...))
。如有任何關鍵字參數,會傳遞給 f
。任何遠端例外狀況都會擷取到 RemoteException
中並擲出。
另請參閱 fetch
和 remotecall
。
範例
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrt was called with a negative real argument but will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
...
Distributed.remote_do
— 方法remote_do(f, id::Integer, args...; kwargs...) -> nothing
非同步地在工作者 id
上執行 f
。與 remotecall
不同,它不會儲存運算結果,也沒有辦法等待其完成。
成功的呼叫表示已接受請求在遠端節點上執行。
雖然對同一個工作者的連續 remotecall
會按照呼叫順序序列化,但遠端工作者上的執行順序是不確定的。例如,remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2)
會依序序列化對 f1
、f2
和 f3
的呼叫。但是,無法保證 f1
會在工作者 2 上先於 f3
執行。
f
拋出的任何例外都會印在遠端工作者的 stderr
上。
如果有任何關鍵字引數,會傳遞給 f
。
Base.put!
— 方法put!(rr::RemoteChannel, args...)
將一組值儲存在 RemoteChannel
中。如果通道已滿,則會區塊處理直到有可用空間。傳回第一個引數。
Base.put!
— 方法put!(rr::Future, v)
將值儲存到 Future
rr
。Future
是寫入一次的遠端參考。已設定的 Future
上的 put!
會擲回 Exception
。所有非同步遠端呼叫會傳回 Future
,並在完成時將值設定為呼叫的傳回值。
Base.take!
— 方法take!(rr::RemoteChannel, args...)
從 RemoteChannel
rr
取得值,並在過程中移除該值。
Base.isready
— 方法isready(rr::RemoteChannel, args...)
判斷 RemoteChannel
是否已儲存值。請注意,此函式可能會導致競爭條件,因為當您收到其結果時,它可能不再為真。不過,它可以在 Future
上安全使用,因為它們只會被指定一次。
Base.isready
— 方法isready(rr::Future)
判斷 Future
是否已儲存值。
如果引數 Future
由其他節點擁有,此呼叫將封鎖以等待答案。建議在其他工作中等待 rr
,或使用本機 Channel
作為代理
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # will not block
Distributed.AbstractWorkerPool
— 類型AbstractWorkerPool
工作池的超類型,例如 WorkerPool
和 CachingPool
。AbstractWorkerPool
應實作
push!
- 將新的工作者新增到整體池中(可用 + 忙碌)put!
- 將工作者放回可用池中take!
- 從可用池中取得工作者(用於遠端函式執行)length
- 整體池中可用的工作者數量isready
- 如果池上的take!
會封鎖,則傳回 false,否則傳回 true
上述內容的預設實作(在 AbstractWorkerPool
上)需要欄位
channel::Channel{Int}
workers::Set{Int}
其中 channel
包含空閒工作者 pid,而 workers
是與此池相關聯的所有工作者的集合。
Distributed.WorkerPool
— 類型WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})
根據工作者 ID 的向量或範圍建立 WorkerPool
。
範例
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
Distributed.CachingPool
— 類型CachingPool(workers::Vector{Int})
AbstractWorkerPool
的實作。 remote
、remotecall_fetch
、pmap
(以及其他遠端執行函式的遠端呼叫)受益於在工作者節點上快取序列化/反序列化的函式,特別是封閉函式(可能擷取大量資料)。
遠端快取會在傳回的 CachingPool
物件的生命週期中維護。若要提早清除快取,請使用 clear!(pool)
。
對於全域變數,只有繫結會擷取在封閉函式中,而不是資料。可以使用 let
區塊來擷取全域資料。
範例
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
end
上述內容只會將 foo
傳輸一次給每個工作者。
Distributed.default_worker_pool
— 函式default_worker_pool()
AbstractWorkerPool
包含閒置的 workers
- 由 remote(f)
和 pmap
(預設)使用。除非透過 default_worker_pool!(pool)
明確設定,否則預設 worker pool 會初始化為 WorkerPool
。
範例
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
Distributed.clear!
— 方法clear!(pool::CachingPool) -> pool
移除所有參與 worker 的快取函式。
Distributed.remote
— 函式remote([p::AbstractWorkerPool], f) -> Function
傳回一個匿名函式,在可用的 worker 上執行函式 f
(如果提供,則從 WorkerPool
p
抽取),使用 remotecall_fetch
。
Distributed.remotecall
— 方法remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
變體的 remotecall(f, pid, ....)
。等待並從 pool
取得一個空閒 worker,並對其執行 remotecall
。
範例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)
在此範例中,任務在 pid 2 上執行,從 pid 1 呼叫。
Distributed.remotecall_wait
— 方法remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
變體的 remotecall_wait(f, pid, ....)
。等待並從 pool
取得一個空閒 worker,並對其執行 remotecall_wait
。
範例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958
Distributed.remotecall_fetch
— 方法remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result
WorkerPool
變數的 remotecall_fetch(f, pid, ....)
。等待並從 pool
取得一個空閒工作人員,並對其執行 remotecall_fetch
。
範例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
Distributed.remote_do
— 方法remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing
WorkerPool
變數的 remote_do(f, pid, ....)
。等待並從 pool
取得一個空閒工作人員,並對其執行 remote_do
。
Distributed.@spawnat
— 巨集@spawnat p expr
建立一個封閉在運算式周圍的封閉,並在程序 p
上非同步執行封閉。傳回一個 Future
至結果。如果 p
是引號引用的字面符號 :any
,則系統將自動選擇要使用的處理器。
範例
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
:any
參數在 Julia 1.3 中提供。
Distributed.@fetch
— 巨集@fetch expr
等同於 fetch(@spawnat :any expr)
。請參閱 fetch
和 @spawnat
。
範例
julia> addprocs(3);
julia> @fetch myid()
2
julia> @fetch myid()
3
julia> @fetch myid()
4
julia> @fetch myid()
2
Distributed.@fetchfrom
— 巨集@fetchfrom
等同於 fetch(@spawnat p expr)
。請參閱 fetch
和 @spawnat
。
範例
julia> addprocs(3);
julia> @fetchfrom 2 myid()
2
julia> @fetchfrom 4 myid()
4
Distributed.@distributed
— 巨集@distributed
一個分散式記憶體,平行 for 迴圈,形式如下:
@distributed [reducer] for var = range
body
end
指定的範圍會在所有工作人員間分割並執行。如果指定了一個選擇性的還原函數,@distributed
會在每個工作人員上執行局部還原,並在呼叫程序上執行最後的還原。
請注意,若沒有 reducer 函數,@distributed
會非同步執行,也就是說它會在所有可用的工作者上產生獨立的任務,並立即回傳而不等待完成。若要等待完成,請在呼叫前面加上 @sync
,例如
@sync @distributed for var = range
body
end
Distributed.@everywhere
— 巨集@everywhere [procs()] expr
在所有 procs
上的 Main
下執行一個表達式。任何程序上的錯誤都會收集到 CompositeException
中並拋出。例如
@everywhere bar = 1
將在所有目前的程序上定義 Main.bar
。稍後新增的任何程序(例如使用 addprocs()
)都不會有定義的表達式。
與 @spawnat
不同,@everywhere
不會擷取任何區域變數。區域變數可以使用內插來廣播
foo = 1
@everywhere bar = $foo
選用參數 procs
允許指定所有程序的子集來執行表達式。
類似於呼叫 remotecall_eval(Main, procs, expr)
,但有兩個額外的功能
- `using` and `import` statements run on the calling process first, to ensure
packages are precompiled.
- The current source file path used by `include` is propagated to other processes.
Distributed.clear!
— 方法clear!(syms, pids=workers(); mod=Main)
透過將模組中的全域繫結初始化為 nothing
來清除它們。syms
應該是 Symbol
型別或 Symbol
的集合。pids
和 mod
識別程序和要重新初始化全域變數的模組。只有在 mod
下找到定義的名稱才會被清除。
如果要求清除全域常數,則會引發例外狀況。
Distributed.remoteref_id
— 函數remoteref_id(r::AbstractRemoteRef) -> RRID
Future
和 RemoteChannel
由欄位識別
where
- 參照實際存在參考所指的底層物件/儲存體的節點。whence
- 參照建立遠端參考的節點。請注意,這與實際存在參考所指的底層物件的節點不同。例如,從主程序呼叫RemoteChannel(2)
會產生where
值為 2 和whence
值為 1。id
在由whence
指定的執行緒建立的所有參考中是唯一的。
whence
和 id
合併後,會在所有執行緒中唯一識別一個參考。
remoteref_id
是低階 API,會傳回包裝遠端參考的 whence
和 id
值的 RRID
物件。
Distributed.channel_from_id
— 函數channel_from_id(id) -> c
低階 API,會傳回 remoteref_id
傳回的 id
的後端 AbstractChannel
。呼叫僅在存在後端通道的節點上有效。
Distributed.worker_id_from_socket
— 函數worker_id_from_socket(s) -> pid
低階 API,會在給定 IO
連線或 Worker
的情況下,傳回已連線的執行緒的 pid
。這在為類型撰寫自訂 serialize
方法時很有用,此方法會根據接收程序 ID 最佳化寫入的資料。
Distributed.cluster_cookie
— 方法cluster_cookie() -> cookie
傳回叢集 cookie。
Distributed.cluster_cookie
— 方法cluster_cookie(cookie) -> cookie
設定傳遞的 cookie 為叢集 cookie,然後傳回它。
叢集管理員介面
此介面提供一種機制,用於在不同的叢集環境中啟動和管理 Julia 工作人員。Base 中有兩種管理員:LocalManager
,用於在同一主機上啟動其他工作人員,以及 SSHManager
,用於透過 ssh
在遠端主機上啟動。TCP/IP 通訊端用於連線和傳輸程序之間的訊息。叢集管理員有可能提供不同的傳輸方式。
Distributed.ClusterManager
— 類型ClusterManager
叢集管理員的超類型,它將工作人員程序作為叢集進行控制。叢集管理員實作工作人員如何新增、移除和進行通訊。SSHManager
和 LocalManager
是此類型的子類型。
Distributed.WorkerConfig
— 類型WorkerConfig
ClusterManager
使用的類型,用於控制新增到其叢集的工作人員。某些欄位由所有叢集管理員使用,以存取主機
io
– 用於存取工作人員的連線(IO
或Nothing
的子類型)host
– 主機位址(String
或Nothing
)port
– 主機上用於連線到工作人員的埠(Int
或Nothing
)
某些欄位由叢集管理員使用,以將工作人員新增到已初始化的主機
count
– 要在主機上啟動的工作人員數量exename
– 主機上 Julia 可執行檔的路徑,預設為"$(Sys.BINDIR)/julia"
或"$(Sys.BINDIR)/julia-debug"
exeflags
– 遠端啟動 Julia 時使用的旗標
userdata
欄位由外部管理員用於儲存每個工作者的資訊。
有些欄位由 SSHManager
和類似的管理員使用
tunnel
–true
(使用通道)、false
(不使用通道)或nothing
(使用管理員的預設值)multiplex
–true
(對通道使用 SSH 多重化)或false
forward
– 用於 ssh 的-L
選項的轉送選項bind_addr
– 要繫結到的遠端主機上的位址sshflags
– 用於建立 SSH 連線的旗標max_parallel
– 主機上可以並行連線到的工作者最大數量
有些欄位由 LocalManager
和 SSHManager
都使用
connect_at
– 決定這是一個工作者對工作者的設定呼叫,還是驅動程式對工作者的設定呼叫process
– 將會連線的程序(通常管理員會在addprocs
期間指派這個程序)ospid
– 依據主機作業系統的程序 ID,用於中斷工作者程序environ
– 由 Local/SSH 管理員用於儲存暫時資訊的私有字典ident
– 由ClusterManager
識別的工作者connect_idents
– 如果使用自訂拓撲,工作者必須連線到的工作者 ID 清單enable_threaded_blas
–true
、false
或nothing
,表示是否在工作者上使用執行緒化的 BLAS
Distributed.launch
— 函數launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
由叢集管理員實作。對於此函數啟動的每一個 Julia 工作者,它應該將一個 WorkerConfig
項目附加到 launched
並通知 launch_ntfy
。一旦所有由 manager
要求的工作者啟動,函數必須退出。params
是所有關鍵字參數的字典,addprocs
會使用這些參數進行呼叫。
Distributed.manage
— 函數manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
由叢集管理員實作。它會在工作者的生命週期中,於主處理程序上使用適當的 op
值進行呼叫
- 在工作者加入或移除 Julia 工作者池時,使用
:register
/:deregister
。 - 在呼叫
interrupt(workers)
時,使用:interrupt
。ClusterManager
應該使用中斷訊號通知適當的工作者。 - 在清理目的時,使用
:finalize
。
Base.kill
— 方法kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
由叢集管理員實作。它會在主處理程序上由 rmprocs
進行呼叫。它應該導致 pid
指定的遠端工作者退出。kill(manager::ClusterManager.....)
會在 pid
上執行遠端 exit()
。
Sockets.connect
— 方法connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
由使用自訂傳輸的叢集管理員實作。它應該建立與具有 pid
ID 的工作者的邏輯連線,由 config
指定,並傳回一對 IO
物件。來自 pid
到目前處理程序的訊息將從 instrm
讀取,而傳送至 pid
的訊息將寫入 outstrm
。自訂傳輸實作必須確保訊息完整且依序傳遞和接收。connect(manager::ClusterManager.....)
會在工作者之間設定 TCP/IP socket 連線。
Distributed.init_worker
— 函數init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
由實作自訂傳輸的叢集管理員呼叫。它將新啟動的程序初始化為工作器。命令列參數 --worker[=<cookie>]
會將程序初始化為工作器,並使用 TCP/IP socket 作為傳輸。cookie
是 cluster_cookie
。
Distributed.start_worker
— 函數start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
start_worker
是內部函數,是透過 TCP/IP 連線的工作器程序的預設進入點。它將程序設定為 Julia 叢集工作器。
host:port 資訊會寫入串流 out
(預設為 stdout)。
此函數會在需要時從 stdin 讀取 cookie,並在一個空閒埠(或如果已指定,則在 --bind-to
命令列選項中的埠)上監聽,並排程工作以處理即將到來的 TCP 連線和要求。它也會(選擇性地)關閉 stdin,並將 stderr 重新導向至 stdout。
它不會傳回。
Distributed.process_messages
— 函數process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
由使用自訂傳輸的叢集管理員呼叫。當自訂傳輸實作收到來自遠端工作器的第一個訊息時,應該呼叫它。自訂傳輸必須管理與遠端工作器的邏輯連線,並提供兩個 IO
物件,一個用於接收訊息,另一個用於傳送訊息至遠端工作器。如果 incoming
為 true
,表示遠端對等端發起連線。發起連線的對等端會傳送叢集 cookie 和其 Julia 版本號碼,以執行驗證交握。
另請參閱 cluster_cookie
。
Distributed.default_addprocs_params
— 函數default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}
由叢集管理員實作。呼叫 addprocs(mgr)
時傳遞的預設關鍵字參數。可透過呼叫 default_addprocs_params()
取得最低限度的選項。