Java 9 comes with support for a feature that has been requested a lot, Reactive Streams. However, the idea of Reactive Streams isn’t anything new. It has existed in external libraries such as (and probably most popular) RxJava. But it’s nice to finally see support from it in an official Java release.
What is Reactive Streams?
Reactive Streams is an attempt to create a standard for asynchronous stream processing with non-blocking backpressure. The purpose of Reactive Streams is to manage the exchange of data streamed asynchronously while also making sure that the receiving side doesn’t have to buffer data. The receiver takes receives or asks for data when it can handle it and only then.
As the picture shows, you have a Publisher and one (or more) Subscriber(s) and Publisher can push data to the Subscriber, but the Subscriber can also request more data as it is done processing the data it received previously. The way it works is that the Subscriber asks for N items. If the Publisher has N items, then the subscriber receives it instantly. Otherwise, it receives the N items as soon as Publisher receives them. The Publisher keeps pushing until the Subscriber has gotten the N items. When the Subscriber has received the N items, the Subscriber can request N more new items.
Let’s put it to test
We are going to make use of Reactive Streams to stream tweets from Twitter. In order to get some action, let’s make the Publisher publish tweets that include the word: “Trump”.
I am going to code this with Spring boot and twitter4j[1].
As I mentioned, Reactive Streams requires a Publisher and a Subscriber. Let’s start with implementing them.
The Subscriber should implement the Flow.Subscriber interface and provides three different methods:onComplete()
which gets called when the subscriber is finished.onError(Throwable)
which is a separate channel for errors that you should listen on for errors.onNext(T)
which is used to retrieve the next item(s).
package org.thecuriousdev.demo.reactivestreams.twitter;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import twitter4j.Status;
public class TweetSubscriber implements Subscriber {
private final String id;
private Flow.Subscription subscription;
public TweetSubscriber(String id) {
this.id = id;
}
@Override
public void onComplete() {
System.out.println("Completed!");
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error: " + throwable);
}
@Override
public void onNext(Status status) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
StringBuilder sb = new StringBuilder()
.append(status.getCreatedAt())
.append(" [").append(status.getUser().getName()).append("]: ")
.append(status.getText());
System.out.println(sb);
subscription.request(2);
}
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("SUB " + id + " -> Subscribed");
this.subscription = subscription;
subscription.request(2);
}
}
For the Subscriber, I added a sleep for 1 second to simulate some work but also to make it more readable. Without the 1 second sleep for each tweet, it is extremely quick and you won’t have time to read anything at all.
For the Publisher we could write our own one, but to make it easy for us we can re-use java.util.concurrent.SubmissionPublisher
.
package org.thecuriousdev.demo.reactivestreams.twitter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import twitter4j.Query;
import twitter4j.Status;
import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterFactory;
import twitter4j.conf.ConfigurationBuilder;
@EnableScheduling
@Configuration
@Service
public class TweetPublisher {
@Value("${oauth.consumerKey}")
private String consumerKey;
@Value("${oauth.consumerSecret}")
private String consumerSecret;
@Value("${oauth.accessToken}")
private String accessToken;
@Value("${oauth.accessTokenSecret}")
private String accesessTokenSecret;
private static final Query TRUMP_QUERY = new Query("trump");
private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(1);
private Twitter twitter;
private SubmissionPublisher sb = new SubmissionPublisher<>(EXECUTOR, Flow.defaultBufferSize());
private Map cache = new HashMap<>();
@Autowired
public void setup() {
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true).setOAuthConsumerKey(consumerKey)
.setOAuthConsumerSecret(consumerSecret)
.setOAuthAccessToken(accessToken)
.setOAuthAccessTokenSecret(accesessTokenSecret);
TwitterFactory tf = new TwitterFactory(cb.build());
twitter = tf.getInstance();
}
@Scheduled(fixedRate = 10000)
public void scanTweets() throws TwitterException {
twitter.search(TRUMP_QUERY).getTweets()
.stream()
.filter(status -> !cache.containsKey(status.getId()))
.forEach(status -> {
cache.put(status.getId(), null);
sb.submit(status);
});
}
public void subscribe(TweetSubscriber subscriber) {
sb.subscribe(subscriber);
}
}
The most important method here for this blog post is the subscribe(TweetSubscriber)
which takes a subscriber and subscribes it to the SubmissionPublisher.
The SubmissionPublisher that we have created only has a fixed thread pool of just 1 because we simply don’t need more than that for our example, but if you do, you can tweak it as you deem appropriate.
I created a Scheduler (cronjob) to run every 10 seconds a search on Twitter for the TRUMP_QUERY. And I also created a Map where I can store the IDs of the tweets so that we do not publish the same ID more than once, I know that there are billions of ways to make this better, and if this program runs for too long it will crash with an out of memory exception because of this Map. I’ve chosen to do so because the scope of this blog is not to perfect that. The scope is rather to display Reactive Streams.
After this, and some additional code to create the Subscriber (which is not really interesting in terms of Reactive Streams), we are good to go.
Final words
We have now learned how to use Reactive Streams in a basic way. But just because Reactive Streams are accessible now without any external dependencies, that doesn’t mean that we should start using it all the time. The traditional streams of Java 8 are probably more suitable for most tasks. But there are definitely some tasks that Reactive Streams is excellent for.
The full code for our Reactive Twitter Streams progress is available here on GitHub if you wanna clone it and run it by yourself, modify and play with by yourself. I didn’t include my application.properties of course, so you will have to create that under src/main/resources and put the following values there:
oauth.consumerKey= oauth.consumerSecret= oauth.accessToken= oauth.accessTokenSecret=
To get these values, you will have to register as a developer at Twitter and create an application there. It is completely free and also a very quick and painless process. There are loads of tutorials out there on how to do that.
Sources
- http://twitter4j.org/en/index.html