多處理和分散式運算
模組 Distributed
提供分散式記憶體平行運算的實作,它是 Julia 附帶標準函式庫的一部分。
大多數現代電腦都擁有超過一個 CPU,而多台電腦可以組合成一個叢集。利用這些多個 CPU 的效能,可以更快地完成許多運算。有兩個主要因素會影響效能:CPU 本身的運算速度,以及它們存取記憶體的速度。在一個叢集中,很明顯地,一個給定的 CPU 將可以最快存取同一台電腦(節點)中的 RAM。或許更令人驚訝的是,由於主記憶體和快取記憶體的速度差異,類似的問題也與典型的多核心筆記型電腦有關。因此,一個良好的多重處理環境應該允許特定 CPU 控制記憶體區塊的「擁有權」。Julia 提供一個基於訊息傳遞的多重處理環境,允許程式同時在不同記憶體網域中的多個處理程序上執行。
Julia 的訊息傳遞實作與其他環境(例如 MPI[1])不同。Julia 中的通訊通常是「單向」的,這表示程式設計師只需要在一個雙處理程序中明確管理一個處理程序。此外,這些操作通常看起來不像「訊息傳送」和「訊息接收」,而比較像是較高層級的操作,例如呼叫使用者函式。
Julia 中的分布式程式設計建立在兩個基本元素上:遠端參考和遠端呼叫。遠端參考是一個物件,可以用於從任何處理程序參考儲存在特定處理程序上的物件。遠端呼叫是由一個處理程序發出的請求,要求在另一個(可能相同的)處理程序上對特定參數呼叫特定函式。
遠端參考有兩種形式:Future
和 RemoteChannel
。
遠端呼叫會傳回一個 Future
給它的結果。遠端呼叫會立即傳回;執行呼叫的程序會繼續進行下一個操作,同時遠端呼叫在其他地方執行。你可以呼叫傳回的 Future
上的 wait
來等待遠端呼叫完成,並可以使用 fetch
取得結果的完整值。
另一方面,RemoteChannel
是可寫入的。例如,多個程序可以透過參考同一個遠端 Channel
來協調他們的處理。
每個程序都有關聯的識別碼。提供互動式 Julia 提示字元的程序的 id
永遠等於 1。預設用於平行運算的程序稱為「工作程序」。當只有一個程序時,程序 1 會被視為工作程序。否則,工作程序會被視為除了程序 1 以外的所有程序。因此,必須新增 2 個或更多程序才能從平行處理方法(例如 pmap
)中獲益。如果你只是想在工作程序執行長時間運算時在主程序中執行其他工作,則新增一個程序是有益的。
讓我們來試試看。從 julia -p n
開始,會在本地端提供 n
個工作程序。通常 n
等於機器上的 CPU 執行緒 (邏輯核心) 數量比較合理。請注意,-p
參數會隱含載入模組 Distributed
。
$ julia -p 2
julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)
julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.18526 1.50912
1.16296 1.60607
remotecall
的第一個參數是要呼叫的函數。Julia 中的大部分平行程式設計都不會參考特定程序或可用的程序數量,但 remotecall
被視為提供更精細控制的低階介面。 remotecall
的第二個參數是會執行工作的程序的 id
,而其餘參數會傳遞給正在呼叫的函數。
如你所見,在第一行中,我們要求程序 2 建立一個 2x2 的隨機矩陣,而在第二行中,我們要求它加上 1。兩個計算的結果都可以在兩個未來值 r
和 s
中取得。 @spawnat
巨集會在第一個參數指定的程序上評估第二個參數中的表達式。
偶爾你可能想要立即取得遠端計算的值。這通常發生在你從遠端物件讀取資料以取得下一個本地端操作所需的資料時。函數 remotecall_fetch
就是用於此目的。它等於 fetch(remotecall(...))
,但更有效率。
julia> remotecall_fetch(r-> fetch(r)[1, 1], 2, r)
0.18526337335308085
這會在工作程序 2 上擷取陣列並傳回第一個值。請注意,在這種情況下,fetch
沒有移動任何資料,因為它是在擁有陣列的工作程序上執行的。也可以寫成
julia> remotecall_fetch(getindex, 2, r, 1, 1)
0.10824216411304866
請記住,getindex(r,1,1)
等於 r[1,1]
,所以這個呼叫會擷取未來值 r
的第一個元素。
為了讓事情更簡單,符號 :any
可以傳遞給 @spawnat
,它會為你選擇執行操作的位置
julia> r = @spawnat :any rand(2,2)
Future(2, 1, 4, nothing)
julia> s = @spawnat :any 1 .+ fetch(r)
Future(3, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.38854 1.9098
1.20939 1.57158
請注意,我們使用 1 .+ fetch(r)
而不是 1 .+ r
。這是因為我們不知道程式碼會在哪裡執行,因此通常需要 fetch
來將 r
移至執行加法的程序。在此情況下,spawnat
聰明到可以在擁有 r
的程序上執行運算,因此 fetch
將會是空操作(不會執行任何工作)。
(值得注意的是,spawnat
不是內建的,而是定義在 Julia 中的 巨集
。您可以定義自己的此類建構。)
要記住的一件重要事情是,一旦擷取,Future
會將其值快取在本地。進一步的 fetch
呼叫不會涉及網路跳躍。一旦所有參考的 Future
都已擷取,遠端儲存的值就會被刪除。
@async
類似於 @spawnat
,但只在本地程序上執行任務。我們使用它為每個程序建立一個「饋送器」任務。每個任務挑選下一個需要計算的索引,然後等待其程序完成,然後重複,直到我們用完索引。請注意,饋送器任務不會開始執行,直到主任務到達 @sync
區塊的結尾,此時它會放棄控制權並等待所有本地任務完成,然後再從函數返回。至於 v0.7 及更高版本,饋送器任務能夠透過 nextidx
共享狀態,因為它們都執行於同一個程序上。即使 Tasks
是以協作方式排程,在某些情況下仍可能需要鎖定,例如在 非同步 I/O 中。這表示情境切換只會在明確定義的點發生:在本例中,是在呼叫 remotecall_fetch
時。這是實作的目前狀態,它可能會隨著未來的 Julia 版本而改變,因為它旨在讓最多 N 個 Tasks
在 M 個 Process
上執行,又稱為 M:N 執行緒。然後將需要一個用於 nextidx
的鎖定取得\釋放模型,因為讓多個程序同時讀寫資源並不安全。
程式碼可用性及載入套件
您的程式碼必須在執行它的任何程序上都可用。例如,在 Julia 提示字元中輸入以下內容
julia> function rand2(dims...)
return 2*rand(dims...)
end
julia> rand2(2,2)
2×2 Array{Float64,2}:
0.153756 0.368514
1.15119 0.918912
julia> fetch(@spawnat :any rand2(2,2))
ERROR: RemoteException(2, CapturedException(UndefVarError(Symbol("#rand2"))
Stacktrace:
[...]
程序 1 知道函數 rand2
,但程序 2 不知道。
最常見的情況是您會從檔案或套件載入程式碼,而且您在控制哪些程序載入程式碼時有相當大的彈性。考慮一個檔案 DummyModule.jl
,其中包含以下程式碼
module DummyModule
export MyType, f
mutable struct MyType
a::Int
end
f(x) = x^2+1
println("loaded")
end
為了在所有程序中參照 MyType
,每個程序都需要載入 DummyModule.jl
。呼叫 include("DummyModule.jl")
只會在單一程序中載入它。若要在每個程序中載入它,請使用 @everywhere
巨集(以 julia -p 2
啟動 Julia)
julia> @everywhere include("DummyModule.jl")
loaded
From worker 3: loaded
From worker 2: loaded
和往常一樣,這不會將 DummyModule
帶入任何程序的範圍中,這需要 using
或 import
。此外,當 DummyModule
被帶入一個程序的範圍中時,它不會在任何其他程序中
julia> using .DummyModule
julia> MyType(7)
MyType(7)
julia> fetch(@spawnat 2 MyType(7))
ERROR: On worker 2:
UndefVarError: `MyType` not defined
⋮
julia> fetch(@spawnat 2 DummyModule.MyType(7))
MyType(7)
但是,仍然有可能將 MyType
傳送給已載入 DummyModule
的程序,即使它不在範圍內
julia> put!(RemoteChannel(2), MyType(7))
RemoteChannel{Channel{Any}}(2, 1, 13)
在啟動時,也可以使用 -L
旗標在多個程序中預先載入檔案,並且可以使用驅動程式腳本來驅動運算
julia -p <n> -L file1.jl -L file2.jl driver.jl
在上述範例中執行驅動程式腳本的 Julia 程序具有等於 1 的 id
,就像提供互動式提示的程序一樣。
最後,如果 DummyModule.jl
不是獨立檔案而是套件,則 using DummyModule
將會在所有程序中載入 DummyModule.jl
,但只會在呼叫 using
的程序中將其帶入範圍內。
啟動和管理工作程序
基本 Julia 安裝內建支援兩種叢集類型
- 使用
-p
選項指定的本機叢集,如上所示。 - 使用
--machine-file
選項跨越多台電腦的叢集。這會使用無密碼的ssh
登入在指定的電腦上啟動 Julia 工作程序(從與目前主機相同的路徑)。每個電腦定義採用[count*][user@]host[:port] [bind_addr[:port]]
形式。user
預設為目前使用者,port
預設為標準 ssh 埠。count
是要在節點上產生的工作程序數量,預設為 1。選用的bind-to bind_addr[:port]
指定其他工作程序用於連線到此工作程序的 IP 位址和埠。
儘管 Julia 通常會努力維持向後相容性,但將程式碼分配到工作程序依賴於 Serialization.serialize
。正如對應文件所指出的,無法保證這會在不同的 Julia 版本中運作,因此建議所有機器上的所有工作程序都使用相同版本。
函數 addprocs
、rmprocs
、workers
等可作為程式化方式,用於新增、移除和查詢叢集中的程序。
julia> using Distributed
julia> addprocs(2)
2-element Array{Int64,1}:
2
3
在呼叫 addprocs
之前,必須在主程序中明確載入模組 Distributed
。它會自動在工作程序中提供。
請注意,工作程序不會執行 ~/.julia/config/startup.jl
啟動指令碼,也不會與任何其他正在執行的程序同步其全域狀態(例如全域變數、新的方法定義和載入的模組)。您可以使用 addprocs(exeflags="--project")
來使用特定環境初始化工作程序,然後使用 @everywhere using <modulename>
或 @everywhere include("file.jl")
。
其他類型的叢集可以透過撰寫您自己的自訂 ClusterManager
來支援,如下方 ClusterManagers 區段所述。
資料移動
在分散式程式中,傳送訊息和移動資料構成了大部分的負擔。減少訊息數量和傳送的資料量對於達成效能和可擴充性至關重要。為此,了解 Julia 的各種分散式程式建構所執行的資料移動非常重要。
fetch
可以視為明確的資料移動操作,因為它直接要求將物件移至本機。 @spawnat
(以及一些相關建構)也會移動資料,但這並不明顯,因此可以稱為隱含的資料移動操作。考慮下列兩種建構和平方隨機矩陣的方法
方法 1
julia> A = rand(1000,1000);
julia> Bref = @spawnat :any A^2;
[...]
julia> fetch(Bref);
方法 2
julia> Bref = @spawnat :any rand(1000,1000)^2;
[...]
julia> fetch(Bref);
差異看似微不足道,但實際上由於 @spawnat
的行為而有相當大的差異。在第一個方法中,隨機矩陣是在本機建構,然後傳送至另一個處理程序進行平方。在第二個方法中,隨機矩陣是在另一個處理程序上建構和平方。因此,第二個方法傳送的資料比第一個方法少很多。
在這個玩具範例中,這兩種方法很容易區分和選擇。然而,在真實程式中,設計資料移動可能需要更多思考,而且可能需要一些測量。例如,如果第一個處理程序需要矩陣 A
,那麼第一個方法可能會更好。或者,如果計算 A
的成本很高,而且只有目前的處理程序有它,那麼將它移至另一個處理程序可能是無法避免的。或者,如果目前的處理程序在 @spawnat
和 fetch(Bref)
之間幾乎沒有事情可做,那麼最好完全消除平行處理。或者想像 rand(1000,1000)
被替換為更昂貴的操作。那麼,只針對這個步驟新增另一個 @spawnat
陳述式可能是合理的。
全域變數
透過 @spawnat
遠端執行的表達式,或使用 remotecall
指定遠端執行的封閉函式,可能會參考全域變數。Main
模組下的全域繫結與其他模組中的全域繫結處理方式略有不同。請考慮以下程式碼片段
A = rand(10,10)
remotecall_fetch(()->sum(A), 2)
在此情況下,遠端處理程序中必須定義 sum
。請注意,A
是在本地工作區中定義的全域變數。工作執行緒 2 在 Main
下沒有稱為 A
的變數。將封閉函式 ()->sum(A)
傳送至工作執行緒 2 的動作會導致在 2 上定義 Main.A
。即使在呼叫 remotecall_fetch
傳回後,Main.A
仍會繼續存在於工作執行緒 2 上。包含嵌入式全域參考(僅在 Main
模組下)的遠端呼叫會以下列方式管理全域變數
如果全域繫結作為遠端呼叫的一部分被參考,則會在目標工作執行緒上建立新的全域繫結。
全域常數也會在遠端節點上宣告為常數。
全域變數僅在遠端呼叫的內容中重新傳送至目標工作執行緒,且僅在其值已變更時才會重新傳送。此外,叢集不會在節點間同步全域繫結。例如
A = rand(10,10) remotecall_fetch(()->sum(A), 2) # worker 2 A = rand(10,10) remotecall_fetch(()->sum(A), 3) # worker 3 A = nothing
執行上述片段會導致工作執行緒 2 上的
Main.A
具有與工作執行緒 3 上的Main.A
不同的值,而節點 1 上的Main.A
值則設定為nothing
。
您可能已經了解,雖然與全域變數相關聯的記憶體可能會在重新指派至主控時被收集,但工作執行緒上不會執行此動作,因為繫結仍然有效。clear!
可用於在不再需要時手動將遠端節點上的特定全域變數重新指派為 nothing
。這會釋放與它們相關聯的任何記憶體,作為常規垃圾收集週期的部分。
因此,程式應該小心在遠端呼叫中參考全域變數。事實上,如果可能,最好完全避免使用它們。如果您必須參考全域變數,請考慮使用 let
區塊來將全域變數本地化。
例如
julia> A = rand(10,10);
julia> remotecall_fetch(()->A, 2);
julia> B = rand(10,10);
julia> let B = B
remotecall_fetch(()->B, 2)
end;
julia> @fetchfrom 2 InteractiveUtils.varinfo()
name size summary
––––––––– ––––––––– ––––––––––––––––––––––
A 800 bytes 10×10 Array{Float64,2}
Base Module
Core Module
Main Module
如您所見,全域變數 A
定義在工作程序 2 上,但 B
被擷取為一個區域變數,因此 B
的繫結不存在於工作程序 2 上。
平行映射和迴圈
幸運的是,許多有用的平行運算不需要資料移動。一個常見的範例是蒙地卡羅模擬,其中多個程序可以同時處理獨立的模擬試驗。我們可以使用 @spawnat
在兩個程序上擲硬幣。首先,在 count_heads.jl
中撰寫以下函數
function count_heads(n)
c::Int = 0
for i = 1:n
c += rand(Bool)
end
c
end
函數 count_heads
僅將 n
個隨機位元加總。以下是如何在兩台機器上執行一些試驗,並將結果加總
julia> @everywhere include_string(Main, $(read("count_heads.jl", String)), "count_heads.jl")
julia> a = @spawnat :any count_heads(100000000)
Future(2, 1, 6, nothing)
julia> b = @spawnat :any count_heads(100000000)
Future(3, 1, 7, nothing)
julia> fetch(a)+fetch(b)
100001564
此範例展示了一種強大且經常使用的平行程式設計模式。許多反覆運算會在多個程序上獨立執行,然後使用一些函數將其結果組合在一起。組合程序稱為「簡約」,因為它通常會降低張量秩:數字向量會簡約成單一數字,或矩陣會簡約成單一行或單一列,等等。在程式碼中,這通常看起來像模式 x = f(x,v[i])
,其中 x
是累加器,f
是簡約函數,而 v[i]
是要簡約的元素。建議讓 f
具有關聯性,這樣執行運算的順序就不重要了。
請注意,我們對 count_heads
使用此模式可以概括。我們使用了兩個明確的 @spawnat
陳述式,這將平行性限制在兩個程序上。若要在任意數量的程序上執行,我們可以使用在分散式記憶體中執行的「平行 for 迴圈」,可以使用 @distributed
在 Julia 中撰寫如下
nheads = @distributed (+) for i = 1:200000000
Int(rand(Bool))
end
此建構實作將反覆運算分配給多個程序的模式,並將它們與指定的簡約 (在此情況下為 (+)
) 結合。每次反覆運算的結果會視為迴圈內最後一個運算式的值。整個平行迴圈運算式本身會評估為最終答案。
請注意,儘管平行 for 迴圈看起來像序列 for 迴圈,但它們的行為截然不同。特別是,反覆運算不會以指定的順序發生,且寫入變數或陣列不會是全域可見的,因為反覆運算會在不同的程序上執行。平行迴圈內使用的任何變數都會被複製並廣播到每個程序。
例如,以下程式碼無法按預期運作
a = zeros(100000)
@distributed for i = 1:100000
a[i] = i
end
此程式碼不會初始化所有 a
,因為每個程序都會有其個別的副本。必須避免使用此類平行 for 迴圈。幸運的是,共用陣列 可用於解決此限制
using SharedArrays
a = SharedArray{Float64}(10)
@distributed for i = 1:10
a[i] = i
end
如果變數是唯讀的,則在平行迴圈中使用「外部」變數是完全合理的
a = randn(1000)
@distributed (+) for i = 1:100000
f(a[rand(1:end)])
end
在此,每個反覆運算會將 f
套用至所有程序共用的向量 a
中隨機選取的範例。
如您所見,如果不需要簡約運算子,則可以省略它。在這種情況下,迴圈會非同步執行,亦即它會在所有可用的工作執行緒上產生獨立的任務,並立即傳回一個 Future
陣列,而不會等待完成。呼叫者可以在稍後呼叫 fetch
來等待 Future
完成,或在迴圈結束時加上 @sync
前綴來等待完成,例如 @sync @distributed for
。
在某些情況下不需要還原運算子,我們僅希望將函數套用至某個範圍內的所有整數(或更一般地說,套用至某個集合中的所有元素)。這是另一個有用的運算,稱為平行對應,在 Julia 中實作為 pmap
函數。例如,我們可以並行計算多個大型隨機矩陣的奇異值,如下所示
julia> M = Matrix{Float64}[rand(1000,1000) for i = 1:10];
julia> pmap(svdvals, M);
Julia 的 pmap
函數是針對每個函數呼叫執行大量工作的案例而設計的。相反地,@distributed for
可以處理每個反覆運算都很小的情況,例如僅對兩個數字求和。@distributed for
和 pmap
都僅使用工作程序來執行平行運算。在 @distributed for
的情況下,最後的還原會在呼叫程序中執行。
遠端參照和抽象通道
遠端參照總是參照 AbstractChannel
的實作。
AbstractChannel
的具體實作(例如 Channel
)需要實作 put!
、take!
、fetch
、isready
和 wait
。由 Future
參照的遠端物件儲存在 Channel{Any}(1)
中,亦即一個大小為 1 的 Channel
,能夠儲存 Any
類型的物件。
可重新寫入的 RemoteChannel
可以指向任何類型和大小的通道,或 AbstractChannel
的任何其他實作。
建構函數 RemoteChannel(f::Function, pid)()
讓我們能夠建構參照至儲存多個特定類型值的通道。f
是在 pid
上執行的函數,它必須傳回一個 AbstractChannel
。
例如,RemoteChannel(()->Channel{Int}(10), pid)
,將傳回型別為 Int
且大小為 10 的通道參考。通道存在於工作者 pid
上。
方法 put!
、take!
、fetch
、isready
和 wait
在 RemoteChannel
上會委派給遠端程序上的後端儲存。
RemoteChannel
因此可用於參照使用者實作的 AbstractChannel
物件。一個簡單的範例在 範例儲存庫 中的 dictchannel.jl
中提供,它使用字典作為其遠端儲存。
通道和遠端通道
通道
是程序本地的。工作者 2 無法直接參照工作者 3 上的通道
,反之亦然。遠端通道
可以在工作者之間傳送和接收值。遠端通道
可以視為通道
的控制代碼。- 與
遠端通道
關聯的程序 IDpid
會識別後端儲存的程序,亦即後端通道
存在的程序。 - 任何具有
遠端通道
參考的程序都可以傳送和接收通道中的項目。資料會自動傳送到 (或從)遠端通道
關聯的程序。 - 序列化
Channel
也會序列化頻道中存在的任何資料。因此,反序列化它實際上會建立原始物件的副本。 - 另一方面,序列化
RemoteChannel
只涉及識別Channel
位置和實例的識別碼序列化,該識別碼由控制代碼所指。反序列化的RemoteChannel
物件(在任何工作站上)因此也會指向與原始物件相同的後端儲存。
上面的頻道範例可以修改為處理程序間通訊,如下所示。
我們啟動 4 個工作站來處理單一 jobs
遠端頻道。由 ID(job_id
)識別的工作會寫入頻道。此模擬中的每個遠端執行工作會讀取 job_id
,等待一段隨機時間,然後將 job_id
、花費時間和它自己的 pid
的組態寫回結果頻道。最後,所有 results
都會列印在主程序上。
julia> addprocs(4); # add worker processes
julia> const jobs = RemoteChannel(()->Channel{Int}(32));
julia> const results = RemoteChannel(()->Channel{Tuple}(32));
julia> @everywhere function do_work(jobs, results) # define work function everywhere
while true
job_id = take!(jobs)
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
put!(results, (job_id, exec_time, myid()))
end
end
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs
julia> for p in workers() # start tasks on the workers to process requests in parallel
remote_do(do_work, p, jobs, results)
end
julia> @elapsed while n > 0 # print out results
job_id, exec_time, where = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
global n = n - 1
end
1 finished in 0.18 seconds on worker 4
2 finished in 0.26 seconds on worker 5
6 finished in 0.12 seconds on worker 4
7 finished in 0.18 seconds on worker 4
5 finished in 0.35 seconds on worker 5
4 finished in 0.68 seconds on worker 2
3 finished in 0.73 seconds on worker 3
11 finished in 0.01 seconds on worker 3
12 finished in 0.02 seconds on worker 3
9 finished in 0.26 seconds on worker 5
8 finished in 0.57 seconds on worker 4
10 finished in 0.58 seconds on worker 2
0.055971741
遠端參考和分散式垃圾回收
遠端參考所指的物件只能在叢集中的所有持有參考都刪除時才能釋放。
儲存值的節點會追蹤哪些工作者有對它的參考。每次將 RemoteChannel
或(未擷取的)Future
序列化到工作者時,會通知該參考所指的節點。每次在本地將 RemoteChannel
或(未擷取的)Future
當作垃圾回收時,擁有該值的節點也會再次收到通知。這是在內部叢集感知序列化器中實作的。遠端參考僅在執行中叢集的內容中有效。不支援將參考序列化和反序列化到和從一般 IO
物件。
通知是透過傳送「追蹤」訊息來完成的,當參考序列化到不同的處理序時會傳送「新增參考」訊息,當參考在本地當作垃圾回收時會傳送「刪除參考」訊息。
由於 Future
是寫入一次並在本地快取的,fetch
Future
的動作也會更新擁有該值的節點上的參考追蹤資訊。
擁有該值的節點會在清除所有對它的參考後釋放該值。
使用 Future
時,將已擷取的 Future
序列化到不同的節點也會傳送該值,因為原始的遠端儲存可能已經在這時收集了該值。
請務必注意,物件在本地當作垃圾回收的時間取決於物件的大小和系統中目前的記憶體壓力。
在遠端參考的情況下,本機參考物件的大小相當小,而儲存在遠端節點上的值可能相當大。由於本機物件可能不會立即被收集,因此建議在 RemoteChannel
的本機實例上或未擷取的 Future
上明確呼叫 finalize
。由於在 Future
上呼叫 fetch
也會從遠端儲存移除其參考,因此在已擷取的 Future
上不需要這麼做。明確呼叫 finalize
會導致立即傳送訊息至遠端節點,以便繼續移除其對該值的參考。
一旦完成,參考就會失效,且無法在任何進一步的呼叫中使用。
本機呼叫
資料必須複製到遠端節點才能執行。這適用於遠端呼叫和資料儲存在不同節點上的 RemoteChannel
/ Future
的情況。正如預期,這會在遠端節點上產生序列化物件的副本。但是,當目的地節點是本機節點(即呼叫處理序 ID 與遠端節點 ID 相同)時,會將其執行為本機呼叫。通常(但並非總是)在不同的工作中執行,但不會序列化/取消序列化資料。因此,呼叫會參考傳遞的相同物件實例,不會建立副本。此行為在下方重點說明
julia> using Distributed;
julia> rc = RemoteChannel(()->Channel(3)); # RemoteChannel created on local node
julia> v = [0];
julia> for i in 1:3
v[1] = i # Reusing `v`
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[3], [3], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 1
julia> addprocs(1);
julia> rc = RemoteChannel(()->Channel(3), workers()[1]); # RemoteChannel created on remote node
julia> v = [0];
julia> for i in 1:3
v[1] = i
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[1], [2], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 3
正如您所見,在具有相同物件 v
的本機擁有 RemoteChannel
上的 put!
在呼叫之間修改,會產生儲存的相同單一物件實例。這與在擁有 rc
的節點是不同節點時建立 v
的副本相反。
請注意,這通常不是問題。只有在物件同時儲存在本地端,並在呼叫後修改時,才需要考量這點。在這種情況下,儲存物件的 deepcopy
可能比較適當。
這也適用於在本地節點上的遠端呼叫,如下例所示
julia> using Distributed; addprocs(1);
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), myid(), v); # Executed on local node
julia> println("v=$v, v2=$v2, ", v === v2);
v=[1], v2=[1], true
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), workers()[1], v); # Executed on remote node
julia> println("v=$v, v2=$v2, ", v === v2);
v=[0], v2=[1], false
正如再次看到的那樣,對本地節點的遠端呼叫就像直接呼叫一樣。呼叫會修改傳遞為參數的本地物件。在遠端呼叫中,它會對參數的副本進行操作。
再次重申,這通常不是問題。如果本地節點也用作運算節點,而且在呼叫後使用參數,則需要考量此行為,並且如果需要,必須將參數的深度副本傳遞給在本地節點上呼叫的呼叫。對遠端節點的呼叫將始終對參數的副本進行操作。
共用陣列
共用陣列使用系統共用記憶體,將同一個陣列對應到許多處理程序。儘管與 DArray
有些相似之處,但 SharedArray
的行為卻大不相同。在 DArray
中,每個處理程序只能本地存取資料的一部分,而且沒有兩個處理程序共用同一個區塊;相反地,在 SharedArray
中,每個「參與」的處理程序都可以存取整個陣列。SharedArray
非常適合在同一台機器上讓兩個或多個處理程序共同存取大量資料時使用。
共用陣列支援透過模組 SharedArrays
提供,必須在所有參與的工作執行緒上明確載入。
SharedArray
編製索引(指派和存取值)就像一般陣列一樣,而且很有效率,因為底層記憶體可供本機程序使用。因此,大部分演算法都能自然地處理 SharedArray
,儘管是在單一程序模式中。如果演算法堅持要 Array
輸入,可以透過呼叫 sdata
從 SharedArray
中擷取底層陣列。對於其他 AbstractArray
類型,sdata
只會傳回物件本身,因此在任何 Array
類型物件上使用 sdata
都是安全的。
共用陣列的建構式為
SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])
這會建立一個 N
維共用陣列,其位元類型為 T
,大小為 dims
,並跨越 pids
指定的程序。與分散陣列不同,共用陣列只能由 pids
指定的參與工作者(以及建立程序,如果它在同一個主機上)存取。請注意,只有 isbits
的元素才受 SharedArray 支援。
如果指定了簽章為 initfn(S::SharedArray)
的 init
函數,則會在所有參與工作者上呼叫它。您可以指定每個工作者在陣列的不同部分執行 init
函數,從而將初始化平行化。
以下是一個簡短範例
julia> using Distributed
julia> addprocs(3)
3-element Array{Int64,1}:
2
3
4
julia> @everywhere using SharedArrays
julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = repeat([myid()], length(localindices(S))))
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 3 4 4
julia> S[3,2] = 7
7
julia> S
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 7 4 4
SharedArrays.localindices
提供索引的不相交一維範圍,有時可方便地將任務分配給程序。當然,您可以用任何您希望的方式來分配工作
julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = repeat([myid()], length( indexpids(S):length(procs(S)):length(S))))
3×4 SharedArray{Int64,2}:
2 2 2 2
3 3 3 3
4 4 4 4
由於所有程序都可以存取底層資料,因此您必須小心不要設定衝突。例如
@sync begin
for p in procs(S)
@async begin
remotecall_wait(fill!, p, S, p)
end
end
end
會導致未定義的行為。因為每個程序都會以其自己的 pid
填滿整個陣列,因此無論哪個程序最後執行(對於 S
的任何特定元素),都會保留其 pid
。
作為一個更廣泛且複雜的範例,考慮並行執行以下「核心」
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
在這種情況下,如果我們嘗試使用一維索引來分割工作,我們可能會遇到問題:如果 q[i,j,t]
接近分配給一個工作者的區塊的結尾,而 q[i,j,t+1]
接近分配給另一個工作者的區塊的開頭,則 q[i,j,t]
很可能在需要用於計算 q[i,j,t+1]
時尚未準備好。在這種情況下,最好手動分塊陣列。我們沿著第二維度進行分割。定義一個函數,它會傳回分配給此工作者的 (irange, jrange)
索引
julia> @everywhere function myrange(q::SharedArray)
idx = indexpids(q)
if idx == 0 # This worker is not assigned a piece
return 1:0, 1:0
end
nchunks = length(procs(q))
splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
1:size(q,1), splits[idx]+1:splits[idx+1]
end
接下來,定義核心
julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
@show (irange, jrange, trange) # display so we can see what's happening
for t in trange, j in jrange, i in irange
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
end
q
end
我們還定義一個 SharedArray
實作的方便包裝器
julia> @everywhere advection_shared_chunk!(q, u) =
advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)
現在讓我們比較三個不同的版本,一個在單一程序中執行
julia> advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);
一個使用 @distributed
julia> function advection_parallel!(q, u)
for t = 1:size(q,3)-1
@sync @distributed for j = 1:size(q,2)
for i = 1:size(q,1)
q[i,j,t+1]= q[i,j,t] + u[i,j,t]
end
end
end
q
end;
一個分塊委派
julia> function advection_shared!(q, u)
@sync begin
for p in procs(q)
@async remotecall_wait(advection_shared_chunk!, p, q, u)
end
end
q
end;
如果我們建立 SharedArray
並計算這些函數的時間,我們會得到以下結果(使用 julia -p 4
)
julia> q = SharedArray{Float64,3}((500,500,500));
julia> u = SharedArray{Float64,3}((500,500,500));
執行這些函數一次以進行 JIT 編譯,並在第二次執行時 @time
它們
julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
830.220 milliseconds (216 allocations: 13820 bytes)
julia> @time advection_parallel!(q, u);
2.495 seconds (3999 k allocations: 289 MB, 2.09% gc time)
julia> @time advection_shared!(q,u);
From worker 2: (irange,jrange,trange) = (1:500,1:125,1:499)
From worker 4: (irange,jrange,trange) = (1:500,251:375,1:499)
From worker 3: (irange,jrange,trange) = (1:500,126:250,1:499)
From worker 5: (irange,jrange,trange) = (1:500,376:500,1:499)
238.119 milliseconds (2264 allocations: 169 KB)
advection_shared!
最大的優點是它將工作者之間的流量減到最低,允許每個工作者在分配的片段上進行長時間的計算。
共用陣列和分散式垃圾回收
與遠端參考類似,共享陣列也依賴建立節點上的垃圾回收機制,以釋放所有參與工作者的參考。建立許多短暫共享陣列物件的程式碼會受益於盡快明確完成這些物件。這將導致同時釋放對應共享區段的記憶體和檔案處理序。
叢集管理員
將 Julia 程序啟動、管理和網路化到邏輯叢集是透過叢集管理員完成的。ClusterManager
負責
- 在叢集環境中啟動工作者程序
- 管理每個工作者生命週期中的事件
- 選擇性地提供資料傳輸
Julia 叢集具有下列特徵
- 最初的 Julia 程序,也稱為
master
,是特殊的,且具有 1 的id
。 - 只有
master
程序可以新增或移除工作者程序。 - 所有程序都可以直接彼此通訊。
工作者之間的連線(使用內建 TCP/IP 傳輸)會以下列方式建立
- 在主控程序上呼叫
addprocs
,並提供ClusterManager
物件。 addprocs
呼叫適當的launch
方法,在適當的機器上產生所需數量的工作者程序。- 每個工作者開始在一個空閒埠上監聽,並將其主機和埠資訊寫入
stdout
。 - 叢集管理員擷取每個工作者的
stdout
,並讓主處理程序可以使用。 - 主處理程序會分析此資訊,並設定與每個工作者的 TCP/IP 連線。
- 每個工作者也會收到叢集中其他工作者的通知。
- 每個工作者會連線到所有
id
小於工作者本身id
的工作者。 - 如此便建立一個網狀網路,其中每個工作者都直接與其他每個工作者連線。
雖然預設傳輸層使用純粹的 TCPSocket
,但 Julia 叢集可以提供自己的傳輸。
Julia 提供兩個內建的叢集管理員
LocalManager
,在呼叫addprocs()
或addprocs(np::Integer)
時使用SSHManager
,在呼叫addprocs(hostnames::Array)
並提供主機名稱清單時使用
LocalManager
用於在同一主機上啟動其他工作者,從而利用多核心和多處理器硬體。
因此,一個最小的叢集管理員需要
addprocs(manager::FooManager)
要求 FooManager
實作
function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
舉例來說,讓我們看看 LocalManager
(負責在同一主機上啟動工作者的管理員)是如何實作的
struct LocalManager <: ClusterManager
np::Integer
end
function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
launch
方法採用下列引數
manager::ClusterManager
:呼叫addprocs
的叢集管理員params::Dict
:傳遞給addprocs
的所有關鍵字參數launched::Array
:附加一個或多個WorkerConfig
物件的陣列c::Condition
:在啟動工作人員時通知的條件變數
launch
方法會在個別的任務中非同步呼叫。此任務的終止表示已啟動所有要求的工作人員。因此,一旦啟動所有要求的工作人員,launch
函式就必須結束。
新啟動的工作人員會以全對全的方式連接到彼此和主控處理程序。指定命令列參數 --worker[=<cookie>]
會讓啟動的處理程序初始化為工作人員,並透過 TCP/IP Socket 設定連線。
叢集中的所有工作人員與主控處理程序共用相同的 cookie。當未指定 cookie,亦即使用 --worker
選項時,工作人員會嘗試從其標準輸入中讀取 cookie。LocalManager
和 SSHManager
都會透過其標準輸入將 cookie 傳遞給新啟動的工作人員。
預設情況下,工作人員會在呼叫 getipaddr()
傳回的位址上監聽一個可用埠。可以透過選用參數 --bind-to bind_addr[:port]
指定要監聽的特定位址。這對多宿主的電腦很有用。
作為非 TCP/IP 傳輸的範例,實作可以選擇使用 MPI,這種情況下不得指定 --worker
。相反地,新啟動的工作人員應在使用任何平行建構之前呼叫 init_worker(cookie)
。
對於每個啟動的工作者,launch
方法必須將 WorkerConfig
物件(已初始化適當欄位)加入 launched
mutable struct WorkerConfig
# Common fields relevant to all cluster managers
io::Union{IO, Nothing}
host::Union{AbstractString, Nothing}
port::Union{Integer, Nothing}
# Used when launching additional workers at a host
count::Union{Int, Symbol, Nothing}
exename::Union{AbstractString, Cmd, Nothing}
exeflags::Union{Cmd, Nothing}
# External cluster managers can use this to store information at a per-worker level
# Can be a dict if multiple fields need to be stored.
userdata::Any
# SSHManager / SSH tunnel connections to workers
tunnel::Union{Bool, Nothing}
bind_addr::Union{AbstractString, Nothing}
sshflags::Union{Cmd, Nothing}
max_parallel::Union{Integer, Nothing}
# Used by Local/SSH managers
connect_at::Any
[...]
end
WorkerConfig
中的大部分欄位由內建管理員使用。自訂叢集管理員通常只會指定 io
或 host
/ port
如果指定
io
,則用於讀取主機/埠資訊。Julia 工作者會在啟動時列印其繫結位址和埠。這讓 Julia 工作者可以監聽任何可用的免費埠,而不必手動設定工作者埠。如果未指定
io
,則使用host
和port
進行連線。count
、exename
和exeflags
與從工作者啟動其他工作者有關。例如,叢集管理員可能會在每個節點上啟動一個工作者,並使用該工作者啟動其他工作者。count
具有整數值n
會啟動總計n
個工作者。count
具有值:auto
會啟動與該機器上的 CPU 執行緒數(邏輯核心)一樣多的工作者。exename
是julia
可執行檔的名稱,包括完整路徑。exeflags
應設定為新工作者的必要命令列引數。
當需要 SSH 通道從主處理程序連線到工作者時,會使用
tunnel
、bind_addr
、sshflags
和max_parallel
。提供
userdata
給自訂叢集管理員,以儲存其自己的工作者特定資訊。
manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
會在工作者的生命週期中不同時間呼叫,並具有適當的 op
值
- 當工作者從 Julia 工作者池中新增/移除時,會使用
:register
/:deregister
。 - 當呼叫
interrupt(workers)
時,會使用:interrupt
。ClusterManager
應使用中斷訊號通知適當的工作者。 - 搭配
:finalize
以進行清理。
叢集管理員與自訂傳輸
將預設的 TCP/IP 全對全通訊端點連線替換為自訂傳輸層會稍微複雜一些。每個 Julia 程序都有與其連線的工作程序一樣多的通訊工作。例如,考慮一個在全對全網格網路中有 32 個程序的 Julia 叢集
- 因此,每個 Julia 程序有 31 個通訊工作。
- 每個工作會在訊息處理迴圈中處理來自單一遠端工作程序的所有輸入訊息。
- 訊息處理迴圈會等待
IO
物件(例如,預設實作中的TCPSocket
),讀取整個訊息,處理它,然後等待下一個訊息。 - 傳送訊息給程序會直接從任何 Julia 工作(不只是通訊工作)進行,再次透過適當的
IO
物件。
替換預設傳輸需要新的實作設定與遠端工作程序的連線,並提供適當的 IO
物件,訊息處理迴圈可以在其上等待。要實作的管理員特定回呼函式為
connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)
預設實作(使用 TCP/IP 通訊端點)實作為 connect(manager::ClusterManager, pid::Integer, config::WorkerConfig)
。
connect
應傳回一對 IO
物件,一個用於讀取從工作程序 pid
傳送的資料,另一個用於寫入需要傳送給工作程序 pid
的資料。自訂叢集管理員可以使用記憶體中的 BufferStream
作為管道,在自訂(可能不是 IO
)傳輸與 Julia 內建平行基礎架構之間代理資料。
BufferStream
是記憶體中的 IOBuffer
,其行為類似於 IO
,它是一個可以非同步處理的串流。
範例儲存庫中的資料夾 clustermanager/0mq
包含一個範例,說明如何使用 ZeroMQ 將 Julia 工作者連接到星狀拓撲,中間有一個 0MQ 代理程式。注意:Julia 程序在邏輯上仍彼此連接,任何工作者都可以直接傳送訊息給其他工作者,而無需知道 0MQ 被用作傳輸層。
使用自訂傳輸時
- Julia 工作者不能使用
--worker
啟動。使用--worker
啟動會導致新啟動的工作者預設使用 TCP/IP socket 傳輸實作。 - 對於每個與工作者的內部邏輯連線,都必須呼叫
Base.process_messages(rd::IO, wr::IO)()
。這會啟動一個新工作,處理由IO
物件代表的工作者傳送和接收的訊息。 init_worker(cookie, manager::FooManager)
必須在工作者程序初始化時呼叫。- 當呼叫
launch
時,群集管理員可以在WorkerConfig
中設定欄位connect_at::Any
。此欄位的值會傳遞給所有connect
回呼。它通常會傳遞有關如何連線到工作者的資訊。例如,TCP/IP socket 傳輸會使用此欄位來指定連線到工作者的(host, port)
組合。
呼叫 kill(manager, pid, config)
會從群集中移除一個工作者。在主程序中,實作必須關閉對應的 IO
物件,以確保正確清理。預設實作只會在指定的遠端工作者上執行 exit()
呼叫。
範例資料夾 clustermanager/simple
是個範例,說明如何使用 UNIX 域 socket 進行群集設定。
LocalManager 和 SSHManager 的網路需求
Julia 集群設計為執行在基礎架構上已確保安全的環境中,例如本機筆電、部門集群,甚至雲端。本節涵蓋內建的 LocalManager
和 SSHManager
的網路安全需求
主控程序不會在任何埠上監聽。它僅會連線到工作程序。
每個工作程序僅繫結到一個本機介面,並在作業系統指定的臨時埠號上監聽。
LocalManager
由addprocs(N)
使用,預設只繫結到迴圈介面。這表示稍後在遠端主機(或任何具有惡意意圖的人)啟動的工作程序無法連線到集群。addprocs(4)
後接addprocs(["remote_host"])
會失敗。有些使用者可能需要建立一個包含其本機系統和幾個遠端系統的集群。這可透過明確要求LocalManager
透過restrict
關鍵字參數繫結到外部網路介面來完成:addprocs(4; restrict=false)
。SSHManager
由addprocs(list_of_remote_hosts)
使用,透過 SSH 在遠端主機上啟動工作程序。預設上,SSH 僅用於啟動 Julia 工作程序。後續的主控程序-工作程序和工作程序-工作程序連線使用純文字、未加密的 TCP/IP socket。遠端主機必須啟用免密碼登入。額外的 SSH 旗標或憑證可透過關鍵字參數sshflags
指定。addprocs(list_of_remote_hosts; tunnel=true, sshflags=<ssh keys and other flags>)
在我們希望也對主控程序-工作程序使用 SSH 連線時很有用。一個典型的場景是執行 Julia REPL(即主控程序)的本機筆電,而集群的其餘部分在雲端,例如 Amazon EC2。在這種情況下,僅需在遠端集群開啟埠 22,並透過公開金鑰基礎架構 (PKI) 驗證 SSH 用戶端。驗證憑證可透過sshflags
提供,例如sshflags=`-i <keyfile>`
。在全對全拓撲(預設)中,所有工作者透過一般 TCP socket 彼此連線。因此,叢集節點上的安全性政策必須確保工作者之間在臨時埠範圍(因作業系統而異)的連線暢通。
透過自訂的
ClusterManager
可以保護和加密所有工作者與工作者之間的流量(透過 SSH)或加密個別訊息。如果你指定
multiplex=true
作為addprocs
的選項,SSH 多工將用於在主控端和工作者之間建立通道。如果你已自行設定 SSH 多工且連線已建立,SSH 多工將用於不管multiplex
選項為何。如果啟用多工,將使用現有連線(ssh 中的-O forward
選項)設定轉送。如果你的伺服器需要密碼驗證,這會很有用;你可以在addprocs
之前登入伺服器,以避免在 Julia 中進行驗證。在會話期間,控制 socket 將位於~/.ssh/julia-%r@%h:%p
,除非使用現有的多工連線。請注意,如果你在一個節點上建立多個程序並啟用多工,頻寬可能會受到限制,因為在這種情況下,程序會共用一個多工 TCP 連線。
叢集 Cookie
叢集中的所有程序共用相同的 Cookie,預設情況下,Cookie 是主控端程序中隨機產生的字串
cluster_cookie()
會傳回 Cookie,而cluster_cookie(cookie)()
會設定 Cookie 並傳回新的 Cookie。- 所有連線在兩端都經過驗證,以確保只有由主控程式啟動的工作人員才能彼此連線。
- Cookie 可以透過啟動時參數
--worker=<cookie>
傳遞給工作人員。如果指定參數--worker
但沒有 Cookie,工作人員會嘗試從其標準輸入 (stdin
) 讀取 Cookie。在擷取 Cookie 後,stdin
會立即關閉。 ClusterManager
可以透過呼叫cluster_cookie()
在主控程式上擷取 Cookie。未採用預設 TCP/IP 傳輸 (因此未指定--worker
) 的叢集管理員必須呼叫init_worker(cookie, manager)
,其 Cookie 與主控程式上的相同。
請注意,需要更高層級安全性的環境可以透過自訂 ClusterManager
來實作這一點。例如,Cookie 可以預先共用,因此不必指定為啟動時參數。
指定網路拓撲 (實驗性質)
傳遞給 addprocs
的關鍵字參數 topology
用於指定工作人員必須如何彼此連線
:all_to_all
,預設值:所有工作人員都彼此連線。:master_worker
:只有驅動程式程序,即pid
1,有連線到工作人員。:custom
:群集管理員之launch
方法透過WorkerConfig
中的ident
和connect_idents
欄位指定連線拓撲。具有群集管理員提供的識別碼ident
的工作人員將連線至connect_idents
中指定的所有工作人員。
關鍵字引數 lazy=true|false
僅影響 topology
選項 :all_to_all
。如果為 true
,群集會從主控端連線至所有工作人員開始。特定工作人員與工作人員之間的連線會在兩個工作人員之間的第一次遠端呼叫中建立。這有助於減少配置給群集內通訊的初始資源。連線會根據平行程式的執行時間需求設定。lazy
的預設值為 true
。
目前,在未連線的工作人員之間傳送訊息會導致錯誤。此行為與功能和介面一樣,應視為實驗性質,且未來版本可能會變更。
值得注意的外部套件
在 Julia 並行化之外,還有許多值得一提的外部套件。例如 MPI.jl 是 MPI
協定的 Julia 封裝器,Dagger.jl 提供類似 Python Dask 的功能,而 DistributedArrays.jl 提供分佈在工作人員中的陣列運算,如 共享陣列 中所述。
必須提到 Julia 的 GPU 程式設計生態系統,其中包括
CUDA.jl 封裝各種 CUDA 函式庫,並支援為 Nvidia GPU 編譯 Julia 核心。
oneAPI.jl 封裝 oneAPI 統一程式設計模型,並支援在受支援的加速器上執行 Julia 核心。目前僅支援 Linux。
AMDGPU.jl 封裝 AMD ROCm 函式庫,並支援為 AMD GPU 編譯 Julia 核心。目前僅支援 Linux。
高階函式庫,例如 KernelAbstractions.jl、Tullio.jl 和 ArrayFire.jl。
在以下範例中,我們將同時使用 DistributedArrays.jl
和 CUDA.jl
,透過先將陣列透過 distribute()
和 CuArray()
進行轉換,將陣列分佈到多個處理程序中。
請記住,在匯入 DistributedArrays.jl
時,要使用 @everywhere
在所有處理程序中匯入它。
$ ./julia -p 4
julia> addprocs()
julia> @everywhere using DistributedArrays
julia> using CUDA
julia> B = ones(10_000) ./ 2;
julia> A = ones(10_000) .* π;
julia> C = 2 .* A ./ B;
julia> all(C .≈ 4*π)
true
julia> typeof(C)
Array{Float64,1}
julia> dB = distribute(B);
julia> dA = distribute(A);
julia> dC = 2 .* dA ./ dB;
julia> all(dC .≈ 4*π)
true
julia> typeof(dC)
DistributedArrays.DArray{Float64,1,Array{Float64,1}}
julia> cuB = CuArray(B);
julia> cuA = CuArray(A);
julia> cuC = 2 .* cuA ./ cuB;
julia> all(cuC .≈ 4*π);
true
julia> typeof(cuC)
CuArray{Float64,1}
在以下範例中,我們將同時使用 DistributedArrays.jl
和 CUDA.jl
,將陣列分佈到多個處理程序中,並對它呼叫一般函數。
function power_method(M, v)
for i in 1:100
v = M*v
v /= norm(v)
end
return v, norm(M*v) / norm(v) # or (M*v) ./ v
end
power_method
重複建立新的向量並將其正規化。我們未在函數宣告中指定任何類型簽章,讓我們看看它是否適用於上述資料類型
julia> M = [2. 1; 1 1];
julia> v = rand(2)
2-element Array{Float64,1}:
0.40395
0.445877
julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)
julia> cuM = CuArray(M);
julia> cuv = CuArray(v);
julia> curesult = power_method(cuM, cuv);
julia> typeof(curesult)
CuArray{Float64,1}
julia> dM = distribute(M);
julia> dv = distribute(v);
julia> dC = power_method(dM, dv);
julia> typeof(dC)
Tuple{DistributedArrays.DArray{Float64,1,Array{Float64,1}},Float64}
為了結束對外部套件的簡短介紹,我們可以考慮 MPI.jl
,它是 MPI 協定的 Julia 封裝。由於要考量每個內部函數會花費太長的時間,因此最好只欣賞用於實作協定的方法。
考慮這個玩具腳本,它只會呼叫每個子處理程序,實例化其等級,並且當到達主處理程序時,執行等級總和
import MPI
MPI.Init()
comm = MPI.COMM_WORLD
MPI.Barrier(comm)
root = 0
r = MPI.Comm_rank(comm)
sr = MPI.Reduce(r, MPI.SUM, root, comm)
if(MPI.Comm_rank(comm) == root)
@printf("sum of ranks: %s\n", sr)
end
MPI.Finalize()
mpirun -np 4 ./julia example.jl
- 1在此背景下,MPI 指的是 MPI-1 標準。從 MPI-2 開始,MPI 標準委員會引入了一組新的通訊機制,統稱為遠端記憶體存取 (RMA)。將 RMA 加入 MPI 標準的動機是為了促進單向通訊模式。如需有關最新 MPI 標準的更多資訊,請參閱 https://mpi-forum.org/docs。