Apache NiFi supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic.
Here I will use NiFi to create a 30 seconds scheduler to retrieve the CitiBike’s Station Feed. The data is in the JSON format:

Install NiFi
We can get NiFi installation file and then just unzip the file, start with the daemon.
1 | wget http://www.us.apache.org/dist/nifi/0.4.1/nifi-0.4.1-bin.tar.gz |
Then we can get to the NiFi interface with http://your-ip-addresse:9090/nifi (9090 is the default port).
Get the NiFi documentation and watch the video to learn more about NiFi.
I have created the following schema to schedule a 30 seconds cron job to get the json file.

Data Flow:
GetHTTP
: this processor is created with the 30 sec timer driven scheduler and get the data from the citibike’s site, the flow file is calleddata.json
.- getDataTime: this is a
EvaluateJsonPath
processor which gets the timestamp information within the json file by evaluating the json path$.executionTime
. This information will be written to the flow file’s new attribute:datetime
. And the destination property should be set to flowfile-attribute. Self terminate: unmatched & failure. - getContent: This
EvaluateJsonPath
processor will get the content of the flow file$.stationBeanList
and overwrite it. The destination remains unchanged, and create a new propertyJsonPaths
, the value is$.stationBeanList
. Self terminate: unmatched & failure. UpdateAttribute
: the processor will rename the flow file according to the newly created attributedatetime
, the propertyfilename
:${datetime:toDate('yyyy-MM-dd hh:mm:ss a'):format('yyyyMMdd_HHmmss')}.json
and create afolder
attribute as well${datetime:toDate('yyyy-MM-dd hh:mm:ss a'):format('yyyyMMdd')}
.LogAttribute
: is used the visualize the change.DetectDuplicate
: the processor helps to identify and remove the duplicated JSON flowfile with theCache Entry Identifier
-${datetime}
. (Distributed Cache Service should be spawned up accordingly)PutHDFS
: the writing processor to HDFS, we can use other local file system in place.Hadoop Configuration Resources
is set to/etc/hadoop/conf/core-site.xml
and the directory is set to/bikesharing/NYC/${folder}
.
PS: ${}
is the Apache NiFi Expression Language, see here.
Wrangle the JSON data
Next we will use the spark to read/wrangle the JSON, and store as parquet
data in HDFS.
In Zeppelin, we can easily use pyspark
engine to do the job, here is the script.
1 | from datetime import date, timedelta |
This script is scheduled at 3am each day to consolidate the data of yesturday.
Another line of script to get the whole parquet data back for analysis: nyc = sqlContext.read.parquet('/bikesharing/NYC_parquet')
.

