BerandaComputers and TechnologyBasic Streaming Data Enrichment on Google Cloud with Dataflow SQL

Basic Streaming Data Enrichment on Google Cloud with Dataflow SQL

Let’s imaging we are a small business and you want to keep track of product demand so you need to be able to realize the top 10 products in real-time and the top 10 branches.

Image for post

Image for post

Our simple architecture

In order to complete our goal, we draw data architecture with all the services needed. Here Dataflow is where de action happens. Using Dataflow SQL we will join streaming data with a table from our Data Warehouse (BigQuery) to calculate the top 10.

Step 1- Setting up the message publisher

Since we don´t have a business yet let’s build a python script to simulate transactions.

To test copy the code without any modifications, create a py file in your Cloud Shell and run! The last line in your Cloud Console shows the gcloud command that sends the message to Pub/Sub.

When we are testing all the architecture we are uncommenting line 29, and completing line 7 with our GCP Project ID.

Image for post

Image for post

Cloud Shell

Step 2 – Create the Pub/Sub topic

Pub/Sub is an asynchronous messaging service. It offers durable message storage and real-time message delivery [Pub/Sub docs].

In step 1 our script will send a message to the topic ‘sales’ so let’s create it.

gcloud pubsub topics create sales

Run the code above in your Cloud Console.

Image for post

Image for post

Creating Pub/Sub topic

Step 3 – Add Pub/Sub source and give it a schema

Enter the BigQuery UI then go to settings and change the engine to Cloud Dataflow. If It is your first time you will need to enable the API.

Image for post

Image for post

Activating Dataflow and Data Catalog APIs

Now go to ‘ADD DATA’ and click to ‘Cloud Dataflow sources’

Image for post

Image for post

Select the topic we’ve created

Image for post

Image for post

To give a schema reload the page and look for the Sales topic in the Resources, click on Edit schema

Image for post

Image for post

Paste the code and save it. This code gives a structure to your message and it’s mandatory for Dataflow SQL.

Step 4 – Create Branchs table

Let’s create the BigQuery table that will join our stream. If It’s the first time you need to create a Dataset.

Image for post

Image for post

Create a local file branches.csv using the data below.

Create a BigQuery table using the UI

Image for post

Image for post

Step 4 —Create a Dataflow Job!

Yes! As you can see the Dataflow Job is as simple as a SQL join. It joins the data from BigQuery to the stream of events using the top_sales_agg table.

Click to the “Create Cloud Dataflow job” button. Write a Job name. There you will find two kinds of destinations: BigQuery table or Pub/Sub topic. As we will be reading the data with Data Studio we select BigQuery. You have other optional parameters like define a service account or indicate a machine type.

Image for post

Image for post

It cloud take several minutes to deploy your pipeline. This query will be transformed into a series of Dataflow steps.

Image for post

Image for post

Step 5 — Validate the process

Important

Run your python script on Cloud Shell uncommenting line 29, and complete line 7 with our GCP Project ID (check step 1).

Go to the Dataflow page, there you would see your Dataflow Job running.

Image for post

Image for post

Inside there is a Job Graph detailing all the steps your job is doing.

Image for post

Image for post

And of course, you could see the results in the BigQuery table.

Read More

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments