Connecting Node(s): A distributed EventEmitter

ā€¢ ā€¢ ā˜•ļø 7 minute read

Writing modern webapplications in Typescript often results in many services which operate in a connected manner. Many of these services can be regarded as separate entities (called modules) which expose a public API and encapsulate the logic in classes in an Object Oriented way. Given that the application grows larger over time, there exists a need to run these modules in separated Node.js processes and deploy them in different environments (f.e. containers) in order to scale the components on their own accord. There are multiple solutions that facilitate such ā€œdistributed applicationsā€ as discussed in previous articles. At Charp we needed a simple, lightweight, but effective PubSub setup in the form of an EventEmitter so that we can connect all our backend services without the developers having to learn very complex frameworks. Having worked with ZeroMQ in the past, I wanted to give NanoMSG (one of its offsprings) a shot as the transport layer for my distributed EventEmitter.

Abstractions of the transport layer

Computer scientists that read Andrew Tanenbaums masterpiece Computer Networks will remember the layered design of what is correctly called the Open Systems Interconnection model. The OSI model is a conceptual model that standardizes the communication functions of computing systems without regard to its underlying internal structure and technology. Both ZeroMQ and nanomsg are situated on top of the transport layer of this model, and implement transport protocols respectively called ZMTP (http://zmtp.org/) and the Scalability Protocols (https://nng.nanomsg.org/). Both frameworks are socket libraries that abstract away the difficulties of the transport layer into common communication patterns. Their design is brokerless and therefore intentionally enable developers to come up with custom built distributed applications. The brokerless design can lead to a super fast, scalable and manageable interconnected system. The learning curve of working with the basic building blocks is actually not that high for a library so deep in the network stack. However, like with everything in life, it takes time and deliberate practice in order to fully master the craft. While ZeroMQ is somewhat more mature, nanomsg deserves some credit because of its performance optimization efforts. Unlike ZeroMQ, nanomsg uses a space-optimized trie called a radix tree to store subscriptions and offers a zero-copy API which greatly improves performance by allowing memory to be copied from machine to machine while completely bypassing the CPU.

The following building blocks (scalability protocols) are currently available:

  • PAIR - simple one-to-one communication
  • BUS - simple many-to-many communication
  • REQREP - allows to build clusters of stateless services to process user requests
  • PUBSUB - distributes messages to large sets of interested subscribers
  • PIPELINE - aggregates messages from multiple sources and load balances them among many destinations
  • SURVEY - allows to query state of multiple applications in a single go

Scalability protocols are layered on top of the transport layer in the network stack. At the moment, the nanomsg library supports the following transports mechanisms:

  • INPROC - transport within a process (between threads, modules etc.)

  • IPC - transport between processes on a single machine

  • TCP - network transport via TCP

  • WS - websockets over TCP

A simple use case of the Scalability Protocols

At work, we needed a basic setup that interconnects multiple backend services running on Node.js. Leaving open the possibility for new services in other programming languages to join the system later on. It had to be performant yet simple to understand. For such requirements, plenty robust solutions do already exist. However, given that we needed to refactor an existing setup which utilized the Node.js EventEmitter, it would save us precious development time by just making an EventEmitter which runs in a distributed fashion. In fact, this proved to be a simple use case of the Scalability Protocols! I was inspired by the creator of the framework itself, who described how to realize a simple broadcasting setup through the use of a bus socket: http://250bpm.com/blog:17.

Simply put, a Distributed EventEmitter built on BUS sockets, can be constructed in two ways:

  1. By interconnecting all emitters (on construction) in a mesh topology over a range of well-defined ports. Whereby each node broadcasts its messages to its neighbours.
  1. By interconnecting all emitters in a Enterprise Service Bus topology given that the master node (which acts as the service bus) binds a well defined port and all clients connect to that given port. The master node is responsible for forwarding/broadcasting the incoming messages to all nodes.

My DistributedEventEmitter is constructed according to the second topology. Notice that in this way we realize a brokered setup with brokerless components, therefore losing some of the performance and availability benefits. Since we introduce a single point of failure, notably the master node which broadcasts all messages, it may be worthwile to look into ways to introduce multiple masters which broadcast in round robin fashion.

Letā€™s take a look at how such an emitter now works in practice. Suppose we start with the master node:

import { DistributedEventEmitter } from "nanomitter";
import { DistributedEvent } from "nanomitter/dist/src/types";

(async () => {
    const emitter = await new DistributedEventEmitter().connect();
    const logger = ({ topic, data }: DistributedEvent) =>
        console.log(`Broadcasted ${topic} ${JSON.stringify(data)}`);
    emitter.on("*", logger);
})().catch(err => {
    console.error(err);
});

I added a simple listener to my master emitter which outputs all ( wildcard: '*ā€™) messages and logs the contents. When no address is provided in the connect method, the emitter will bind port:55555 on the address tcp://127.0.0.1.

Now we introduce a worker node with a similar event listener which subscribes on the topic stockprice:

import { DistributedEventEmitter } from "nanomitter";
import { DistributedEvent } from "nanomitter/dist/src/types";

(async () => {
    const emitter = await new DistributedEventEmitter().connect();
    const logger = (msg: DistributedEvent) => console.log(msg);

    emitter.on("stockprice", logger);

    setInterval(
        () =>
            emitter.emit({
                topic: "stockprice",
                data: { ticker: "AAPL", price: 250 + Math.random() * 10 }
            }),
        300
    );
})().catch(err => {
    console.error(err);
});

Every 300ms, the Emitter will emit a message with topic stockprice and some JSON data.

If we now deploy the master node and some worker nodes we can observe the expected logs. In order to realize a distributed setup in the cloud, we can dockerize our applications and make use of a Docker Overlay Network defined in a docker-compose file:

version: '2'
services:
    master:
        build: ./master/
        environment:
        - NANO_BIND_ADDRESS=tcp://*:55555
    worker:
        build: ./worker/
        environment:
        - NANO_CONN_ADDRESS=tcp://master:55555

Final Remarks

Having worked with the brokerless frameworks provided me with new insights on the limits of TCP and why abstraction can be a useful thing. My DistributedEventEmitter is a work in progress, and it bothers me that the nanomsg library in Javascript still works in a synchronous fashion. I look forward to work with the second version called NNG which will work with the more modern Asynch/Await syntax, given that this will resolve many of the current issues. As a developer, it is very important to always keep in mind the business requirements when picking a library or framework that might solve the problem. At Charp, we try to keep the development time as low as possible while resolving the tasks at hand. However, when you have larger dev teams in your possession and reliability/scalability/performance play a key role, it might be worthwile to look at more complex alternatives from the Cloud Native movement such as NATS, KubeMQ or Strimzi. Such frameworks provide enterprise-grade message broker solutions that can run in a cloud environment such as the Azure Kubernetes Service.

Thanks for reading this post, I hope you learned something new from this article! Feel free to take a look at my repository in order to grasp the essence of what can be realised with the nanomsg building blocks.