FLiP-Py-Pi-GasThermal: Building an IoT Edge Application with Apache Pulsar and Python for TVOC and CO2 Ingest

Timothy Spann. 🇺🇦 - Apr 27 '22 - - Dev Community

FLiP-Py-Pi-GasThermal

Source: https://github.com/tspannhw/FLiP-Py-Pi-GasThermal

tags: Apache Pulsar, Python, Raspberry Pi, Gas Sensor + Thermal Camera Sensors, Apache Flink, Trino/Presto SQL

ThermalCam

Sensors

  • Pimoroni BreakoutGarden: SGP30
    • Sensiron SGP30 TVOC and eCO2 sensor (datasheet)
    • TVOC sensing from 0-60,000 ppb (parts per billion)
    • CO2 sensing from 400 to 60,000 ppm (parts per million)
  • Pimoroni BreakoutGarden: MLX90640 Thermal Camera

HardWare

Architecture

designthis more sendmoredata

Build

bin/pulsar-admin topics create persistent://public/default/garden3bin/pulsar-client consume "persistent://public/default/garden3" -s "garden3reader" -n 0class Garden(Record): cpu = Float() diskusage = String() endtime = String() equivalentco2ppm = String() host = String() hostname = String() ipaddress = String() macaddress = String() memory = Float() rowid = String() runtime = Integer() starttime = String() systemtime = String() totalvocppb = String() ts = Integer() uuid = String()----- got message -----key:[garden3_uuid_yvs_20220306191528], properties:[], content:{ "cpu": 0.0, "diskusage": "103496.6 MB", "endtime": "1646594128.2460103", "equivalentco2ppm": " 413", "host": "garden3", "hostname": "garden3", "ipaddress": "192.168.1.198", "macaddress": "dc:a6:32:32:98:20", "memory": 9.2, "rowid": "20220306191528_707b34d4-7299-4233-a495-d2d97393e834", "runtime": 0, "starttime": "03/06/2022 14:15:28", "systemtime": "03/06/2022 14:15:29", "totalvocppb": " 5", "ts": 1646594129, "uuid": "garden3_uuid_yvs_20220306191528"}presto> select * from pulsar."public/default"."garden3"; cpu | diskusage | endtime | equivalentco2ppm | host | hostname | ipaddress | macaddress | memory | rowid | runtime | -----+-------------+--------------------+------------------+---------+----------+---------------+-------------------+--------+-----------------------------------------------------+---------+------ 6.5 | 103496.5 MB | 1646594650.7116666 | 418 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192410_197b6b0c-b86c-4191-9e9c-11777767825e | 0 | 03/06 6.7 | 103496.5 MB | 1646594651.7441382 | 418 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192411_4dab3212-423b-46a9-ae39-b10eb363336d | 0 | 03/06 1.3 | 103496.5 MB | 1646594652.7764313 | 421 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192412_d24a819b-4ca1-489a-9683-da48bc37185c | 0 | 03/06 0.2 | 103496.5 MB | 1646594653.810233 | 421 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192413_f4e43177-2486-4a36-b27a-903028d6aacf | 0 | 03/06 0.0 | 103496.5 MB | 1646594654.8467774 | 416 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192414_558d3db8-725a-46ec-ad1f-89f8067142f8 | 0 | 03/06 0.0 | 103496.5 MB | 1646594655.880628 | 420 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192415_1eeea533-7f7e-4945-a089-d4b2f8681e14 | 0 | 03/06 0.0 | 103496.5 MB | 1646594656.9145741 | 418 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192416_d7d8c6ea-2adf-4704-8f49-3108a7328f26 | 0 | 03/06 0.0 | 103496.5 MB | 1646594657.9489982 | 425 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192417_c71cbb6e-c59f-4855-84ae-b796fd3d7a76 | 0 | 03/06 0.0 | 103496.5 MB | 1646594658.9828157 | 420 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192418_c14204dd-4c1c-4e76-a272-8ddecd11a97c | 0 | 03/06 0.0 | 103496.5 MB | 1646594660.0187812 | 420 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192419_c7583063-06e0-4601-806a-96619b6bb136 | 0 | 03/06 0.0 | 103496.5 MB | 1646594661.0531507 | 428 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192421_57e4a8ed-af6f-4dd0-b143-f29f1a211fdb | 0 | 03/06 0.0 | 103496.5 MB | 1646594662.087301 | 421 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192422_cf2ce36e-c996-41ac-baec-db292cffdd37 | 0 | 03/06 6.2 | 103496.5 MB | 1646594663.1214898 | 415 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192423_30b13428-ac2e-4929-b9f5-c4c3fbc3312a | 0 | 03/06 6.5 | 103496.5 MB | 1646594664.1541135 | 424 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192424_d1517315-85fa-4923-b634-9b673c5b20ba | 0 | 03/06 3.6 | 103496.5 MB | 1646594665.1890867 | 417 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192425_3984b0bc-a92e-4458-834c-e65259bb7a4d | 0 | 03/06 0.0 | 103496.5 MB | 1646594666.221572 | 422 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192426_e0debd83-47e8-43f7-8ec5-cf0dbc27b87e | 0 | 03/06 0.0 | 103496.5 MB | 1646594667.2555919 | 424 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192427_6ec920e7-56d2-4730-bddc-e18592cf1210 | 0 | 03/06 0.0 | 103496.5 MB | 1646594668.2893167 | 428 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192428_434b4458-6f89-4161-b967-f29796d8bf5d | 0 | 03/06 0.0 | 103496.5 MB | 1646594669.3234618 | 426 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192429_5bb29c2e-d078-405d-b8f6-967ea4753b3f | 0 | 03/06 0.0 | 103496.5 MB | 1646594670.359024 | 421 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192430_05b86f6b-4d28-497c-acd6-d14f7ce27157 | 0 | 03/06 0.0 | 103496.5 MB | 1646594671.392967 | 432 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192431_b7a2955a-6f39-4439-8eb9-27d7a2633836 | 0 | 03/06 0.0 | 103496.5 MB | 1646594672.4271743 | 426 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192432_29861384-fc30-4deb-bc1d-b7f2d27ef89d | 0 | 03/06 0.0 | 103496.5 MB | 1646594673.4611707 | 424 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192433_25435335-39fe-4294-b913-95c63537a743 | 0 | 03/06 0.0 | 103496.5 MB | 1646594674.4951062 | 424 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192434_a52a90e8-b11c-467b-b87f-936432bc8988 | 0 | 03/06 3.3 | 103496.5 MB | 1646594675.531778 | 436 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192435_48bab6a9-9f2e-443b-9fed-f096f6c24ebd | 0 | 03/06 6.5 | 103496.5 MB | 1646594676.5642908 | 433 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192436_761f79dc-06f6-4ed3-8062-5227b6842b77 | 0 | 03/06 6.2 | 103496.5 MB | 1646594677.5965276 | 421 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192437_6d8d49c9-0519-4825-b6b7-ed435e9fe747 | 0 | 03/06 0.0 | 103496.5 MB | 1646594678.6290672 | 423 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192438_a920872d-f152-4b18-94a7-b4dfe5641482 | 0 | 03/06 0.0 | 103496.5 MB | 1646594679.6629703 | 418 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192439_032ce574-10f6-4b50-b5e8-102b466af65a | 0 | 03/06 0.0 | 103496.5 MB | 1646594680.699419 | 420 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192440_cde6f78a-d028-4f87-a95b-156bac0ee0c2 | 0 | 03/06 0.0 | 103496.5 MB | 1646594681.7338123 | 424 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192441_7400689a-6974-4ff2-a688-96c317d45d03 | 0 | 03/06 0.0 | 103496.5 MB | 1646594682.767776 | 417 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192442_46696fa7-295c-4fe6-bf49-8d8405e21cf5 | 0 | 03/06 0.0 | 103496.5 MB | 1646594683.8017883 | 420 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192443_4de9f79c-0169-49d6-b337-fb57dcb5cf7e | 0 | 03/06 0.0 | 103496.5 MB | 1646594684.8360054 | 422 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192444_63081ac4-2c87-462d-bc16-ab506e9c6db2 | 0 | 03/06 0.0 | 103496.5 MB | 1646594685.8720167 | 420 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192445_fee13e36-26c6-4f06-894c-8b96cb0f3bae | 0 | 03/06 0.0 | 103496.5 MB | 1646594686.90594 | 431 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192446_8aaa423d-bfe3-43e8-8653-d554c2afb8d6 | 0 | 03/06 0.8 | 103496.4 MB | 1646594687.939747 | 416 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192447_bfe339cd-3fb4-4d2a-b612-3a2363e1b83b | 0 | 03/06 6.5 | 103496.4 MB | 1646594688.9728699 | 424 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192448_8809fc3e-75f8-4aca-a141-6e5d4872b0de | 0 | 03/06 6.5 | 103496.4 MB | 1646594690.0051649 | 412 | garden3 | garden3 | 192.168.1.198 | dc:a6:32:32:98:20 | 9.2 | 20220306192449_0d0b4b62-0321-4bcd-952b-14607333d9f5 | 0 | 03/06presto> desc pulsar."public/default"."garden3"; Column | Type | Extra | Comment -------------------+-----------+-------+----------------------------------------------------------------------------- cpu | real | | ["null","float"] diskusage | varchar | | ["null","string"] endtime | varchar | | ["null","string"] equivalentco2ppm | varchar | | ["null","string"] host | varchar | | ["null","string"] hostname | varchar | | ["null","string"] ipaddress | varchar | | ["null","string"] macaddress | varchar | | ["null","string"] memory | real | | ["null","float"] rowid | varchar | | ["null","string"] runtime | integer | | ["null","int"] starttime | varchar | | ["null","string"] systemtime | varchar | | ["null","string"] totalvocppb | varchar | | ["null","string"] ts | integer | | ["null","int"] uuid | varchar | | ["null","string"] __partition__ | integer | | The partition number which the message belongs to __event_time__ | timestamp | | Application defined timestamp in milliseconds of when the event occurred __publish_time__ | timestamp | | The timestamp in milliseconds of when event as published __message_id__ | varchar | | The message ID of the message used to generate this row __sequence_id__ | bigint | | The sequence ID of the message used to generate this row __producer_name__ | varchar | | The name of the producer that publish the message used to generate this row __key__ | varchar | | The partition key for the topic __properties__ | varchar | | User defined properties     
Enter fullscreen mode Exit fullscreen mode

Presto/Trino gives us access to all that tasty meta data for each message/event/row/record/thing/data stuff. All with special names to help prevent collisions.

Meta Data

  • partition - if we have a partitioned topic, which partition does this message belong to.
  • event_time - you know what time it is! Timestamp in ms for event action.
  • publish_time - when was this message published to the topic?
  • message_id - unique id for this message
  • sequence_id - ordering information for this message
  • producer_name - who sent me this?
  • key - did you assign a key like I asked you to?
  • properties - all those extra fields you added around the payload

Spark Structured Streaming

val dfPulsar = spark.readStream.format("pulsar").option("service.url", "pulsar://localhost:6650").option("admin.url", "http://localhost:8080").option("topic", "persistent://public/default/garden3").load()dfPulsar.printSchema()val pQuery = dfPulsar.selectExpr("*").writeStream.format("parquet").option("truncate", false) .option("checkpointLocation", "/tmp/checkpoint").option("path", "/opt/demo/gasthermal").start() pQuery.explain()pQuery.awaitTermination()pQuery.stop()// can be "orc", "json", "csv", etc.
Enter fullscreen mode Exit fullscreen mode

Spark

Show Me The Data

We can visualize data from Apache Pulsar by consuming it through the web sockets interface in a simple JQuery Single Page Web Application like below.

JavaScript

Example Parquet Files

pip3 install parquet-tools -Uparquet-tools inspect part-00000-b7e1f8dc-956d-4130-bc59-7b1435e41391-c000.snappy.parquet############ file meta data ############created_by: parquet-mr version 1.12.1 (build 2a5c06c58fa987f85aa22170be14d927d5ff6e7d)num_columns: 23num_rows: 1num_row_groups: 1format_version: 1.0serialized_size: 5071############ Columns ############cpudiskusageendtimeequivalentco2ppmhosthostnameipaddressmacaddressmemoryrowidruntimestarttimesystemtimetotalvocppbtsuuid __key__ topic __messageId__ publishTime__eventTimekeyvalue############ Column(cpu) ############name: cpupath: cpumax_definition_level: 1max_repetition_level: 0physical_type: FLOATlogical_type: Noneconverted_type (legacy): NONE############ Column(diskusage) ############name: diskusagepath: diskusagemax_definition_level: 1max_repetition_level: 0physical_type: BYTE_ARRAYlogical_type: Stringconverted_type (legacy): UTF8############ Column(endtime) ############name: endtimepath: endtimemax_definition_level: 1max_repetition_level: 0physical_type: BYTE_ARRAYlogical_type: Stringconverted_type (legacy): UTF8############ Column(equivalentco2ppm) ############name: equivalentco2ppmpath: equivalentco2ppmmax_definition_level: 1max_repetition_level: 0physical_type: BYTE_ARRAYlogical_type: Stringconverted_type (legacy): UTF8############ Column(host) ############name: hostpath: hostmax_definition_level: 1max_repetition_level: 0physical_type: BYTE_ARRAYlogical_type: Stringconverted_type (legacy): UTF8############ Column(hostname) ############name: hostnamepath: hostnamemax_definition_level: 1max_repetition_level: 0physical_type: BYTE_ARRAYlogical_type: Stringconverted_type (legacy): UTF8############ Column(ipaddress) ############name: ipaddresspath: ipaddressmax_definition_level: 1max_repetition_level: 0physical_type: BYTE_ARRAYlogical_type: Stringconverted_type (legacy): UTF8############ Column(macaddress) ############name: macaddresspath: macaddressmax_definition_level: 1max_repetition_level: 0physical_type: BYTE_ARRAYlogical_type: Stringconverted_type (legacy): UTF8############ Column(memory) ############name: memorypath: memorymax_definition_level: 1max_repetition_level: 0physical_type: FLOATlogical_type: Noneconverted_type (legacy): NONE############ Column(rowid) ############name: rowidpath: rowidmax_definition_level: 1max_repetition_level: 0physical_type: BYTE_ARRAYlogical_type: Stringconverted_type (legacy): UTF8############ Column(runtime) ############name: runtimepath: runtimemax_definition_level: 1max_repetition_level: 0physical_type: INT32logical_type: Noneconverted_type (legacy): NONE############ Column(starttime) ############name: starttimepath: starttimemax_definition_level: 1max_repetition_level: 0physical_type: BYTE_ARRAYlogical_type: Stringconverted_type (legacy): UTF8############ Column(systemtime) ############name: systemtimepath: systemtimemax_definition_level: 1max_repetition_level: 0physical_type: BYTE_ARRAYlogical_type: Stringconverted_type (legacy): UTF8############ Column(totalvocppb) ############name: totalvocppbpath: totalvocppbmax_definition_level: 1max_repetition_level: 0physical_type: BYTE_ARRAYlogical_type: Stringconverted_type (legacy): UTF8############ Column(ts) ############name: tspath: tsmax_definition_level: 1max_repetition_level: 0physical_type: INT32logical_type: Noneconverted_type (legacy): NONE############ Column(uuid) ############name: uuidpath: uuidmax_definition_level: 1max_repetition_level: 0physical_type: BYTE_ARRAYlogical_type: Stringconverted_type (legacy): UTF8############ Column(__key) ############name: __keypath:__ keymax_definition_level: 1max_repetition_level: 0physical_type: BYTE_ARRAYlogical_type: Noneconverted_type (legacy): NONE############ Column(__topic) ############name:__topicpath: __topicmax_definition_level: 1max_repetition_level: 0physical_type: BYTE_ARRAYlogical_type: Stringconverted_type (legacy): UTF8############ Column(__messageId) ############name: __messageIdpath:__ messageIdmax_definition_level: 1max_repetition_level: 0physical_type: BYTE_ARRAYlogical_type: Noneconverted_type (legacy): NONE############ Column(__publishTime) ############name:__publishTimepath: __publishTimemax_definition_level: 1max_repetition_level: 0physical_type: INT96logical_type: Noneconverted_type (legacy): NONE############ Column(__eventTime) ############name: __eventTimepath:__ eventTimemax_definition_level: 1max_repetition_level: 0physical_type: INT96logical_type: Noneconverted_type (legacy): NONE############ Column(key) ############name: keypath: __messageProperties.key_value.keymax_definition_level: 2max_repetition_level: 1physical_type: BYTE_ARRAYlogical_type: Stringconverted_type (legacy): UTF8############ Column(value) ############name: valuepath:__messageProperties.key_value.valuemax_definition_level: 3max_repetition_level: 1physical_type: BYTE_ARRAYlogical_type: Stringconverted_type (legacy): UTF8
Enter fullscreen mode Exit fullscreen mode

Flink

CREATE CATALOG pulsar WITH ( 'type' = 'pulsar', 'service-url' = 'pulsar://pulsar1:6650', 'admin-url' = 'http://pulsar1:8080', 'format' = 'json');USE CATALOG pulsar;SHOW TABLES;Flink SQL> describe garden3;+------------------+--------+------+-----+--------+-----------+| name | type | null | key | extras | watermark |+------------------+--------+------+-----+--------+-----------+| cpu | FLOAT | true | | | || diskusage | STRING | true | | | || endtime | STRING | true | | | || equivalentco2ppm | STRING | true | | | || host | STRING | true | | | || hostname | STRING | true | | | || ipaddress | STRING | true | | | || macaddress | STRING | true | | | || memory | FLOAT | true | | | || rowid | STRING | true | | | || runtime | INT | true | | | || starttime | STRING | true | | | || systemtime | STRING | true | | | || totalvocppb | STRING | true | | | || ts | INT | true | | | || uuid | STRING | true | | | |+------------------+--------+------+-----+--------+-----------+16 rows in setselect equivalentco2ppm, totalvocppb, cpu, starttime, systemtime, ts, cpu, diskusage, endtime, memory, uuid from garden3;select max(equivalentco2ppm) as MaxCO2, max(totalvocppb) as MaxVocPPB from garden3;// TODOAdd your own table publishTime TIMESTAMP(3) METADATA, WATERMARK FOR publishTime AS publishTime - INTERVAL '5' SECOND  
Enter fullscreen mode Exit fullscreen mode

Flink Flink2 Flink3

Apache NiFi (FLiPN)

  • Choose a processor from the palette

ShowNiFi

  • Consume messages from Apache Pulsar

ConsumePulsar

  • Full Apache NiFi Data Flow with Apache Pulsar and MongoDB

Flow

MongoDB

mongo -u username1 -p password1 --authenticationDatabase admin pulsar1:27017/inventoryshow databasesdb.createCollection("garden3")show collectionsdb.garden3.find().pretty(){ "_id" : ObjectId("622f7315f99b9a338d60592f"), "cpu" : 0, "diskusage" : "101615.9 MB", "endtime" : "1647276083.2033532", "equivalentco2ppm" : " 407", "host" : "garden3", "hostname" : "garden3", "ipaddress" : "192.168.1.199", "macaddress" : "dc:a6:32:32:98:20", "memory" : 8.8, "rowid" : "20220314164123_0e19c5e6-45f5-405e-bd93-9aed05b37630", "runtime" : 0, "starttime" : "03/14/2022 12:41:23", "systemtime" : "03/14/2022 12:41:24", "totalvocppb" : " 5", "ts" : 1647276084, "uuid" : "garden3_uuid_xrl_20220314164123"}
Enter fullscreen mode Exit fullscreen mode

mongo mongoquery

References

Additional Heat Images

hot hot2 more3 doesanyonereadthis4

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .