Operatori di programmazione reattiva in RxJava 2

Se la tua app Android ha intenzione di collezionare le recensioni a cinque stelle su Google Play, allora deve essere in grado di eseguire più operazioni.

Come minimo, gli utenti mobili di oggi si aspettano di essere ancora in grado di interagire con la tua app mentre lavora in background. Questo può sembrare semplice, ma Android è a thread singolo per impostazione predefinita, quindi se hai intenzione di soddisfare le aspettative del tuo pubblico, prima o poi dovrai creare alcuni thread aggiuntivi.

Nel precedente articolo di questa serie abbiamo avuto un'introduzione a RxJava, una libreria reattiva per la JVM che può aiutarti a creare applicazioni Android che reagiscono a dati ed eventi man mano che si verificano. Ma puoi anche usare questa libreria per reagire a dati ed eventi contemporaneamente.

In questo post, vi mostrerò come è possibile utilizzare gli operatori di RxJava per rendere finalmente la concorrenza su Android un'esperienza senza problemi. Alla fine di questo articolo, saprete come utilizzare gli operatori RxJava per creare thread aggiuntivi, specificare il lavoro che dovrebbe verificarsi su questi thread e quindi postare i risultati sul più importante thread dell'interfaccia utente principale di Android, tutto con un semplice poche righe di codice.

E dato che nessuna tecnologia è perfetta, ti racconterò anche una delle maggiori potenziali conseguenze dell'aggiunta della libreria RxJava ai tuoi progetti, prima di mostrarti come utilizzare gli operatori per garantire questo problema mai si verifica nei tuoi progetti Android.

Presentazione degli operatori

RxJava ha un'enorme collezione di operatori che hanno principalmente lo scopo di aiutarti a modificare, filtrare, unire e trasformare i dati che vengono emessi dal tuo OsservabileS. Troverai la lista completa degli operatori RxJava nei documenti ufficiali, e anche se nessuno si aspetta che tu memorizzi ogni singolo operatore, vale la pena dedicare un po 'di tempo a leggere questo elenco, così da avere un'idea approssimativa delle diverse trasformazioni di dati che è possibile eseguire.

L'elenco di operatori di RxJava è già abbastanza completo, ma se non riesci a trovare l'operatore perfetto per la trasformazione dei dati che avevi in ​​mente, puoi sempre concatenare più operatori. Applicare un operatore a un Osservabile in genere ne restituisce un altro Osservabile, quindi puoi continuare ad applicare gli operatori fino a ottenere i risultati desiderati.

Ci sono troppi operatori RxJava da coprire in un singolo articolo, e i documenti ufficiali di RxJava fanno già un buon lavoro introducendo tutti gli operatori che è possibile utilizzare per le trasformazioni di dati, quindi mi concentrerò su due operatori che hanno il il maggior potenziale per rendere la tua vita come uno sviluppatore Android più semplice: subscribeOn () e observeOn ()

Multithreading con operatori RxJava

Se la tua app fornirà la migliore esperienza utente possibile, dovrà essere in grado di eseguire attività intensive o di lunga durata ed eseguire più attività contemporaneamente, senza bloccare l'importantissima interfaccia utente principale di Android.

Ad esempio, immagina che la tua app debba recuperare alcune informazioni da due diversi database. Se esegui entrambe queste attività una dopo l'altra sul thread principale di Android, non solo ci vorrà molto tempo, ma l'interfaccia utente non risponderà finché l'app non avrà completato il recupero di ogni singola informazione da entrambi i database . Non esattamente una grande esperienza utente!

Una soluzione molto migliore è quella di creare due thread aggiuntivi in ​​cui è possibile eseguire entrambe queste attività contemporaneamente senza bloccare il thread dell'interfaccia utente principale. Questo approccio significa che il lavoro sarà completato due volte più velocemente, e l'utente sarà in grado di continuare a interagire con l'interfaccia utente della tua app. Potenzialmente, i tuoi utenti potrebbero anche non essere a conoscenza del fatto che la tua app esegue un lavoro intensivo e di lunga durata in background: tutte le informazioni del database appariranno semplicemente nell'interfaccia utente della tua applicazione, come se per magia!

Android fornisce alcuni strumenti che puoi utilizzare per creare ulteriori thread, tra cui Servizios e IntentServices, ma queste soluzioni sono difficili da implementare e possono rapidamente produrre un codice complesso e dettagliato. Inoltre, se non implementi correttamente il multithreading, potresti trovarti con un'applicazione che perde memoria e genera tutti i tipi di errori.

Per rendere il multithreading su Android ancora più mal di testa, il thread principale dell'interfaccia utente di Android è l'unico thread in grado di aggiornare l'interfaccia utente della tua app. Se desideri aggiornare l'interfaccia utente della tua app con il risultato del lavoro svolto qualsiasi altro thread, allora in genere dovrai creare un handler sul thread dell'interfaccia utente principale, quindi utilizzare questo handler per trasferire i dati dal thread in background al thread principale. Ciò significa più codice, più complessità e più opportunità di errori da insinuarsi nel tuo progetto.

Ma RxJava presenta due operatori che possono aiutarti a evitare gran parte di questa complessità e potenziale errore.

Nota che usi questi operatori insieme a schedulatori, che sono essenzialmente componenti che ti permettono di specificare discussioni. Per ora, pensaci scheduler come sinonimo di parola filo.

  • subscribeOn (Scheduler): Per impostazione predefinita, a Osservabile emette i suoi dati sul thread in cui è stata dichiarata la sottoscrizione, cioè dove hai chiamato il .sottoscrivi metodo. In Android, questo è generalmente il thread principale dell'interfaccia utente. Puoi usare il subscribeOn () operatore per definire un diverso Scheduler dove il Osservabile dovrebbe eseguire ed emettere i suoi dati.
  • observeOn (Scheduler): Puoi usare questo operatore per reindirizzare il tuo Osservabilele emissioni ad un diverso Scheduler, cambiando efficacemente il thread in cui il OsservabileVengono inviate le notifiche e, per estensione, il thread in cui vengono utilizzati i suoi dati.

RxJava viene fornito con un numero di programmi di pianificazione che è possibile utilizzare per creare diversi thread, tra cui:

  • Schedulers.io (): Progettato per essere utilizzato per attività relative all'IO. 
  • Schedulers.computation (): Progettato per essere utilizzato per attività computazionali. Per impostazione predefinita, il numero di thread nello scheduler di calcolo è limitato al numero di CPU disponibili sul dispositivo.
  • Schedulers.newThread (): Crea una nuova discussione.

Ora hai una panoramica di tutte le parti in movimento, diamo un'occhiata ad alcuni esempi di come subscribeOn () e observeOn () sono usati e vedono alcuni programmatori in azione.

subscribeOn ()

In Android, in genere utilizzi subscribeOn () e un accompagnamento Scheduler per cambiare il thread in cui viene eseguito un lavoro di lunga durata o intenso, quindi non c'è il rischio di bloccare il thread dell'interfaccia utente principale. Ad esempio, potresti decidere di importare una grande quantità di dati su io () scheduler o eseguire alcuni calcoli sul calcolo() scheduler.

Nel codice seguente, stiamo creando un nuovo thread in cui il Osservabile eseguirà le sue operazioni ed emetterà i valori 1, 2, e 3.

Observable.just (1, 2, 3) .subscribeOn (Schedulers.newThread ()) .subscribe (Observer); 

Mentre questo è tutto ciò che serve per creare un thread e iniziare a emettere dati su quel thread, potresti voler confermare che questo osservabile funzioni davvero su un nuovo thread. Un metodo consiste nel stampare il nome del thread attualmente utilizzato dall'applicazione, in Android StudioLogcat Monitor.

Convenientemente, nel post precedente, Iniziare con RxJava, abbiamo creato un'applicazione che invia messaggi a Logcat Monitor in varie fasi durante il ciclo di vita dell'Observable, in modo da poter riutilizzare molto di questo codice.

Apri il progetto che hai creato in quel post e modifica il codice in modo che utilizzi quanto sopra Osservabile come fonte Osservabile. Quindi aggiungere il subscribeOn () Operatore e specifica che i messaggi inviati a Logcat devono includere il nome del thread corrente.

Il tuo progetto finito dovrebbe assomigliare a questo:

import android.support.v7.app.AppCompatActivity; importare android.os.Bundle; import android.util.Log; import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; public class MainActivity estende AppCompatActivity String statico finale pubblico TAG = "MainActivity"; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Observable.just (1, 2, 3) .subscribeOn (Schedulers.newThread ()) .subscribe (Observer);  Osservatore Observer = new Observer() @Override public void onSubscribe (Disposable d) Log.e (TAG, "onSubscribe" + Thread.currentThread (). GetName ());  @Override public void onNext (Valore intero) Log.e (TAG, "onNext:" + value + Thread.currentThread (). GetName ());  @Override public void onError (Throwable e) Log.e (TAG, "onError:");  @Override public void onComplete () Log.e (TAG, "onComplete: All Done!" + Thread.currentThread (). GetName ()); ; 

Assicurati che il monitor Logcat di Android Studio sia aperto (selezionando Monitor Android scheda, seguito da logcat) e quindi esegui il tuo progetto su un dispositivo Android fisico o su un AVD. Dovresti vedere il seguente output nel Monitor Logcat:

Ecco, puoi vederlo .sottoscrivi viene chiamato sul thread dell'interfaccia utente principale, ma l'osservabile funziona su un thread completamente diverso.

Il subscribeOn () l'operatore avrà lo stesso effetto, indipendentemente da dove lo posizioni nella catena osservabile; tuttavia, non è possibile utilizzare più subscribeOn () operatori nella stessa catena. Se includi più di uno subscribeOn (), allora la tua catena lo farà solo utilizzare il subscribeOn () questo è il più vicino alla fonte osservabile.

observeOn ()

diversamente da subscribeOn (), dove posizioni observeOn () nella tua catena fa importa, poiché questo operatore modifica solo il thread utilizzato dagli oggetti osservabili che appaiono a valle

Ad esempio, se hai inserito il seguente nella tua catena, allora ogni osservabile che appare nella catena da questo punto in poi userà il nuovo thread.

.observeOn (Schedulers.newThread ())

Questa catena continuerà a funzionare sul nuovo thread finché non ne incontra un'altra observeOn () operatore, a quel punto passerà alla thread specificata da quell'operatore. Puoi controllare il thread dove specifici osservabili inviano le loro notifiche inserendo più elementi observeOn () operatori nella tua catena.

Quando sviluppi app Android, generalmente le utilizzi observeOn () per inviare il risultato del lavoro svolto sui thread in background al thread dell'interfaccia utente principale di Android. Il modo più semplice per reindirizzare le emissioni sul thread principale dell'interfaccia utente di Android è utilizzare Schedulatore AndroidSchedulers.mainThread, che è incluso come parte della libreria RxAndroid, piuttosto che nella libreria RxJava. 

La libreria RxAndroid include binding specifici per Android per RxJava 2, rendendola una preziosa risorsa aggiuntiva per gli sviluppatori Android (e qualcosa che vedremo in modo molto più dettagliato nel prossimo post di questa serie).

Per aggiungere RxAndroid al tuo progetto, apri il tuo livello di modulo build.gradle file e aggiungere la versione più recente della libreria alla sezione delle dipendenze. Al momento della scrittura, l'ultima versione di RxAndroid era 2.0.1, quindi aggiungo quanto segue:

dipendenze ... compile 'io.reactivex.rxjava2: rxandroid: 2.0.1'

Dopo aver aggiunto questa libreria al tuo progetto, puoi specificare che i risultati di un osservabile devono essere inviati al thread dell'interfaccia utente principale della tua app, utilizzando una singola riga di codice:

.observeOn (AndroidSchedulers.mainThread ())

Considerando che comunicare con il thread dell'interfaccia utente principale della tua app occupa una pagina intera dei documenti ufficiali di Android, si tratta di un enorme miglioramento che potrebbe potenzialmente farti risparmiare molto tempo durante la creazione di applicazioni Android multithread.

Il principale svantaggio di RxJava

Mentre RxJava ha molto da offrire agli sviluppatori Android, nessuna tecnologia è perfetta, e RxJava ha una trappola che ha il potenziale di far crashare la tua app.

Per impostazione predefinita, RxJava gestisce un flusso di lavoro basato su push: i dati sono prodotti a monte da un Osservabile, e viene quindi spinto a valle verso l'assegnato Osservatore. Il problema principale con un flusso di lavoro basato su push è quanto sia facile per il produttore (in questo caso, il Osservabile) per emettere oggetti troppo rapidamente per il consumatore (Osservatore) processare.

Una chiacchierata Osservabile e un lento Osservatore può portare rapidamente a un arretrato di articoli non consumati, che andrà a divorare risorse di sistema e potrebbe anche risultare in un OutOfMemoryException. Questo problema è noto come contropressione.

Se si sospetta che si verifichi una contropressione nell'app, esistono alcune soluzioni possibili, tra cui l'utilizzo di un operatore per ridurre il numero di articoli prodotti.

Creazione di periodi di campionamento con campione() e throttlefirst ()

Se uno Osservabile sta emettendo un numero elevato di elementi, quindi potrebbe non essere necessario per l'assegnato Osservatore ricevere ogni uno di questi articoli.

Se puoi tranquillamente ignorare qualcuno di un OsservabileLe emissioni, quindi ci sono alcuni operatori che è possibile utilizzare per creare periodi di campionamento e quindi selezionare i valori specifici emessi durante questi periodi:

  • Il campione() L'operatore controlla l'output dell'Osservable agli intervalli specificati dall'utente e quindi prende l'elemento più recente emesso durante quel periodo di campionamento. Ad esempio, se includi .campione (5, SECONDI) nel tuo progetto quindi l'Observer riceverà l'ultimo valore che è stato emesso durante ogni intervallo di cinque secondi. 
  • Il throttleFirst () L'operatore prende il primo valore emesso durante il periodo di campionamento. Ad esempio, se includi .throttlefirst (5, SECONDI) quindi l'Observer riceverà il primo valore emesso durante ogni intervallo di cinque secondi.  

Emissioni in serie con buffer()

Se non si riesce a saltare in modo sicuro le emissioni, allora si può ancora essere in grado di togliere una certa pressione da una lotta Osservatore raggruppando le emissioni in lotti e poi proseguendo in massa. L'elaborazione delle emissioni in batch è in genere più efficiente rispetto all'elaborazione di più emissioni separatamente, pertanto questo approccio dovrebbe migliorare il tasso di consumo.

È possibile creare emissioni in batch utilizzando il buffer() operatore. Qui, stiamo usando buffer() per raggruppare tutti gli articoli emessi per un periodo di tre secondi:

Observable.range (0, 10) .buffer (3, SECONDS) .subscribe (System.out :: println);

In alternativa, puoi usare buffer() creare un lotto costituito da un numero specifico di emissioni. Ad esempio, qui stiamo dicendo buffer() per raggruppare le emissioni in gruppi di quattro:

Observable.range (0, 10) .buffer (4) .subscribe (System.out :: println);

Sostituzione di oggetti osservabili con fluidi

Un metodo alternativo per ridurre il numero di emissioni è sostituire il Osservabile questo ti sta causando problemi con a fluido.

In RxJava 2, il team RxJava ha deciso di dividere lo standard Osservabile in due tipi: il tipo normale che abbiamo visto in tutta questa serie, e fluidoS.

fluidos funziona quasi allo stesso modo di Osservabiles, ma con una grande differenza: fluidos invia solo il numero di elementi richiesti dall'osservatore. Se hai un Osservabile che sta emettendo più oggetti di quanti ne possa consumare l'osservatore assegnato, allora potreste voler considerare di passare a fluido anziché.

Prima di poter iniziare a usare fluidos nei tuoi progetti, devi aggiungere la seguente dichiarazione di importazione:

import io.reactivex.Flowable;

Puoi quindi creare fluidos utilizzando esattamente le stesse tecniche utilizzate per creare OsservabileS. Ad esempio, ognuno dei seguenti frammenti di codice creerà a fluido è in grado di emettere dati:

fluido flowable = Flowable.fromArray (new String [] "south", "north", "west", "east"); ... flowable.subscribe ()
fluido flowable = Flowable.range (0, 20); ... flowable.subscribe ()

A questo punto, ti starai chiedendo: perché dovrei mai usarlo Osservabiles quando posso usare fluidos e non ti devi preoccupare della contropressione? La risposta è che a fluido incorre più di un sovraccarico di un normale Osservabile, quindi, per creare un'app ad alte prestazioni, è necessario attenersi Osservabiles a meno che tu non sospetti che la tua applicazione abbia problemi di contropressione.

single

UN fluido non è l'unica variazione su Osservabile che troverai in RxJava, dato che la libreria include anche singolo classe.

single sono utili quando devi solo emettere un valore. In questi scenari, creando un Osservabile può sembrare eccessivo, ma a singolo è progettato per emettere semplicemente un singolo valore e quindi completare, chiamando:

  • onSuccess (): Il singolo emette il suo unico valore.  
  • onError (): Se la singolo non è in grado di emettere il suo oggetto, quindi passerà questo metodo al risultato Throwable.

UN singolo chiamerà solo uno di questi metodi e quindi terminerà immediatamente.

Diamo un'occhiata a un esempio di a singolo in azione-di nuovo, per risparmiare tempo stiamo riutilizzando il codice:

importare android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import io.reactivex.Single; import io.reactivex.SingleObserver; import io.reactivex.disposables.Disposable; public class MainActivity estende AppCompatActivity String statico finale pubblico TAG = "MainActivity"; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Single.just ("Hello World") .subscribe (getSingleObserver ());  SingleObserver privato getSingleObserver () return new SingleObserver() @Override public void onSubscribe (Disposable d) Log.e (TAG, "onSubscribe");  @Override public void onSuccess (String value) Log.e (TAG, "onSuccess:" + value);  @Override public void onError (Throwable e) Log.e (TAG, "onError:"); ; 

Esegui il tuo progetto su un dispositivo Android AVD o fisico e vedrai il seguente output nel monitor Logcat di Android Studio:

Se cambi idea e vuoi convertire a singolo in un Osservabile in qualsiasi momento, quindi ancora una volta RxJava ha tutti gli operatori necessari, tra cui:

  • mergeWith (): Unisce più single in un singolo Osservabile
  • concatWith (): Catene gli oggetti emessi da più single insieme, per formare un Osservabile emissione. 
  • toObservable (): Converte a singolo in un Osservabile che emette l'elemento originariamente emesso dal Single e quindi completo.

Sommario

In questo post abbiamo esplorato alcuni operatori RxJava che è possibile utilizzare per creare e gestire più thread, senza la complessità e il potenziale di errore che tradizionalmente accompagnano il multithreading su Android. Abbiamo anche visto come è possibile utilizzare la libreria RxAndroid per comunicare con l'importante thread dell'interfaccia utente principale di Android utilizzando una singola riga di codice e come garantire che la contropressione non diventi un problema nell'applicazione.

Abbiamo toccato la libreria RxAndroid alcune volte in questa serie, ma questa libreria è piena di binding RxJava specifici per Android che possono essere preziosi quando si lavora con RxJava sulla piattaforma Android, quindi nel post finale di questa serie faremo stai guardando la libreria RxAndroid in modo molto più dettagliato.

Fino ad allora, controlla alcuni dei nostri altri post sulla codifica per Android!