The goal of this short article is to introduce some Iot and Bigdata GCP components and illustrate a basic but effective data pipeline configuration.

We’ll took a common IoT platforms usage as an example: environmental monitoring.

We can imagine an enologist friend of us, complaining about his job drawbacks. He need to move continuosly from a vineyard to another to monitor air temperature, ground humidity and so on, and those activites take a lot of time.

In addition, his customers are several and far apart, so data collection activity is taking a lot or time more than data analysis itself.

Our idea is to propose to our friend a system where several sensors are distributed over the vineyards, and collected data is organized in a way it can be analyzed remotely.

Data flow

+--------+      +--------------+     +---------------+      +-------------------+     +------------------+
| Device +------> IoT registry +-----> Pub/Sub topic +------> Dataflow pipeline +-----> Bigquery dataset |
+--------+      +--------------+     +---------------+      +-------------------+     +------------------+

Configuration

Create a new project (to do so the command needs to be ran from an already authenticated configuration):

gcloud projects create iot-vineyard

Let’s create a dedicated configuration to manage the new project:

gcloud config configurations create iot-vineyard

Project and account should be set up for the new configuration:

gcloud config set project iot-vineyard

gcloud config set account <my_account@gmail.com>

Let’s start from our data destination: we’ll generate a new dataset and an empty table for Bigquery. We are going to do so using Bigquery DDL syntax.

bq mk --dataset  --description "All we need to analyze to make a good wine" iot-vineyard:vineyard

bq mk --location=europe-west1 --table iot-vineyard:vineyard.humidity ./bq_umidity_schema.json

File ./bq_umidity_schema.json contains table vineyard.humidity schema, here’s the content:

[
  {
    "description": "timestamp",
    "mode": "REQUIRED",
    "name": "time",
    "type": "STRING"
  },
  {
    "description": "streaming device",
    "mode": "REQUIRED",
    "name": "origin",
    "type": "STRING"
  },
  {
    "description": "raw value",
    "mode": "NULLABLE",
    "name": "value",
    "type": "FLOAT"
  }
]

Briefly check what has been created:

bq show vineyard

bq show vineyard.humidity

Now it’s the message broker turn, our data will flow here to be read/consumed from different potential clients. We’ll create a new Pub/Sub topic:

gcloud pubsub topics create vineyard-humidity

Now we need to create a Dataflow job that reads from Pub/Sub topic and writes to Bigquery table.

Google distributes a set of existing templates for GCP components integration, one of them is specific for JSON message streaming from Pub/Sub to Bigquery:

https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubSubToBigQuery.java

At this point, we need to enable Dataflow APIs. To spot the related API service name we can trivially use available services list:

gcloud services list --available | grep -i dataflow
dataflow.googleapis.com                               Dataflow API

Enable Dataflow APIs with:

gcloud services enable dataflow.googleapis.come

Just in case we need to activate a billing account for the current project, we can find available accounts using:

gcloud alpha billing accounts list

And we can link the billing account to our project with:

gcloud alpha billing accounts projects link iot-vineyard --billing-account=<billing_account_ID> 

Before a job can be created, we need to give proper access rights on Bigquery dataset to Dataflow, we can achieve that changing dataset configuration, obtained typing:

bq show --format=prettyjson iot-vineyard:vineyard > /tmp/dataset.json

Dataflow service account can be obtained with:

 gcloud projects get-iam-policy iot-vineyard

Edit json file adding a new access entry having WRITER role and userByEmail valued with Dataflow service account (find it using IAM dashboard) email:

{
  "access": [
    {
      "role": "WRITER",
      "specialGroup": "projectWriters"
    },
    {
      "role": "WRITER",
      "userByEmail": "service-<some_number>@dataflow-service-producer-prod.iam.gserviceaccount.com"
    },
    {
      "role": "OWNER",
      "specialGroup": "projectOwners"
    },
    {
      "role": "OWNER",
      "userByEmail": "gordonfreeman81@gmail.com"
    },
    {
      "role": "READER",
      "specialGroup": "projectReaders"
    }
  ],
  "creationTime": "1595583329241",
  "datasetReference": {
    ...
}

Update dataset definition:

bq update --source /tmp/dataset.json iot-vineyard:vineyard

Now we can create the jobs starting from Google template.

Templates need to exist on Cloud Storage to be used in Dataflow, in this case Google offers a pre-populated public bucket at gs://dataflow-templates/latest/PubSub_to_BigQuery:

gcloud dataflow jobs run topic2BQ --region=europe-west1 --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery --parameters inputTopic=projects/iot-vineyard/topics/vineyard-humidity,outputTableSpec=iot-vineyard:vineyard.humidity

Now everything is ready to receive our data, and we’ll preoceed to enable a device. Let’s enable IoT Core APIs:

gcloud services enable cloudiot.googleapis.com

Create a new IoT registry and define the topic where data streaming will flow through from allowed devices:

 gcloud iot registries create vineyard-registry --region=europe-west1 --no-enable-http-config --event-notification-config=topic=projects/iot-vineyard/topics/vineyard-humidity

To add a new device to the registry, we need to generate a key pair to authenticate the new device:

mkdir .ssh
openssl ecparam -genkey -name prime256v1 -noout -out .ssh/ec_private.pem
openssl ec -in .ssh/ec_private.pem -pubout -out .ssh/ec_public.pem
wget -O .ssh/roots.pem https://pki.goog/roots.pem

And we can create a new device:

gcloud iot devices create vineyard-hygrometer --region=europe-west1 --registry=vineyard-registry --public-key path=.ssh/ec_public.pem,type=es256-pem

Not having a real device, we are going to test our environment using a script simulating data transmission to the registry. Download the Python script, and install the neeeded prerequsites.

sudo apt-get -y install build-essential libssl-dev python-dev libffi-dev git wget openssl python2
curl https://bootstrap.pypa.io/get-pip.py --output get-pip.py
sudo python2 get-pip.py
sudo pip install -r requirements.txt
wget https://raw.githubusercontent.com/googlecodelabs/iotcore-heartrate/master/heartrateSimulator.py

As its name reveals, the script is originally wrote to simulate a heartrate sensor, but it simply streams every valid JSON it can read from sample file.

Now we need to prepare a file contaning the data we are about to send:

cat <<EOF >>SampleData.json
{"time": "2020-07-27T08:37:38Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.88}
{"time": "2020-07-27T08:37:39Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.78}
{"time": "2020-07-27T08:37:40Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.87}
{"time": "2020-07-27T08:37:41Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.68}
{"time": "2020-07-27T08:37:42Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.78}
{"time": "2020-07-27T08:37:43Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.77}
{"time": "2020-07-27T08:37:44Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.86}
{"time": "2020-07-27T08:37:45Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.76}
{"time": "2020-07-27T08:37:46Z", "origin": "6c0a4f94-de7f-44e6-aa14-4c5078fc20a1", "value": 0.67}
EOF

Finally, let’s run the script to start writing data from SampleData.json to the registry.

python heartrateSimulator.py --project_id=iot-vineyard --registry_id=vineyard-registry --device_id=vineyard-hygrometer --private_key_file=.ssh/ec_private.pem --ca_certs=.ssh/roots.pem --cloud_region=europe-west1 --json_data_file=SampleData.json

Conclusions

At this point, we can check if our data flow is actually reaching the table on Bigquery running a query on it from bq CLI tool. As an alternative we can run the same query from Bigquery dashboard on the GCP console.

bq query \
'SELECT *
 FROM
   `vineyard.humidity`'

Collected data is now available to support further analysis using proper additional tools, e.g., we can use Google Data Studio to render a diagram or set up a report.