Software Development

How to create a pub/sub application with MongoDB ? Introduction

In this article we will see how to create a pub/sub application (messaging, chat, notification), and this fully based on MongoDB (without any message broker like RabbitMQ, JMS, … ).

So, what needs to be done to achieve such thing:

  • an application “publish” a message. In our case, we simply save a document into MongoDB
  • another application, or thread, subscribe to these events and will received message automatically. In our case this means that the application should automatically receive newly created document out of MongoDB

All this is possible with some very cool MongoDB features : capped collections and tailable cursors.

Capped Collections and Tailable Cursors

As you can see in the documentation, Capped Collections are fixed sized collections, that work in a way similar to circular buffers: once a collection fills its allocated space, it makes room for new documents by overwriting the oldest documents.

MongoDB Capped Collections can be queried using Tailable Cursors, that are similar to the unix tail -f command.  Your application continue to retrieve documents as they are inserted into the collection. I also like to call this a “continuous query”. Now that we have seen the basics, let’s implement it.

Building a very basic application

Create the collection

The first thing to do is to create a new capped collection :

$> mongo

use chat

db.messages.drop()

db.createCollection('messages', { capped: true, size: 10000 })

db.messages.insert({"type":"init"});

For simplicity, I am using the MongoDB Shell to create the messages collection in the chat database.

You can see on line #7 how to create a capped collection, with 2 options:

  • capped : true : this one is obvious
  • size : 10000 :  this is a mandatory option when you create a capped collection. This is the maximum size in bytes. (will be raised to a multiple of 256)

Finally, on line #9, I insert a dummy document, this is also mandatory to be able to get the tailable cursor to work.

Write an application

Now that we have the collection, let’s write some code.  First in node.js:

var mongo = require("mongodb");

var mongodbUri = "mongodb://127.0.0.1/chat";

mongo.MongoClient.connect (mongodbUri, function (err, db) {

  db.collection('messages', function(err, collection) {
    // open a tailable cursor
    console.log("== open tailable cursor");
    collection.find({}, {tailable:true, awaitdata:true, numberOfRetries:-1})
                      .sort({ $natural: 1 })
                      .each(function(err, doc) {
      console.log(doc);
    })
  });

});

From lines #1 to 5 I just connect to my local MongoDB instance.

Then on line #7, I get the messages collection.

And on line #10, I execute a find, using a tailable cursor, using specific options:

  • {} : no filter, so all documents will be returned
  • tailable : true : this one is clear, to say that we want to create a tailable cursor
  • awaitdata : true : to say that we wait for data before returning no data to the client
  • numberOfRetries : -1 :  The number of times to retry on time out, -1 is infinite, so the application will keep trying

Line #11 just force the sort to the natural order, then on line #12, the cursor returns the data, and the document is printed in the console each time it is inserted.

Test the Application

Start the application:

node app.js

Insert documents in the messages collection, from the shell or any other tool. You can find below a screencast showing this very basic application working:

https://www.youtube.com/watch?v=uSuiYvssKuo

The source code of this sample application in this Github repository, take the step-01 branch; clone this branch using:

git clone -b step-01 https://github.com/tgrall/mongodb-realtime-pubsub.git

I have also created a gist showing the same behavior in Java:

package org.mongodb.demos.tailable;

import com.mongodb.*;

public class MyApp {

    public static void main(String[] args) throws Exception {

        MongoClient mongoClient = new MongoClient();
        DBCollection coll = mongoClient.getDB("chat").getCollection("messages");

        DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
                .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

        System.out.println("== open cursor ==");

        Runnable task = () -> {
            System.out.println("\tWaiting for events");
            while (cur.hasNext()) {
                DBObject obj = cur.next();
                System.out.println( obj );

            }
        };
        new Thread(task).start();
        
    }
    
}

Mathieu Ancelin has written it in Scala:

package org.mongodb.demos.tailable

import reactivemongo.api._
import reactivemongo.bson._
import play.api.libs.iteratee.Iteratee
import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.api.collections.default.BSONCollection

object Capped extends App {

  val driver = new MongoDriver
  val connection = driver.connection(List("localhost"))
  val db = connection("chat")
  val collection = db.collection[BSONCollection]("messages")

  val cursor = collection
        .find(BSONDocument())
          .options(QueryOpts().tailable.awaitData)
            .cursor[BSONDocument]

  println("== open tailable cursor")
  
  cursor.enumerate().apply(Iteratee.foreach { doc =>
    println(s"Document inserted: ${BSONDocument.pretty(doc)}")
  })
}

Add some user interface

We have the basics of a publish subscribe based application:

  • publish by inserting document into MongoDB
  • subscribe by reading document using a tailable cursor

Let’s now push the messages to a user using for example socket.io. For this we need to:

  • add socket.io dependency to our node project
  • add HTML page to show messages

The following gists shows the updated version of the app.js and index.html, let’s take a look:

"use strict";

var mongo = require("mongodb"),
    fs = require("fs"),         // to read static files
    io = require("socket.io"),  // socket io server
    http = require("http");

var mongodbUri = "mongodb://127.0.0.1/chat";

var app = http.createServer(handler);
io = io.listen(app);
app.listen(3000);
console.log("http server on port 3000");

function handler(req, res){
  fs.readFile(__dirname + "/index.html",
  function (err, data) {
    res.writeHead(200);
    res.end(data);
  });
}

mongo.MongoClient.connect (mongodbUri, function (err, db) {

  db.collection('messages', function(err, collection) {

    // open socket
    io.sockets.on("connection", function (socket) {
      // open a tailable cursor
      console.log("== open tailable cursor");
      collection.find({}, {tailable:true, awaitdata:true, numberOfRetries:-1}).sort({ $natural: 1 }).each(function(err, doc) {
        console.log(doc);
        // send message to client
        if (doc.type == "message") {
          socket.emit("message",doc);
        }
      })

    });

  });

});

The node application has been updated with the following features:

  • lines #4-7: import of http, file system and socket.io
  • lines #10-21: configure and start the http server. You can see that I have created a simple handler to serve static html file
  • lines #28-39: I have added support to Web socket using socket.io where I open the tailable cursor, and push/emit the messages on the socket.

As you can see, the code that I have added is simple. I do not use any advanced framework, nor manage exceptions, this for simplicity and readability.

Let’s now look at the client (html page).

<!doctype html>
<html>
<head>
  <title>MongoDB pub/sub</title>
  <style>
  * { margin: 0; padding: 10px; box-sizing: border-box; }
  body { font: 13px Helvetica, Arial; }
  #messages { list-style-type: none; margin: 0; padding: 0; }
  #messages li { padding: 5px 10px; }
  #messages li:nth-child(odd) { background: #eee; }
  </style>
</head>
<body>
  <h2>MongoDB/Socket.io demonstration</h2>

  <ul id="messages"></ul>

  <script src="https://cdn.socket.io/socket.io-1.2.0.js"></script>
  <script src="https://www.javacodegeeks.com/wp-content/litespeed/localres/aHR0cHM6Ly9jb2RlLmpxdWVyeS5jb20vjquery-2.1.3.min.js"></script>
  <script>
  var socket = io();
  socket.on('message', function(doc){
    $('#messages').append($('<li>').text(doc.text));
  });
  </script>
</body>
</html>

Same as the server, it is really simple and does not use any advanced libraries except socket.io client (line #18) and JQuery (line #19), and used:

  • on line #22 to received messages ans print them in the page using JQuery on line #23

I have created a screencast of this version of the application:

You can find the source code in this Github repository, take the step-02 branch; clone this branch using:

git clone -b step-02 https://github.com/tgrall/mongodb-realtime-pubsub.git

Conclusion

In this first post, we have:

  • learned about tailable cursor and capped collection
  • see how it can be used to develop a pub/sub application
  • expose this into a basic web socket based application

Tugdual Grall

Tugdual Grall, an open source advocate and a passionate developer, is a Chief Technical Evangelist EMEA at MapR. He currently works with the European developer communities to ease MapR, Hadoop, and NoSQL adoption. Before joining MapR, Tug was Technical Evangelist at MongoDB and Couchbase. Tug has also worked as CTO at eXo Platform and JavaEE product manager, and software engineer at Oracle. Tugdual is Co-Founder of the Nantes JUG (Java User Group) that holds since 2008 monthly meeting about Java ecosystem. Tugdual also writes a blog available at http://tgrall.github.io/
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button