In questo articolo imparerai le basi della concorrenza in Elixir e vedrai come generare processi, inviare e ricevere messaggi e creare processi di lunga durata. Inoltre, potrai conoscere GenServer, vedere come può essere utilizzato nella tua applicazione e scoprire alcune chicche che ti fornisce.
Come probabilmente sapete, Elixir è un linguaggio funzionale utilizzato per costruire sistemi concorrenti tolleranti all'errore che gestiscono molte richieste simultanee. Utilizza BEAM (macchina virtuale Erlang) processi per eseguire contemporaneamente varie attività, il che significa, ad esempio, che il servizio di una richiesta non ne blocca un altro. I processi sono leggeri e isolati, il che significa che non condividono alcuna memoria e anche se un processo si blocca, altri possono continuare a funzionare.
I processi BEAM sono molto diversi dal Processi OS. Fondamentalmente, BEAM viene eseguito in un processo del sistema operativo e utilizza il proprio scheduler. Ogni schedulatore occupa uno Core della CPU, gira in un thread separato e può gestire migliaia di processi simultaneamente (che a turno eseguono). Puoi leggere un po 'di più su BEAM e multithreading su StackOverflow.
Quindi, come vedi, i processi BEAM (dirò solo "processi" d'ora in poi) sono molto importanti in Elixir. Il linguaggio fornisce alcuni strumenti di basso livello per generare manualmente i processi, mantenere lo stato e gestire le richieste. Tuttavia, poche persone li usano: è più comune affidarsi al Open Telecom Platform (OTP) quadro per farlo.
OTP al giorno d'oggi non ha nulla a che fare con i telefoni: è una struttura generica per costruire complessi sistemi concorrenti. Definisce il modo in cui le applicazioni devono essere strutturate e fornisce un database nonché una serie di strumenti molto utili per creare processi server, recuperare errori, eseguire registrazioni, ecc. In questo articolo, parleremo di comportamento del server chiamato GenServer fornito da OTP.
Puoi pensare a GenServer come a un'astrazione o a un helper che semplifica il lavoro con i processi del server. In primo luogo, vedrete come generare processi usando alcune funzioni di basso livello. Poi passeremo a GenServer e vedremo come semplifica le cose per noi rimuovendo la necessità di scrivere codice noioso (e piuttosto generico) ogni volta. Iniziamo!
Se mi chiedessi come creare un processo in Elixir, risponderei: uova esso! spawn / 1 è una funzione definita all'interno di nocciolo
modulo che restituisce un nuovo processo. Questa funzione accetta un lambda che verrà eseguito nel processo creato. Non appena l'esecuzione è terminata, anche il processo termina:
spawn (fn -> IO.puts ("hi") end) |> IO.inspect # => hi # => #PID<0.72.0>
Ecco uova
ha restituito un nuovo ID di processo. Se aggiungi un ritardo al lambda, la stringa "hi" verrà stampata dopo un po 'di tempo:
spawn (fn ->: timer.sleep (5000) IO.puts ("hi") end) |> IO.inspect # => #PID<0.82.0> # => (dopo 5 secondi) "ciao"
Ora possiamo generare tutti i processi che vogliamo e verranno eseguiti contemporaneamente:
spawn_it = fn (num) -> spawn (fn ->: timer.sleep (5000) IO.puts ("hi # num") end) end Enum.each (1 ... 10, fn (_) -> spawn_it . (: rand.uniform (100)) end) # => (tutti stampati allo stesso tempo, dopo 5 secondi) # => hi 5 # => hi 10 ecc ...
Qui stiamo generando dieci processi e stampando una stringa di test con un numero casuale. : rand
è un modulo fornito da Erlang, quindi il suo nome è un atomo. Quello che è bello è che tutti i messaggi verranno stampati nello stesso momento, dopo cinque secondi. Succede perché tutti e dieci i processi vengono eseguiti contemporaneamente.
Confrontalo con il seguente esempio che esegue lo stesso compito ma senza usare escursione / 1
:
dont_spawn_it = fn (num) ->: timer.sleep (5000) IO.puts ("hi # num") end Enum.each (1 ... 10, fn (_) -> dont_spawn_it. (: rand.uniform ( 100)) end) # => (dopo 5 secondi) hi 70 # => (dopo altri 5 secondi) hi 45 # => etc ...
Mentre questo codice è in esecuzione, puoi andare in cucina e preparare un'altra tazza di caffè, poiché ci vorranno quasi un minuto per completarla. Ogni messaggio viene visualizzato in sequenza, il che ovviamente non è ottimale!
Potresti chiedere: "Quanta memoria consuma un processo?" Beh, dipende, ma inizialmente occupa un paio di kilobyte, che è un numero molto piccolo (anche il mio vecchio portatile ha 8 GB di memoria, per non parlare dei server moderni e cool).
Fin qui tutto bene. Prima di iniziare a lavorare con GenServer, tuttavia, discutiamo ancora di un'altra cosa importante: il passaggio e la ricezione messaggi.
Non sorprende che i processi (che sono isolati, come ricordi) debbano comunicare in qualche modo, specialmente quando si tratta di costruire sistemi più o meno complessi. Per raggiungere questo, possiamo usare i messaggi.
Un messaggio può essere inviato usando una funzione con un nome abbastanza ovvio: send / 2. Accetta una destinazione (porta, ID processo o un nome processo) e il messaggio effettivo. Dopo che il messaggio è stato inviato, appare nel cassetta postale di un processo e può essere elaborato. Come vedi, l'idea generale è molto simile alla nostra attività quotidiana di scambio di e-mail.
Una mailbox è fondamentalmente una coda "first in first out" (FIFO). Dopo che il messaggio è stato elaborato, viene rimosso dalla coda. Per iniziare a ricevere messaggi, devi indovinare cosa! -A ricevere macro. Questa macro contiene una o più clausole e un messaggio viene confrontato con esse. Se viene trovata una corrispondenza, il messaggio viene elaborato. In caso contrario, il messaggio viene reinserito nella casella di posta. Oltre a ciò, puoi impostare un optional dopo
clausola che viene eseguita se un messaggio non è stato ricevuto nel tempo specificato. Puoi leggere di più su inviare / 2
e ricevere
nei documenti ufficiali.
Ok, basta con la teoria, proviamo a lavorare con i messaggi. Prima di tutto, invia qualcosa al processo corrente:
send (self (), "ciao!")
La macro self / 0 restituisce un pid del processo chiamante, che è esattamente ciò di cui abbiamo bisogno. Non omettere le parentesi tonde dopo la funzione, poiché riceverai un avvertimento riguardante la corrispondenza dell'ambiguità.
Ora ricevi il messaggio mentre imposti il dopo
clausola:
ricevi msg -> IO.puts "Yay, un messaggio: # msg" msg dopo 1000 -> IO.puts: stderr, "Voglio i messaggi!" end |> IO.puts # => Yay, un messaggio: ciao! # => ciao!
Nota che la clausola restituisce il risultato della valutazione dell'ultima riga, quindi otteniamo il "Ciao!" stringa.
Ricorda che puoi inserire tutte le clausole necessarie:
send (self (), : ok, "ciao!") receive do : ok, msg -> IO.puts "Yay, un messaggio: # msg" msg : error, msg -> IO .puts: stderr, "Oh no, è successo qualcosa di brutto: # msg" _ -> IO.puts "Non so cosa sia questo messaggio ..." dopo 1000 -> IO.puts: stderr, "I want messages!" end |> IO.puts
Qui abbiamo quattro clausole: una per gestire un messaggio di successo, un'altra per gestire gli errori, quindi una clausola di "fallback" e un timeout.
Se il messaggio non corrisponde a nessuna delle clausole, viene mantenuto nella casella di posta, il che non è sempre auspicabile. Perché? Perché ogni volta che arriva un nuovo messaggio, quelli vecchi vengono elaborati nella prima testata (perché la cassetta postale è una coda FIFO), rallentando il programma. Pertanto una clausola di "fallback" potrebbe tornare utile.
Ora che sai come generare processi, inviare e ricevere messaggi, diamo un'occhiata a un esempio leggermente più complesso che comporta la creazione di un semplice server che risponde a vari messaggi.
Nell'esempio precedente, abbiamo inviato solo un messaggio, lo abbiamo ricevuto ed eseguito un po 'di lavoro. Va bene, ma non è molto funzionale. Di solito succede che abbiamo un server in grado di rispondere a vari messaggi. Per "server" intendo un processo di lunga durata costruito con una funzione ricorrente. Ad esempio, creiamo un server per eseguire alcune equazioni matematiche. Sta per ricevere un messaggio contenente l'operazione richiesta e alcuni argomenti.
Inizia creando il server e la funzione di loop:
defmodule MathServer fa def start do spawn e ascolta / 0 end defp ascolta do do do : sqrt, caller, arg -> IO.puts arg _ -> IO.puts: stderr, "Non implementato." end listen () end end
Quindi generiamo un processo che continua ad ascoltare i messaggi in arrivo. Dopo che il messaggio è stato ricevuto, il ascoltare / 0
la funzione viene nuovamente chiamata, creando così un ciclo infinito. Dentro il ascoltare / 0
funzione, aggiungiamo il supporto per il : sqrt
messaggio, che calcolerà la radice quadrata di un numero. Il arg
conterrà il numero effettivo per eseguire l'operazione contro. Inoltre, stiamo definendo una clausola di fallback.
Ora puoi avviare il server e assegnare il suo id di processo a una variabile:
math_server = MathServer.start IO.inspect math_server # => #PID<0.85.0>
Brillante! Ora aggiungiamo un funzione di implementazione per eseguire effettivamente il calcolo:
defmodule MathServer do # ... def sqrt (server, arg) invia (: some_name, : sqrt, self (), arg) end end
Usa questa funzione ora:
MathServer.sqrt (math_server, 3) # => 3
Per ora, stampa semplicemente l'argomento passato, quindi aggiusta il tuo codice in questo modo per eseguire l'operazione matematica:
defmodule MathServer do # ... defp ascolta do receive do : sqrt, caller, arg -> send (: some_name, : result, do_sqrt (arg)) _ -> IO.puts: stderr, "Non implementato." end listen () end defp do_sqrt (arg) do: math.sqrt (arg) end end
Ora, ancora un altro messaggio viene inviato al server contenente il risultato del calcolo.
Quello che è interessante è che il sqrt / 2
la funzione invia semplicemente un messaggio al server che chiede di eseguire un'operazione senza attendere il risultato. Quindi, in pratica, esegue un chiamata asincrona.
Ovviamente, vorremmo prendere il risultato ad un certo punto nel tempo, quindi codificare un'altra funzione pubblica:
def grab_result do do do : result, result -> result after 5000 -> IO.puts: stderr, "Timeout" end end
Ora utilizzalo:
math_server = MathServer.start MathServer.sqrt (math_server, 3) MathServer.grab_result |> IO.puts # => 1.7320508075688772
Funziona! Naturalmente, è anche possibile creare un pool di server e distribuire le attività tra loro, raggiungendo la concorrenza. È conveniente quando le richieste non sono correlate tra loro.
Va bene, abbiamo coperto una manciata di funzioni che ci permettono di creare processi server di lunga durata e inviare e ricevere messaggi. Questo è grandioso, ma dobbiamo scrivere troppo codice boilerplate che avvia un loop del server (avviare / 0
), risponde ai messaggi (ascoltare / 0
funzione privata) e restituisce un risultato (grab_result / 0
). In situazioni più complesse, potremmo anche aver bisogno di gestire uno stato condiviso o gestire gli errori.
Come ho detto all'inizio dell'articolo, non è necessario reinventare una bicicletta. Invece, possiamo utilizzare il comportamento di GenServer che già fornisce per noi tutti il codice boilerplate e ha un grande supporto per i processi del server (come abbiamo visto nella sezione precedente).
Comportamento in Elixir è un codice che implementa un modello comune. Per utilizzare GenServer, è necessario definire uno speciale modulo di richiamata che soddisfa il contratto come dettato dal comportamento. Nello specifico, dovrebbe implementare alcune funzioni di callback e l'implementazione effettiva spetta a te. Dopo che le callback sono state scritte, il modulo di comportamento potrebbe utilizzarli.
Come dichiarato dai documenti, GenServer richiede sei callback da implementare, sebbene abbiano anche un'implementazione predefinita. Significa che puoi ridefinire solo quelli che richiedono una logica personalizzata.
Per prima cosa: dobbiamo avviare il server prima di fare qualsiasi altra cosa, quindi procedere alla sezione successiva!
Per dimostrare l'utilizzo di GenServer, scriviamo a CalcServer
ciò consentirà agli utenti di applicare varie operazioni a un argomento. Il risultato dell'operazione verrà archiviato in a stato del server, e quindi può essere applicata anche un'altra operazione. Oppure un utente può ottenere un risultato finale dei calcoli.
Innanzitutto, utilizzare la macro di utilizzo per collegare GenServer:
defmodule CalcServer usa GenServer end
Ora avremo bisogno di ridefinire alcune callback.
Il primo è init / 1, che viene richiamato all'avvio di un server. L'argomento passato viene utilizzato per impostare lo stato di un server iniziale. Nel caso più semplice, questa callback dovrebbe restituire il : ok, initial_state
tupla, anche se ci sono altri possibili valori di ritorno come : stop, motivo
, che provoca l'arresto immediato del server.
Penso che possiamo consentire agli utenti di definire lo stato iniziale del nostro server. Tuttavia, dobbiamo verificare che l'argomento passato sia un numero. Quindi usa una clausola di guardia per questo:
defmodule CalcServer usa GenServer def init (initial_value) quando is_number (initial_value) do : ok, initial_value end def init (_) do : stop, "Il valore deve essere un numero intero!" end-end
Ora, avvia semplicemente il server usando la funzione start / 3 e fornisci il tuo CalcServer
come un modulo di callback (il primo argomento). Il secondo argomento sarà lo stato iniziale:
GenServer.start (CalcServer, 5.1) |> IO.inspect # => : ok, #PID<0.85.0>
Se si tenta di passare un non numero come secondo argomento, il server non verrà avviato, che è esattamente ciò di cui abbiamo bisogno.
Grande! Ora che il nostro server è in esecuzione, possiamo iniziare a codificare operazioni matematiche.
Vengono chiamate richieste asincrone calchi nei termini di GenServer. Per eseguire tale richiesta, utilizzare la funzione cast / 2, che accetta un server e la richiesta effettiva. È simile al sqrt / 2
funzione che abbiamo codificato quando si parla di processi del server. Utilizza anche l'approccio "fuoco e dimentica", il che significa che non stiamo aspettando la fine della richiesta.
Per gestire i messaggi asincroni, viene utilizzato un callback call_cast / 2. Accetta una richiesta e uno stato e dovrebbe rispondere con una tupla : noreply, new_state
nel caso più semplice (o : stop, motivo, nuovo_stato
per fermare il loop del server). Ad esempio, gestiamo un asincrono : sqrt
espressi:
def handle_cast (: sqrt, state) do : noreply,: math.sqrt (stato) fine
Ecco come manteniamo lo stato del nostro server. Inizialmente il numero (passato quando il server è stato avviato) era 5.1
. Ora aggiorniamo lo stato e lo impostiamo : Math.sqrt (5.1)
.
Codifica la funzione di interfaccia che utilizza pressofuso / 2
:
def sqrt (pid) do GenServer.cast (pid,: sqrt) end
Per me, questo assomiglia a un mago malvagio che lancia un incantesimo ma non si preoccupa dell'impatto che provoca.
Si noti che abbiamo bisogno di un ID di processo per eseguire il cast. Ricorda che quando un server viene avviato correttamente, una tupla : ok, pid
viene restituito. Pertanto, utilizziamo la corrispondenza del modello per estrarre l'id del processo:
: ok, pid = GenServer.start (CalcServer, 5.1) CalcServer.sqrt (pid)
Bello! Lo stesso approccio può essere usato per implementare, diciamo, moltiplicazione. Il codice sarà un po 'più complesso dato che dovremo passare il secondo argomento, un moltiplicatore:
def moltiplicare (pid, moltiplicatore) fare GenServer.cast (pid, : moltiplicare, moltiplicatore) fine
Il lanciare
la funzione supporta solo due argomenti, quindi ho bisogno di costruire una tupla e passare un argomento aggiuntivo lì.
Ora il callback:
def handle_cast (: moltiplicazione, moltiplicatore, stato) do : noreply, stato * moltiplicatore fine
Possiamo anche scrivere un singolo handle_cast
callback che supporta l'operazione e interrompe il server se l'operazione è sconosciuta:
def handle_cast (operation, state) fa l'operazione del caso do: sqrt -> : noreply,: math.sqrt (stato) : moltiplicazione, moltiplicatore -> : noreply, stato * moltiplicatore _ -> : stop, "Non implementato", stato fine fine
Ora usa la nuova funzione di interfaccia:
CalcServer.multiply (pid, 2)
Ottimo, ma al momento non c'è modo di ottenere un risultato dei calcoli. Pertanto, è il momento di definire ancora un altro callback.
Se le richieste asincrone sono casuali, allora quelle sincrone sono nominate chiamate. Per eseguire tali richieste, utilizzare la funzione chiamata / 3, che accetta un server, una richiesta e un timeout opzionale che equivale a cinque secondi per impostazione predefinita.
Le richieste sincrone vengono utilizzate quando si desidera attendere che la risposta arrivi effettivamente dal server. Il tipico caso d'uso è ottenere alcune informazioni come risultato di calcoli, come nell'esempio di oggi (ricordate il grab_result / 0
funzione da una delle sezioni precedenti).
Per elaborare le richieste sincrone, a handle_call / 3
la callback è utilizzata. Accetta una richiesta, una tupla contenente il pid del server e un termine che identifica la chiamata, nonché lo stato corrente. Nel caso più semplice, dovrebbe rispondere con una tupla : rispondi, rispondi, nuovo_stato
.
Codifica ora questa callback:
def handle_call (: result, _, state) do : reply, state, state end
Come vedi, niente di complesso. Il rispondere
e il nuovo stato è uguale allo stato corrente in quanto non voglio cambiare nulla dopo che il risultato è stato restituito.
Ora l'interfaccia risultato / 1
funzione:
def result (pid) do GenServer.call (pid,: result) end
Questo è! L'utilizzo finale del CalcServer è illustrato di seguito:
: ok, pid = GenServer.start (CalcServer, 5.1) CalcServer.sqrt (pid) CalcServer.multiply (pid, 2) CalcServer.result (pid) |> IO.puts # => 4.516635916254486
Diventa un po 'noioso fornire sempre un ID di processo quando si chiamano le funzioni dell'interfaccia. Fortunatamente, è possibile dare al tuo processo un nome o un nome alias. Questo viene fatto all'avvio del server impostando nome
:
GenServer.start (CalcServer, 5.1, nome: calc) CalcServer.sqrt CalcServer.multiply (2) CalcServer.result |> IO.puts
Si noti che non sto memorizzando pid ora, anche se si potrebbe voler fare la corrispondenza dei pattern per assicurarsi che il server sia stato effettivamente avviato.
Ora le funzioni dell'interfaccia diventano un po 'più semplici:
def sqrt do GenServer.cast (: calc,: sqrt) end def moltiplicare (moltiplicatore) do GenServer.cast (: calc, : moltiplicare, moltiplicare) end def result do GenServer.call (: calc,: result) end
Non dimenticare che non è possibile avviare due server con lo stesso alias.
In alternativa, è possibile introdurre un'altra funzione di interfaccia avviare / 1
all'interno del tuo modulo e sfruttare la macro __MODULE __ / 0, che restituisce il nome del modulo corrente come un atomo:
defmodule CalcServer usa GenServer def start (initial_value) do GenServer.start (CalcServer, initial_value, name: __MODULE__) end def sqrt do GenServer.cast (__ MODULE__,: sqrt) end def moltiplicare (moltiplicatore) do GenServer.cast (__ MODULE__, : multiply, moltiplicatore) end def result do GenServer.call (__ MODULE__,: result) end # ... end CalcServer.start (6.1) CalcServer.sqrt CalcServer.multiply (2) CalcServer.result |> IO.puts
Un altro callback che può essere ridefinito nel modulo è chiamato terminate / 2. Accetta un motivo e lo stato corrente e viene chiamato quando un server sta per uscire. Ciò può accadere quando, ad esempio, si passa un argomento errato a moltiplicare / 1
funzione di interfaccia:
# ... CalcServer.multiply (2)
Il callback potrebbe essere simile a questo:
def terminate (_reason, _state) do IO.puts "The server terminated" end
In questo articolo abbiamo trattato le basi della concorrenza in Elixir e abbiamo discusso di funzioni e macro come uova
, ricevere
, e inviare
. Hai imparato quali sono i processi, come crearli e come inviare e ricevere messaggi. Inoltre, abbiamo visto come creare un semplice processo server di lunga durata che risponde a entrambi i messaggi sincroni e asincroni.
Inoltre, abbiamo discusso del comportamento di GenServer e abbiamo visto come semplifica il codice introducendo vari callback. Abbiamo lavorato con il dentro
, terminare
, handle_call
e handle_cast
callback e creato un semplice server di calcolo. Se qualcosa ti è sembrato poco chiaro, non esitare a pubblicare le tue domande!
C'è di più in GenServer, e ovviamente è impossibile coprire tutto in un articolo. Nel mio prossimo post, spiegherò cosa supervisori sono e come puoi usarli per monitorare i tuoi processi e recuperarli dagli errori. Fino ad allora, felice codifica!