IdentifiantMot de passe
Loading...
Mot de passe oublié ?Je m'inscris ! (gratuit)

Vous êtes nouveau sur Developpez.com ? Créez votre compte ou connectez-vous afin de pouvoir participer !

Vous devez avoir un compte Developpez.com et être connecté pour pouvoir participer aux discussions.

Vous n'avez pas encore de compte Developpez.com ? Créez-en un en quelques instants, c'est entièrement gratuit !

Si vous disposez déjà d'un compte et qu'il est bien activé, connectez-vous à l'aide du formulaire ci-dessous.

Identifiez-vous
Identifiant
Mot de passe
Mot de passe oublié ?
Créer un compte

L'inscription est gratuite et ne vous prendra que quelques instants !

Je m'inscris !

Julia : introduction au calcul parallèle sur PC multicoeur
Un billet de blog de Daniel Hagnoul

Le , par danielhagnoul

0PARTAGES


  • Le 2020-11-19, j'utilise Julia_1.5.3 sur VS_Code_1.51.1 avec un W10 Pro, i9-10900F, 10 coeurs.
  • Préalable

  • Vous n'êtes pas un novice en programmation.
  • Mes billets précédents sur Julia sont supposés connus et assimilés.


  • Vous aurez besoin des paquets : DistributedArrays, SharedArrays
  • Aujourd'hui, voici le statut (voir mes billets précédents) de mes paquets :

    • Status `C:\Users\User\.julia\environments\v1.5\Project.toml`


    BenchmarkTools v0.5.0
    CSV v0.8.0
    Cairo v1.0.5
    Compose v0.8.2
    DataFrames v0.22.0
    DistributedArrays v0.6.5
    Fontconfig v0.4.0
    Genie v1.8.0
    Geodesy v0.5.0
    GraphIO v0.5.0
    GraphPlot v0.4.3
    Gtk v1.1.5
    IJulia v1.23.0
    LightGraphs v1.3.3
    LightGraphsExtras v0.3.0
    Memoize v0.4.3
    MetaGraphs v0.6.6
    Plots v1.9.1
    Query v1.0.0
    SimpleWeightedGraphs v1.1.1
    Weave v0.10.6
    WinRPM v1.0.0
    SharedArrays


    Nous allons apprendre à réaliser du calcul parallèle dans Julia. Les exemples proviennent du site web de Julia et d'autres sources sur le web.
    Première étape, connaître le nombre de coeurs de votre processeur. Dans Paremètres -> Système -> Informations système se trouve le nom exact de votre processeur. Faites une recherche sur ce nom pour trouver le nombre de coeurs.
    Par exemple, pour un i7-9750h, c'est 6 coeurs. Pour le calcul parallèle, le nombre de coeurs disponibles est votre nombre de coeurs - 1. Pour l'i7-9750h vous avez donc 5 coeurs disponibles qui seront numérotés dans Julia de 2 à 6.

    using Distributed

    addprocs(5)

    #=
    On vient d'ajouter 5 coeurs.
    Attention à réduire ce nombre si votre processeur
    à moins de coeurs disponibles.


    Pour Julia un coeur est un worker (travailleur)
    =#


    println("nprocs() = $(nprocs())\n")
    println("nworkers() = $(nworkers())\n")
    println("workers() = $(workers())\n")




    #=
    Générer deux matrices formées par des nombres aléatoires.
    L'une des matrices est composée de nombres réels,
    tandis que l'autre contient des entiers entre 1 et 8.
    =#


    f1 = remotecall(rand, 2, 2, 2)
    # la fonction rand, id du worker, les arguments de la fonction, ici rand(2, 2)


    f2 = remotecall(rand, 3, 1:8, 3, 4)
    # la fonction rand, id du worker, les arguments de la fonction ici rand(1:8, 3, 4)


    println("f1 = $(f1)\n")


    #=
    f1 et f2 sont des 'Future', la méthode fetch(f1) donnera le résultat du calcul.


    N'écrivez pas : r1 = fetch(remotecall(rand, 2, 2, 2))
    mais : r1 = remotecall_fetch(rand, 2, 2, 2)
    c'est plus performant.
    =#


    println("fetch(f1) = $(fetch(f1))\n")
    println("fetch(f2) = $(fetch(f2))\n")


    #=
    Il existe d'autres moyens d'envoyer du travail aux différends workers.
    Dans le code suivant, nous créons une matrice aléatoire puis nous
    ajoutons 1 à tous ses éléments.
    =#


    # @spawnat id du worker expression


    f3 = @spawnat 4 rand(2, 2)
    f4 = @spawnat 5 1 .+ fetch(f3)


    println("fetch(f4) = $(fetch(f4))\n")


    # @spawn laisse Julia sélectionner le worker.


    f5 = @spawn rand(2, 2)
    f6 = @spawn 1 .+ fetch(f5)


    println("fetch(f6) = $(fetch(f6))\n")


    #=
    Nous pouvons utiliser nos propres fonctions pour
    faire des calculs en parallèle.


    Notre fonction doit être déclarée après l'ajout des workers et
    commencer par la macro @everywhere, sinon les workers ignore
    son existence.


    Nous créons une fonction qui retourne la racine carrée du nombre
    n multipler par un nombre aléatoire.


    Julia peut utiliser de nombreux symboles voir
    https://docs.julialang.org/en/v1/manual/unicode-input/

    Pour obtenir √ on écrit \sqrt suivit d'une frappe sur la touche
    de tabulation.
    =#


    @everywhere function sqrt_rand(n)
    return √(n) * rand()
    end


    f7 = @spawnat 6 [sqrt_rand(n) for n=1:10]


    println("fetch(f7) = $(fetch(f7))\n")


    #=
    nprocs() = 6


    nworkers() = 5


    workers() = [2, 3, 4, 5, 6]


    f1 = Future(2, 1, 7, nothing)
    =#


    # Obtenir des informations sur les workers disponibles
    for i in workers()
    id, pid, host = fetch(@spawnat i (myid(), getpid(), gethostname()))
    println("id = $id ; pid = $pid sur $host")
    end


    #=
    id = 2 ; pid = 9228 sur DESKTOP-PRO64S4
    id = 3 ; pid = 13884 sur DESKTOP-PRO64S4
    id = 4 ; pid = 3344 sur DESKTOP-PRO64S4
    id = 5 ; pid = 10944 sur DESKTOP-PRO64S4
    id = 6 ; pid = 14764 sur DESKTOP-PRO64S4
    =#


    Le but du calcul parallèle est d'améliorer les performances, et à cette fin, il est important de prêter attention à toutes les étapes impliquées. En particulier, tout ce qui concerne la quantité d'informations transmises entre les processus et la portée des variables est crucial.


    using Distributed


    addprocs(5)


    A = rand(10, 10);
    # Nous construisons la matrice A localement.


    @time begin
    f1 = @spawn A^2;
    # Pour effectuer l'opération désirée, nous devons déplacer les données dans un autre worker.


    println("fetch(f1) = $(fetch(f1))\n")
    end # 1.411503 seconds (1.34 M allocations: 68.992 MiB, 0.67% gc time)


    @time begin
    f2 = @spawn rand(10, 10)^2;
    # Ici on construit la matrice et on exécute l'opération dans le même worker.


    println("fetch(f2) = $(fetch(f2))\n")
    end # 1.079813 seconds (438 allocations: 61.073 KiB)


    #=
    L'utilisation d'une approche ou d'une autre dépendra des
    nécessités. Si le processus principal (le n° 1) a besoin
    de la matrice, la première approche est la meilleure.
    =#


    @everywhere using InteractiveUtils, LinearAlgebra


    @everywhere function add_un(A)
    return 1 .+ A
    end


    A = rand(2, 2)


    println(varinfo(), "\n")
    #=
    | name | size | summary |
    |:------ | ---------:|:-------------------- |
    | A | 72 bytes | 2×2 Array{Float64,2} |
    | Base | | Module |
    | Core | | Module |
    | Main | | Module |
    | add_un | 0 bytes | typeof(add_un) |
    | f1 | 880 bytes | Future |
    | f2 | 880 bytes | Future |


    Nous voyons que A est créé localement
    =#


    @spawnat 2 println(varinfo(), "\n")
    #=
    From worker 2: |


    Nous voyons qu'il n'y a toujours rien dans le worker 2
    =#


    f3 = @spawnat 2 add_un(A)


    println("fetch(f3) = $(fetch(f3))\n")
    # fetch(f3) = [1.0480350106870235 1.7025735086470284; 1.0338404628511426 1.9987156034573772]


    @spawnat 2 println(varinfo(), "\n")
    #=
    From worker 2: | name | size | summary |
    From worker 2: |:----------- | -----------:|:---------------------- |
    From worker 2: | A | 840 bytes | 10×10 Array{Float64,2} |
    From worker 2: | Base | | Module |
    From worker 2: | Core | | Module |
    From worker 2: | Distributed | 899.158 KiB | Module |
    From worker 2: | Main | | Module |
    From worker 2: | add_un | 0 bytes | typeof(add_un) |
    From worker 2:
    From worker 2:
    From worker 2: | name | size | summary |
    From worker 2: |:----------- | -----------:|:-------------------- |
    From worker 2: | A | 72 bytes | 2×2 Array{Float64,2} |
    From worker 2: | Base | | Module |
    From worker 2: | Core | | Module |
    From worker 2: | Distributed | 902.546 KiB | Module |
    From worker 2: | Main | | Module |
    From worker 2: | add_un | 0 bytes | typeof(add_un) |
    From worker 2:
    From worker 2:


    Nous voyons que A consomme beaucoup de place dans le worker 2.
    =#


    X = rand(2, 2) # X car A a été modifié par add_un(A)


    # Nous allons travailler dans une portée locale avec let end.
    let B = X
    f4 = @spawnat 3 add_un(B) # nour travaillons sur le worker 3


    println("fetch(f4) = $(fetch(f4))\n")
    # fetch(f4) = [1.755934161883382 1.2047407578469025; 1.0398048146365817 1.5077782104581243]

    @spawnat 3 println(varinfo(), "\n")
    #=
    From worker 3: | name
    =#
    end


    Nous allons maintenant apprendre deux constructions courantes utilisées dans le calcul parallèle: @distributed et pmap().

    @distributed : https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.@distributed
    pmap : https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.pmap


    Nous serons intéressés par l'utilisation de @distributed lorsque nous aurons un très grand nombre de tâches faciles et indépendantes à faire.


  • Nous avons besoin d'un grand nombre d'opérations à faire, car @distributed nécessite plus de mouvements de données que les boucles for ordinaires, donc le nombre d'opérations doit être suffisamment grand pour en valoir la peine.
  • Nous avons besoin d'opérations faciles, car l'idée est que le nombre d'opérations, et non les opérations elles-mêmes, est le facteur qui prend du temps. Quand c'est l'inverse (peu d'opérations, mais vraiment chronophage), nous utiliserons pmap().
  • L'indépendance est nécessaire, car les itérations ne se produisent pas dans un ordre spécifié.


    using Distributed


    addprocs(5)


    #=
    @distributed for var = range
    body
    end


    La plage spécifiée est partitionnée et exécutée localement sur
    tous les nœuds de calcul. Dans le cas où une fonction de
    réduction facultative est spécifiée, @distributed effectue
    des réductions locales sur chaque travailleur avec une
    réduction finale sur le processus appelant.


    Notez que sans fonction de réduction, @distributed s'exécute
    de manière asynchrone, c'est-à-dire qu'il génère des tâches
    indépendantes sur tous les nœuds de calcul disponibles et
    revient immédiatement sans attendre la fin. Pour attendre
    la fin, préfixez l'appel avec @sync, comme:


    @sync @distributed for var = range
    body
    end
    =#


    @time begin
    n = 200_000_000


    # Approximation de π par la méthode de Monte Carlo (aire du cercle).
    piAprox = @distributed (+) for i = 1:n
    Int(rand()^2 + rand()^2 <= 1)
    end


    piAprox = piAprox * 4.0 / n


    println("π - piAprox = $(π - piAprox)\n")
    # π - piAprox = 4.345358979307434e-5
    end # 0.261304 seconds (106.56 k allocations: 5.455 MiB)


    @everywhere using LinearAlgebra


    #=
    pmap(f, [::AbstractWorkerPool], c...; distributed=true,
    batch_size=1, on_error=nothing, retry_delays=[],
    retry_check=nothing) -> collection


    On transforme la collection c en appliquant f à chaque
    élément à l'aide des workers disponibles.
    =#


    @time begin
    # Nous créons un array de matrices.
    M = Matrix{Float64}[rand(100,100) for i = 1:200]


    R = pmap(svdvals, M)
    # svdvals : https://docs.julialang.org/en/v1/stdlib/LinearAlgebra/#LinearAlgebra.svdvals


    println("length(R) = $(length(R))\n")
    # length(R) = 100


    println("R = $(R)\n")
    # R = 49.68770632801252
    end # 0.421247 seconds (1.26 M allocations: 82.347 MiB, 2.03% gc time)



    Dans les codes précédents, nous avons vu comment lancer différentes tâches simultanément et comment les gérer de manière asynchrone.
    Nous n'avons eu aucun problème lié au partage d'informations, car toutes les tâches ont été exécutées dans le worker 1, bien que des fonctions aient été évaluées dans différent workers.

    Channel permet d'implémenter facilement la parallélisation lorsque nous avons besoin de lire des données, de les traiter et de les écrire dans différents workers.
    Un Channel peut être considéré comme une file d'attente dans un supermarché : vous approvisionnez les éléments par l'arrière, mais vous les prenez par l'avant.

    using Distributed


    addprocs(5)


    #=
    RemoteChannel : https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.RemoteChannel


    Channel : https://docs.julialang.org/en/v1/base/parallel/#Base.Channel


    Le premier paramètre d'un Channel est le type et le second
    le nombre maximum d'éléments disponibles pour ce Channel
    =#


    const jobs = RemoteChannel(() -> Channel{Int}(32));
    const results = RemoteChannel(() -> Channel{Tuple}(32));


    @everywhere function do_work(jobs, results)
    while true
    # take! : https://docs.julialang.org/en/v1/base/parallel/#Base.take!-Tuple{Channel}


    job_id = take!(jobs)
    exec_time = rand(1:3)
    sleep(exec_time)
    put!(results, (job_id, exec_time, myid()))
    end
    end


    function make_jobs(nb)
    for i in 1:nb
    put!(jobs, i)
    end
    end


    let nb_jobs = 12


    make_jobs(nb_jobs)


    @time begin
    for p in workers()
    # remote_do : https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.remote_do-Tuple{Any,AbstractWorkerPool,Vararg{Any,N}%20where%20N}


    remote_do(do_work, p, jobs, results)
    end

    while nb_jobs > 0
    job_id, exec_time, where = take!(results)


    println("Le job $job_id a été terminé en $(round(exec_time, digits=2)) secondes sur le worker $where")


    nb_jobs -= 1
    end
    end
    end


    #=
    Le job 4 a été terminé en 1.0 secondes sur le worker 3
    Le job 1 a été terminé en 2.0 secondes sur le worker 2
    Le job 2 a été terminé en 2.0 secondes sur le worker 5
    Le job 3 a été terminé en 3.0 secondes sur le worker 4
    Le job 5 a été terminé en 3.0 secondes sur le worker 6
    Le job 7 a été terminé en 1.0 secondes sur le worker 2
    Le job 10 a été terminé en 1.0 secondes sur le worker 6
    Le job 8 a été terminé en 2.0 secondes sur le worker 5
    Le job 11 a été terminé en 1.0 secondes sur le worker 2
    Le job 6 a été terminé en 3.0 secondes sur le worker 3
    Le job 9 a été terminé en 3.0 secondes sur le worker 4
    Le job 12 a été terminé en 2.0 secondes sur le worker 6
    6.380669 seconds (806.07 k allocations: 40.362 MiB, 0.07% gc time)
    =#


    Dans le code précédent, le lecteur aura certainement compris que si un Channel est utile, il ne convient pas à toutes les situations.
    Nous aurons souvent besoin de travailler avec un Array, mais en parallèle (par exemple: pour un algorithme de multiplication matricielle).

    SharedArray satisfera notre besoin. C'est juste un tableau, mais accessible dans n'importe quel worker.
    SharedArrays : https://docs.julialang.org/en/v1/stdlib/SharedArrays/

    using Distributed

    addprocs(5)


    #=
    SharedArray{T, N}(dims::NTuple, init = false, pids = Int[])


    T représente le type
    N est la dimension du tableau (par exemple: Float64, 2 pour une matrice).
    dims recueille, entre parenthèses, le nombre d'éléments dans chaque dimension
    (par exemple: (3,2) pour une matrice 3 × 2).
    init : ce paramètre ne fonctionne dans aucun de mes tests, les exemples que
    j'ai trouvés sur le web ne fonctionnent pas.
    pids est un vecteur donnant les workers qui peuvent accéder au tableau partagé.
    Si rien n'est spécifié dans pids, tous les workers peuvent accéder au tableau
    partagé.
    =#


    @everywhere using SharedArrays


    N = 10_000


    # calcul du produit scalaire
    x = rand(0:5, N)
    c = rand(0:1, N)


    output = SharedArray{Int, 1}(N);


    r = @distributed (+) for i = 1:N
    output = x * c;
    end


    println("r = $r\n")


    # liste des nombres premiers
    @everywhere function isprime(n)
    for i = 2:Int(floor(sqrt(n)))
    if n % i == 0;
    return false
    end
    end
    return true
    end


    P = SharedArray{Int, 1}(N);


    # rappel : sans opérateur @distributed doit être précédé d'un @sync
    @time @sync @distributed for i = 1:N
    if isprime(i)
    P = i
    end
    end # 0.081662 seconds (224.33 k allocations: 11.448 MiB)


    A = Array{Int64, 1}()


    for i=1:N
    if P != 0
    push!(A, i)
    end
    end


    println("A = $A\n")


    Une manière d'obtenir le parallélisme consiste à répartir un Array entre plusieurs workers.
    Chaque worker peut lire et écrire dans la partie de l'Array qu'il possède et a un accès en lecture seule aux parties qu'il ne possède pas.

    Les tableaux distribués Julia sont implémentés par le type DArray.
    DistributedArrays : https://juliaparallel.github.io/DistributedArrays.jl/stable/

    using Distributed

    addprocs(5)

    @everywhere using DistributedArrays, LinearAlgebra, InteractiveUtils


    @everywhere function aa(n)
    la = zeros(n, n)
    la[diagind(la, 0)] .= 2.0
    la[diagind(la, -1)] .= -1.0
    la[diagind(la, 1)] .= -1.0
    return la
    end


    @everywhere function b1(n)
    la = zeros(n, n)
    la[1, n] = -1.0
    return la
    end


    @everywhere function b2(n)
    la = zeros(n, n)
    la[n, 1] = -1.0
    return la
    end


    # Appeler les fonctions sur les workers pour créer des parties locales
    n = 4
    d11 = @spawnat 2 aa(n)
    d12 = @spawnat 3 b1(n)
    d21 = @spawnat 4 b2(n)
    d22 = @spawnat 5 aa(n)


    # Créer une matrice distribuée sur une grille de workers 2 * 2
    A = reshape([d11 d21 d12 d22], (2, 2))


    DA = DArray(A) # DistributedArrays


    println(typeof(DA)) # DArray{Float64,2,Array{Float64,2}}
    println(length(DA)) # 64
    println(DA) # 0.0
    println(DA)


    #=
    [2.0 -1.0 0.0 0.0 0.0 0.0 0.0 -1.0;
    -1.0 2.0 -1.0 0.0 0.0 0.0 0.0 0.0;
    0.0 -1.0 2.0 -1.0 0.0 0.0 0.0 0.0;
    0.0 0.0 -1.0 2.0 0.0 0.0 0.0 0.0;
    0.0 0.0 0.0 0.0 2.0 -1.0 0.0 0.0;
    0.0 0.0 0.0 0.0 -1.0 2.0 -1.0 0.0;
    0.0 0.0 0.0 0.0 0.0 -1.0 2.0 -1.0;
    -1.0 0.0 0.0 0.0 0.0 0.0 -1.0 2.0]
    =#


    println(varinfo())


    #=
    | name | size | summary |
    |:---- | ---------:|:-------------------------------------- |
    | A | 200 bytes | 2×2 Array{Future,2} |
    | Base | | Module |
    | Core | | Module |
    | DA | 544 bytes | 8×8 DArray{Float64,2,Array{Float64,2}} |
    | Main | | Module |
    | aa | 0 bytes | typeof(aa) |
    | b1 | 0 bytes | typeof(b1) |
    | b2 | 0 bytes | typeof(b2) |
    | d11 | 32 bytes | Future |
    | d12 | 32 bytes | Future |
    | d21 | 32 bytes | Future |
    | d22 | 32 bytes | Future |
    | n | 8 bytes | Int64 |
    =#


    DB = DA * DA


    println(DB)


    #=
    [6.0 -4.0 1.0 0.0 0.0 0.0 1.0 -4.0;
    -4.0 6.0 -4.0 1.0 0.0 0.0 0.0 1.0;
    1.0 -4.0 6.0 -4.0 0.0 0.0 0.0 0.0;
    0.0 1.0 -4.0 5.0 0.0 0.0 0.0 0.0;
    0.0 0.0 0.0 0.0 5.0 -4.0 1.0 0.0;
    0.0 0.0 0.0 0.0 -4.0 6.0 -4.0 1.0;
    1.0 0.0 0.0 0.0 1.0 -4.0 6.0 -4.0;
    -4.0 1.0 0.0 0.0 0.0 1.0 -4.0 6.0]
    =#


    # Vérifiez les valeurs sur le worker 3
    f = @spawnat 3 DB.localpart


    println(fetch(f))


    #=
    [0.0 0.0 1.0 -4.0;
    0.0 0.0 0.0 1.0;
    0.0 0.0 0.0 0.0;
    0.0 0.0 0.0 0.0]
    =#


    Licence Creative Commons Attribution 2.0 Belgique
  • Vous avez lu gratuitement 2 651 articles depuis plus d'un an.
    Soutenez le club developpez.com en souscrivant un abonnement pour que nous puissions continuer à vous proposer des publications.

    Une erreur dans cette actualité ? Signalez-nous-la !