Bacon.js + Node.js + MongoDB: Functional Reactive Programming on the Server

Posted on by in Development

In this article, we’ll demonstrate usage of the Bacon.js library, implementing a Node.js chat application in functional reactive programming style. We’ll use the Socket.IO chat application example as our baseline, adding on some additional asynchronous workflows, interaction with MongoDB, error handling, and logging.

Why

This document is largely the result of my frustration in finding little examples of using Bacon.js on the server; most of what I’ve seen has to do with building reactive user interfaces and games. You should expect to get an not only an overview of the Bacon.js API – but also an idea for how one might model a program around a collection of high level streams and properties.

Our Demo App

In this blog post we’ll be building a Socket.IO-powered chat application. On top of basic chat functionality (sending messages to the room, being notified when users enter/leave the room), the application will:

  1. Broadcast a weather-related fact to the chat room after every 20 messages sent between users
  2. Send all message-text to an external logging server after every 20 messages
  3. Retry failed attempts at logging messages up to 10 times, pausing 100ms per retry

To use as a reference, I’ve built a vanilla-JavaScript version of this application, viewable here. The final version, implemented with Bacon.js, is viewable here.

EventStreams and Properties

Using Bacon.js, we’ll be restructuring our application around two “reactive” abstractions: EventStream and Property. The difference between the two is nicely summarized by the Bacon.js author on his GitHub page:

Each EventStream represents a stream of events. […] In addition to EventStreams, bacon.js has a thing called Property, that is almost like an EventStream, but has a “current value”. So things that change and have a current state are Properties, while things that consist of discrete events are EventStreams.

Differently put, events in an EventStream may represent things like the moment in time in which a user clicks a mouse, a timer expires, or the result of an asynchronous file-read operation has been made available. A Property on the other hand represents a value that might change over time. You might use a Property to represent the current value of an HTML input-field or an array of a system’s currently logged-in users.

So what are the properties and EventStreams in our system? To achieve parity with our original, vanilla-JavaScript implementation, we’ll want:

  1. a EventStream “connections” representing incoming socket connections
  2. a EventStream “messages” representing all messages sent by all users
  3. a EventStream “logEntries” representing the result of HTTP POSTing our messages to the logging server
  4. a Property “funFact” whose value will change after every 20 messages sent between users

Building EventStreams and Properties

Our First EventStream

Creating our first EventStream – which represents inbound connections to our Socket.IO server – is a fairly straightforward affair:

To use Bacon.fromBinder (which returns an EventStream), we simply pass the sink function as the callback to our io.on method and, voilà, we’ve got a EventStream of events representing Socket.IO socket-connections.

Collapsing Streams of Streams with flatMap

Combining EventStreams is where things really start to get interesting with Bacon.js. Take, for example, our desire to create a single stream of events representing all messages sent in our system. We’ve got connections, an EventStream with one event per Socket.IO connection. Using Bacon.fromBinder, we already know how to create a stream of each connection’s messages – but how can we collapse them down into a single, application-level EventStream? With Observable#flatMap, naturally.

I’ll try to diagram out what’s happening here, starting with the connections stream:

If each one of those sockets is used to create a new stream (of their messages), we’d have something multi-dimensional, like this:

Using flatMap, and the combiner-function passed to Socket#on, we can collapse these events into a single stream, which we call messages:

Our chat application now has a single EventStream containing both the message-text and its originating socket.

Accumulating State W/Scan

Bacon.js provides a few functions that can be used to combine events into a Property. We’ll be using the scan function to build up an array of messages sent between users of our system. We only care about these messages in chunks of 20, which we’ll add to our accumulator function’s implementation:

The relation between events coming through the messages EventStream and the value of the messageWindow Property can be demonstrated by the following table:

Using this new Property, we build a new EventStream representing getting a new “fun fact” from Mongo after every 20 messages sent between users:

Note the usage of Bacon.fromNodeCallback in the example above, which is being used to bridge the gap between a function that expects to be passed Node.js-style, error-first callback, and Bacon.js. For full documentation – including how to apply the function to arguments or to call an object’s method expecting a callback check out the API docs.

Custom Retry Logic

Last comes the entries stream – implemented in terms of messageWindow, which represents the result of HTTP POSTing batches of 20 messages to our logging server. I noted in the beginning of this article that the logging server was flakey and that we might need to retry our request; we’ll do so using a combination of flatMap and Bacon.retry:

The API is pretty straightforward: The function assigned to the source property (line 9) must return an EventStream, and will be called up to retries-number of times (line 7) in the event of a failure. The return value of the delay function represents the number of milliseconds to wait before retrying a failed operation – in this case, 100.

Consuming Streams and Properties

So far we’ve spent the bulk of our effort creating streams and properties. Bacon.js is a pull-based FRP system, so we’ll need to implement some subscribers in order for the events to start flowing through out network of transformers.

When an event comes through the connections stream, broadcast to users already in the room that a new user has connected:

When a user sends a message, broadcast it to all connected users:

When a fun fact is ready (after 20 messages), broadcast it to all connected users:

Implement a no-op subscriber, which serves only to “pull” events through the stream (causing HTTP POSTs to be fired to the logging server):

Error Handling

Important to note is that EventStream and Property compose such that errors from an inner stream will bubble up to the outer stream. A somewhat contrived example:

The same rules apply for merged event streams. As such, we’ll merge our two potentially error-producing streams and register a single onEvent handler:

Errors thrown in a combinator, however, will not be caught and passed along by Bacon.js:

Logging + Debugging

If desired, centralized logging can be implemented by mapping events through a higher-order “tagging” function and merging them into a single stream using Bacon.mergeAll. Merging into a single stream has the added benefit of enabling the the developer to get a feel for the order in which events are flowing through her system:

Takeaways

My hope is that you have a better feel for the Bacon.js API and how you might use it to build a system comprised of properties and event streams. Check out the GitHub repo, experiment with functions I haven’t explained, and let me know what you think!