I really like using JSON encoding as a way of transferring messages between processes as it is machine and language independent. Plus, it is very well suited to stream processing networks (such as rt-ai Edge) as arbitrary fields can be added to existing JSON messages and passed along. Contrast this with compiled IDLs which typically have no flexibility whatsoever.
One problem though is that binary data cannot be included in JSON messages directly. Typically base64 encoding is used to convert binary data into text. However, this is inefficient, especially in a stream processing network where base64 decoding and encoding might have to be done several times.
There are a variety of modifications to JSON around but it is very simple to just add binary data on to the end of a JSON message to form a complete message that can be transferred via MQTT for example.
In Python, an MQTT message can be published like this:
def publish(topic, jsonData, binData = None):
jsonDump = json.dumps(jsonData)
jsonString = struct.pack('>I', len(jsonDump)) + jsonDump + binData
Here, jsonData contains the normal JSON message text, binData contains the binary data to be sent along with it. To receive the message, use something like this:
def onMessage(client, userdata, message):
jsonLength = struct.unpack('>I', message.payload[0:4])
jsonData = json.loads(message.payload[4:4+jsonLength])
binData = message.payload[4+jsonLength:]
rtndf now has Python PPEs that support streaming data from a variety of environmental sensors. The sensehat PPE streams data from all of the sensors on the Raspberry Pi Sense HAT. The sensors PPE streams data from a variety of common environmental sensors:
- ADX345 accelerometer
- BMP180 pressure/temperature sensor
- HTU21D humidity sensor
- MCP9808 temperature sensor
- TMP102 temperature sensor
- TSL2561 light sensor
The specific sensors in use can be enabled by selectively commenting out lines in the sensors Python script.
sensorview is another new PPE that can display the sensor streams generated by sensehat and sensors. The screenshot shows the data from a sensehat for example.
rtndf now has a simple Python script that allows a Raspberry Pi fitted with PiCamera to be used as a PPE to generate an MJPEG video stream over MQTT. The resulting stream looks identical to that generated by uvccam (which works with UVC webcams) so picam and uvccam can be used interchangeably as video sources in rtndf pipelines.
The idea of joining together separate, lightweight processing elements to form complex pipelines is nothing new. DirectX and GStreamer have been doing this kind of thing for a long time. More recently, Apache NiFi has done a similar kind of thing but with Java classes. While Apache NiFi does have a lot of nice features, I really don’t want to live in Java hell.
I have been playing with MQTT for some time now and it is a very easy to use publish/subscribe system that’s used in all kinds of places. Seemed like it could be the glue for something…
So that’s really the background for rtnDataFlow or rtndf as it is now called. It currently uses MQTT as its pub/sub infrastructure but there’s nothing too specific there – MQTT could easily be swapped out for something else if required. The repo consists of a number of pipeline processing elements that can be used to do some (hopefully) useful things. The primary language is Python although there’s nothing stopping anything being used provided it has an MQTT client and handles the JSON messages correctly. It will even be able to include pipeline processing elements in Docker containers. This will make deployment of new, complex, pipeline processing elements very simple.
The pipeline processing elements are all joined up using topics. Pipeline processing elements can publish to one or more topics and/or subscribe to one or more topics. Because pub/sub systems are intrinsically multicasting, it’s very easy to process data in multiple ways in parallel (for redundancy, performance or functionality). MQTT also allows pipeline processing elements to be distributed on multiple systems, allowing load sharing and heterogeneous computing systems (where only some machines might be fitted with GPUs for example).
Obviously, tools are required to design the pipelines and also to manage them at runtime. The design aspect will come from an old code generation project. While that actually generates C and Python code from a design that the user inputs via a graphical interface, the rtnDataFlow version will just make sure all topic names and broker addresses line up correctly and then produce a pipeline configuration file. A special app, rtnFlowControl, will run on each system and will be responsible for implementing the pipeline design specified.
So what’s the point of all of this? I’m tired of writing (or reworking) code multiple times for slightly different applications. My goal is to keep the pipeline processing elements simple enough and tightly focused so that the specific application can be achieved by just wiring together pipeline processing elements. There’ll end up being quite a few of these of course and probably most applications will still need custom elements but it’s better than nothing. My initial use of rtnDataFlow will be to assist with experiments to see how machine learning tools can be used with IoT devices to do interesting things.
I found this interesting tutorial describing ways to use OpenCV to implement motion detection. I thought that this might form the basis of a nice pipeline processing element for rtnDataFlow. Pipeline processing elements receive a stream from an MQTT topic, process it in some way and then output the modified stream on a new MQTT topic, usually in the same form but with appropriate changes. The new script is called modet.py and it takes a Jpeg over MQTT video stream and performs motion detection using OpenCV’s BackgroundSubtractorMOG2. The output stream consists of the input frames annotated with boxes around objects in motion in the frame. The screenshot shows an example. The small box is actually where the code has detected a moving screen saver on the monitor.
It can be tricky to get stable, large boxes rather than a whole bunch of smaller ones that percolate around. The code contains seven tunable parameters that can be modified as required – comments are in the code. Some will be dependent on frame size, some on frame rate. I tuned these parameters for 1280 x 720 frames at 30 frames per second, the default for the uvccam script.
The pipeline I was using for this test looked like this:
uvccam -> modet -> avview
I also tried it with the imageproc pipeline processor just for fun:
uvccam -> imageproc -> modet ->avview
This actually works pretty well too.