Previous rt-ai Edge designs, such as the driveway monitor, are static in the sense that they just sit there, running 24/7. Another mode of operation is dynamic, where stream processing networks are created on demand and accessible via standard interfaces. This is appropriate for offloading inference from mobile devices in a sentient space for example. As users enter the space, apps on their mobile devices (XR headsets, tablets, phones etc) can access inference and other processing resources from the edge compute system supporting the space.
There are three main components in a dynamic rt-ai Edge system:
- Composable Processing Pipeline (CPP). This is the dynamic analog of the static Stream Processing Network (SPN). A CPP is a set of Stream Processing Elements (SPEs) that has been designed using rtaiDesigner. The main difference between a CPP and an SPN is that, in general, the CPP contains no data sources or sinks: these are provided by the user app.
- Conductor. The Conductor is responsible for managing an allocated resource session. User apps interact directly with the Conductor via a Websocket API while the Conductor maps data flowing on the Websocket API to and from the MQTT interfaces on the CPP(s) that have been allocated to that session.
- Orchestrator. The Orchestrator manages the dynamic system. User apps interact with the Orchestrator to request resource. The Orchestrator allocates necessary CPP resources and creates a Conductor instance to act as the source and sink for the CPP(s). The user apps are then redirected to the Websocket API on the new Conductor instance at which point data can flow to and from the user. The Orchestrator is responsible for managing all of the rt-ai Edge nodes that have been allocated to the edge compute system, allocating CPPs to nodes dynamically based on available resources and hardware (e.g. GPU or embedded inference hardware).
The diagram above shows the idle state. The heart of this design is the Orchestrator as it directs all operations. When a user (via an app or browser) wants to use some edge resource, it uses the RESTful API of the Orchestrator to identify itself and define the details of the resources that it requires. The requested resources are then mapped to one or more CPP types. In this example, the Orchestrator maintains a hot pool of CPPs to minimize start up latency. Hot pool CPPs are instantiated but idle as they have no data sources. As the Orchestrator allocates CPPs from the pool, the Orchestrator creates new CPP instances to replace them. This is useful because inference SPEs can have startup times of several seconds. The hot pool hides this delay from the user. Note that the hot pool could consist of multiple types of CPPs that perform different functions – the Orchestrator just selects the correct type to satisfy the resource request. Alternatively, there could be a fixed set of CPP instances and users are just allocated to those. Or, CPPs can be instantiated on demand if startup latency is not an issue.
Once the Orchestrator has identified one or more CPPs to satisfy the resource request, it creates a Conductor instance for the request. The Conductor presents a Websocket API to the user while connecting into rt-ai Edge’s MQTT infrastructure to communicate with the CPPs. If there is only a single CPP involved, the input pin of the CPP is connected to the output pin of the Conductor and the input pin of the Conductor is connected to the output pin(s) of the CPP. If there is more than one CPP required, the CPPs are connected together as required (this can be an arbitrary graph, not just a pipeline) and the input and output pin(s) at the edges connected to the Conductor. Once this is all set up, the Orchestrator redirects the user app to the new Conductor instance and the session can begin as shown below:
As an example, suppose an AR headset user wants to identify and annotate objects in the real world using an AR overlay. In this case, the user app might request a CPP that performs the appropriate object detection and returns the box coordinates of the object and an identified label. The user app would stream the video feed from the AR headset to the Conductor using the Websocket connection. The Conductor would then pass the video frames on to the CPP. The output of the CPP would contain the detected object metadata that is passed via the Conductor onto the Websocket connection back to the user app for rendering.
The MQTT-based heart of rt-ai Edge is ideal for constructing stream processing networks (SPNs) that are intended to run continuously. rt-ai Edge tools (such as rtaiDesigner) make it easy to modify and re-deploy SPNs across multiple nodes during the design phase but, once in full time operation, these SPNs just run by themselves. An existing stream processing element (SPE), PutNiFi, allows data from an rt-ai Edge network to be stored and processed by big data tools – using Elasticsearch for example. However, these types of big data tools aren’t always appropriate, especially if low latency access is required as Java garbage collection can cause random delays.
For many applications, much simpler but reliably low latency storage is desirable. The Manifold system already has a storage app, ManifoldStore, that is optimized for timestamp-based searches of historical data. A new SPE called PutManifold allows data from an SPN to flow into a Manifold networking surface. The SPN screen capture above shows two instances of the PutManifold SPE used to transfer audio and video data from the SPN. ManifoldStore grabs passing data and stores it using timestamp as the key. Manifold applications can then access historical data flows using streamId/timestamp pairs. It is particularly simple to coordinate access across multiple data streams. This is very useful when trying to correlate events across multiple data sources at a particular point or window in time.
ManifoldStore is intrinsically schemaless in that it can store anything that consists of a JSON part and a binary data part, as used in rt-ai Edge. A new application called rtaiView is a universal viewer that allows multiple streams of all types to be displayed in a traditional split-screen monitoring format. It uses ManifoldStore for its underlying storage and provides a window into the operation of the SPN.
Manifold is designed to be very flexible with various features that reduce configuration for ad-hoc uses. This makes it very easy to perform offline processing of stored data as and when required which is ideal for offline machine learning applications.
The main reason for rt-ai Edge‘s existence is to reduce large volumes of raw data into much smaller amounts of data with high semantic content. Sometimes this can be acted upon in the local loop (i.e. within the edge space) when that makes sense or low latency is critical. Even if it is, it may still be useful to store the information extracted from the raw streams for later offline processing such as machine learning. Since Apache NiFi has all the required interfaces, it makes sense that rt-ai Edge can pass data into Apache NiFi, using it as a gateway to big data type applications.
For this simple example, I am storing recovered license plate data in Elasticsearch. The screen capture above shows the rt-ai Edge stream processing network (SPN) with the new PutNiFi stream processing element (SPE). PutNiFi transfers any rt-ai message desired into an Apache NiFi instance using MQTT for transport.
This screen capture shows the very simple Apache NiFi design. The ConsumeMQTT processor is used to collect messages from the PutNiFi SPE and then passes these to Elasticsearch for storage. Obviously a lot more could be going on here if required.
I really like using JSON encoding as a way of transferring messages between processes as it is machine and language independent. Plus, it is very well suited to stream processing networks (such as rt-ai Edge) as arbitrary fields can be added to existing JSON messages and passed along. Contrast this with compiled IDLs which typically have no flexibility whatsoever.
One problem though is that binary data cannot be included in JSON messages directly. Typically base64 encoding is used to convert binary data into text. However, this is inefficient, especially in a stream processing network where base64 decoding and encoding might have to be done several times.
There are a variety of modifications to JSON around but it is very simple to just add binary data on to the end of a JSON message to form a complete message that can be transferred via MQTT for example.
In Python, an MQTT message can be published like this:
def publish(topic, jsonData, binData = None):
jsonDump = json.dumps(jsonData)
jsonString = struct.pack('>I', len(jsonDump)) + jsonDump + binData
Here, jsonData contains the normal JSON message text, binData contains the binary data to be sent along with it. To receive the message, use something like this:
def onMessage(client, userdata, message):
jsonLength = struct.unpack('>I', message.payload[0:4])
jsonData = json.loads(message.payload[4:4+jsonLength])
binData = message.payload[4+jsonLength:]
Turned out to be really easy to drive Apache NiFi from the Bosch XDK sensor node via MQTT. The XDK actually has an MQTT example project that does pretty much everything for you – it’s the MQTT Paho demo on this page. I am using it with the mosquitto broker on Ubuntu.
The screen capture shows the output of a simple display application that subscribes to the MQTT topic and graphs the sensor values. This is actually a version of sensorview in the rtndf GitHub repo. It doesn’t display the gyro or magnetometer data from the XDK but it displays the rest of the data.
On the Apache NiFi side of things, I am using the ConsumeMQTT processor. This is an example of a data record recovered from the provenance data.
One day, if I am feeling really keen, I could port the old RTIMULib2 software onto the XDK and fill out the sensorpose field in the message with something other than zeroes.
Just got a Bosch XDK sensor to try out – seems like it could be the basis of a very functional environmental sensor. It has a bunch of hardware sensors in a nice little package that includes a battery, WiFi and Bluetooth connectivity and can be wall mounted. You can certainly do the same thing with a Raspberry Pi but it is not so convenient, especially when you need battery power.
The plan is to get MQTT working and hook it up to Apache NiFi to integrate it with my other Raspberry Pi based environmental sensors.
While I have been using MQTT so far for rtndf, I always had in mind using my own infrastructure. I have been developing the concepts on and off since about 2003 and there’s a direct line from the early versions (intended for clusters of robots to form ad-hoc meshes), through SyntroNet and SNC to the latest incarnation called Manifold. It has some nice features such as auto-discovery, optimized distributed multicast, easy resilience and a distributed directory system that makes node discovery really easy.
The Manifold is made up of nodes. The most important node is ManifoldNexus which forms the hyper-connected fabric of the Manifold. The plan is for rtndf apps to become Manifold nodes to take advantage of the capabilities of Manifold. Manifold has APIs for C++ and Python.
Even though it is very new, Manifold is working quite well. Using Python source and sink scripts, it’s possible get throughput of around 2G bytes per second for both end to end (E2E) and multicast traffic. This figure was obtained using 5000 400,000 byte packets per second on an I7 5820K machine. Between machines, rates are obviously limited by link speeds for large packets. Round-trip E2E latency is around 50uS for small packets which could probably be improved. Maximum E2E message rate is about 100,000 per second between two nodes.
Manifold does potentially lend itself to being used with poll mode Ethernet links and shared memory links. Poll mode shared memory links are especially effective as latency is minimized and data predominately bounces off the CPU’s caches, not to mention DPDK links for inter-machine connectivity. Plenty of work left to do…