Note to self: RxJava SyncOnSubscribe

I have been banging my head on this one for a while, and now that I finally get it I thought I might write this down.

When using RxJava it is generally not a great idea to be using Observable.create() because it doesn’t doesn’t handle backpressure. But sometimes I find myself in need of a custom Observable which I can’t create using fromCallable() or other methods. For example, I might have a process that emits multiple events (something that fromCallable() can’t handle). Or I might have something that produces the event in an asynchronous fashion, such as when wrapping an existing API that takes a callback.

So the trick is to create() an observable that does support backpressure in a proper way. It turns out that there’s a helper class called SyncOnSubscribe that does this. But how do you use it? After looking at it a couple of times over the past months, it finally clicked (with a little help from Paul, cheers for that!).

Using SyncOnSubscribe

SyncOnSubscribe is what you should pass to Observable.create() in stead of your own OnSubscribe<T>. So far, so good. Now how do we implement it? The javadoc is quite daunting, at least to me.

There are a bunch of static methods to create an OnSyncSubscribe instance with various overloads: createStateful(), createSingleState(), createStateless()…what?

Let’s ignore all of the static helper methods and create a subclass that implements the required methods.

Implementing SyncOnSubscribe

First there’s generateState(). There’s that state thing again. The javadoc says:

Executed once when subscribed to by a subscriber … to produce a state value.

The important thing here is the executed once part. This method allows to initialize or start whatever you need to do for the Observable to start producing items. Secondly, you can return anything here which will serve as the state. This comes into play in the next() method.

The next(state, observer) method is called every time an item is requested from the Observable. When this method is called, your SyncOnSubscribe should produce exactly one item (potentially blocking) and call observer.onNext() once, or call observer.onComplete() or observer.onError() to indicate completion or error. The next() method receives the state that was created in generateState() (if any) and returns the new state value. So although this variable is called state it can be any object that you would require in your logic to produce the next value when next() is called. For example, state could be an InputStream you are reading items from.

onUnsubscribe(state) is a method you can optionally implement to clean up when the Observable is unsubscribed, and also receives the state again. This is where you could close the InputStream in the previous example.

What about AsyncOnSubscribe?

There’s also AsyncOnSubscribe which is basically the same thing, but has a slightly different method signature for next():

next(S state, long requested, Observer<Observable<? extends T>> observer)

This variant takes a count of requested items. Note that Long.MAX_VALUE means that anything goes, no upper bound or all available items. The observer expects an Observable that produces the requested amount of items, which is what makes it async. Note that creating an observable like that can still be tricky and thus does not completely solve the case where you are wrapping some kind of async API that works with callbacks of some kind. Fortunately the new fromAsync() operator in the 1.1.7 release helps with that and that’s probably what you’d generally use.

TL;DR

As I’m writing this down now, I realise that all of the above isn’t that complicated once you figure it out, but it took quite some time for me to wrap my head around it anyway, so let’s recap:

  • generateState allows you to initialise whatever you call your state and serves as a callback that your Observable is being subscribed to.
  • SyncOnSubscribe generates one item in it’s next() function.
  • ASyncOnSubscribe calls observer.next() with an Observable that emits requested items that you create.
  • onUnsubscribe is called when the Observable is unsubscribed, and allows for clean up.

For Observables wrapping a callback use fromAsync().



Questions, remarks or other feedback? Discuss this post on Google+