The main reason for rt-ai Edge‘s existence is to reduce large volumes of raw data into much smaller amounts of data with high semantic content. Sometimes this can be acted upon in the local loop (i.e. within the edge space) when that makes sense or low latency is critical. Even if it is, it may still be useful to store the information extracted from the raw streams for later offline processing such as machine learning. Since Apache NiFi has all the required interfaces, it makes sense that rt-ai Edge can pass data into Apache NiFi, using it as a gateway to big data type applications.
For this simple example, I am storing recovered license plate data in Elasticsearch. The screen capture above shows the rt-ai Edge stream processing network (SPN) with the new PutNiFi stream processing element (SPE). PutNiFi transfers any rt-ai message desired into an Apache NiFi instance using MQTT for transport.
This screen capture shows the very simple Apache NiFi design. The ConsumeMQTT processor is used to collect messages from the PutNiFi SPE and then passes these to Elasticsearch for storage. Obviously a lot more could be going on here if required.
Turned out to be really easy to drive Apache NiFi from the Bosch XDK sensor node via MQTT. The XDK actually has an MQTT example project that does pretty much everything for you – it’s the MQTT Paho demo on this page. I am using it with the mosquitto broker on Ubuntu.
The screen capture shows the output of a simple display application that subscribes to the MQTT topic and graphs the sensor values. This is actually a version of sensorview in the rtndf GitHub repo. It doesn’t display the gyro or magnetometer data from the XDK but it displays the rest of the data.
On the Apache NiFi side of things, I am using the ConsumeMQTT processor. This is an example of a data record recovered from the provenance data.
One day, if I am feeling really keen, I could port the old RTIMULib2 software onto the XDK and fill out the sensorpose field in the message with something other than zeroes.
Just got a Bosch XDK sensor to try out – seems like it could be the basis of a very functional environmental sensor. It has a bunch of hardware sensors in a nice little package that includes a battery, WiFi and Bluetooth connectivity and can be wall mounted. You can certainly do the same thing with a Raspberry Pi but it is not so convenient, especially when you need battery power.
The plan is to get MQTT working and hook it up to Apache NiFi to integrate it with my other Raspberry Pi based environmental sensors.
The normal way to build Apache NiFi from source on Linux is to use:
mvn -T C2.0 clean install
More info is here incidentally. One issue with this is that it also runs all the tests which gives rise to a couple of problems. One is that some of the tests take a while and slow down the build process. The other is that, should some arcane test in code that isn’t interesting fail, the build aborts. To avoid this, build with tests turned off:
mvn -T C2.0 clean install -Dmaven.test.skip=true
Saves quite a bit of time!
The idea of joining together separate, lightweight processing elements to form complex pipelines is nothing new. DirectX and GStreamer have been doing this kind of thing for a long time. More recently, Apache NiFi has done a similar kind of thing but with Java classes. While Apache NiFi does have a lot of nice features, I really don’t want to live in Java hell.
I have been playing with MQTT for some time now and it is a very easy to use publish/subscribe system that’s used in all kinds of places. Seemed like it could be the glue for something…
So that’s really the background for rtnDataFlow or rtndf as it is now called. It currently uses MQTT as its pub/sub infrastructure but there’s nothing too specific there – MQTT could easily be swapped out for something else if required. The repo consists of a number of pipeline processing elements that can be used to do some (hopefully) useful things. The primary language is Python although there’s nothing stopping anything being used provided it has an MQTT client and handles the JSON messages correctly. It will even be able to include pipeline processing elements in Docker containers. This will make deployment of new, complex, pipeline processing elements very simple.
The pipeline processing elements are all joined up using topics. Pipeline processing elements can publish to one or more topics and/or subscribe to one or more topics. Because pub/sub systems are intrinsically multicasting, it’s very easy to process data in multiple ways in parallel (for redundancy, performance or functionality). MQTT also allows pipeline processing elements to be distributed on multiple systems, allowing load sharing and heterogeneous computing systems (where only some machines might be fitted with GPUs for example).
Obviously, tools are required to design the pipelines and also to manage them at runtime. The design aspect will come from an old code generation project. While that actually generates C and Python code from a design that the user inputs via a graphical interface, the rtnDataFlow version will just make sure all topic names and broker addresses line up correctly and then produce a pipeline configuration file. A special app, rtnFlowControl, will run on each system and will be responsible for implementing the pipeline design specified.
So what’s the point of all of this? I’m tired of writing (or reworking) code multiple times for slightly different applications. My goal is to keep the pipeline processing elements simple enough and tightly focused so that the specific application can be achieved by just wiring together pipeline processing elements. There’ll end up being quite a few of these of course and probably most applications will still need custom elements but it’s better than nothing. My initial use of rtnDataFlow will be to assist with experiments to see how machine learning tools can be used with IoT devices to do interesting things.
This one looks quite a bit nicer than my previous attempt at this design! The functionality is the same but now a lot of the heavier processing has been moved into a new infrastructure that’s been developed to integrate artificial intelligence and machine learning functions into data flows very efficiently. Now I am able to leverage Apache NiFi‘s extensive range of processors to interface to all kinds of things but also escape the JVM environment to get bare metal performance for the higher level functions including access to GPUs and things like that. In this design I am just using NiFi’s MQTT and Elasticsearch processors but it could just as easily fire processed data into HDFS, Kafka etc.
Could not resist posting this Apache NiFi graph. What’s even better is that it actually works just fine (I tend to use error log messages for debugging, that’s why a few of the boxes have error icons). NiFi makes it very easy to keep adding functionality and this is what happens when you get carried away.