分散式運算

分散式平行處理工具。

Distributed.addprocs函式
addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers

透過指定的叢集管理員啟動工作程序。

例如,Beowulf 叢集透過套件 ClusterManagers.jl 中實作的客製化叢集管理員獲得支援。

新啟動的工作程序等待主控端建立連線的秒數,可以在工作程序的環境中透過變數 JULIA_WORKER_TIMEOUT 指定。僅在使用 TCP/IP 作為傳輸時相關。

若要在不封鎖 REPL 或包含函式的情況下啟動工作程序(如果以程式方式啟動工作程序),請在自己的工作中執行 addprocs

範例

# On busy clusters, call `addprocs` asynchronously
t = @async addprocs(...)
# Utilize workers as and when they come online
if nprocs() > 1   # Ensure at least one new worker is available
   ....   # perform distributed execution
end
# Retrieve newly launched worker IDs, or any error messages
if istaskdone(t)   # Check if `addprocs` has completed to ensure `fetch` doesn't block
    if nworkers() == N
        new_pids = fetch(t)
    else
        fetch(t)
    end
end
原始碼
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> List of process identifiers

透過 SSH 在遠端機器上新增工作程序。設定使用關鍵字參數(如下所示)進行。特別是,exename 關鍵字可用於指定遠端機器上 julia 二進檔的路徑。

machines 是「機器規格」的向量,以 [user@]host[:port] [bind_addr[:port]] 形式的字串給出。user 預設為目前使用者,port 預設為標準 SSH 連接埠。如果指定 [bind_addr[:port]],其他工作程序會在指定的 bind_addrport 上連線到此工作程序。

可以在 machines 向量中使用組或 (machine_spec, count) 形式在遠端主機上啟動多個程序,其中 count 是要在指定主機上啟動的工作程序數量。將 :auto 傳遞為工作程序數量會啟動與遠端主機上 CPU 執行緒數量一樣多的工作程序。

範例:

addprocs([
    "remote1",               # one worker on 'remote1' logging in with the current username
    "user@remote2",          # one worker on 'remote2' logging in with the 'user' username
    "user@remote3:2222",     # specifying SSH port to '2222' for 'remote3'
    ("user@remote4", 4),     # launch 4 workers on 'remote4'
    ("user@remote5", :auto), # launch as many workers as CPU threads on 'remote5'
])

關鍵字參數:

  • tunnel:如果為 true,則會使用 SSH 隧道路由從主程序連線到工作程序。預設為 false

  • multiplex:如果為 true,則會使用 SSH 多工處理進行 SSH 隧道路由。預設為 false

  • ssh:用於啟動工作程序的 SSH 執行檔的名稱或路徑。預設為 "ssh"

  • sshflags:指定其他 ssh 選項,例如 sshflags=`-i /home/foo/bar.pem`

  • max_parallel:指定在主機上並行連線的最大工作程序數量。預設為 10。

  • shell:指定 ssh 在工作程序上連線的 shell 類型。

    • shell=:posix:相容於 POSIX 的 Unix/Linux shell(sh、ksh、bash、dash、zsh 等)。預設值。

    • shell=:csh:Unix C shell (csh、tcsh)。

    • shell=:wincmd:Microsoft Windows cmd.exe

  • dir:指定工作人員的工作目錄。預設為主機的目前目錄(由 pwd() 找到)。

  • enable_threaded_blas:如果為 true,則 BLAS 將在新增處理程序的多個執行緒上執行。預設為 false

  • exenamejulia 可執行檔的名稱。預設為 "$(Sys.BINDIR)/julia""$(Sys.BINDIR)/julia-debug"(視情況而定)。建議在所有遠端機器上使用相同的 Julia 版本,否則序列化和程式碼分發可能會失敗。

  • exeflags:傳遞給工作人員處理程序的其他旗標。

  • topology:指定工作人員如何彼此連線。在未連線的工作人員之間傳送訊息會導致錯誤。

    • topology=:all_to_all:所有處理程序都彼此連線。預設。

    • topology=:master_worker:只有驅動程式處理程序,即 pid 1 會連線到工作人員。工作人員不會彼此連線。

    • topology=:custom:叢集管理員的 launch 方法透過 WorkerConfig 中的欄位 identconnect_idents 指定連線拓撲。具有叢集管理員身分 ident 的工作人員將連線到 connect_idents 中指定的所有工作人員。

  • lazy:僅適用於 topology=:all_to_all。如果為 true,工作人員對工作人員的連線會延遲設定,即會在工作人員之間的遠端呼叫第一次執行時設定。預設為 true。

  • env:提供字串配對陣列,例如 env=["JULIA_DEPOT_PATH"=>"/depot"],以要求在遠端機器上設定環境變數。預設情況下,只有環境變數 JULIA_WORKER_TIMEOUT 會自動從本機傳遞到遠端環境。

  • cmdline_cookie:透過 --worker 命令列選項傳遞驗證 cookie。透過 ssh stdio 傳遞 cookie 的(更安全的)預設行為可能會讓使用較舊(ConPTY 之前)Julia 或 Windows 版本的 Windows 工作器當機,此時 cmdline_cookie=true 提供了替代方案。

Julia 1.6

關鍵字引數 sshshellenvcmdline_cookie 已新增至 Julia 1.6。

環境變數

如果主處理程序無法在 60.0 秒內與新啟動的工作器建立連線,工作器會將其視為致命情況並終止。此逾時時間可透過環境變數 JULIA_WORKER_TIMEOUT 控制。主處理程序上的 JULIA_WORKER_TIMEOUT 值指定新啟動的工作器等待建立連線的秒數。

原始碼
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> List of process identifiers

使用內建的 LocalManager 在本機主機上啟動 np 個工作器。

本機工作器會繼承主處理程序的目前套件環境(即,目前專案、LOAD_PATHDEPOT_PATH)。

關鍵字參數:

  • restrict::Bool:如果為 true(預設),繫結會限制為 127.0.0.1
  • direxenameexeflagsenvtopologylazyenable_threaded_blas:與 SSHManager 的效果相同,請參閱 addprocs(machines::AbstractVector) 的文件。
Julia 1.9

繼承套件環境和 env 關鍵字引數已新增至 Julia 1.9。

原始碼
Distributed.nprocs函式
nprocs()

取得可用處理程序的數量。

範例

julia> nprocs()
3

julia> workers()
2-element Array{Int64,1}:
 2
 3
原始碼
Distributed.nworkers函式
nworkers()

取得可用的工作程序數量。這比 nprocs() 少一個。如果 nprocs() == 1,則等於 nprocs()

範例

$ julia -p 2

julia> nprocs()
3

julia> nworkers()
2
原始碼
Distributed.procs方法
procs()

傳回所有程序識別碼的清單,包括 pid 1(workers() 不包含此識別碼)。

範例

$ julia -p 2

julia> procs()
3-element Array{Int64,1}:
 1
 2
 3
原始碼
Distributed.procs方法
procs(pid::Integer)

傳回同一實體節點上所有程序識別碼的清單。特別是所有繫結到與 pid 相同 IP 位址的工作程序都會傳回。

原始碼
Distributed.workers函數
workers()

傳回所有工作程序識別碼的清單。

範例

$ julia -p 2

julia> workers()
2-element Array{Int64,1}:
 2
 3
原始碼
Distributed.rmprocs函數
rmprocs(pids...; waitfor=typemax(Int))

移除指定的作業。請注意,只有程序 1 可以新增或移除作業。

參數 waitfor 指定等待作業關閉的時間長度

  • 如果未指定,rmprocs 會等到所有要求的 pids 都移除為止。
  • 如果無法在要求的 waitfor 秒數內終止所有作業,就會引發 ErrorException
  • 如果 waitfor 值為 0,則呼叫會立即傳回,而作業會排程在不同的工作中移除。排程的 Task 物件會傳回。使用者應在呼叫任何其他平行呼叫之前,對工作呼叫 wait

範例

$ julia -p 5

julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0

julia> wait(t)

julia> workers()
3-element Array{Int64,1}:
 4
 5
 6
原始碼
Distributed.interrupt函數
interrupt(pids::Integer...)

中斷指定工作人員上目前執行的任務。這等同於在本地機器上按 Ctrl-C。如果沒有給定任何參數,則會中斷所有工作人員。

原始碼
interrupt(pids::AbstractVector=workers())

中斷指定工作人員上目前執行的任務。這等同於在本地機器上按 Ctrl-C。如果沒有給定任何參數,則會中斷所有工作人員。

原始碼
Distributed.myid函數
myid()

取得目前程序的 ID。

範例

julia> myid()
1

julia> remotecall_fetch(() -> myid(), 4)
4
原始碼
Distributed.pmap函數
pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection

使用可用工作人員和任務將 f 套用至每個元素來轉換集合 c

對於多個集合參數,逐元素套用 f

請注意,f 必須提供給所有工作人員程序;有關詳細資訊,請參閱 程式碼可用性和載入套件

如果未指定工作人員池,則會使用所有可用工作人員,亦即預設工作人員池。

預設情況下,pmap 會將運算分佈至所有指定的工作人員。若要僅使用本地程序並分佈至任務,請指定 distributed=false。這等同於使用 asyncmap。例如,pmap(f, c; distributed=false) 等同於 asyncmap(f,c; ntasks=()->nworkers())

pmap 也可以透過 batch_size 參數使用程序和任務的組合。對於大於 1 的批次大小,集合會以多個批次處理,每個批次的長度小於或等於 batch_size。批次會作為單一請求傳送至空閒工作人員,其中本地 asyncmap 會使用多個並行任務處理批次中的元素。

任何錯誤都會停止 pmap 處理集合的其餘部分。若要覆寫此行為,您可以透過參數 on_error 指定錯誤處理函數,該函數會接收一個單一參數,即例外情況。此函數可以透過重新擲回錯誤來停止處理,或傳回任何值以繼續,然後將該值與結果一起內嵌傳回給呼叫者。

考慮以下兩個範例。第一個會內嵌傳回例外情況物件,第二個會在任何例外情況中傳回 0

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
 1
  ErrorException("foo")
 3
  ErrorException("foo")

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
 1
 0
 3
 0

也可以透過重試失敗的運算來處理錯誤。關鍵字參數 retry_delaysretry_check 會傳遞到 retry,分別作為關鍵字參數 delayscheck。如果指定批次處理,且整個批次失敗,則會重試批次中的所有項目。

請注意,如果同時指定 on_errorretry_delays,則會在重試之前呼叫 on_error 掛鉤。如果 on_error 沒有擲回(或重新擲回)例外情況,則不會重試元素。

範例:發生錯誤時,在元素上重試 f 最多 3 次,且重試之間沒有任何延遲。

pmap(f, c; retry_delays = zeros(3))

範例:僅在例外情況不是 InexactError 類型時重試 f,並以指數方式增加延遲,最多 3 次。在所有 InexactError 發生時,傳回 NaN

pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
原始碼
Distributed.RemoteException類型
RemoteException(captured)

遠端運算的例外情況會被擷取並在本地重新擲回。RemoteException 會封裝工作者的 pid 和擷取的例外情況。CapturedException 會擷取遠端例外情況以及例外情況引發時的呼叫堆疊的可序列化表單。

原始碼
Distributed.Future類型
Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)

Future 是單一計算的佔位符,其終止狀態和時間未知。對於多個潛在計算,請參閱 RemoteChannel。請參閱 remoteref_id 以識別 AbstractRemoteRef

原始碼
Distributed.RemoteChannel類型
RemoteChannel(pid::Integer=myid())

在程序 pid 上建立對 Channel{Any}(1) 的引用。預設 pid 是目前的程序。

RemoteChannel(f::Function, pid::Integer=myid())

建立特定大小和類型的遠端通道引用。f 是在 pid 上執行時必須傳回 AbstractChannel 實作的函數。

例如,RemoteChannel(()->Channel{Int}(10), pid) 會傳回 pid 上類型為 Int、大小為 10 的通道引用。

預設 pid 是目前的程序。

原始碼
Base.fetch方法
fetch(x::Future)

等待並取得 Future 的值。取得的值會快取在本地。對同一個引用的後續 fetch 呼叫會傳回快取的值。如果遠端值是例外,會引發 RemoteException,其中包含遠端例外和回溯追蹤。

原始碼
Distributed.remotecall方法
remotecall(f, id::Integer, args...; kwargs...) -> Future

在指定的程序上,非同步地對指定參數呼叫函數 f。傳回 Future。如有任何關鍵字參數,會傳遞給 f

原始碼
Distributed.remotecall_wait方法
remotecall_wait(f, id::Integer, args...; kwargs...)

在工作者 ID id 指定的 Worker 上,以一個訊息執行較快的 wait(remotecall(...))。如有任何關鍵字參數,會傳遞給 f

另請參閱 waitremotecall

原始碼
Distributed.remotecall_fetch方法
remotecall_fetch(f, id::Integer, args...; kwargs...)

在一個訊息中執行 fetch(remotecall(...))。如有任何關鍵字參數,會傳遞給 f。任何遠端例外狀況都會擷取到 RemoteException 中並擲出。

另請參閱 fetchremotecall

範例

$ julia -p 2

julia> remotecall_fetch(sqrt, 2, 4)
2.0

julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrt was called with a negative real argument but will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
...
原始碼
Distributed.remote_do方法
remote_do(f, id::Integer, args...; kwargs...) -> nothing

非同步地在工作者 id 上執行 f。與 remotecall 不同,它不會儲存運算結果,也沒有辦法等待其完成。

成功的呼叫表示已接受請求在遠端節點上執行。

雖然對同一個工作者的連續 remotecall 會按照呼叫順序序列化,但遠端工作者上的執行順序是不確定的。例如,remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) 會依序序列化對 f1f2f3 的呼叫。但是,無法保證 f1 會在工作者 2 上先於 f3 執行。

f 拋出的任何例外都會印在遠端工作者的 stderr 上。

如果有任何關鍵字引數,會傳遞給 f

原始碼
Base.put!方法
put!(rr::RemoteChannel, args...)

將一組值儲存在 RemoteChannel 中。如果通道已滿,則會區塊處理直到有可用空間。傳回第一個引數。

原始碼
Base.put!方法
put!(rr::Future, v)

將值儲存到 Future rrFuture 是寫入一次的遠端參考。已設定的 Future 上的 put! 會擲回 Exception。所有非同步遠端呼叫會傳回 Future,並在完成時將值設定為呼叫的傳回值。

原始碼
Base.isready方法
isready(rr::RemoteChannel, args...)

判斷 RemoteChannel 是否已儲存值。請注意,此函式可能會導致競爭條件,因為當您收到其結果時,它可能不再為真。不過,它可以在 Future 上安全使用,因為它們只會被指定一次。

原始碼
Base.isready方法
isready(rr::Future)

判斷 Future 是否已儲存值。

如果引數 Future 由其他節點擁有,此呼叫將封鎖以等待答案。建議在其他工作中等待 rr,或使用本機 Channel 作為代理

p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f)  # will not block
原始碼
Distributed.AbstractWorkerPool類型
AbstractWorkerPool

工作池的超類型,例如 WorkerPoolCachingPoolAbstractWorkerPool 應實作

  • push! - 將新的工作者新增到整體池中(可用 + 忙碌)
  • put! - 將工作者放回可用池中
  • take! - 從可用池中取得工作者(用於遠端函式執行)
  • length - 整體池中可用的工作者數量
  • isready - 如果池上的 take! 會封鎖,則傳回 false,否則傳回 true

上述內容的預設實作(在 AbstractWorkerPool 上)需要欄位

  • channel::Channel{Int}
  • workers::Set{Int}

其中 channel 包含空閒工作者 pid,而 workers 是與此池相關聯的所有工作者的集合。

原始碼
Distributed.WorkerPool類型
WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})

根據工作者 ID 的向量或範圍建立 WorkerPool

範例

$ julia -p 3

julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))

julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
原始碼
Distributed.CachingPool類型
CachingPool(workers::Vector{Int})

AbstractWorkerPool 的實作。 remoteremotecall_fetchpmap(以及其他遠端執行函式的遠端呼叫)受益於在工作者節點上快取序列化/反序列化的函式,特別是封閉函式(可能擷取大量資料)。

遠端快取會在傳回的 CachingPool 物件的生命週期中維護。若要提早清除快取,請使用 clear!(pool)

對於全域變數,只有繫結會擷取在封閉函式中,而不是資料。可以使用 let 區塊來擷取全域資料。

範例

const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
    pmap(i -> sum(foo) + i, wp, 1:100);
end

上述內容只會將 foo 傳輸一次給每個工作者。

原始碼
Distributed.default_worker_pool函式
default_worker_pool()

AbstractWorkerPool 包含閒置的 workers - 由 remote(f)pmap(預設)使用。除非透過 default_worker_pool!(pool) 明確設定,否則預設 worker pool 會初始化為 WorkerPool

範例

$ julia -p 3

julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
原始碼
Distributed.remotecall方法
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool 變體的 remotecall(f, pid, ....)。等待並從 pool 取得一個空閒 worker,並對其執行 remotecall

範例

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)

在此範例中,任務在 pid 2 上執行,從 pid 1 呼叫。

原始碼
Distributed.remotecall_wait方法
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool 變體的 remotecall_wait(f, pid, ....)。等待並從 pool 取得一個空閒 worker,並對其執行 remotecall_wait

範例

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)

julia> fetch(f)
0.9995177101692958
原始碼
Distributed.remotecall_fetch方法
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result

WorkerPool 變數的 remotecall_fetch(f, pid, ....)。等待並從 pool 取得一個空閒工作人員,並對其執行 remotecall_fetch

範例

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
原始碼
Distributed.remote_do方法
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing

WorkerPool 變數的 remote_do(f, pid, ....)。等待並從 pool 取得一個空閒工作人員,並對其執行 remote_do

原始碼
Distributed.@spawnat巨集
@spawnat p expr

建立一個封閉在運算式周圍的封閉,並在程序 p 上非同步執行封閉。傳回一個 Future 至結果。如果 p 是引號引用的字面符號 :any,則系統將自動選擇要使用的處理器。

範例

julia> addprocs(3);

julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)

julia> fetch(f)
2

julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
Julia 1.3

:any 參數在 Julia 1.3 中提供。

原始碼
Distributed.@fetch巨集
@fetch expr

等同於 fetch(@spawnat :any expr)。請參閱 fetch@spawnat

範例

julia> addprocs(3);

julia> @fetch myid()
2

julia> @fetch myid()
3

julia> @fetch myid()
4

julia> @fetch myid()
2
原始碼
Distributed.@distributed巨集
@distributed

一個分散式記憶體,平行 for 迴圈,形式如下:

@distributed [reducer] for var = range
    body
end

指定的範圍會在所有工作人員間分割並執行。如果指定了一個選擇性的還原函數,@distributed 會在每個工作人員上執行局部還原,並在呼叫程序上執行最後的還原。

請注意,若沒有 reducer 函數,@distributed 會非同步執行,也就是說它會在所有可用的工作者上產生獨立的任務,並立即回傳而不等待完成。若要等待完成,請在呼叫前面加上 @sync,例如

@sync @distributed for var = range
    body
end
原始碼
Distributed.@everywhere巨集
@everywhere [procs()] expr

在所有 procs 上的 Main 下執行一個表達式。任何程序上的錯誤都會收集到 CompositeException 中並拋出。例如

@everywhere bar = 1

將在所有目前的程序上定義 Main.bar。稍後新增的任何程序(例如使用 addprocs())都不會有定義的表達式。

@spawnat 不同,@everywhere 不會擷取任何區域變數。區域變數可以使用內插來廣播

foo = 1
@everywhere bar = $foo

選用參數 procs 允許指定所有程序的子集來執行表達式。

類似於呼叫 remotecall_eval(Main, procs, expr),但有兩個額外的功能

- `using` and `import` statements run on the calling process first, to ensure
  packages are precompiled.
- The current source file path used by `include` is propagated to other processes.
原始碼
Distributed.clear!方法
clear!(syms, pids=workers(); mod=Main)

透過將模組中的全域繫結初始化為 nothing 來清除它們。syms 應該是 Symbol 型別或 Symbol 的集合。pidsmod 識別程序和要重新初始化全域變數的模組。只有在 mod 下找到定義的名稱才會被清除。

如果要求清除全域常數,則會引發例外狀況。

原始碼
Distributed.remoteref_id函數
remoteref_id(r::AbstractRemoteRef) -> RRID

FutureRemoteChannel 由欄位識別

  • where - 參照實際存在參考所指的底層物件/儲存體的節點。

  • whence - 參照建立遠端參考的節點。請注意,這與實際存在參考所指的底層物件的節點不同。例如,從主程序呼叫 RemoteChannel(2) 會產生 where 值為 2 和 whence 值為 1。

  • id 在由 whence 指定的執行緒建立的所有參考中是唯一的。

whenceid 合併後,會在所有執行緒中唯一識別一個參考。

remoteref_id 是低階 API,會傳回包裝遠端參考的 whenceid 值的 RRID 物件。

原始碼
Distributed.worker_id_from_socket函數
worker_id_from_socket(s) -> pid

低階 API,會在給定 IO 連線或 Worker 的情況下,傳回已連線的執行緒的 pid。這在為類型撰寫自訂 serialize 方法時很有用,此方法會根據接收程序 ID 最佳化寫入的資料。

原始碼

叢集管理員介面

此介面提供一種機制,用於在不同的叢集環境中啟動和管理 Julia 工作人員。Base 中有兩種管理員:LocalManager,用於在同一主機上啟動其他工作人員,以及 SSHManager,用於透過 ssh 在遠端主機上啟動。TCP/IP 通訊端用於連線和傳輸程序之間的訊息。叢集管理員有可能提供不同的傳輸方式。

Distributed.ClusterManager類型
ClusterManager

叢集管理員的超類型,它將工作人員程序作為叢集進行控制。叢集管理員實作工作人員如何新增、移除和進行通訊。SSHManagerLocalManager 是此類型的子類型。

原始碼
Distributed.WorkerConfig類型
WorkerConfig

ClusterManager 使用的類型,用於控制新增到其叢集的工作人員。某些欄位由所有叢集管理員使用,以存取主機

  • io – 用於存取工作人員的連線(IONothing 的子類型)
  • host – 主機位址(StringNothing
  • port – 主機上用於連線到工作人員的埠(IntNothing

某些欄位由叢集管理員使用,以將工作人員新增到已初始化的主機

  • count – 要在主機上啟動的工作人員數量
  • exename – 主機上 Julia 可執行檔的路徑,預設為 "$(Sys.BINDIR)/julia""$(Sys.BINDIR)/julia-debug"
  • exeflags – 遠端啟動 Julia 時使用的旗標

userdata 欄位由外部管理員用於儲存每個工作者的資訊。

有些欄位由 SSHManager 和類似的管理員使用

  • tunneltrue(使用通道)、false(不使用通道)或 nothing(使用管理員的預設值)
  • multiplextrue(對通道使用 SSH 多重化)或 false
  • forward – 用於 ssh 的 -L 選項的轉送選項
  • bind_addr – 要繫結到的遠端主機上的位址
  • sshflags – 用於建立 SSH 連線的旗標
  • max_parallel – 主機上可以並行連線到的工作者最大數量

有些欄位由 LocalManagerSSHManager 都使用

  • connect_at – 決定這是一個工作者對工作者的設定呼叫,還是驅動程式對工作者的設定呼叫
  • process – 將會連線的程序(通常管理員會在 addprocs 期間指派這個程序)
  • ospid – 依據主機作業系統的程序 ID,用於中斷工作者程序
  • environ – 由 Local/SSH 管理員用於儲存暫時資訊的私有字典
  • ident – 由 ClusterManager 識別的工作者
  • connect_idents – 如果使用自訂拓撲,工作者必須連線到的工作者 ID 清單
  • enable_threaded_blastruefalsenothing,表示是否在工作者上使用執行緒化的 BLAS
原始碼
Distributed.launch函數
launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)

由叢集管理員實作。對於此函數啟動的每一個 Julia 工作者,它應該將一個 WorkerConfig 項目附加到 launched 並通知 launch_ntfy。一旦所有由 manager 要求的工作者啟動,函數必須退出。params 是所有關鍵字參數的字典,addprocs 會使用這些參數進行呼叫。

原始碼
Distributed.manage函數
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)

由叢集管理員實作。它會在工作者的生命週期中,於主處理程序上使用適當的 op 值進行呼叫

  • 在工作者加入或移除 Julia 工作者池時,使用 :register/:deregister
  • 在呼叫 interrupt(workers) 時,使用 :interruptClusterManager 應該使用中斷訊號通知適當的工作者。
  • 在清理目的時,使用 :finalize
原始碼
Base.kill方法
kill(manager::ClusterManager, pid::Int, config::WorkerConfig)

由叢集管理員實作。它會在主處理程序上由 rmprocs 進行呼叫。它應該導致 pid 指定的遠端工作者退出。kill(manager::ClusterManager.....) 會在 pid 上執行遠端 exit()

原始碼
Sockets.connect方法
connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)

由使用自訂傳輸的叢集管理員實作。它應該建立與具有 pid ID 的工作者的邏輯連線,由 config 指定,並傳回一對 IO 物件。來自 pid 到目前處理程序的訊息將從 instrm 讀取,而傳送至 pid 的訊息將寫入 outstrm。自訂傳輸實作必須確保訊息完整且依序傳遞和接收。connect(manager::ClusterManager.....) 會在工作者之間設定 TCP/IP socket 連線。

原始碼
Distributed.init_worker函數
init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())

由實作自訂傳輸的叢集管理員呼叫。它將新啟動的程序初始化為工作器。命令列參數 --worker[=<cookie>] 會將程序初始化為工作器,並使用 TCP/IP socket 作為傳輸。cookiecluster_cookie

原始碼
Distributed.start_worker函數
start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)

start_worker 是內部函數,是透過 TCP/IP 連線的工作器程序的預設進入點。它將程序設定為 Julia 叢集工作器。

host:port 資訊會寫入串流 out(預設為 stdout)。

此函數會在需要時從 stdin 讀取 cookie,並在一個空閒埠(或如果已指定,則在 --bind-to 命令列選項中的埠)上監聽,並排程工作以處理即將到來的 TCP 連線和要求。它也會(選擇性地)關閉 stdin,並將 stderr 重新導向至 stdout。

它不會傳回。

原始碼
Distributed.process_messages函數
process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)

由使用自訂傳輸的叢集管理員呼叫。當自訂傳輸實作收到來自遠端工作器的第一個訊息時,應該呼叫它。自訂傳輸必須管理與遠端工作器的邏輯連線,並提供兩個 IO 物件,一個用於接收訊息,另一個用於傳送訊息至遠端工作器。如果 incomingtrue,表示遠端對等端發起連線。發起連線的對等端會傳送叢集 cookie 和其 Julia 版本號碼,以執行驗證交握。

另請參閱 cluster_cookie

原始碼
Distributed.default_addprocs_params函數
default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}

由叢集管理員實作。呼叫 addprocs(mgr) 時傳遞的預設關鍵字參數。可透過呼叫 default_addprocs_params() 取得最低限度的選項。

原始碼