Transit Monitoring

Carousel imageCarousel imageCarousel imageCarousel imageCarousel imageCarousel imageCarousel imageCarousel imageCarousel imageCarousel imageCarousel imageCarousel imageCarousel imageCarousel imageCarousel image

As an example of data ingest that could pose difficulties, but with Apache NiFi is easy - let's look at Transit Data. As an example I want to ingest data for NJ Transit which is New Jersey's (USA) transit system for trains, light rail and buses. There are a few different links and they provide RSS (a type of XML). Let's ingest over REST, convert from RSS to JSON and then store this data and use it for queries and maps.

This is easy and no code.

Step by Step Instructions


  • Build a list of URLS:

[{"url":"https://www.njtransit.com/rss/RailAdvisories_feed.xml"},

{"url":"https://www.njtransit.com/rss/BusAdvisories_feed.xml"},

{"url":"https://www.njtransit.com/rss/LightRailAdvisories_feed.xml"},

{"url":"https://www.njtransit.com/rss/CustomerNotices_feed.xml"}

]

  • Call those URLs with InvokeHTTP

  • Use QueryRecord to convert from XML to JSON (could also go to AVRO, Parquet or other options).

  • Use FlattenJson to make it cleaner

  • Next step is an UpdateRecord to add some fields useful for queries like timestamp (${now():toNumber()}), unique ID (${uuid}), service name and location name.

  • Finally I store to Kudu via PutKudu which will store to my table impala::default.transitstatus using the JSONReader. I don't need to know field names or anything else about the table. DDL: https://github.com/tspannhw/CloudDemo2021/blob/main/sql/kudu.sql.

  • Also I send it to Kafka via PublishKafkaRecord_2.

  • If you wanted you could also send to any Cloud (AWS, Azure, GCS) storage or service, databases via PutDatabaseRecord, MongoDB, Elastic, SOLR, File Systems, JMS, TCP/IP, MQTT or more. You can add as many stores as you wish.


SQL Table Create for Apache Kudu / Apache Impala

CREATE TABLE transitstatus (

`uuid` STRING,

`ts` TIMESTAMP,

`locationname` STRING,

`servicename` STRING,

`title` STRING,

`description` STRING,

`pubdate` STRING,

`link` STRING,

`guid` STRING,

PRIMARY KEY (`uuid`,`ts` ) )

PARTITION BY HASH PARTITIONS 4

STORED AS KUDU TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');

Example Data


[ { "title" : "BUS 11 - May 21, 2021 01:50:45 PM", "description" : "Social Distancing Barriers to be Removed from Buses – Effective Saturday, May 22, 2021", "link" : "https://www.njtransit.com/node/1360048", "guid" : "https://www.njtransit.com/node/1360048", "pubDate" : "May 21, 2021 01:50:45 PM", "ts" : "1622581895034", "locationname" : "NJ", "uuid" : "1b9f11e4-b50e-495f-b6ef-50475580d374", "servicename" : "BusAdvisories_feed.xml"} ]
<item> <title>Feb 08, 2021 01:11:40 PM</title> <description>NJ TRANSIT Introduces New “FlexPass” Ticket Option – Effective Monday, February 8, 2021</description> <link>https://www.njtransit.com/node/1321226</link> <guid>https://www.njtransit.com/node/1321226</guid> <pubDate>Feb 08, 2021 01:11:40 PM</pubDate> </item>