I have been banging my head on this one for a while, and now that I finally get it I thought I might write this down.
When using RxJava it is generally not a great idea to be using Observable.create()
because it doesn’t doesn’t handle backpressure.
But sometimes I find myself in need of a custom Observable
which I can’t create using fromCallable()
or other methods. For example, I might
have a process that emits multiple events (something that fromCallable()
can’t handle). Or I might have something that produces the event in an asynchronous fashion, such as when
wrapping an existing API that takes a callback.
So the trick is to create()
an observable that does support backpressure in a proper way. It turns out that there’s a helper class called SyncOnSubscribe
that does this.
But how do you use it? After looking at it a couple of times over the past months, it finally clicked (with a little help from Paul, cheers for that!).
Using SyncOnSubscribe
SyncOnSubscribe
is what you should pass to Observable.create()
in stead of your own OnSubscribe<T>
. So far, so good. Now how do we implement it?
The javadoc is quite daunting, at least to me.
There are a bunch of static methods to create an OnSyncSubscribe
instance with various overloads:
createStateful()
, createSingleState()
, createStateless()
…what?
Let’s ignore all of the static helper methods and create a subclass that implements the required methods.
Implementing SyncOnSubscribe
First there’s generateState()
. There’s that state thing again. The javadoc says:
Executed once when subscribed to by a subscriber … to produce a state value.
The important thing here is the executed once part. This method allows to initialize or start whatever you need to do for the Observable
to start producing
items. Secondly, you can return anything here which will serve as the state. This comes into play in the next()
method.
The next(state, observer)
method is called every time an item is requested from the Observable
. When this method is called, your SyncOnSubscribe
should produce exactly
one item (potentially blocking) and call observer.onNext()
once, or call observer.onComplete()
or observer.onError()
to indicate completion or error.
The next()
method receives the state that was created in generateState()
(if any) and returns the new state value. So although this variable is called state it can be any object that you would
require in your logic to produce the next value when next()
is called. For example, state could be an InputStream
you are reading items from.
onUnsubscribe(state)
is a method you can optionally implement to clean up when the Observable
is unsubscribed, and also receives the state again. This is where you could close the InputStream
in the previous example.
What about AsyncOnSubscribe?
There’s also AsyncOnSubscribe
which is basically the same thing, but has a slightly different method signature for next()
:
next(S state, long requested, Observer<Observable<? extends T>> observer)
This variant takes a count of requested
items. Note that Long.MAX_VALUE
means that anything goes, no upper bound or all available items. The observer expects an Observable that produces the requested amount of items, which is what makes it async.
Note that creating an observable like that can still be tricky and thus does not completely solve the case where you are wrapping some kind of async API that works with callbacks of some kind.
Fortunately the new fromAsync()
operator in the 1.1.7 release helps with that and that’s probably what you’d generally use.
TL;DR
As I’m writing this down now, I realise that all of the above isn’t that complicated once you figure it out, but it took quite some time for me to wrap my head around it anyway, so let’s recap:
generateState
allows you to initialise whatever you call your state and serves as a callback that yourObservable
is being subscribed to.SyncOnSubscribe
generates one item in it’snext()
function.ASyncOnSubscribe
callsobserver.next()
with anObservable
that emitsrequested
items that you create.onUnsubscribe
is called when theObservable
is unsubscribed, and allows for clean up.
For Observable
s wrapping a callback use fromAsync()
.