Concorrenza in RxJava 2

Un'app multithreading ha due o più parti che possono essere eseguite in parallelo. Ciò consente all'app di utilizzare meglio i core all'interno della CPU del dispositivo. Ciò consente di svolgere le attività più velocemente e offre all'utente un'esperienza più fluida e reattiva. 

La codifica per la concorrenza in Java può essere dolorosa, ma grazie a RxJava, ora è molto più facile da fare. Con RxJava, devi solo dichiarare il thread su cui vuoi che l'attività sia eseguita (dichiaratamente) invece di creare e gestire i thread (imperativamente). 

RxJava fare uso di schedulatori insieme con il subscribeOn () e observeOn () operatori di concorrenza per raggiungere questo obiettivo. In questo tutorial, imparerai a conoscere schedulatori, il subscribeOn () operatore, il observeOn () operatore, e anche come sfruttare il flatMap () operatore per raggiungere la concorrenza. Ma prima, iniziamo con schedulatori nel RxJava.

Prerequisiti 

Per seguire questo tutorial, dovresti avere familiarità con:

  • RxJava 2 su Android
  • espressioni lambda

Dai un'occhiata ai nostri altri post per imparare le basi delle espressioni RxJava e lambda.

Scheduler in RxJava 2

schedulatori in RxJava vengono utilizzati per eseguire un'unità di lavoro su un thread. UN Scheduler fornisce un'astrazione al meccanismo di threading Android e Java. Quando vuoi eseguire un'attività e fai uso di a Scheduler per eseguire tale compito, il Scheduler passa al suo pool di thread (una raccolta di thread pronti per l'uso) e quindi esegue l'attività in un thread disponibile. 

È anche possibile specificare che un'attività debba essere eseguita in un thread specifico. (Ci sono due operatori, subscribeOn () e observeOn (), che può essere usato per specificare su quale thread dal Scheduler pool di thread l'attività dovrebbe essere eseguita.)

Come sapete, in Android, i processi a esecuzione prolungata o le attività ad alta intensità di CPU non devono essere eseguiti sul thread principale. Se un abbonamento di un Osservatore ad Osservabile è condotto sul thread principale, ogni operatore associato verrà eseguito anche sul thread principale. Nel caso di un'attività a esecuzione prolungata (ad es. Esecuzione di una richiesta di rete) o di un lavoro intensivo della CPU (ad es. Trasformazione dell'immagine), questo bloccherà l'interfaccia utente fino al termine dell'attività, portando alla terribile finestra di dialogo ANR (Applicazione non rispondente). e l'app si arresta in modo anomalo. Questi operatori possono invece essere passati a un altro thread con observeOn () operatore. 

Nella prossima sezione, esploreremo i diversi tipi di schedulatori e i loro usi.

Tipi di programmatori

Ecco alcuni dei tipi di schedulatori disponibile in RxJava e RxAndroid per indicare il tipo di thread per eseguire attività su. 

  • Schedulers.immediate (): restituisce a Scheduler che esegue il lavoro istantaneamente nel thread corrente. Tieni presente che ciò bloccherà il thread corrente, quindi dovrebbe essere usato con cautela. 
  • Schedulers.trampoline (): pianifica le attività nel thread corrente. Queste attività non vengono eseguite immediatamente, ma vengono eseguite dopo che il thread ha terminato le attività correnti. Questo è diverso da Schedulers.immediate () perché invece di eseguire immediatamente un'attività, attende il completamento delle attività correnti. 
  • Schedulers.newThread (): avvia una nuova discussione e restituisce a Scheduler per eseguire l'attività nel nuovo thread per ciascuno Osservatore. Si dovrebbe fare attenzione a usare questo perché il nuovo thread non viene riutilizzato in seguito ma viene invece distrutto. 
  • Schedulers.computation (): questo ci dà un Scheduler che è inteso per il lavoro intensivo di calcolo come la trasformazione dell'immagine, calcoli complessi, ecc. Questa operazione utilizza completamente i core della CPU. Questo Scheduler utilizza una dimensione del pool di thread fissa che dipende dai core della CPU per un utilizzo ottimale. È necessario fare attenzione a non creare più thread rispetto ai core della CPU disponibili poiché ciò può ridurre le prestazioni. 
  • Schedulers.io (): crea e restituisce a Scheduler designato per lavori con I / O come esecuzione di chiamate di rete asincrone o lettura e scrittura nel database. Queste attività non sono ad uso intensivo della CPU o altro Schedulers.computation ().
  • Schedulers.single (): crea e restituisce a Scheduler ed esegue diversi compiti in sequenza in un singolo thread. 
  • Schedulers.from (esecutore esecutore): questo creerà a Scheduler che eseguirà un compito o un'unità di lavoro sul dato Esecutore
  • AndroidSchedulers.mainThread (): questo creerà a Scheduler che esegue l'attività sul thread principale dell'applicazione Android. Questo tipo di schedulatore è fornito da RxAndroid biblioteca. 

Il subscribeOn () Operatore

Usando il subscribeOn () operatore di concorrenza, si specifica che il Scheduler dovrebbe eseguire l'operazione nel Osservabile a monte. Quindi sposterà i valori su Gli osservatori usando lo stesso thread Ora vediamo un esempio pratico:

importare android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import io.reactivex.Observable; import io.reactivex.ObservableOnSubscribe; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; public class MainActivity estende AppCompatActivity String statico finale privato [] STATI = "Lagos", "Abuja", "Abia", "Edo", "Enugu", "Niger", "Anambra"; Monouso privato mDisposable = null; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Osservabile observable = Observable.create (dataSource ()) .subscribeOn (Schedulers.newThread ()) .doOnComplete (() -> Log.d ("MainActivity", "Complete")); mDisposable = observable.subscribe (s -> Log.d ("MainActivity", "received" + s + "on thread" + Thread.currentThread (). getName ()););  ObservableOnSubscribe privato dataSource () return (emitter -> for (Stato stringa: STATI) emitter.onNext (stato); Log.d ("MainActivity", "emitting" + state + "on thread" + Thread.currentThread (). getName ()); Thread.sleep (600); emitter.onComplete (););  @Override protected void onDestroy () if (mDisposable! = Null &&! MDisposable.isDisposed ()) mDisposable.dispose ();  super.onDestroy (); 

Nel codice sopra, abbiamo una statica Lista di array che contiene alcuni stati in Nigeria. Abbiamo anche un campo che è di tipo Monouso. Otteniamo il Monouso istanza chiamando Observable.subscribe (), e lo useremo più tardi quando chiameremo il dispose () metodo per rilasciare le risorse che sono state utilizzate. Questo aiuta a prevenire perdite di memoria. Nostro fonte di dati() il metodo (che può restituire dati da un'origine database remota o locale) restituirà ObservableOnSubscribe: questo è necessario per noi per creare il nostro Osservabile successivamente usando il metodo Observable.create ()

Dentro il fonte di dati() metodo, passiamo attraverso l'array, emettendo ogni elemento al Gli osservatori a chiamata emitter.onNext (). Dopo che ogni valore è stato emesso, dormiamo il thread in modo da simulare il lavoro intenso che viene eseguito. Infine, chiamiamo il onComplete () metodo per segnalare al Gli osservatori che abbiamo finito con i valori e che non dovrebbero più aspettarsi. 

Ora, il nostro fonte di dati() il metodo non dovrebbe essere eseguito sul thread principale dell'interfaccia utente. Ma come è specificato? Nell'esempio sopra, abbiamo fornito Schedulers.newThread () come argomento a subscribeOn (). Ciò significa che il fonte di dati() l'operazione verrà eseguita in una nuova discussione. Nota anche che nell'esempio sopra, ne abbiamo uno solo Osservatore. Se avessimo più Gli osservatori, ognuno di loro avrebbe avuto il suo filo. 

In modo che possiamo vedere questo funziona, il nostro Osservatore stampa i valori che ottiene nella sua onNext () metodo dal Osservabile

Quando eseguiamo questa operazione e visualizziamo il nostro logcat su Android Studio, puoi vedere che le emissioni di fonte di dati() metodo per il Osservatore è successo sullo stesso thread-RxNewThreadScheduler-1-in cui la Osservatore li ha ricevuti. 

Se non si specifica il .subscribeOn () metodo dopo il Observable.create () metodo, verrà eseguito sul thread corrente, che nel nostro caso è il thread principale, bloccando in tal modo l'interfaccia utente dell'app. 

Ci sono alcuni dettagli importanti di cui dovresti essere a conoscenza riguardo al subscribeOn () operatore. Dovresti averne solo uno subscribeOn () nel Osservabile catena; aggiungerne un altro in qualsiasi punto della catena non avrà alcun effetto. Il posto consigliato per mettere questo operatore è il più vicino possibile alla fonte per motivi di chiarezza. In altre parole, posizionalo prima nella catena di operatori. 

Observable.create (dataSource ()) .subscribeOn (Schedulers.computation ()) // ha effetto .subscribeOn (Schedulers.io ()) // non ha alcun effetto .doOnNext (s -> saveToCache (s); // eseguito su Schedulers.computation ())

Il observeOn () Operatore

Come abbiamo visto, il subscribeOn () l'operatore di concorrenza istruirà il Osservabile quale Scheduler da usare per spingere le emissioni in avanti lungo il Osservabile catena al Gli osservatori

Il lavoro del observeOn () l'operatore di concorrenza, d'altra parte, è quello di passare le emissioni successive su un altro thread o Scheduler. Usiamo questo operatore per controllare su quali thread i consumatori a valle riceveranno le emissioni. Vediamo un esempio pratico. 

importare android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import android.widget.TextView; import io.reactivex.Observable; import io.reactivex.ObservableOnSubscribe; import io.reactivex.android.schedulers.AndroidSchedulers; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; public class ObserveOnActivity estende AppCompatActivity private Disposable mDisposable = null; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); TextView textView = (TextView) findViewById (R.id.tv_main); Osservabile observable = Observable.create (dataSource ()) .subscribeOn (Schedulers.newThread ()) .observeOn (AndroidSchedulers.mainThread ()) .doOnComplete (() -> Log.d ("ObserveOnActivity", "Complete")); mDisposable = observable.subscribe (s -> Log.d ("ObserveOnActivity", "received" + s + "sul thread" + Thread.currentThread (). getName ()); textView.setText (s););  ObservableOnSubscribe privato dataSource () return (emitter -> Thread.sleep (800); emitter.onNext ("Value"); Log.d ("ObserveOnActivity", "dataSource () sul thread" + Thread.currentThread (). getName ( )); emitter.onComplete (););  // ... 

Nel codice sopra, abbiamo usato il observeOn () operatore e poi ha passato il AndroidSchedulers.mainThread () ad esso. Quello che abbiamo fatto è passare il thread da Schedulers.newThread () al thread principale di Android. Questo è necessario perché vogliamo aggiornare il TextView widget, e può farlo solo dal thread dell'interfaccia utente principale. Si noti che se non si passa al thread principale quando si tenta di aggiornare il TextView widget, l'app si blocca e lancia un CalledFromWrongThreadException

non mi piace il subscribeOn () operatore, il observeOn () l'operatore può essere applicato più volte nella catena dell'operatore, modificando in tal modo Scheduler più di una volta. 

Osservabile observable = Observable.create (dataSource ()) .subscribeOn (Schedulers.newThread ()) .observeOn (Schedulers.io ()) .doOnNext (s -> saveToCache (s); Log.d ("ObserveOnActivity", "doOnNext () sul thread "+ Thread.currentThread (). getName ());) .observeOn (AndroidSchedulers.mainThread ()) .doOnComplete (() -> Log.d (" ObserveOnActivity "," Complete "));

Questo codice ha due observeOn () operatori. Il primo usa il Schedulers.io (), il che significa che il saveToCache () il metodo verrà eseguito su Schedulers.io () filo. Dopodiché, passa a AndroidSchedulers.mainThread () dove Gli osservatori riceverà le emissioni dall'upstream. 

Concorrenza con l'operatore flatMap ()

Il flatMap () l'operatore è un altro operatore molto potente e importante che può essere utilizzato per raggiungere la concorrenza. La definizione secondo la documentazione ufficiale è la seguente:

Trasforma gli oggetti emessi da un Osservabile in Osservabili, quindi appiattisci le emissioni da quelle in un singolo Osservabile.


Diamo un'occhiata a un esempio pratico che utilizza questo operatore: 

 // ... @Override protected void onCreate (Bundle savedInstanceState) // ... final String [] states = "Lagos", "Abuja", "Imo", "Enugu"; Osservabile statesObservable = Observable.fromArray (states); statesObservable.flatMap (s -> Observable.create (getPopulation (s))) .subscribe (pair -> Log.d ("MainActivity", pair.first + "population is" + pair.second));  ObservableOnSubscribe privato getPopulation (String state) return (emitter -> Random r = new Random (); Log.d ("MainActivity", "getPopulation () per" + state + "chiamato" + Thread.currentThread (). getName ( )); emitter.onNext (nuova coppia (stato, r.nextInt (300000 - 10000) + 10000)); emitter.onComplete ();); 

Questo stamperà quanto segue sul logcat di Android Studio:

getPopulation () per Lagos chiamato sulla popolazione principale di Lagos è 80362 getPopulation () per Abuja chiamato sulla popolazione principale di Abuja è 132559 getPopulation () per Imo chiamato sulla popolazione Imo principale è 34106 getPopulation () per Enugu chiamato sulla popolazione Enugu principale è 220301

Dal risultato sopra, puoi vedere che i risultati che abbiamo ottenuto erano nello stesso ordine dell'array. Anche il getPopulation () il metodo per ogni stato è stato elaborato sullo stesso thread, il thread principale. Ciò rende il risultato di output lento poiché sono stati elaborati in sequenza sul thread principale. 

Ora, per consentirci di raggiungere la concorrenza con questo operatore, vogliamo getPopulation () metodo per ogni stato (emissioni dal statesObservable) da elaborare su fili diversi Fare questo porterà a un'elaborazione più veloce. Useremo il flatMap () operatore per fare questo perché crea un nuovo Osservabile per ogni emissione. Applichiamo quindi il subscribeOn () operatore di concorrenza a ciascuno, passando a Scheduler ad esso. 

 statesObservable.flatMap (s -> Observable.create (getPopulation (s)) .subscribeOn (Schedulers.io ())) .subscribe (pair -> Log.d ("MainActivity", pair.first + "population is" + coppia .secondo));

Come ogni emissione produce un Osservabile, il flatMap () il lavoro dell'operatore è di unirli insieme e quindi inviarli come un singolo flusso. 

getPopulation () per Lagos chiamato su RxCachedThreadScheduler-1 La popolazione di Lagos è 143965 getPopulation () per Abuja chiamato su RxCachedThreadScheduler-2 getPopulation () per Enugu chiamato su RxCachedThreadScheduler-4 La popolazione di Abuja è 158363 La popolazione di Enugu è 271420 getPopulation () per Imo chiamata su RxCachedThreadScheduler -3 La popolazione di Imo è 81564

Nel risultato sopra, possiamo osservare che ogni stato è getPopulation () il metodo è stato elaborato su thread diversi. Questo rende la lavorazione molto più veloce, ma osserva anche che le emissioni dal flatMap () operatore che sono stati ricevuti dal Osservatore non sono nello stesso ordine delle emissioni originali a monte. 

Conclusione

In questo tutorial, hai imparato a gestire la concorrenza usando RxJava 2: che cos'è, il diverso schedulatori disponibile e come utilizzare il subscribeOn () e observeOn () operatori di concorrenza. Ti ho anche mostrato come usare il flatMap () operatore per raggiungere la concorrenza. 

Nel frattempo, dai uno sguardo ad alcuni dei nostri altri corsi e tutorial sul linguaggio Java e sullo sviluppo di app per Android!