The MQTT-based heart of rt-ai Edge is ideal for constructing stream processing networks (SPNs) that are intended to run continuously. rt-ai Edge tools (such as rtaiDesigner) make it easy to modify and re-deploy SPNs across multiple nodes during the design phase but, once in full time operation, these SPNs just run by themselves. An existing stream processing element (SPE), PutNiFi, allows data from an rt-ai Edge network to be stored and processed by big data tools – using Elasticsearch for example. However, these types of big data tools aren’t always appropriate, especially if low latency access is required as Java garbage collection can cause random delays.
For many applications, much simpler but reliably low latency storage is desirable. The Manifold system already has a storage app, ManifoldStore, that is optimized for timestamp-based searches of historical data. A new SPE called PutManifold allows data from an SPN to flow into a Manifold networking surface. The SPN screen capture above shows two instances of the PutManifold SPE used to transfer audio and video data from the SPN. ManifoldStore grabs passing data and stores it using timestamp as the key. Manifold applications can then access historical data flows using streamId/timestamp pairs. It is particularly simple to coordinate access across multiple data streams. This is very useful when trying to correlate events across multiple data sources at a particular point or window in time.
ManifoldStore is intrinsically schemaless in that it can store anything that consists of a JSON part and a binary data part, as used in rt-ai Edge. A new application called rtaiView is a universal viewer that allows multiple streams of all types to be displayed in a traditional split-screen monitoring format. It uses ManifoldStore for its underlying storage and provides a window into the operation of the SPN.
Manifold is designed to be very flexible with various features that reduce configuration for ad-hoc uses. This makes it very easy to perform offline processing of stored data as and when required which is ideal for offline machine learning applications.
The main reason for rt-ai Edge‘s existence is to reduce large volumes of raw data into much smaller amounts of data with high semantic content. Sometimes this can be acted upon in the local loop (i.e. within the edge space) when that makes sense or low latency is critical. Even if it is, it may still be useful to store the information extracted from the raw streams for later offline processing such as machine learning. Since Apache NiFi has all the required interfaces, it makes sense that rt-ai Edge can pass data into Apache NiFi, using it as a gateway to big data type applications.
For this simple example, I am storing recovered license plate data in Elasticsearch. The screen capture above shows the rt-ai Edge stream processing network (SPN) with the new PutNiFi stream processing element (SPE). PutNiFi transfers any rt-ai message desired into an Apache NiFi instance using MQTT for transport.
This screen capture shows the very simple Apache NiFi design. The ConsumeMQTT processor is used to collect messages from the PutNiFi SPE and then passes these to Elasticsearch for storage. Obviously a lot more could be going on here if required.
I really like using JSON encoding as a way of transferring messages between processes as it is machine and language independent. Plus, it is very well suited to stream processing networks (such as rt-ai Edge) as arbitrary fields can be added to existing JSON messages and passed along. Contrast this with compiled IDLs which typically have no flexibility whatsoever.
One problem though is that binary data cannot be included in JSON messages directly. Typically base64 encoding is used to convert binary data into text. However, this is inefficient, especially in a stream processing network where base64 decoding and encoding might have to be done several times.
There are a variety of modifications to JSON around but it is very simple to just add binary data on to the end of a JSON message to form a complete message that can be transferred via MQTT for example.
In Python, an MQTT message can be published like this:
def publish(topic, jsonData, binData = None):
jsonDump = json.dumps(jsonData)
jsonString = struct.pack('>I', len(jsonDump)) + jsonDump + binData
Here, jsonData contains the normal JSON message text, binData contains the binary data to be sent along with it. To receive the message, use something like this:
def onMessage(client, userdata, message):
jsonLength = struct.unpack('>I', message.payload[0:4])
jsonData = json.loads(message.payload[4:4+jsonLength])
binData = message.payload[4+jsonLength:]
Turned out to be really easy to drive Apache NiFi from the Bosch XDK sensor node via MQTT. The XDK actually has an MQTT example project that does pretty much everything for you – it’s the MQTT Paho demo on this page. I am using it with the mosquitto broker on Ubuntu.
The screen capture shows the output of a simple display application that subscribes to the MQTT topic and graphs the sensor values. This is actually a version of sensorview in the rtndf GitHub repo. It doesn’t display the gyro or magnetometer data from the XDK but it displays the rest of the data.
On the Apache NiFi side of things, I am using the ConsumeMQTT processor. This is an example of a data record recovered from the provenance data.
One day, if I am feeling really keen, I could port the old RTIMULib2 software onto the XDK and fill out the sensorpose field in the message with something other than zeroes.
Just got a Bosch XDK sensor to try out – seems like it could be the basis of a very functional environmental sensor. It has a bunch of hardware sensors in a nice little package that includes a battery, WiFi and Bluetooth connectivity and can be wall mounted. You can certainly do the same thing with a Raspberry Pi but it is not so convenient, especially when you need battery power.
The plan is to get MQTT working and hook it up to Apache NiFi to integrate it with my other Raspberry Pi based environmental sensors.
While I have been using MQTT so far for rtndf, I always had in mind using my own infrastructure. I have been developing the concepts on and off since about 2003 and there’s a direct line from the early versions (intended for clusters of robots to form ad-hoc meshes), through SyntroNet and SNC to the latest incarnation called Manifold. It has some nice features such as auto-discovery, optimized distributed multicast, easy resilience and a distributed directory system that makes node discovery really easy.
The Manifold is made up of nodes. The most important node is ManifoldNexus which forms the hyper-connected fabric of the Manifold. The plan is for rtndf apps to become Manifold nodes to take advantage of the capabilities of Manifold. Manifold has APIs for C++ and Python.
Even though it is very new, Manifold is working quite well. Using Python source and sink scripts, it’s possible get throughput of around 2G bytes per second for both end to end (E2E) and multicast traffic. This figure was obtained using 5000 400,000 byte packets per second on an I7 5820K machine. Between machines, rates are obviously limited by link speeds for large packets. Round-trip E2E latency is around 50uS for small packets which could probably be improved. Maximum E2E message rate is about 100,000 per second between two nodes.
Manifold does potentially lend itself to being used with poll mode Ethernet links and shared memory links. Poll mode shared memory links are especially effective as latency is minimized and data predominately bounces off the CPU’s caches, not to mention DPDK links for inter-machine connectivity. Plenty of work left to do…
rtndf now has a simple Python script that allows a Raspberry Pi fitted with PiCamera to be used as a PPE to generate an MJPEG video stream over MQTT. The resulting stream looks identical to that generated by uvccam (which works with UVC webcams) so picam and uvccam can be used interchangeably as video sources in rtndf pipelines.