Beliebte Suchanfragen
//

Near-Realtime Analytics with MongoDB, Node.js & SmoothieCharts

21.1.2014 | 4 minutes of reading time

In this blog post we’ll have a look at how easy it is to do some (near-)realtime analytics with your (big) data. I will use some well-known technologies like MongoDB and node.js and a lesser known JavaScript library called Smoothies Charts for realtime visualization.


Streaming Data from MongoDB

There is no official API to stream data out of MongoDB in an asynchronous manner. But it is possible to do so by trailing the oplog , MongoDB’s internal collection for replication (which is basically an implementation of an distributed log ). Each document (and each other update or remove operation) will show up in that collection when using a replica set. So let’s set up a minimal replica set with 2 members:

1$ mkdir -p /data/rs01
2$ mkdir -p /data/rs02
3$ mongod --port 27017 --dbpath /data/rs01 --replSet rs
4$ mongod --port 27018 --dbpath /data/rs02 --replSet rs

After the instances are up, we have to init the replica set. We connect to one of the instances using the mongo shell and issue the following command:

1$ mongo
2...
3> rs.initiate({
4  _id: "rs",
5  members: [
6    {_id: 0, host: "localhost:27017"},
7    {_id: 1, host: "localhost:27018"}
8  ]})

After some time, one of the instance becomes the primary node, the other the secondary.

Watching the Oplog

There are several node modules out there that allow you to easily watch the oplog. We’ll use …

1npm install mongo-oplog-watcher

This module allows us to register a callback for each replication operation (insert, update, delete) quite easily:

1var OplogWatcher = require('mongo-oplog-watcher');
2 
3var oplog = new OplogWatcher({
4  host:"127.0.0.1" ,ns: "test.orders"
5});
6 
7oplog.on('insert', function(doc) {
8    console.log(doc);
9});

We connect to the local replica set and are only interested in insert operations on the collection test.orders. Right now, we are just logging the received document.

Simple Analytics

For our demo, we use one of the simplest aggregation available: we will count things. To be more specific, we’ll count product orders grouped by category (books, electronics, shoes, etc.). Each document in test.orders has field cid that holds the product category:

1{
2  _id: ObjectId("..."),
3  cid: "BK", // "BK" = books, "SH" = shoes, "EL" = electronics
4  ...
5}

The aggregation implementation is not that complicated. If we get an order for a yet unknown category, the counter is initialized with 1 and incremented otherwise:

1var categoryCounter = {};
2 
3exports.addData = function( document ) {
4    cat = document.cid;
5    if ( cat ) {
6        catCount = categoryCounter[cat];
7 
8        if (catCount) {
9            categoryCounter[cat]++;
10        } else {
11            categoryCounter[cat] = 1;
12        }
13    }
14 
15};
16 
17exports.aggregate = function() {
18    return categoryCounter;
19}

This aggregation can easily extended with a timestamp to count all orders in a time frame, like orders per minute, orders per hour and so on. Do this as an exercise.

Client Notification

With each emitted document (that frequency maybe reduced in production with heavy write load), the dispatcher pushes the updates to all registered web clients which use web sockets to connect to the node server process. We are using the node module socket.io:

1npm install socket.io

The code on the server side looks basically like this:

1// init web sockets
2var clients = [];
3var io = require('socket.io').listen(httpd);
4 
5io.sockets.on('connection', function(socket) {
6    clients.push(socket);
7});
8 
9io.sockets.on('disconnect', function(socket) {
10    clients.pull( clients.indexOf(socket) );
11});
12 
13// client notification
14var async = require('async');
15 
16if ( clients.length > 0 ) {
17    console.info("Starting push to clients ...");
18    async.eachSeries( 
19        clients, 
20        function(socket, callback) {
21            socket.emit(data_key, data);
22            callback();
23        },
24        function(err) {
25            console.info(err);
26        }
27    );
28}

The array of connected client get its notification asynchronously by use of the async module.

Visualization – Smoothies Charts

The visualization is done by a JavaScript library called Smoothies Charts which supports drawing reatime graphs in an easy manner. After opening a web socket to the node process, the graph is initialized. We are plotting one line for each product category:

1var CATEGORIES = [
2    'BK', // books
3    'EL', // electronics
4    'SH'  // shoes
5];
6var STROKES = {'BK': 'rgba(0, 255, 0, 1)', 'EL': 'rgba(255, 0, 0, 1)', 'SH': 'rgba(0, 0, 255, 1)' };
7var FILLS = {'BK': 'rgba(0, 255, 0, 0.2)', 'EL': 'rgba(255, 0, 0, 0.2)', 'SH': 'rgba(0, 0, 255, 0.2)'};
8var orders = [];
9var current_state = null;
10 
11function initChart() {
12    var chart = new SmoothieChart();
13    forEachCategory( function(category) {
14        orders[category] = new TimeSeries();
15        chart.addTimeSeries(orders[category], {
16            strokeStyle : STROKES[category],
17            fillStyle : FILLS[category],
18            lineWidth : 2
19        });
20    });
21    chart.streamTo(document.getElementById("orders-category"), 500);
22 
23    setInterval( autoUpdate, 2000 );
24}
25 
26function initWebsocket() {
27    var socket = io.connect('http://localhost:8080');
28    socket.on('order_aggregates', function (data) {
29        current_state = data;
30        updateChart(data);
31    });
32}
33 
34function updateChart(data) {
35    if (data) {
36        forEachCategory( function(category) {
37            orders[category].append(new Date().getTime(), data[category] );
38        });
39    }
40}

This results in three monotonically increasing graphs that may look like this:

Conclusion

Of course, this example is simple. But it illustrates how to put up a (near) realtime view to your data. After streaming your data out of your NoSQL datastore, you may want to use more mature solutions like Storm or Splunk to process that data. Your system may not only visualize your data, it can also perform actions, something like increasing the amount of ad banners for the product category that does not sell very well etc.

The full source code for our demo application can be found at this github repo .

share post

//

More articles in this subject area

Discover exciting further topics and let the codecentric world inspire you.

//

Gemeinsam bessere Projekte umsetzen.

Wir helfen deinem Unternehmen.

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.