- Le 2020-11-19, j'utilise Julia_1.5.3 sur VS_Code_1.51.1 avec un W10 Pro, i9-10900F, 10 coeurs.
- Préalable
- Vous n'êtes pas un novice en programmation.
- Mes billets précédents sur Julia sont supposés connus et assimilés.
BenchmarkTools v0.5.0
CSV v0.8.0
Cairo v1.0.5
Compose v0.8.2
DataFrames v0.22.0
DistributedArrays v0.6.5
Fontconfig v0.4.0
Genie v1.8.0
Geodesy v0.5.0
GraphIO v0.5.0
GraphPlot v0.4.3
Gtk v1.1.5
IJulia v1.23.0
LightGraphs v1.3.3
LightGraphsExtras v0.3.0
Memoize v0.4.3
MetaGraphs v0.6.6
Plots v1.9.1
Query v1.0.0
SimpleWeightedGraphs v1.1.1
Weave v0.10.6
WinRPM v1.0.0
SharedArrays
Nous allons apprendre à réaliser du calcul parallèle dans Julia. Les exemples proviennent du site web de Julia et d'autres sources sur le web.
Première étape, connaître le nombre de coeurs de votre processeur. Dans Paremètres -> Système -> Informations système se trouve le nom exact de votre processeur. Faites une recherche sur ce nom pour trouver le nombre de coeurs.
Par exemple, pour un i7-9750h, c'est 6 coeurs. Pour le calcul parallèle, le nombre de coeurs disponibles est votre nombre de coeurs - 1. Pour l'i7-9750h vous avez donc 5 coeurs disponibles qui seront numérotés dans Julia de 2 à 6.
using Distributed
addprocs(5)
#=
On vient d'ajouter 5 coeurs.
Attention à réduire ce nombre si votre processeur
à moins de coeurs disponibles.
Pour Julia un coeur est un worker (travailleur)
=#
println("nprocs() = $(nprocs())\n")
println("nworkers() = $(nworkers())\n")
println("workers() = $(workers())\n")
#=
Générer deux matrices formées par des nombres aléatoires.
L'une des matrices est composée de nombres réels,
tandis que l'autre contient des entiers entre 1 et 8.
=#
f1 = remotecall(rand, 2, 2, 2)
# la fonction rand, id du worker, les arguments de la fonction, ici rand(2, 2)
f2 = remotecall(rand, 3, 1:8, 3, 4)
# la fonction rand, id du worker, les arguments de la fonction ici rand(1:8, 3, 4)
println("f1 = $(f1)\n")
#=
f1 et f2 sont des 'Future', la méthode fetch(f1) donnera le résultat du calcul.
N'écrivez pas : r1 = fetch(remotecall(rand, 2, 2, 2))
mais : r1 = remotecall_fetch(rand, 2, 2, 2)
c'est plus performant.
=#
println("fetch(f1) = $(fetch(f1))\n")
println("fetch(f2) = $(fetch(f2))\n")
#=
Il existe d'autres moyens d'envoyer du travail aux différends workers.
Dans le code suivant, nous créons une matrice aléatoire puis nous
ajoutons 1 à tous ses éléments.
=#
# @spawnat id du worker expression
f3 = @spawnat 4 rand(2, 2)
f4 = @spawnat 5 1 .+ fetch(f3)
println("fetch(f4) = $(fetch(f4))\n")
# @spawn laisse Julia sélectionner le worker.
f5 = @spawn rand(2, 2)
f6 = @spawn 1 .+ fetch(f5)
println("fetch(f6) = $(fetch(f6))\n")
#=
Nous pouvons utiliser nos propres fonctions pour
faire des calculs en parallèle.
Notre fonction doit être déclarée après l'ajout des workers et
commencer par la macro @everywhere, sinon les workers ignore
son existence.
Nous créons une fonction qui retourne la racine carrée du nombre
n multipler par un nombre aléatoire.
Julia peut utiliser de nombreux symboles voir
https://docs.julialang.org/en/v1/manual/unicode-input/
Pour obtenir √ on écrit \sqrt suivit d'une frappe sur la touche
de tabulation.
=#
@everywhere function sqrt_rand(n)
return √(n) * rand()
end
f7 = @spawnat 6 [sqrt_rand(n) for n=1:10]
println("fetch(f7) = $(fetch(f7))\n")
#=
nprocs() = 6
nworkers() = 5
workers() = [2, 3, 4, 5, 6]
f1 = Future(2, 1, 7, nothing)
=#
# Obtenir des informations sur les workers disponibles
for i in workers()
id, pid, host = fetch(@spawnat i (myid(), getpid(), gethostname()))
println("id = $id ; pid = $pid sur $host")
end
#=
id = 2 ; pid = 9228 sur DESKTOP-PRO64S4
id = 3 ; pid = 13884 sur DESKTOP-PRO64S4
id = 4 ; pid = 3344 sur DESKTOP-PRO64S4
id = 5 ; pid = 10944 sur DESKTOP-PRO64S4
id = 6 ; pid = 14764 sur DESKTOP-PRO64S4
=#
Le but du calcul parallèle est d'améliorer les performances, et à cette fin, il est important de prêter attention à toutes les étapes impliquées. En particulier, tout ce qui concerne la quantité d'informations transmises entre les processus et la portée des variables est crucial.
using Distributed
addprocs(5)
A = rand(10, 10);
# Nous construisons la matrice A localement.
@time begin
f1 = @spawn A^2;
# Pour effectuer l'opération désirée, nous devons déplacer les données dans un autre worker.
println("fetch(f1) = $(fetch(f1))\n")
end # 1.411503 seconds (1.34 M allocations: 68.992 MiB, 0.67% gc time)
@time begin
f2 = @spawn rand(10, 10)^2;
# Ici on construit la matrice et on exécute l'opération dans le même worker.
println("fetch(f2) = $(fetch(f2))\n")
end # 1.079813 seconds (438 allocations: 61.073 KiB)
#=
L'utilisation d'une approche ou d'une autre dépendra des
nécessités. Si le processus principal (le n° 1) a besoin
de la matrice, la première approche est la meilleure.
=#
@everywhere using InteractiveUtils, LinearAlgebra
@everywhere function add_un(A)
return 1 .+ A
end
A = rand(2, 2)
println(varinfo(), "\n")
#=
| name | size | summary |
|:------ | ---------:|:-------------------- |
| A | 72 bytes | 2×2 Array{Float64,2} |
| Base | | Module |
| Core | | Module |
| Main | | Module |
| add_un | 0 bytes | typeof(add_un) |
| f1 | 880 bytes | Future |
| f2 | 880 bytes | Future |
Nous voyons que A est créé localement
=#
@spawnat 2 println(varinfo(), "\n")
#=
From worker 2: |
Nous voyons qu'il n'y a toujours rien dans le worker 2
=#
f3 = @spawnat 2 add_un(A)
println("fetch(f3) = $(fetch(f3))\n")
# fetch(f3) = [1.0480350106870235 1.7025735086470284; 1.0338404628511426 1.9987156034573772]
@spawnat 2 println(varinfo(), "\n")
#=
From worker 2: | name | size | summary |
From worker 2: |:----------- | -----------:|:---------------------- |
From worker 2: | A | 840 bytes | 10×10 Array{Float64,2} |
From worker 2: | Base | | Module |
From worker 2: | Core | | Module |
From worker 2: | Distributed | 899.158 KiB | Module |
From worker 2: | Main | | Module |
From worker 2: | add_un | 0 bytes | typeof(add_un) |
From worker 2:
From worker 2:
From worker 2: | name | size | summary |
From worker 2: |:----------- | -----------:|:-------------------- |
From worker 2: | A | 72 bytes | 2×2 Array{Float64,2} |
From worker 2: | Base | | Module |
From worker 2: | Core | | Module |
From worker 2: | Distributed | 902.546 KiB | Module |
From worker 2: | Main | | Module |
From worker 2: | add_un | 0 bytes | typeof(add_un) |
From worker 2:
From worker 2:
Nous voyons que A consomme beaucoup de place dans le worker 2.
=#
X = rand(2, 2) # X car A a été modifié par add_un(A)
# Nous allons travailler dans une portée locale avec let end.
let B = X
f4 = @spawnat 3 add_un(B) # nour travaillons sur le worker 3
println("fetch(f4) = $(fetch(f4))\n")
# fetch(f4) = [1.755934161883382 1.2047407578469025; 1.0398048146365817 1.5077782104581243]
@spawnat 3 println(varinfo(), "\n")
#=
From worker 3: | name
=#
end
Nous allons maintenant apprendre deux constructions courantes utilisées dans le calcul parallèle: @distributed et pmap().
@distributed : https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.@distributed
pmap : https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.pmap
Nous serons intéressés par l'utilisation de @distributed lorsque nous aurons un très grand nombre de tâches faciles et indépendantes à faire.
using Distributed
addprocs(5)
#=
@distributed for var = range
body
end
La plage spécifiée est partitionnée et exécutée localement sur
tous les nœuds de calcul. Dans le cas où une fonction de
réduction facultative est spécifiée, @distributed effectue
des réductions locales sur chaque travailleur avec une
réduction finale sur le processus appelant.
Notez que sans fonction de réduction, @distributed s'exécute
de manière asynchrone, c'est-à-dire qu'il génère des tâches
indépendantes sur tous les nœuds de calcul disponibles et
revient immédiatement sans attendre la fin. Pour attendre
la fin, préfixez l'appel avec @sync, comme:
@sync @distributed for var = range
body
end
=#
@time begin
n = 200_000_000
# Approximation de π par la méthode de Monte Carlo (aire du cercle).
piAprox = @distributed (+) for i = 1:n
Int(rand()^2 + rand()^2 <= 1)
end
piAprox = piAprox * 4.0 / n
println("π - piAprox = $(π - piAprox)\n")
# π - piAprox = 4.345358979307434e-5
end # 0.261304 seconds (106.56 k allocations: 5.455 MiB)
@everywhere using LinearAlgebra
#=
pmap(f, [::AbstractWorkerPool], c...; distributed=true,
batch_size=1, on_error=nothing, retry_delays=[],
retry_check=nothing) -> collection
On transforme la collection c en appliquant f à chaque
élément à l'aide des workers disponibles.
=#
@time begin
# Nous créons un array de matrices.
M = Matrix{Float64}[rand(100,100) for i = 1:200]
R = pmap(svdvals, M)
# svdvals : https://docs.julialang.org/en/v1/stdlib/LinearAlgebra/#LinearAlgebra.svdvals
println("length(R) = $(length(R))\n")
# length(R) = 100
println("R = $(R)\n")
# R = 49.68770632801252
end # 0.421247 seconds (1.26 M allocations: 82.347 MiB, 2.03% gc time)
Dans les codes précédents, nous avons vu comment lancer différentes tâches simultanément et comment les gérer de manière asynchrone.
Nous n'avons eu aucun problème lié au partage d'informations, car toutes les tâches ont été exécutées dans le worker 1, bien que des fonctions aient été évaluées dans différent workers.
Channel permet d'implémenter facilement la parallélisation lorsque nous avons besoin de lire des données, de les traiter et de les écrire dans différents workers.
Un Channel peut être considéré comme une file d'attente dans un supermarché : vous approvisionnez les éléments par l'arrière, mais vous les prenez par l'avant.
using Distributed
addprocs(5)
#=
RemoteChannel : https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.RemoteChannel
Channel : https://docs.julialang.org/en/v1/base/parallel/#Base.Channel
Le premier paramètre d'un Channel est le type et le second
le nombre maximum d'éléments disponibles pour ce Channel
=#
const jobs = RemoteChannel(() -> Channel{Int}(32));
const results = RemoteChannel(() -> Channel{Tuple}(32));
@everywhere function do_work(jobs, results)
while true
# take! : https://docs.julialang.org/en/v1/base/parallel/#Base.take!-Tuple{Channel}
job_id = take!(jobs)
exec_time = rand(1:3)
sleep(exec_time)
put!(results, (job_id, exec_time, myid()))
end
end
function make_jobs(nb)
for i in 1:nb
put!(jobs, i)
end
end
let nb_jobs = 12
make_jobs(nb_jobs)
@time begin
for p in workers()
# remote_do : https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.remote_do-Tuple{Any,AbstractWorkerPool,Vararg{Any,N}%20where%20N}
remote_do(do_work, p, jobs, results)
end
while nb_jobs > 0
job_id, exec_time, where = take!(results)
println("Le job $job_id a été terminé en $(round(exec_time, digits=2)) secondes sur le worker $where")
nb_jobs -= 1
end
end
end
#=
Le job 4 a été terminé en 1.0 secondes sur le worker 3
Le job 1 a été terminé en 2.0 secondes sur le worker 2
Le job 2 a été terminé en 2.0 secondes sur le worker 5
Le job 3 a été terminé en 3.0 secondes sur le worker 4
Le job 5 a été terminé en 3.0 secondes sur le worker 6
Le job 7 a été terminé en 1.0 secondes sur le worker 2
Le job 10 a été terminé en 1.0 secondes sur le worker 6
Le job 8 a été terminé en 2.0 secondes sur le worker 5
Le job 11 a été terminé en 1.0 secondes sur le worker 2
Le job 6 a été terminé en 3.0 secondes sur le worker 3
Le job 9 a été terminé en 3.0 secondes sur le worker 4
Le job 12 a été terminé en 2.0 secondes sur le worker 6
6.380669 seconds (806.07 k allocations: 40.362 MiB, 0.07% gc time)
=#
Dans le code précédent, le lecteur aura certainement compris que si un Channel est utile, il ne convient pas à toutes les situations.
Nous aurons souvent besoin de travailler avec un Array, mais en parallèle (par exemple: pour un algorithme de multiplication matricielle).
SharedArray satisfera notre besoin. C'est juste un tableau, mais accessible dans n'importe quel worker.
SharedArrays : https://docs.julialang.org/en/v1/stdlib/SharedArrays/
using Distributed
addprocs(5)
#=
SharedArray{T, N}(dims::NTuple, init = false, pids = Int[])
T représente le type
N est la dimension du tableau (par exemple: Float64, 2 pour une matrice).
dims recueille, entre parenthèses, le nombre d'éléments dans chaque dimension
(par exemple: (3,2) pour une matrice 3 × 2).
init : ce paramètre ne fonctionne dans aucun de mes tests, les exemples que
j'ai trouvés sur le web ne fonctionnent pas.
pids est un vecteur donnant les workers qui peuvent accéder au tableau partagé.
Si rien n'est spécifié dans pids, tous les workers peuvent accéder au tableau
partagé.
=#
@everywhere using SharedArrays
N = 10_000
# calcul du produit scalaire
x = rand(0:5, N)
c = rand(0:1, N)
output = SharedArray{Int, 1}(N);
r = @distributed (+) for i = 1:N
output = x * c;
end
println("r = $r\n")
# liste des nombres premiers
@everywhere function isprime(n)
for i = 2:Int(floor(sqrt(n)))
if n % i == 0;
return false
end
end
return true
end
P = SharedArray{Int, 1}(N);
# rappel : sans opérateur @distributed doit être précédé d'un @sync
@time @sync @distributed for i = 1:N
if isprime(i)
P = i
end
end # 0.081662 seconds (224.33 k allocations: 11.448 MiB)
A = Array{Int64, 1}()
for i=1:N
if P != 0
push!(A, i)
end
end
println("A = $A\n")
Une manière d'obtenir le parallélisme consiste à répartir un Array entre plusieurs workers.
Chaque worker peut lire et écrire dans la partie de l'Array qu'il possède et a un accès en lecture seule aux parties qu'il ne possède pas.
Les tableaux distribués Julia sont implémentés par le type DArray.
DistributedArrays : https://juliaparallel.github.io/DistributedArrays.jl/stable/
using Distributed
addprocs(5)
@everywhere using DistributedArrays, LinearAlgebra, InteractiveUtils
@everywhere function aa(n)
la = zeros(n, n)
la[diagind(la, 0)] .= 2.0
la[diagind(la, -1)] .= -1.0
la[diagind(la, 1)] .= -1.0
return la
end
@everywhere function b1(n)
la = zeros(n, n)
la[1, n] = -1.0
return la
end
@everywhere function b2(n)
la = zeros(n, n)
la[n, 1] = -1.0
return la
end
# Appeler les fonctions sur les workers pour créer des parties locales
n = 4
d11 = @spawnat 2 aa(n)
d12 = @spawnat 3 b1(n)
d21 = @spawnat 4 b2(n)
d22 = @spawnat 5 aa(n)
# Créer une matrice distribuée sur une grille de workers 2 * 2
A = reshape([d11 d21 d12 d22], (2, 2))
DA = DArray(A) # DistributedArrays
println(typeof(DA)) # DArray{Float64,2,Array{Float64,2}}
println(length(DA)) # 64
println(DA) # 0.0
println(DA)
#=
[2.0 -1.0 0.0 0.0 0.0 0.0 0.0 -1.0;
-1.0 2.0 -1.0 0.0 0.0 0.0 0.0 0.0;
0.0 -1.0 2.0 -1.0 0.0 0.0 0.0 0.0;
0.0 0.0 -1.0 2.0 0.0 0.0 0.0 0.0;
0.0 0.0 0.0 0.0 2.0 -1.0 0.0 0.0;
0.0 0.0 0.0 0.0 -1.0 2.0 -1.0 0.0;
0.0 0.0 0.0 0.0 0.0 -1.0 2.0 -1.0;
-1.0 0.0 0.0 0.0 0.0 0.0 -1.0 2.0]
=#
println(varinfo())
#=
| name | size | summary |
|:---- | ---------:|:-------------------------------------- |
| A | 200 bytes | 2×2 Array{Future,2} |
| Base | | Module |
| Core | | Module |
| DA | 544 bytes | 8×8 DArray{Float64,2,Array{Float64,2}} |
| Main | | Module |
| aa | 0 bytes | typeof(aa) |
| b1 | 0 bytes | typeof(b1) |
| b2 | 0 bytes | typeof(b2) |
| d11 | 32 bytes | Future |
| d12 | 32 bytes | Future |
| d21 | 32 bytes | Future |
| d22 | 32 bytes | Future |
| n | 8 bytes | Int64 |
=#
DB = DA * DA
println(DB)
#=
[6.0 -4.0 1.0 0.0 0.0 0.0 1.0 -4.0;
-4.0 6.0 -4.0 1.0 0.0 0.0 0.0 1.0;
1.0 -4.0 6.0 -4.0 0.0 0.0 0.0 0.0;
0.0 1.0 -4.0 5.0 0.0 0.0 0.0 0.0;
0.0 0.0 0.0 0.0 5.0 -4.0 1.0 0.0;
0.0 0.0 0.0 0.0 -4.0 6.0 -4.0 1.0;
1.0 0.0 0.0 0.0 1.0 -4.0 6.0 -4.0;
-4.0 1.0 0.0 0.0 0.0 1.0 -4.0 6.0]
=#
# Vérifiez les valeurs sur le worker 3
f = @spawnat 3 DB.localpart
println(fetch(f))
#=
[0.0 0.0 1.0 -4.0;
0.0 0.0 0.0 1.0;
0.0 0.0 0.0 0.0;
0.0 0.0 0.0 0.0]
=#
Licence Creative Commons Attribution 2.0 Belgique
Vous avez lu gratuitement 2 651 articles depuis plus d'un an.
Soutenez le club developpez.com en souscrivant un abonnement pour que nous puissions continuer à vous proposer des publications.
Soutenez le club developpez.com en souscrivant un abonnement pour que nous puissions continuer à vous proposer des publications.