Un'app multithreading ha due o più parti che possono essere eseguite in parallelo. Ciò consente all'app di utilizzare meglio i core all'interno della CPU del dispositivo. Ciò consente di svolgere le attività più velocemente e offre all'utente un'esperienza più fluida e reattiva.
La codifica per la concorrenza in Java può essere dolorosa, ma grazie a RxJava
, ora è molto più facile da fare. Con RxJava
, devi solo dichiarare il thread su cui vuoi che l'attività sia eseguita (dichiaratamente) invece di creare e gestire i thread (imperativamente).
RxJava
fare uso di schedulatori
insieme con il subscribeOn ()
e observeOn ()
operatori di concorrenza per raggiungere questo obiettivo. In questo tutorial, imparerai a conoscere schedulatori
, il subscribeOn ()
operatore, il observeOn ()
operatore, e anche come sfruttare il flatMap ()
operatore per raggiungere la concorrenza. Ma prima, iniziamo con schedulatori
nel RxJava
.
Per seguire questo tutorial, dovresti avere familiarità con:
Dai un'occhiata ai nostri altri post per imparare le basi delle espressioni RxJava e lambda.
schedulatori
in RxJava vengono utilizzati per eseguire un'unità di lavoro su un thread. UN Scheduler
fornisce un'astrazione al meccanismo di threading Android e Java. Quando vuoi eseguire un'attività e fai uso di a Scheduler
per eseguire tale compito, il Scheduler
passa al suo pool di thread (una raccolta di thread pronti per l'uso) e quindi esegue l'attività in un thread disponibile.
È anche possibile specificare che un'attività debba essere eseguita in un thread specifico. (Ci sono due operatori, subscribeOn ()
e observeOn ()
, che può essere usato per specificare su quale thread dal Scheduler
pool di thread l'attività dovrebbe essere eseguita.)
Come sapete, in Android, i processi a esecuzione prolungata o le attività ad alta intensità di CPU non devono essere eseguiti sul thread principale. Se un abbonamento di un Osservatore
ad Osservabile
è condotto sul thread principale, ogni operatore associato verrà eseguito anche sul thread principale. Nel caso di un'attività a esecuzione prolungata (ad es. Esecuzione di una richiesta di rete) o di un lavoro intensivo della CPU (ad es. Trasformazione dell'immagine), questo bloccherà l'interfaccia utente fino al termine dell'attività, portando alla terribile finestra di dialogo ANR (Applicazione non rispondente). e l'app si arresta in modo anomalo. Questi operatori possono invece essere passati a un altro thread con observeOn ()
operatore.
Nella prossima sezione, esploreremo i diversi tipi di schedulatori
e i loro usi.
Ecco alcuni dei tipi di schedulatori
disponibile in RxJava
e RxAndroid
per indicare il tipo di thread per eseguire attività su.
Schedulers.immediate ()
: restituisce a Scheduler
che esegue il lavoro istantaneamente nel thread corrente. Tieni presente che ciò bloccherà il thread corrente, quindi dovrebbe essere usato con cautela. Schedulers.trampoline ()
: pianifica le attività nel thread corrente. Queste attività non vengono eseguite immediatamente, ma vengono eseguite dopo che il thread ha terminato le attività correnti. Questo è diverso da Schedulers.immediate ()
perché invece di eseguire immediatamente un'attività, attende il completamento delle attività correnti. Schedulers.newThread ()
: avvia una nuova discussione e restituisce a Scheduler
per eseguire l'attività nel nuovo thread per ciascuno Osservatore
. Si dovrebbe fare attenzione a usare questo perché il nuovo thread non viene riutilizzato in seguito ma viene invece distrutto. Schedulers.computation ()
: questo ci dà un Scheduler
che è inteso per il lavoro intensivo di calcolo come la trasformazione dell'immagine, calcoli complessi, ecc. Questa operazione utilizza completamente i core della CPU. Questo Scheduler
utilizza una dimensione del pool di thread fissa che dipende dai core della CPU per un utilizzo ottimale. È necessario fare attenzione a non creare più thread rispetto ai core della CPU disponibili poiché ciò può ridurre le prestazioni. Schedulers.io ()
: crea e restituisce a Scheduler
designato per lavori con I / O come esecuzione di chiamate di rete asincrone o lettura e scrittura nel database. Queste attività non sono ad uso intensivo della CPU o altro Schedulers.computation ()
.Schedulers.single ()
: crea e restituisce a Scheduler
ed esegue diversi compiti in sequenza in un singolo thread. Schedulers.from (esecutore esecutore)
: questo creerà a Scheduler
che eseguirà un compito o un'unità di lavoro sul dato Esecutore
. AndroidSchedulers.mainThread ()
: questo creerà a Scheduler
che esegue l'attività sul thread principale dell'applicazione Android. Questo tipo di schedulatore è fornito da RxAndroid
biblioteca. subscribeOn ()
OperatoreUsando il subscribeOn ()
operatore di concorrenza, si specifica che il Scheduler
dovrebbe eseguire l'operazione nel Osservabile
a monte. Quindi sposterà i valori su Gli osservatori
usando lo stesso thread Ora vediamo un esempio pratico:
importare android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import io.reactivex.Observable; import io.reactivex.ObservableOnSubscribe; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; public class MainActivity estende AppCompatActivity String statico finale privato [] STATI = "Lagos", "Abuja", "Abia", "Edo", "Enugu", "Niger", "Anambra"; Monouso privato mDisposable = null; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Osservabileobservable = Observable.create (dataSource ()) .subscribeOn (Schedulers.newThread ()) .doOnComplete (() -> Log.d ("MainActivity", "Complete")); mDisposable = observable.subscribe (s -> Log.d ("MainActivity", "received" + s + "on thread" + Thread.currentThread (). getName ());); ObservableOnSubscribe privato dataSource () return (emitter -> for (Stato stringa: STATI) emitter.onNext (stato); Log.d ("MainActivity", "emitting" + state + "on thread" + Thread.currentThread (). getName ()); Thread.sleep (600); emitter.onComplete ();); @Override protected void onDestroy () if (mDisposable! = Null &&! MDisposable.isDisposed ()) mDisposable.dispose (); super.onDestroy ();
Nel codice sopra, abbiamo una statica Lista di array
che contiene alcuni stati in Nigeria. Abbiamo anche un campo che è di tipo Monouso
. Otteniamo il Monouso
istanza chiamando Observable.subscribe ()
, e lo useremo più tardi quando chiameremo il dispose ()
metodo per rilasciare le risorse che sono state utilizzate. Questo aiuta a prevenire perdite di memoria. Nostro fonte di dati()
il metodo (che può restituire dati da un'origine database remota o locale) restituirà ObservableOnSubscribe
: questo è necessario per noi per creare il nostro Osservabile
successivamente usando il metodo Observable.create ()
.
Dentro il fonte di dati()
metodo, passiamo attraverso l'array, emettendo ogni elemento al Gli osservatori
a chiamata emitter.onNext ()
. Dopo che ogni valore è stato emesso, dormiamo il thread in modo da simulare il lavoro intenso che viene eseguito. Infine, chiamiamo il onComplete ()
metodo per segnalare al Gli osservatori
che abbiamo finito con i valori e che non dovrebbero più aspettarsi.
Ora, il nostro fonte di dati()
il metodo non dovrebbe essere eseguito sul thread principale dell'interfaccia utente. Ma come è specificato? Nell'esempio sopra, abbiamo fornito Schedulers.newThread ()
come argomento a subscribeOn ()
. Ciò significa che il fonte di dati()
l'operazione verrà eseguita in una nuova discussione. Nota anche che nell'esempio sopra, ne abbiamo uno solo Osservatore
. Se avessimo più Gli osservatori
, ognuno di loro avrebbe avuto il suo filo.
In modo che possiamo vedere questo funziona, il nostro Osservatore
stampa i valori che ottiene nella sua onNext ()
metodo dal Osservabile
.
Quando eseguiamo questa operazione e visualizziamo il nostro logcat su Android Studio, puoi vedere che le emissioni di fonte di dati()
metodo per il Osservatore
è successo sullo stesso thread-RxNewThreadScheduler-1
-in cui la Osservatore
li ha ricevuti.
Se non si specifica il .subscribeOn ()
metodo dopo il Observable.create ()
metodo, verrà eseguito sul thread corrente, che nel nostro caso è il thread principale, bloccando in tal modo l'interfaccia utente dell'app.
Ci sono alcuni dettagli importanti di cui dovresti essere a conoscenza riguardo al subscribeOn ()
operatore. Dovresti averne solo uno subscribeOn ()
nel Osservabile
catena; aggiungerne un altro in qualsiasi punto della catena non avrà alcun effetto. Il posto consigliato per mettere questo operatore è il più vicino possibile alla fonte per motivi di chiarezza. In altre parole, posizionalo prima nella catena di operatori.
Observable.create (dataSource ()) .subscribeOn (Schedulers.computation ()) // ha effetto .subscribeOn (Schedulers.io ()) // non ha alcun effetto .doOnNext (s -> saveToCache (s); // eseguito su Schedulers.computation ())
observeOn ()
OperatoreCome abbiamo visto, il subscribeOn ()
l'operatore di concorrenza istruirà il Osservabile
quale Scheduler
da usare per spingere le emissioni in avanti lungo il Osservabile
catena al Gli osservatori
.
Il lavoro del observeOn ()
l'operatore di concorrenza, d'altra parte, è quello di passare le emissioni successive su un altro thread o Scheduler
. Usiamo questo operatore per controllare su quali thread i consumatori a valle riceveranno le emissioni. Vediamo un esempio pratico.
importare android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import android.widget.TextView; import io.reactivex.Observable; import io.reactivex.ObservableOnSubscribe; import io.reactivex.android.schedulers.AndroidSchedulers; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; public class ObserveOnActivity estende AppCompatActivity private Disposable mDisposable = null; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); TextView textView = (TextView) findViewById (R.id.tv_main); Osservabileobservable = Observable.create (dataSource ()) .subscribeOn (Schedulers.newThread ()) .observeOn (AndroidSchedulers.mainThread ()) .doOnComplete (() -> Log.d ("ObserveOnActivity", "Complete")); mDisposable = observable.subscribe (s -> Log.d ("ObserveOnActivity", "received" + s + "sul thread" + Thread.currentThread (). getName ()); textView.setText (s);); ObservableOnSubscribe privato dataSource () return (emitter -> Thread.sleep (800); emitter.onNext ("Value"); Log.d ("ObserveOnActivity", "dataSource () sul thread" + Thread.currentThread (). getName ( )); emitter.onComplete ();); // ...
Nel codice sopra, abbiamo usato il observeOn ()
operatore e poi ha passato il AndroidSchedulers.mainThread ()
ad esso. Quello che abbiamo fatto è passare il thread da Schedulers.newThread ()
al thread principale di Android. Questo è necessario perché vogliamo aggiornare il TextView
widget, e può farlo solo dal thread dell'interfaccia utente principale. Si noti che se non si passa al thread principale quando si tenta di aggiornare il TextView
widget, l'app si blocca e lancia un CalledFromWrongThreadException
.
non mi piace il subscribeOn ()
operatore, il observeOn ()
l'operatore può essere applicato più volte nella catena dell'operatore, modificando in tal modo Scheduler
più di una volta.
Osservabileobservable = Observable.create (dataSource ()) .subscribeOn (Schedulers.newThread ()) .observeOn (Schedulers.io ()) .doOnNext (s -> saveToCache (s); Log.d ("ObserveOnActivity", "doOnNext () sul thread "+ Thread.currentThread (). getName ());) .observeOn (AndroidSchedulers.mainThread ()) .doOnComplete (() -> Log.d (" ObserveOnActivity "," Complete "));
Questo codice ha due observeOn ()
operatori. Il primo usa il Schedulers.io ()
, il che significa che il saveToCache ()
il metodo verrà eseguito su Schedulers.io ()
filo. Dopodiché, passa a AndroidSchedulers.mainThread ()
dove Gli osservatori
riceverà le emissioni dall'upstream.
Il flatMap ()
l'operatore è un altro operatore molto potente e importante che può essere utilizzato per raggiungere la concorrenza. La definizione secondo la documentazione ufficiale è la seguente:
Trasforma gli oggetti emessi da un Osservabile in Osservabili, quindi appiattisci le emissioni da quelle in un singolo Osservabile.
Diamo un'occhiata a un esempio pratico che utilizza questo operatore:
// ... @Override protected void onCreate (Bundle savedInstanceState) // ... final String [] states = "Lagos", "Abuja", "Imo", "Enugu"; OsservabilestatesObservable = Observable.fromArray (states); statesObservable.flatMap (s -> Observable.create (getPopulation (s))) .subscribe (pair -> Log.d ("MainActivity", pair.first + "population is" + pair.second)); ObservableOnSubscribe privato getPopulation (String state) return (emitter -> Random r = new Random (); Log.d ("MainActivity", "getPopulation () per" + state + "chiamato" + Thread.currentThread (). getName ( )); emitter.onNext (nuova coppia (stato, r.nextInt (300000 - 10000) + 10000)); emitter.onComplete (););
Questo stamperà quanto segue sul logcat di Android Studio:
getPopulation () per Lagos chiamato sulla popolazione principale di Lagos è 80362 getPopulation () per Abuja chiamato sulla popolazione principale di Abuja è 132559 getPopulation () per Imo chiamato sulla popolazione Imo principale è 34106 getPopulation () per Enugu chiamato sulla popolazione Enugu principale è 220301
Dal risultato sopra, puoi vedere che i risultati che abbiamo ottenuto erano nello stesso ordine dell'array. Anche il getPopulation ()
il metodo per ogni stato è stato elaborato sullo stesso thread, il thread principale. Ciò rende il risultato di output lento poiché sono stati elaborati in sequenza sul thread principale.
Ora, per consentirci di raggiungere la concorrenza con questo operatore, vogliamo getPopulation ()
metodo per ogni stato (emissioni dal statesObservable
) da elaborare su fili diversi Fare questo porterà a un'elaborazione più veloce. Useremo il flatMap ()
operatore per fare questo perché crea un nuovo Osservabile
per ogni emissione. Applichiamo quindi il subscribeOn ()
operatore di concorrenza a ciascuno, passando a Scheduler
ad esso.
statesObservable.flatMap (s -> Observable.create (getPopulation (s)) .subscribeOn (Schedulers.io ())) .subscribe (pair -> Log.d ("MainActivity", pair.first + "population is" + coppia .secondo));
Come ogni emissione produce un Osservabile
, il flatMap ()
il lavoro dell'operatore è di unirli insieme e quindi inviarli come un singolo flusso.
getPopulation () per Lagos chiamato su RxCachedThreadScheduler-1 La popolazione di Lagos è 143965 getPopulation () per Abuja chiamato su RxCachedThreadScheduler-2 getPopulation () per Enugu chiamato su RxCachedThreadScheduler-4 La popolazione di Abuja è 158363 La popolazione di Enugu è 271420 getPopulation () per Imo chiamata su RxCachedThreadScheduler -3 La popolazione di Imo è 81564
Nel risultato sopra, possiamo osservare che ogni stato è getPopulation ()
il metodo è stato elaborato su thread diversi. Questo rende la lavorazione molto più veloce, ma osserva anche che le emissioni dal flatMap ()
operatore che sono stati ricevuti dal Osservatore
non sono nello stesso ordine delle emissioni originali a monte.
In questo tutorial, hai imparato a gestire la concorrenza usando RxJava 2: che cos'è, il diverso schedulatori
disponibile e come utilizzare il subscribeOn ()
e observeOn ()
operatori di concorrenza. Ti ho anche mostrato come usare il flatMap ()
operatore per raggiungere la concorrenza.
Nel frattempo, dai uno sguardo ad alcuni dei nostri altri corsi e tutorial sul linguaggio Java e sullo sviluppo di app per Android!