TL;DR:
You can’t re-use subscribers, because they have a built-in
subscription.
In RX, you have Observers
, Subscriptions
, and Subscribers
:
-
Observer
is just an interface with theonNext
,onCompleted
andonError
methods. -
Subscription
is an interface withsubscribe
andunsubscribe
methods. -
Subscriber
is a concrete class that implements bothObserver
andSubscription
.
When you want to do something with a stream of events, you’ll generally require
both an Observer
and a Subscriber
.
In a typical subscribe
call, you pass in an Observer
, and get back a
Subscription
.
subscription = observable.subscribe(
{ printLn("onNext") },
{ printLn("onCompleted") },
{ e -> printLn("onError: " + e) }
);
If you decide your observer logic needs to be reused elsewhere, you can extract the observer part into a separate variable:
observer = new Observer(
{ printLn("onNext") },
{ printLn("onCompleted") },
{ e -> printLn("onError: " + e) }
);
subscription1 = subject1.subscribe(observer);
subscription2 = subject2.subscribe(observer);
subscription3 = subject3.subscribe(observer);
subject1.onNext(null);
"onNext"
subject2.onNext(null);
"onNext"
subject3.onNext(null);
"onNext"
This creates three separate subscriptions with common observer logic, which is probably what you intended.
However, if you extract your observer logic into a subcriber instance, you effectively get one subscription, and subsequent attempts to subscribe fail silently.
subscriber = new Subscriber(
{ printLn("onNext") },
{ printLn("onCompleted") },
{ e -> printLn("onError: " + e) }
);
subscription1 = subject1.subscribe(subscriber);
subscription2 = subject2.subscribe(subscriber);
subscription3 = subject3.subscribe(subscriber);
subject1.onNext(null);
"onNext"
subject2.onNext(null);
...
subject3.onNext(null);
...