- Le 2020-11-19, j'utilise Julia_1.5.3 sur VS_Code_1.51.1 avec un W10 Pro, i9-10900F, 10 coeurs.
- Préalable
- Vous n'êtes pas un novice en programmation.
- Mes billets précédents sur Julia sont supposés connus et assimilés.
- Vous aurez besoin des paquets : DistributedArrays, SharedArrays
- Aujourd'hui, voici le statut (voir mes billets précédents) de mes paquets :
- Status `C:\Users\User\.julia\environments\v1.5\Project.toml`
- Status `C:\Users\User\.julia\environments\v1.5\Project.toml`
[6e4b80f9] BenchmarkTools v0.5.0
[336ed68f] CSV v0.8.0
[159f3aea] Cairo v1.0.5
[a81c6b42] Compose v0.8.2
[a93c6f00] DataFrames v0.22.0
[aaf54ef3] DistributedArrays v0.6.5
[186bb1d3] Fontconfig v0.4.0
[c43c736e] Genie v1.8.0
[0ef565a4] Geodesy v0.5.0
[aa1b3936] GraphIO v0.5.0
[a2cc645c] GraphPlot v0.4.3
[4c0ca9eb] Gtk v1.1.5
[7073ff75] IJulia v1.23.0
[093fc24a] LightGraphs v1.3.3
[89bd72ed] LightGraphsExtras v0.3.0
[c03570c3] Memoize v0.4.3
[626554b9] MetaGraphs v0.6.6
[91a5bcdd] Plots v1.9.1
[1a8c2f83] Query v1.0.0
[47aef6b3] SimpleWeightedGraphs v1.1.1
[44d3d7a6] Weave v0.10.6
[c17dfb99] WinRPM v1.0.0
[1a1011a3] SharedArrays
Nous allons apprendre à réaliser du calcul parallèle dans Julia. Les exemples proviennent du site web de Julia et d'autres sources sur le web.
Première étape, connaître le nombre de coeurs de votre processeur. Dans Paremètres -> Système -> Informations système se trouve le nom exact de votre processeur. Faites une recherche sur ce nom pour trouver le nombre de coeurs.
Par exemple, pour un i7-9750h, c'est 6 coeurs. Pour le calcul parallèle, le nombre de coeurs disponibles est votre nombre de coeurs - 1. Pour l'i7-9750h vous avez donc 5 coeurs disponibles qui seront numérotés dans Julia de 2 à 6.
Code Julia : | Sélectionner tout |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | using Distributed addprocs(5) #= On vient d'ajouter 5 coeurs. Attention à réduire ce nombre si votre processeur à moins de coeurs disponibles. Pour Julia un coeur est un worker (travailleur) =# println("nprocs() = $(nprocs())\n") println("nworkers() = $(nworkers())\n") println("workers() = $(workers())\n") #= Générer deux matrices formées par des nombres aléatoires. L'une des matrices est composée de nombres réels, tandis que l'autre contient des entiers entre 1 et 8. =# f1 = remotecall(rand, 2, 2, 2) # la fonction rand, id du worker, les arguments de la fonction, ici rand(2, 2) f2 = remotecall(rand, 3, 1:8, 3, 4) # la fonction rand, id du worker, les arguments de la fonction ici rand(1:8, 3, 4) println("f1 = $(f1)\n") #= f1 et f2 sont des 'Future', la méthode fetch(f1) donnera le résultat du calcul. N'écrivez pas : r1 = fetch(remotecall(rand, 2, 2, 2)) mais : r1 = remotecall_fetch(rand, 2, 2, 2) c'est plus performant. =# println("fetch(f1) = $(fetch(f1))\n") println("fetch(f2) = $(fetch(f2))\n") #= Il existe d'autres moyens d'envoyer du travail aux différends workers. Dans le code suivant, nous créons une matrice aléatoire puis nous ajoutons 1 à tous ses éléments. =# # @spawnat id du worker expression f3 = @spawnat 4 rand(2, 2) f4 = @spawnat 5 1 .+ fetch(f3) println("fetch(f4) = $(fetch(f4))\n") # @spawn laisse Julia sélectionner le worker. f5 = @spawn rand(2, 2) f6 = @spawn 1 .+ fetch(f5) println("fetch(f6) = $(fetch(f6))\n") #= Nous pouvons utiliser nos propres fonctions pour faire des calculs en parallèle. Notre fonction doit être déclarée après l'ajout des workers et commencer par la macro @everywhere, sinon les workers ignore son existence. Nous créons une fonction qui retourne la racine carrée du nombre n multipler par un nombre aléatoire. Julia peut utiliser de nombreux symboles voir https://docs.julialang.org/en/v1/manual/unicode-input/ Pour obtenir √ on écrit \sqrt suivit d'une frappe sur la touche de tabulation. =# @everywhere function sqrt_rand(n) return √(n) * rand() end f7 = @spawnat 6 [sqrt_rand(n) for n=1:10] println("fetch(f7) = $(fetch(f7))\n") #= nprocs() = 6 nworkers() = 5 workers() = [2, 3, 4, 5, 6] f1 = Future(2, 1, 7, nothing) =# # Obtenir des informations sur les workers disponibles for i in workers() id, pid, host = fetch(@spawnat i (myid(), getpid(), gethostname())) println("id = $id ; pid = $pid sur $host") end #= id = 2 ; pid = 9228 sur DESKTOP-PRO64S4 id = 3 ; pid = 13884 sur DESKTOP-PRO64S4 id = 4 ; pid = 3344 sur DESKTOP-PRO64S4 id = 5 ; pid = 10944 sur DESKTOP-PRO64S4 id = 6 ; pid = 14764 sur DESKTOP-PRO64S4 =# |
Le but du calcul parallèle est d'améliorer les performances, et à cette fin, il est important de prêter attention à toutes les étapes impliquées. En particulier, tout ce qui concerne la quantité d'informations transmises entre les processus et la portée des variables est crucial.
Code Julia : | Sélectionner tout |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 | using Distributed addprocs(5) A = rand(10, 10); # Nous construisons la matrice A localement. @time begin f1 = @spawn A^2; # Pour effectuer l'opération désirée, nous devons déplacer les données dans un autre worker. println("fetch(f1) = $(fetch(f1))\n") end # 1.411503 seconds (1.34 M allocations: 68.992 MiB, 0.67% gc time) @time begin f2 = @spawn rand(10, 10)^2; # Ici on construit la matrice et on exécute l'opération dans le même worker. println("fetch(f2) = $(fetch(f2))\n") end # 1.079813 seconds (438 allocations: 61.073 KiB) #= L'utilisation d'une approche ou d'une autre dépendra des nécessités. Si le processus principal (le n° 1) a besoin de la matrice, la première approche est la meilleure. =# @everywhere using InteractiveUtils, LinearAlgebra @everywhere function add_un(A) return 1 .+ A end A = rand(2, 2) println(varinfo(), "\n") #= | name | size | summary | |:------ | ---------:|:-------------------- | | A | 72 bytes | 2×2 Array{Float64,2} | | Base | | Module | | Core | | Module | | Main | | Module | | add_un | 0 bytes | typeof(add_un) | | f1 | 880 bytes | Future | | f2 | 880 bytes | Future | Nous voyons que A est créé localement =# @spawnat 2 println(varinfo(), "\n") #= From worker 2: | Nous voyons qu'il n'y a toujours rien dans le worker 2 =# f3 = @spawnat 2 add_un(A) println("fetch(f3) = $(fetch(f3))\n") # fetch(f3) = [1.0480350106870235 1.7025735086470284; 1.0338404628511426 1.9987156034573772] @spawnat 2 println(varinfo(), "\n") #= From worker 2: | name | size | summary | From worker 2: |:----------- | -----------:|:---------------------- | From worker 2: | A | 840 bytes | 10×10 Array{Float64,2} | From worker 2: | Base | | Module | From worker 2: | Core | | Module | From worker 2: | Distributed | 899.158 KiB | Module | From worker 2: | Main | | Module | From worker 2: | add_un | 0 bytes | typeof(add_un) | From worker 2: From worker 2: From worker 2: | name | size | summary | From worker 2: |:----------- | -----------:|:-------------------- | From worker 2: | A | 72 bytes | 2×2 Array{Float64,2} | From worker 2: | Base | | Module | From worker 2: | Core | | Module | From worker 2: | Distributed | 902.546 KiB | Module | From worker 2: | Main | | Module | From worker 2: | add_un | 0 bytes | typeof(add_un) | From worker 2: From worker 2: Nous voyons que A consomme beaucoup de place dans le worker 2. =# X = rand(2, 2) # X car A a été modifié par add_un(A) # Nous allons travailler dans une portée locale avec let end. let B = X f4 = @spawnat 3 add_un(B) # nour travaillons sur le worker 3 println("fetch(f4) = $(fetch(f4))\n") # fetch(f4) = [1.755934161883382 1.2047407578469025; 1.0398048146365817 1.5077782104581243] @spawnat 3 println(varinfo(), "\n") #= From worker 3: | name =# end |
Nous allons maintenant apprendre deux constructions courantes utilisées dans le calcul parallèle: @distributed et pmap().
@distributed : https://docs.julialang.org/en/v1/std...d.@distributed
pmap : https://docs.julialang.org/en/v1/std...stributed.pmap
Nous serons intéressés par l'utilisation de @distributed lorsque nous aurons un très grand nombre de tâches faciles et indépendantes à faire.
- Nous avons besoin d'un grand nombre d'opérations à faire, car @distributed nécessite plus de mouvements de données que les boucles for ordinaires, donc le nombre d'opérations doit être suffisamment grand pour en valoir la peine.
- Nous avons besoin d'opérations faciles, car l'idée est que le nombre d'opérations, et non les opérations elles-mêmes, est le facteur qui prend du temps. Quand c'est l'inverse (peu d'opérations, mais vraiment chronophage), nous utiliserons pmap().
- L'indépendance est nécessaire, car les itérations ne se produisent pas dans un ordre spécifié.
Code Julia : | Sélectionner tout |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | using Distributed addprocs(5) #= @distributed [reducer] for var = range body end La plage spécifiée est partitionnée et exécutée localement sur tous les nuds de calcul. Dans le cas où une fonction de réduction facultative est spécifiée, @distributed effectue des réductions locales sur chaque travailleur avec une réduction finale sur le processus appelant. Notez que sans fonction de réduction, @distributed s'exécute de manière asynchrone, c'est-à-dire qu'il génère des tâches indépendantes sur tous les nuds de calcul disponibles et revient immédiatement sans attendre la fin. Pour attendre la fin, préfixez l'appel avec @sync, comme: @sync @distributed for var = range body end =# @time begin n = 200_000_000 # Approximation de π par la méthode de Monte Carlo (aire du cercle). piAprox = @distributed (+) for i = 1:n Int(rand()^2 + rand()^2 <= 1) end piAprox = piAprox * 4.0 / n println("π - piAprox = $(π - piAprox)\n") # π - piAprox = 4.345358979307434e-5 end # 0.261304 seconds (106.56 k allocations: 5.455 MiB) @everywhere using LinearAlgebra #= pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection On transforme la collection c en appliquant f à chaque élément à l'aide des workers disponibles. =# @time begin # Nous créons un array de matrices. M = Matrix{Float64}[rand(100,100) for i = 1:200] R = pmap(svdvals, M) # svdvals : https://docs.julialang.org/en/v1/stdlib/LinearAlgebra/#LinearAlgebra.svdvals println("length(R[1]) = $(length(R[1]))\n") # length(R[1]) = 100 println("R[1][1] = $(R[1][1])\n") # R[1][1] = 49.68770632801252 end # 0.421247 seconds (1.26 M allocations: 82.347 MiB, 2.03% gc time) |
Dans les codes précédents, nous avons vu comment lancer différentes tâches simultanément et comment les gérer de manière asynchrone.
Nous n'avons eu aucun problème lié au partage d'informations, car toutes les tâches ont été exécutées dans le worker 1, bien que des fonctions aient été évaluées dans différent workers.
Channel permet d'implémenter facilement la parallélisation lorsque nous avons besoin de lire des données, de les traiter et de les écrire dans différents workers.
Un Channel peut être considéré comme une file d'attente dans un supermarché : vous approvisionnez les éléments par l'arrière, mais vous les prenez par l'avant.
Code Julia : | Sélectionner tout |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 | using Distributed addprocs(5) #= RemoteChannel : https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.RemoteChannel Channel : https://docs.julialang.org/en/v1/base/parallel/#Base.Channel Le premier paramètre d'un Channel est le type et le second le nombre maximum d'éléments disponibles pour ce Channel =# const jobs = RemoteChannel(() -> Channel{Int}(32)); const results = RemoteChannel(() -> Channel{Tuple}(32)); @everywhere function do_work(jobs, results) while true # take! : https://docs.julialang.org/en/v1/base/parallel/#Base.take!-Tuple{Channel} job_id = take!(jobs) exec_time = rand(1:3) sleep(exec_time) put!(results, (job_id, exec_time, myid())) end end function make_jobs(nb) for i in 1:nb put!(jobs, i) end end let nb_jobs = 12 make_jobs(nb_jobs) @time begin for p in workers() # remote_do : https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.remote_do-Tuple{Any,AbstractWorkerPool,Vararg{Any,N}%20where%20N} remote_do(do_work, p, jobs, results) end while nb_jobs > 0 job_id, exec_time, where = take!(results) println("Le job $job_id a été terminé en $(round(exec_time, digits=2)) secondes sur le worker $where") nb_jobs -= 1 end end end #= Le job 4 a été terminé en 1.0 secondes sur le worker 3 Le job 1 a été terminé en 2.0 secondes sur le worker 2 Le job 2 a été terminé en 2.0 secondes sur le worker 5 Le job 3 a été terminé en 3.0 secondes sur le worker 4 Le job 5 a été terminé en 3.0 secondes sur le worker 6 Le job 7 a été terminé en 1.0 secondes sur le worker 2 Le job 10 a été terminé en 1.0 secondes sur le worker 6 Le job 8 a été terminé en 2.0 secondes sur le worker 5 Le job 11 a été terminé en 1.0 secondes sur le worker 2 Le job 6 a été terminé en 3.0 secondes sur le worker 3 Le job 9 a été terminé en 3.0 secondes sur le worker 4 Le job 12 a été terminé en 2.0 secondes sur le worker 6 6.380669 seconds (806.07 k allocations: 40.362 MiB, 0.07% gc time) =# |
Dans le code précédent, le lecteur aura certainement compris que si un Channel est utile, il ne convient pas à toutes les situations.
Nous aurons souvent besoin de travailler avec un Array, mais en parallèle (par exemple: pour un algorithme de multiplication matricielle).
SharedArray satisfera notre besoin. C'est juste un tableau, mais accessible dans n'importe quel worker.
SharedArrays : https://docs.julialang.org/en/v1/stdlib/SharedArrays/
Code Julia : | Sélectionner tout |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | using Distributed addprocs(5) #= SharedArray{T, N}(dims::NTuple, init = false, pids = Int[]) T représente le type N est la dimension du tableau (par exemple: Float64, 2 pour une matrice). dims recueille, entre parenthèses, le nombre d'éléments dans chaque dimension (par exemple: (3,2) pour une matrice 3 × 2). init : ce paramètre ne fonctionne dans aucun de mes tests, les exemples que j'ai trouvés sur le web ne fonctionnent pas. pids est un vecteur donnant les workers qui peuvent accéder au tableau partagé. Si rien n'est spécifié dans pids, tous les workers peuvent accéder au tableau partagé. =# @everywhere using SharedArrays N = 10_000 # calcul du produit scalaire x = rand(0:5, N) c = rand(0:1, N) output = SharedArray{Int, 1}(N); r = @distributed (+) for i = 1:N output[i] = x[i] * c[i]; end println("r = $r\n") # liste des nombres premiers @everywhere function isprime(n) for i = 2:Int(floor(sqrt(n))) if n % i == 0; return false end end return true end P = SharedArray{Int, 1}(N); # rappel : sans opérateur @distributed doit être précédé d'un @sync @time @sync @distributed for i = 1:N if isprime(i) P[i] = i end end # 0.081662 seconds (224.33 k allocations: 11.448 MiB) A = Array{Int64, 1}() for i=1:N if P[i] != 0 push!(A, i) end end println("A = $A\n") |
Une manière d'obtenir le parallélisme consiste à répartir un Array entre plusieurs workers.
Chaque worker peut lire et écrire dans la partie de l'Array qu'il possède et a un accès en lecture seule aux parties qu'il ne possède pas.
Les tableaux distribués Julia sont implémentés par le type DArray.
DistributedArrays : https://juliaparallel.github.io/Dist...ays.jl/stable/
Code Julia : | Sélectionner tout |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | using Distributed addprocs(5) @everywhere using DistributedArrays, LinearAlgebra, InteractiveUtils @everywhere function aa(n) la = zeros(n, n) la[diagind(la, 0)] .= 2.0 la[diagind(la, -1)] .= -1.0 la[diagind(la, 1)] .= -1.0 return la end @everywhere function b1(n) la = zeros(n, n) la[1, n] = -1.0 return la end @everywhere function b2(n) la = zeros(n, n) la[n, 1] = -1.0 return la end # Appeler les fonctions sur les workers pour créer des parties locales n = 4 d11 = @spawnat 2 aa(n) d12 = @spawnat 3 b1(n) d21 = @spawnat 4 b2(n) d22 = @spawnat 5 aa(n) # Créer une matrice distribuée sur une grille de workers 2 * 2 A = reshape([d11 d21 d12 d22], (2, 2)) DA = DArray(A) # DistributedArrays println(typeof(DA)) # DArray{Float64,2,Array{Float64,2}} println(length(DA)) # 64 println(DA[32]) # 0.0 println(DA) #= [2.0 -1.0 0.0 0.0 0.0 0.0 0.0 -1.0; -1.0 2.0 -1.0 0.0 0.0 0.0 0.0 0.0; 0.0 -1.0 2.0 -1.0 0.0 0.0 0.0 0.0; 0.0 0.0 -1.0 2.0 0.0 0.0 0.0 0.0; 0.0 0.0 0.0 0.0 2.0 -1.0 0.0 0.0; 0.0 0.0 0.0 0.0 -1.0 2.0 -1.0 0.0; 0.0 0.0 0.0 0.0 0.0 -1.0 2.0 -1.0; -1.0 0.0 0.0 0.0 0.0 0.0 -1.0 2.0] =# println(varinfo()) #= | name | size | summary | |:---- | ---------:|:-------------------------------------- | | A | 200 bytes | 2×2 Array{Future,2} | | Base | | Module | | Core | | Module | | DA | 544 bytes | 8×8 DArray{Float64,2,Array{Float64,2}} | | Main | | Module | | aa | 0 bytes | typeof(aa) | | b1 | 0 bytes | typeof(b1) | | b2 | 0 bytes | typeof(b2) | | d11 | 32 bytes | Future | | d12 | 32 bytes | Future | | d21 | 32 bytes | Future | | d22 | 32 bytes | Future | | n | 8 bytes | Int64 | =# DB = DA * DA println(DB) #= [6.0 -4.0 1.0 0.0 0.0 0.0 1.0 -4.0; -4.0 6.0 -4.0 1.0 0.0 0.0 0.0 1.0; 1.0 -4.0 6.0 -4.0 0.0 0.0 0.0 0.0; 0.0 1.0 -4.0 5.0 0.0 0.0 0.0 0.0; 0.0 0.0 0.0 0.0 5.0 -4.0 1.0 0.0; 0.0 0.0 0.0 0.0 -4.0 6.0 -4.0 1.0; 1.0 0.0 0.0 0.0 1.0 -4.0 6.0 -4.0; -4.0 1.0 0.0 0.0 0.0 1.0 -4.0 6.0] =# # Vérifiez les valeurs sur le worker 3 f = @spawnat 3 DB.localpart println(fetch(f)) #= [0.0 0.0 1.0 -4.0; 0.0 0.0 0.0 1.0; 0.0 0.0 0.0 0.0; 0.0 0.0 0.0 0.0] =# |
Licence Creative Commons Attribution 2.0 Belgique