IoT Pipeline
The goal of this short article is to introduce some Iot and Bigdata GCP components and illustrate a basic but effective data pipeline configuration.
We’ll took a common IoT platforms usage as an example: environmental monitoring.
We can imagine an enologist friend of us, complaining about his job drawbacks. He need to move continuosly from a vineyard to another to monitor air temperature, ground humidity and so on, and those activites take a lot of time.
In addition, his customers are several and far apart, so data collection activity is taking a lot or time more than data analysis itself.
Our idea is to propose to our friend a system where several sensors are distributed over the vineyards, and collected data is organized in a way it can be analyzed remotely.
Data flow
+--------+ +--------------+ +---------------+ +-------------------+ +------------------+
| Device +------> IoT registry +-----> Pub/Sub topic +------> Dataflow pipeline +-----> Bigquery dataset |
+--------+ +--------------+ +---------------+ +-------------------+ +------------------+
Configuration
Create a new project (to do so the command needs to be ran from an already authenticated configuration):
gcloud projects create iot-vineyard
Let’s create a dedicated configuration to manage the new project:
gcloud config configurations create iot-vineyard
Project and account should be set up for the new configuration:
gcloud config set project iot-vineyard
gcloud config set account <my_account@gmail.com>
Let’s start from our data destination: we’ll generate a new dataset and an empty table for Bigquery. We are going to do so using Bigquery DDL syntax.
bq mk --dataset --description "All we need to analyze to make a good wine" iot-vineyard:vineyard
bq mk --location=europe-west1 --table iot-vineyard:vineyard.humidity ./bq_umidity_schema.json
File ./bq_umidity_schema.json contains table vineyard.humidity schema, here’s the content:
[
{
"description": "timestamp",
"mode": "REQUIRED",
"name": "time",
"type": "STRING"
},
{
"description": "streaming device",
"mode": "REQUIRED",
"name": "origin",
"type": "STRING"
},
{
"description": "raw value",
"mode": "NULLABLE",
"name": "value",
"type": "FLOAT"
}
]
Briefly check what has been created:
bq show vineyard
bq show vineyard.humidity
Now it’s the message broker turn, our data will flow here to be read/consumed from different potential clients. We’ll create a new Pub/Sub topic:
gcloud pubsub topics create vineyard-humidity
Now we need to create a Dataflow job that reads from Pub/Sub topic and writes to Bigquery table.
Google distributes a set of existing templates for GCP components integration, one of them is specific for JSON message streaming from Pub/Sub to Bigquery:
At this point, we need to enable Dataflow APIs. To spot the related API service name we can trivially use available services list:
gcloud services list --available | grep -i dataflow
dataflow.googleapis.com Dataflow API
Enable Dataflow APIs with:
gcloud services enable dataflow.googleapis.come
Just in case we need to activate a billing account for the current project, we can find available accounts using:
gcloud alpha billing accounts list
And we can link the billing account to our project with:
gcloud alpha billing accounts projects link iot-vineyard --billing-account=<billing_account_ID>
Before a job can be created, we need to give proper access rights on Bigquery dataset to Dataflow, we can achieve that changing dataset configuration, obtained typing:
bq show --format=prettyjson iot-vineyard:vineyard > /tmp/dataset.json
Dataflow service account can be obtained with:
gcloud projects get-iam-policy iot-vineyard
Edit json file adding a new access entry having WRITER role and userByEmail valued with Dataflow service account (find it using IAM dashboard) email:
{
"access": [
{
"role": "WRITER",
"specialGroup": "projectWriters"
},
{
"role": "WRITER",
"userByEmail": "service-<some_number>@dataflow-service-producer-prod.iam.gserviceaccount.com"
},
{
"role": "OWNER",
"specialGroup": "projectOwners"
},
{
"role": "OWNER",
"userByEmail": "gordonfreeman81@gmail.com"
},
{
"role": "READER",
"specialGroup": "projectReaders"
}
],
"creationTime": "1595583329241",
"datasetReference": {
...
}
Update dataset definition:
bq update --source /tmp/dataset.json iot-vineyard:vineyard
Now we can create the jobs starting from Google template.
Templates need to exist on Cloud Storage to be used in Dataflow, in this case Google offers a pre-populated public bucket at gs://dataflow-templates/latest/PubSub_to_BigQuery:
gcloud dataflow jobs run topic2BQ --region=europe-west1 --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery --parameters inputTopic=projects/iot-vineyard/topics/vineyard-humidity,outputTableSpec=iot-vineyard:vineyard.humidity
Now everything is ready to receive our data, and we’ll preoceed to enable a device. Let’s enable IoT Core APIs:
gcloud services enable cloudiot.googleapis.com
Create a new IoT registry and define the topic where data streaming will flow through from allowed devices:
gcloud iot registries create vineyard-registry --region=europe-west1 --no-enable-http-config --event-notification-config=topic=projects/iot-vineyard/topics/vineyard-humidity
To add a new device to the registry, we need to generate a key pair to authenticate the new device:
mkdir .ssh
openssl ecparam -genkey -name prime256v1 -noout -out .ssh/ec_private.pem
openssl ec -in .ssh/ec_private.pem -pubout -out .ssh/ec_public.pem
wget -O .ssh/roots.pem https://pki.goog/roots.pem
And we can create a new device:
gcloud iot devices create vineyard-hygrometer --region=europe-west1 --registry=vineyard-registry --public-key path=.ssh/ec_public.pem,type=es256-pem
Not having a real device, we are going to test our environment using a script simulating data transmission to the registry. Download the Python script, and install the neeeded prerequsites.
sudo apt-get -y install build-essential libssl-dev python-dev libffi-dev git wget openssl python2
curl https://bootstrap.pypa.io/get-pip.py --output get-pip.py
sudo python2 get-pip.py
sudo pip install -r requirements.txt
wget https://raw.githubusercontent.com/googlecodelabs/iotcore-heartrate/master/heartrateSimulator.py
As its name reveals, the script is originally wrote to simulate a heartrate sensor, but it simply streams every valid JSON it can read from sample file.
Now we need to prepare a file contaning the data we are about to send:
cat <<EOF >>SampleData.json
{"time": "2020-07-27T08:37:38Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.88}
{"time": "2020-07-27T08:37:39Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.78}
{"time": "2020-07-27T08:37:40Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.87}
{"time": "2020-07-27T08:37:41Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.68}
{"time": "2020-07-27T08:37:42Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.78}
{"time": "2020-07-27T08:37:43Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.77}
{"time": "2020-07-27T08:37:44Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.86}
{"time": "2020-07-27T08:37:45Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.76}
{"time": "2020-07-27T08:37:46Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.67}
EOF
Finally, let’s run the script to start writing data from SampleData.json to the registry.
python heartrateSimulator.py --project_id=iot-vineyard --registry_id=vineyard-registry --device_id=vineyard-hygrometer --private_key_file=.ssh/ec_private.pem --ca_certs=.ssh/roots.pem --cloud_region=europe-west1 --json_data_file=SampleData.json
Conclusions
At this point, we can check if our data flow is actually reaching the table on Bigquery running a query on it from bq CLI tool. As an alternative we can run the same query from Bigquery dashboard on the GCP console.
bq query \
'SELECT *
FROM
`vineyard.humidity`'
Collected data is now available to support further analysis using proper additional tools, e.g., we can use Google Data Studio to render a diagram or set up a report.