Data Flow with Apache Nifi in Google Cloud Platform

Sıla Kazan
6 min readJan 16, 2022

Hi everyone,

In the data world today, we will create a pipeline by performing a certain number of operations on real data. Sometimes the data does not come in csv files, but via database or via APIs. Now I will tell you how I made an Apache Nifi project with a Virtual Machine that we will create on Google Cloud Consele.

Our goal in this project is to pull the data in real time via Twitter and give them certain rules and send them to different channels.

What is Apache Nifi?

Apache NiFi is a flow-based tool that we can configure the flow of the data step by stepIt provides real-time control that makes it easy to manage the movement of data between any source and any destination.
NiFi let us to take data from different sources (from files, SQLs, NoSQLs, APIs, Kafka etc) and tranform it with NiFi Expression Language and load/send it to any target sources that we want. At the same time we can specify some actions as output of the processes like sending messages to Slack or e-mail.

Let’s get started…

Step By Step

STEP 1:

Let’s create a vm instance with the google cloud console.

1-) Go to compute engine

2–) Go to Create Instance

Create Instance

STEP 2:

Let’s do the general settings and installation of Apache Nifi.

1-) Apache nifi download link

wget https://archive.apache.org/dist/nifi/1.13.2/

Nifi install

2-) nifi.properties let’s go into the file and set the following configuration setting

3-) Firewall rules created because in order for you to open it in your web browser, you need to register the ip address of your machine.

STEP 3:

If you’ve come this far, congratulations, let’s started Apache Nifi :)

1-) run bin/nifi.sh start

2-) https://localhost:8443/nifi

localhost change => External IP

Apache nifi may take a few minutes to get up.

You will get an empty Apache Nifi screen.

Let’s pull twitter data;

STEP 4:

1-) Add Processor setting;

2-) Now we should decided data source for our task, this could change with your task. I decided to take Twitter API.This provide us Live Stream Tweet Data with API. Tweets about the economy.

  • By going to the PROPERTIES section in the GetTwitter processor section, we first mark the “Twitter Endpoint” property as Filter Endpoint.If you have successfully created a Twitter Developer account, we will provide you with Consumer Key, Consumer Secret, Access Token and Access Token Secret information by creating an App there, and we enter this information in the sections here.I want to tell you that I will only be interested in Turkish tweets by typing “tr” in the Languages section and giving the keyword “economy” in the Terms to Filter On section to pull only data containing economics.

3-) The incoming tweets are as follows.

  • There is a lot of information about each incoming tweet, I don’t want it all, I have to take some actions.

4-) JoltTansform JSON:When we examine the data, we see how many features are coming in.I don’t want to store all of this, I just want the areas I want to come to.Now we can move on to the JoltTransformJSON Processor, which is the process we will do next.

  • So I know when the user posted this tweet,the corresponding tweet,the username,the number of favorites he received,the total number of followers, etc. I want to take care of the information.It is thanks to this Processor that we have performed this operation.

5-) EvaluateJsonPath Process:I need to assign some attributes in JSON Format to variables.Here is the EvaluateJsonPath Processor that allows us to do this operation.

Now we have come to the part where we will separate the data according to certain rules and then transfer it to the targets.

6-) RouteOnAttributeProcessor: We create 5different rules.

STEP 5:

Now we finished half of the project and we can start second part of it. Until this stage we prepared our data to send events. Now time is to trigger events. For this I’ll firstly share the picture with you and later explain them step by step.

1-) SLACK CHANNEL: Send the tweets that have a raise in them as a notification.

2-)Kafka CHANNEL: We wanted the data that was not transmitted to any rules to be stored in this kafka. We have created a topic called “unmatched” for this.

Zookeeper started
Kafka topic created
kafka properties setting

3-) Elasticsearch CHANNEL: It sends tweets to Elasticsearch that contain speculation and location from Ankara.

4-) PutSQL (PostgreSQL) CHANNEL: It sends tweets containing dollars to postgreSQL.

5-) Gmail CHANNEL:I told him to e-mail the tweets about the dollar and inflation with Gmail.

And thats all. Our all configurations are done. Now our all processors ready to start. Only thing that we should do is run them all.

We have created a simple ETL process. We have learned to provide such a data flow. I hope it’s better understood now.

I have tried to explain the data flow process and the processes I have performed other than it in detail to you as much as I can.I hope it was useful.You can send me any questions and feedback :)

See you in my next project :)

--

--