Core Java

Consuming java.util.concurrent.BlockingQueue as rx.Observable

Classical producer-consumer pattern is relatively simple in Java since we have java.util.concurrent.BlockingQueue. To avoid busy waiting and error-prone manual locking we simply take advantage of put() and take(). They both block if queue is full or empty respectively. All we need is a bunch of threads sharing reference to the same queue: some producing and others consuming. And of course the queue has to have a limited capacity, otherwise we will soon run out of memory in case of producers outperforming consumers. Greg Young couldn’t emphasize enough this rule during Devoxx Poland:

Never, ever create an unbounded queue

Producer-consumer using BlockingQueue

Here is a simplest example. First we need a producer that puts objects in a shared queue:

import lombok.Value;
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
@Value
class Producer implements Runnable {
 
  private final BlockingQueue<User> queue;
 
  @Override
  public void run() {
    try {
      while (!Thread.currentThread().isInterrupted()) {
        final User user = new User("User " + System.currentTimeMillis());
        log.info("Producing {}", user);
        queue.put(user);
        TimeUnit.SECONDS.sleep(1);
      }
    } catch (Exception e) {
      log.error("Interrupted", e);
    }
  }
}

Producer simply publishes an instance of User class (whatever it is) to a given queue every second. Obviously in real life placing User in a queue would be a result of some action within a system, like user login. Similarly consumer takes new items from a queue and processes them:

@Slf4j
@Value
class Consumer implements Runnable {
 
  private final BlockingQueue<User> queue;
 
  @Override
  public void run() {
    try {
      while (!Thread.currentThread().isInterrupted()) {
        final User user = queue.take();
        log.info("Consuming: {}", user);
      }
    } catch (Exception e) {
      log.error("Interrupted", e);
    }
  }
}

Again in real life processing would mean storing in database or running some fraud detection on a user. We use queue to decouple processing thread from consuming thread, e.g. to reduce latency. To run a simple test let’s spin up few producer and consumer threads:

BlockingQueue<User> queue = new ArrayBlockingQueue<>(1_000);
final List<Runnable> runnables = Arrays.asList(
    new Producer(queue),
    new Producer(queue),
    new Consumer(queue),
    new Consumer(queue),
    new Consumer(queue)
);
 
final List<Thread> threads = runnables
    .stream()
    .map(runnable -> new Thread(runnable, threadName(runnable)))
    .peek(Thread::start)
    .collect(toList());
 
TimeUnit.SECONDS.sleep(5);
threads.forEach(Thread::interrupt);
 
//...
 
private static String threadName(Runnable runnable) {
  return runnable.getClass().getSimpleName() + "-" + System.identityHashCode(runnable);
}

We have 2 producers and 3 consumers, everything seems to be working. In real life you would probably have some implicit producer threads, like HTTP request handling threads. On the consumer side you would most likely use a thread pool. This pattern works well, but especially the consuming side is quite low-level.

Introducing ObservableQueue<T>

The purpose of this article is to introduce an abstraction that behaves like a queue from producer side but as an Observable from RxJava on consumer side. In other words we can treats objects added to a queue as a stream that we can map, filter, compose, etc. on the client side. Interestingly, this is no longer a queue under the hood. ObservableQueue<T> simply forwards all new objects straight to subscribed consumers and doesn’t buffer events in case of no-one listening (“hot” observable). ObservableQueue<T> is not a queue per-se, it’s just a bridge between one API and the other. It’s similar to java.util.concurrent.SynchronousQueue, but if no-one is interested in consuming, object is simply discarded.

Here is a first experimental implementation. It’s just a toy code, do not consider it production ready. Also we’ll greatly simplify it later:

public class ObservableQueue<T> implements BlockingQueue<T>, Closeable {
 
  private final Set<Subscriber<? super T>> subscribers = Collections.newSetFromMap(new ConcurrentHashMap<>());
  private final Observable<T> observable = Observable.create(subscriber -> {
    subscriber.add(new Subscription() {
      @Override
      public void unsubscribe() {
        subscribers.remove(subscriber);
      }
 
      @Override
      public boolean isUnsubscribed() {
        return false;
      }
    });
    subscribers.add(subscriber);
  });
 
  public Observable<T> observe() {
    return observable;
  }
 
  @Override
  public boolean add(T t) {
    return offer(t);
  }
 
  @Override
  public boolean offer(T t) {
    subscribers.forEach(subscriber -> subscriber.onNext(t));
    return true;
  }
 
  @Override
  public T remove() {
    return noSuchElement();
  }
 
  @Override
  public T poll() {
    return null;
  }
 
  @Override
  public T element() {
    return noSuchElement();
  }
 
  private T noSuchElement() {
    throw new NoSuchElementException();
  }
 
  @Override
  public T peek() {
    return null;
  }
 
  @Override
  public void put(T t) throws InterruptedException {
    offer(t);
  }
 
  @Override
  public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {
    return offer(t);
  }
 
  @Override
  public T take() throws InterruptedException {
    throw new UnsupportedOperationException("Use observe() instead");
  }
 
  @Override
  public T poll(long timeout, TimeUnit unit) throws InterruptedException {
    return null;
  }
 
  @Override
  public int remainingCapacity() {
    return 0;
  }
 
  @Override
  public boolean remove(Object o) {
    return false;
  }
 
  @Override
  public boolean containsAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public boolean addAll(Collection<? extends T> c) {
    c.forEach(this::offer);
    return true;
  }
 
  @Override
  public boolean removeAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public boolean retainAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public void clear() {
  }
 
  @Override
  public int size() {
    return 0;
  }
 
  @Override
  public boolean isEmpty() {
    return true;
  }
 
  @Override
  public boolean contains(Object o) {
    return false;
  }
 
  @Override
  public Iterator<T> iterator() {
    return Collections.emptyIterator();
  }
 
  @Override
  public Object[] toArray() {
    return new Object[0];
  }
 
  @Override
  public <T> T[] toArray(T[] a) {
    return a;
  }
 
  @Override
  public int drainTo(Collection<? super T> c) {
    return 0;
  }
 
  @Override
  public int drainTo(Collection<? super T> c, int maxElements) {
    return 0;
  }
 
  @Override
  public void close() throws IOException {
    subscribers.forEach(rx.Observer::onCompleted);
  }
}

There are couple of interesting facts about it:

  1. We must keep track of all subscribers, i.e. consumers that are willing to receive new items. If one of the subscribers is no longer interested, we must remove such subscriber, otherwise memory leak will occur (keep reading!)
  2. This queue behaves as if it was always empty. It never holds any items – when you put something into this queue, it is automatically passed to subscribers and forgotten
  3. Technically this queue is unbounded (!), meaning you can put as many items as you want. However since items are passed to all subscribers (if any) and immediately discarded, this queue is actually always empty (see above)
  4. Still it’s possible that producer is generating too many events and consumers can’t keep up with that – RxJava now has back pressure support, not covered in this article.

Producer can use ObservableQueue<T> just like any other BlockingQueue<T>, assuming I implemented queue contract correctly. However the consumer looks much lighter and smarter:

final ObservableQueue<User> users = new ObservableQueue<>();
final Observable<User> observable = users.observe();
 
users.offer(new User("A"));
observable.subscribe(user -> log.info("User logged in: {}", user));
users.offer(new User("B"));
users.offer(new User("C"));

Code above prints "B" and "C" only. "A" is lost by design since ObservableQueue drops items in case no one is listening. Obviously Producer class now uses users queue. Everything works fine, you can call users.observe() at any point and apply one of dozens of Observable operators. However there is one caveat: by default RxJava doesn’t enforce any threading, so consuming happens in the same thread as producing! We lost the most important feature of producer-consumer pattern, i.e. thread decoupling. Luckily everything is declarative in RxJava, thread scheduling as well:

users
    .observe()
    .observeOn(Schedulers.computation())
    .forEach(user ->
            log.info("User logged in: {}", user)
    );

Now let’s see some real RxJava power. Imagine you want to count how many users log in per second, where each login is placed as an event into a queue:

users
  .observe()
  .map(User::getName)
  .filter(name -> !name.isEmpty())
  .window(1, TimeUnit.SECONDS)
  .flatMap(Observable::count)
  .doOnCompleted(() -> log.info("System shuts down"))
  .forEach(c -> log.info("Logins in last second: {}", c));

The performance is also acceptable, such queue can accept around 3 million objects per second on my laptop with one subscriber. Treat this class as an adapter from legacy systems using queues to modern reactive world. But wait! Using ObservableQueue<T> is easy, but the implementation with subscribers synchronized set seems too low-level. Luckily there is Subject<T, T>. Subject is “the other side” of Observable – you can push events to Subject but it still implements Observable, so you can easily create arbitrary Observable. Look how beautifully ObservableQueue looks like with one of the Subject implementations:

public class ObservableQueue<T> implements BlockingQueue<T>, Closeable {
 
  private final Subject<T, T> subject = PublishSubject.create();
 
  public Observable<T> observe() {
    return subject;
  }
 
  @Override
  public boolean add(T t) {
    return offer(t);
  }
 
  @Override
  public boolean offer(T t) {
    subject.onNext(t);
    return true;
  }
 
  @Override
  public void close() throws IOException {
    subject.onCompleted();
  }
  @Override
  public T remove() {
    return noSuchElement();
  }
 
  @Override
  public T poll() {
    return null;
  }
 
  @Override
  public T element() {
    return noSuchElement();
  }
 
  private T noSuchElement() {
    throw new NoSuchElementException();
  }
 
  @Override
  public T peek() {
    return null;
  }
 
  @Override
  public void put(T t) throws InterruptedException {
    offer(t);
  }
 
  @Override
  public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {
    return offer(t);
  }
 
  @Override
  public T take() throws InterruptedException {
    throw new UnsupportedOperationException("Use observe() instead");
  }
 
  @Override
  public T poll(long timeout, TimeUnit unit) throws InterruptedException {
    return null;
  }
 
  @Override
  public int remainingCapacity() {
    return 0;
  }
 
  @Override
  public boolean remove(Object o) {
    return false;
  }
 
  @Override
  public boolean containsAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public boolean addAll(Collection<? extends T> c) {
    c.forEach(this::offer);
    return true;
  }
 
  @Override
  public boolean removeAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public boolean retainAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public void clear() {
  }
 
  @Override
  public int size() {
    return 0;
  }
 
  @Override
  public boolean isEmpty() {
    return true;
  }
 
  @Override
  public boolean contains(Object o) {
    return false;
  }
 
  @Override
  public Iterator<T> iterator() {
    return Collections.emptyIterator();
  }
 
  @Override
  public Object[] toArray() {
    return new Object[0];
  }
 
  @Override
  public <T> T[] toArray(T[] a) {
    return a;
  }
 
  @Override
  public int drainTo(Collection<? super T> c) {
    return 0;
  }
 
  @Override
  public int drainTo(Collection<? super T> c, int maxElements) {
    return 0;
  }
 
}

The implementation above is much cleaner and we don’t have to worry about thread synchronization at all.

Tomasz Nurkiewicz

Java EE developer, Scala enthusiast. Enjoying data analysis and visualization. Strongly believes in the power of testing and automation.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

2 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Dávid Karnok
8 years ago

I have 3 comments:

1) You should call subscribers.add() before you setup the Subscription on the Subscriber. In the current order, an asynchronous unsubscribe() call won’t remove the Subscriber if said call happens just before the subscribers.add().

2) You can use Subscriptions.create(() -> subscribers.remove(subscriber)) instead of implementing the Subscription interface directly.

3) Adding values to a BlockingQueue can happen from multiple threads but PublishSubject requires serialized access to its onXXX methods. Use PublishSubject.create().toSerialized() instead.

Tomasz Nurkiewicz
8 years ago

Thank you very much for this valuable comment. I will fix all found issues in original article (http://www.nurkiewicz.com/2015/07/consuming-javautilconcurrentblockingque.html) since I can’t edit this one.

Back to top button