EdgeAI: Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi
*Building MiNiFi IoT Apps with the new Cloudera EFM *
It is very easy to build a drag and drop EdgeAI application with EFM and then push to all your MiNiFi agents.
Cloudera Edge Management CEM-1.1.1
Download the newest CEM today!
https://www.cloudera.com/downloads/cdf/cem.html
https://docs.cloudera.com/cem/1.1.1/release-notes/topics/cem-whats-new.html
NiFi Flow Receiving From MiNiFi Java Agent
In a cluster in my CDP-DC Cluster I consume Kafka messages sent from my remote NiFi gateway to publish alerts to Kafka and push records to Apache HBase and Apache Kudu. We filter our data with Streaming SQL.
*We can use SQL to route, create aggregates like averages, chose a subset of fields and limit data returned. Using the power of Apache Calcite, Streaming SQL in NiFi is a game changer against Record Data Types including CSV, XML, Avro, Parquet, JSON and Grokable text. Read and write different formats and convert when your SQL is done. Or just to SELECT * FROM FLOWFILE to get everything. *
We can see this flow from Atlas as we trace the data lineage and provenance from Kafka topic.
We can search Atlas for Kafka Topics.
From coral Kafka topic to NiFi to Kudu.
Details on Coral Kafka Topic
Examining the Hive Metastore Data on the Coral Kudu Table
NiFi Flow Details in Atlas
Details on Alerts Topic
Statistics from Atlas
*See: * https://www.datainmotion.dev/2020/02/connecting-apache-nifi-to-apache-atlas.html
Example Web Camera Image
** Example JSON Record**
[{"cputemp":59,"id":"20200221190718_2632409e-f635-48e7-9f32-aa1333f3b8f9","temperature":"39.44","memory":91.1,"score_1":"0.29","starttime":"02/21/2020 14:07:13","label_1":"hair spray","tempf":"102.34","diskusage":"50373.5 MB","message":"Success","ambient_light":"329.92","host":"coralenv","cpu":34.1,"macaddress":"b8:27:eb:99:64:6b","pressure":"102.76","score_2":"0.14","ip":"127.0.1.1","te":"5.10","systemtime":"02/21/2020 14:07:18","label_2":"syringe","humidity":"10.21"}]
Querying Kudu results in Hue
Pushing Alerts to Slack from NiFi
I am running on Apache NiFi 1.11.1 and wanted to point out a new feature. Download flow: Will download the highlighted flow/pgroup as JSON.
Looking at NiFi counters to monitor progress:
We can see how easy it is to ingest IoT sensor data and run AI algorithms on Coral TPUs.
Shell (coralrun.sh)
p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 18.0px Menlo; color: #ffffff; background-color: #224fbc} span.s1 {font-variant-ligatures: no-common-ligatures}
!/bin/bash
DATE=$(date +"%Y-%m-%d_%H%M%S")
fswebcam -q -r 1280x720 /opt/demo/images/$DATE.jpg
python3 -W ignore /opt/demo/test.py --image /opt/demo/images/$DATE.jpg 2>/dev/null
Kudu Table DDL
https://github.com/tspannhw/table-ddl
Python 3 (test.py)
p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 18.0px Menlo; color: #ffffff; background-color: #224fbc} p.p2 {margin: 0.0px 0.0px 0.0px 0.0px; font: 18.0px Menlo; color: #ffffff; background-color: #224fbc; min-height: 21.0px} span.s1 {font-variant-ligatures: no-common-ligatures}import time
import sys
import subprocess
import os
import base64
import uuid
import datetime
import traceback
import base64
import json
from time import gmtime, strftime
import math
import random, string
import time
import psutil
import uuid
from getmac import get_mac_address
from coral.enviro.board import EnviroBoard
from luma.core.render import canvas
from PIL import Image, ImageDraw, ImageFont
import os
import argparse
from edgetpu.classification.engine import ClassificationEngine
Importing socket library
import socket
start = time.time()
starttf = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
def ReadLabelFile(file_path):
with open(file\_path, 'r') as f: lines = f.readlines() ret = {} for line in lines: pair = line.strip().split(maxsplit=1) ret[int(pair[0])] = pair[1].strip() return ret
Google Example Code
def update_display(display, msg):
with canvas(display) as draw: draw.text((0, 0), msg, fill='white')
def getCPUtemperature():
res = os.popen('vcgencmd measure\_temp').readline() return(res.replace("temp=","").replace("'C\n",""))
Get MAC address of a local interfaces
def psutil_iface(iface):
# type: (str) -> Optional[str] import psutil nics = psutil.net\_if\_addrs() if iface in nics: nic = nics[iface] for i in nic: if i.family == psutil.AF\_LINK: return i.address
/opt/demo/examples-camera/all_models
row = { }
try:
i = 1
while i == 1:
parser = argparse.ArgumentParser() parser.add\_argument('--image', help='File path of the image to be recognized.', required=True) args = parser.parse\_args() # Prepare labels. labels = ReadLabelFile('/opt/demo/examples-camera/all\_models/imagenet\_labels.txt') # Initialize engine. engine = ClassificationEngine('/opt/demo/examples-camera/all\_models/inception\_v4\_299\_quant\_edgetpu.tflite') # Run inference. img = Image.open(args.image) scores = {} kCount = 1 # Iterate Inference Results for result in engine.ClassifyWithImage(img, top\_k=5): scores['label\_' + str(kCount)] = labels[result[0]] scores['score\_' + str(kCount)] = "{:.2f}".format(result[1]) kCount = kCount + 1 enviro = EnviroBoard() host\_name = socket.gethostname() host\_ip = socket.gethostbyname(host\_name) cpuTemp=int(float(getCPUtemperature())) uuid2 = '{0}\_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4()) usage = psutil.disk\_usage("/") end = time.time() row.update(scores) row['host'] = os.uname()[1] row['ip'] = host\_ip row['macaddress'] = psutil\_iface('wlan0') row['cputemp'] = round(cpuTemp,2) row['te'] = "{0:.2f}".format((end-start)) row['starttime'] = starttf row['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S') row['cpu'] = psutil.cpu\_percent(interval=1) row['diskusage'] = "{:.1f} MB".format(float(usage.free) / 1024 / 1024) row['memory'] = psutil.virtual\_memory().percent row['id'] = str(uuid2) row['message'] = "Success" row['temperature'] = '{0:.2f}'.format(enviro.temperature) row['humidity'] = '{0:.2f}'.format(enviro.humidity) row['tempf'] = '{0:.2f}'.format((enviro.temperature \* 1.8) + 32) row['ambient\_light'] = '{0}'.format(enviro.ambient\_light) row['pressure'] = '{0:.2f}'.format(enviro.pressure) msg = 'Temp: {0}'.format(row['temperature']) msg += 'IP: {0}'.format(row['ip']) update\_display(enviro.display, msg)
i = 2
except:
row['message'] = "Error"
print(json.dumps(row))
Source Code:
https://github.com/tspannhw/nifi-minifi-coral-env
Sensors / Devices / Hardware:
- Humdity-HDC2010 humidity sensor
- Light-OPT3002 ambient light sensor
- Barometric-BMP280 barometric pressure sensor
- PS3 Eye Camera and Microphone USB
- Raspberry Pi 3B+
- Google Coral Environmental Sensor Board
- Google Coral USB Accelerator TPU
References:
- https://coral.ai/docs/enviro-board/get-started/
- https://coral.ai/products/accelerator/
- https://coral.ai/docs/enviro-board/datasheet/
- https://github.com/tspannhw/nifi-minifi-coral-env
- https://github.com/tspannhw/nifi-minifi-coral
- https://www.datainmotion.dev/2019/08/google-coral-tpu-with-edge-devices-and.html
- https://github.com/tspannhw/minifi-grove-sensors
- https://coral.ai/docs/enviro-board/get-started/