Beliebte Suchanfragen
//

Processing protobufs messages with AWS IoT Core

2.7.2020 | 15 minutes of reading time

Introduction

The Internet of Things (IoT) is gradually changing an ever increasing number of aspects of modern day life. From connected vehicles to sensors monitoring all sorts of metrics in our homes: chips can be put to use almost everywhere. They are also getting cheaper and more powerful everyday. Moreover, these chips are eager to transmit the data they have collected. As easy and fun as it is to toy with chips and metrics, as cumbersome it can be to connect them to services with more compute power and storage capacity. Furthermore, if you have a large number of devices with each sending messages every second or more frequently, you sooner or later run into performance issues.
How then, can we simply connect devices and collect data in a flexible, scalable way while simultaneously having the collected data readily available for processing within a cloud infrastructure? This is where the AWS IoT Core comes in.

In this blog post, I demonstrate how to set up AWS IoT Core, how to connect a test client via MQTT, and send messages to the endpoint. In addition, in order to fully take advantage of the MQTT protocol we will introduce protocol buffers and show how to serialize data, send, and eventually process it within our AWS infrastructure.

Prerequisites

This blog post is intended for developers and solution architects familiar with cloud infrastructure. The post assumes that you have AWS knowledge and an AWS account, as well as some basic understanding of the Go programming language (Golang).

The Plan

This is what our finished project will look like from a higher level of abstraction:

AWS IoT Core Architecture

A Golang test client connects to AWS IoT Core and sends protocol buffers messages to the MQTT endpoint. An IoT Rule listens to a specific MQTT topic and routes the messages to a Kinesis Data Stream. Kinesis in turn invokes a Lambda function, which deserializes the protocol buffers data and is thus able to process it further to store it into DynamoDB.

I will briefly describe each part of the architecture before we dive into the hands-on part.

MQTT

MQTT is a lightweight publish-subscribe network protocol designed for small sensors and mobile devices. It is optimized for high-latency or unreliable networks. MQTT significantly outperforms HTTP, if we manage to increase information density of each payload message for a stream of data over a longer lasting connection, i.e. transferring more data while simultaneously reducing the payload size. One way to achieve this, for instance, is by using protocol buffers. We will learn more about protocol buffers in the next chapter. If you want to go into more depth about HTTP vs. MQTT, I can recommend this article by Google.

One of the most popular MQTT libraries is Eclipse Paho with well documented and maintained source code repositories and a broad availability for several programming languages. This post will show how to use the library to connect to AWS IoT Core with Golang, since we will also be using Golang later on for processing the IoT messages in our Lambda function (for short cold start time, small package size, etc.).

Protocol Buffers

Protocol buffers (or protobufs for short) are a language- and platform-neutral, extensible way of serializing structured data. They are available for quite a few programming languages and if you are not yet familiar with them, you can get a quick overview here . They satisfy our demand for sending more data while decreasing payload size and thus are an ideal fit for our endeavour. As it happens the combination of MQTT and protobufs has become quite popular in the world of IoT. The basic idea is to create a protobufs configuration file (.proto) that defines which information of which type will be contained in the message. With the .proto file, we can use the protobufs compiler to generate data structures for usage in our respective programming language. These data structures can then effortlessly be serialized (and deserialized).
This is the .proto file we will use in our example:

syntax = "proto3";
package iot;

import "google/protobuf/timestamp.proto";

option go_package = "proto;pb";

message Device {
    string name = 1;
    int64 id = 2;
    double lat = 3;
    double long = 4;

    enum DeviceType {
        MOBILE = 0;
        DESKTOP = 1;
        IOT = 2;
    }

    google.protobuf.Timestamp last_updated = 5;
}

We will not go into further detail about the .proto format. But it should be quite readable and easy to reason what the file is supposed to do or store. One additional benefit of protobufs is that once we created the .proto file, we can generate data structures for every supported language and get safe code for de- and serialization (also to / from JSON) for free.

IoT Core

AWS IoT Core is a managed cloud service that connects IoT devices to your AWS infrastructure. It acts as a MQTT message gateway that can send messages to and receive messages from devices respectively. With AWS IoT Core, you can manage and monitor devices or entire fleets. Connected devices can then easily and securely interact with cloud applications and even communicate with each other. Furthermore, you can test connected devices with the offered web-based MQTT client with which you can send and receive messages yourself.

Kinesis

Protobufs are serialized to raw bytes (i.e. binary data). That’s what makes them so space-efficient. This comes with the cost of de- and serialization of each message in addition to being able to send and receive raw bytes. Unfortunately it is currently not possible to directly pass binary payload data between IoT Core and a Lambda function. However, with Kinesis Data Streams we can circumvent this limitation. Kinesis Data Streams is a managed data streaming service, which enables us to to send and process binary data and acts as a buffer between incoming protobufs messages and the Lambda function. Moreover, we can influence parallel processing of the function via Kinesis Shards (but this will not be covered here).

Lambda

Lambda functions are the “glue” between your “serverless” infrastructure parts. They let you run code without provisioning or managing servers and you only pay for the compute time you consume. Often Lambda functions run in response to events and automatically manage the resources required (such as parallel execution). The function introduced in this blog post, too, will respond to an event, as we will see later on. All we have to do in order to deploy a Lambda function is to satisfy a specific interface in the source code, compile and upload it.

DynamoDB

DynamoDB is a fully managed key-value NoSQL database. The setup of a table is pretty simple, you just need to provide a hash key (i.e. primary key) and optionally provide a sort key. We will use it to store the processed protobufs message data.

Setup infrastructure

Note that the generated protobufs code is already contained in the provided repositories. However, for reference, here is the process to install the compiler.

Install Protobufs Compiler

To generate code from our .proto file we need to install the protobufs compiler first. You can find it here and a detailed how-to can be found here .
The easiest way to install them is to download the binary for your OS.

Compile Lambda Code

In the upcoming step, we will deploy the infrastructure stack. The stack requires the Lambda function code to be ready for uploading. Hence, we have to compile and zip the source code to have the binary available. Execute the following steps to build and zip the Lambda code:

  1. Download and install a binary release of the go programming language
  2. Clone this repository
  3. Execute either the build.sh or
    GOOS=linux go build cmd/iot-lambda/main.go && zip function.zip main
    

Afterwards, we take note of the path where we checked out the code to provide it to the infrastructure setup in the next step.

Create infrastructure

The following steps will set up the necessary infrastructure via AWS Cloud Development Kit (CDK) – except the IoT Core. We will configure IoT Core manually since it is not yet as well supported in the CDK as other services. Besides, we need to download certificates manually anyway and the setup is pretty easy. To use the CDK configuration you need either provide your AWS access key during execution or have config and credentials files ready in place (e.g. on MacOs in ~/.aws folder; see the AWS documentation for further reference). Furthermore, we require Node and npm i.e. npx. Alternatively, you can also install CDK via npm and execute the commands noted in the readme of the infrastructure project.

First, clone this repository . If you checked out the Lambda function repository in the same parent directory as this infrastructure repository, you don’t need to take further action. Otherwise, you have to provide the path to the binary zip to the iot-protobuf-cdk-setup-stack.ts file:

code: Code.fromAsset(path.join(__dirname, '../../iot-lambda/function.zip')),

Afterwards execute the following:

  1. npm install – to download dependencies
  2. npm run build – to compile typescript to javascript
  3. npx cdk synth – to “synthesize” a CloudFormation template from the Typescript configuration. You can review what will be created in your account
  4. npx cdk deploy – to create the resources and deploy the stack

If you have not used CloudFormation or CDK before, it is likely that CDK asks you to deploy the CloudFormation toolkit stack. It is necessary to deploy CDK stacks i.e. CloudFormation templates. It also conveniently prints the command to do so. After having finished testing and toying with the stack, don’t forget to execute npx cdk destroy to tear down the stack and avoid incurring any costs.

Setup IoT Core

Of course, you need a “Thing” to use AWS IoT Core. A Thing can be any device or client that wants to send or receive messages. We will create our first Thing in the next step. But first we need to create a policy which we can attach during creation. A policy can also be attached later on, but it makes the creation process easier when you have it available from the start.

Create policy

Almost everything in AWS requires a policy to allow actions. So does our Thing. Hence we do the following in the AWS console:

  1. Go to “Services”, type “IoT” and select “IoT Core”
  2. Expand the “Secure” menu and click on “Policies”
  3. Press the “Create” button in the top right corner
AWS console – Select IoT
IoT console – Select policies
IoT console – Advanced mode policy creation

We want to be able to connect and send messages. We can either build the policy with the wizard or go to “advanced mode” and copy and paste the following (make sure to replace yourAWSAccountID with your actual account id):

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "iot:Publish",
        "iot:Subscribe",
        "iot:Connect",
        "iot:Receive"
      ],
      "Resource": "arn:aws:iot:eu-central-1:<yourAWSAccountID>:topic/deviceLocation"
    },
    {
      "Effect": "Allow",
     "Action": [
        "iot:Publish",
        "iot:Subscribe",
        "iot:Connect",
        "iot:Receive"
      ],
      "Resource": "arn:aws:iot:eu-central-1:<yourAWSAccountID>:client/myClientId"
    }
  ]
}

The first statement is for our topic. We allow connection, publishing, subscribing and receiving. The second statement is for our device. We allow the same actions here.
You can choose an arbitrary value for your topic (in this case “deviceLocation”) and clientID (in this case “myClientId”).

Create Thing and certificates

Now it is time to create our Thing. While creating it, we should also make things more secure by creating certificates and attaching the policy we already created. To do so, execute the following steps while at the IoT Core console:

  1. Click on “Manage” then “Things”.
  2. Select “Create” and afterwards on “Single Thing”. Enter a name for your Thing. No need to change other values here.
  3. Press “Next” and “Create Certificate”.
  4. Download all certificates, as well as the “root CA for AWS IoT”.
  5. Click on “Activate” to activate the certificate.
  6. Press “Attach Policy” and select the policy we created in the preliminary step.
  7. Click on “Register Thing”.
Create new thing
Create a single thing
Edit thing
Download certificates
Attach policy

Get the endpoint

To bring our test client and IoT Core together, we have to know the endpoint we will be connecting to. To get the endpoint URL go to “Test”, click on the arrow beneath the console identifier and select “View endpoint”. Copy the value for use in the .env file in the test client (see next chapter). On the test page, we can later also subscribe to our topic and can see if our test client is able to connect and send as intended.

Setup test client

Clone this repository to get the client code. Note that the generated protobufs data structure is already contained in the repository. You don’t need to execute the protoc command again. For reference, here is the process to generate code via the compiler. Execute the following command:

protoc --proto_path=/Users/exampleUser/iot-mqtt-client/ --go_out=/Users/exampleUser/iot-mqtt-client/ /Users/exampleUser/iot-mqtt-client/proto/device.proto

Where:

  • –proto_path – defines the directory in which to search for imports
  • –go_out – generates a golang file into the specified directory

Afterwards, we are able to serialize our message with these lines of Go code:

device := &iot.Device{
        Name:        "Test Device",
        Id:          4711,
        Position:    123123123.00001231,
        LastUpdated: ptypes.TimestampNow(),
    }
protoDevice, err := proto.Marshal(device)

We can deserialize it analogously with:

deserializedDevice := &iot.Device{}
err := proto.Unmarshal(protoDevice, deserializedDevice)

Note that in a production environment, we probably would like our core data structures not to be dependent on protobufs. This is usually solved with two separate data types, one for our core logic and the one generated by protobufs. Subsequently we introduce a thin mapping layer between the two within the protobufs dependent code. But let’s keep it simple and continue the example with just the generated data structure.

Configure environment variables

The example code comes with an .env file, where we need to enter paths for the AWS certificate we generated in the IoT Core Setup and other necessary values for the code to work.

  • IOT_ENDPOINT – enter the device endpoint supplied by AWS IoT (see “Get your endpoint”). Looks something like this 98765.iot.eu-central-1.amazonaws.com
  • CA_CERT_PATH – the absolute path of the downloaded AWS Root certificate e.g. AmazonRootCA1.pem
  • DEVICE_CERT_PATH – absolute path for the device certificate e.g. 12345-certificate.pem.crt
  • DEVICE_KEY_PATH – the private key for the certificate e.g. 12345-private.pem.key
  • TOPIC – the topic you want to send the protobufs messages to and where the IoT Core rule should listen to (see above) (use the one we configured in the policy, see “Create policy”)
  • CLIENT_ID – the client identifier configured in the policy

Finally, execute the following command and run the resulting main binary subsequently:

go build cmd/iot-mqtt-client/main.go

Connecting

After we have configured the client with the .env file, we should be ready to send the first messages. Currently the test client sends a message with random lat and long values (floating point numbers) to the endpoint every ten seconds. We go back to the “Test” page in IoT Core console and subscribe to the topic we have configured. There we can check if we have successfully established a connection. After we have subscribed, we start our go test client program and should see messages coming in. At present, the IoT Core service does not natively support protobufs (ie. binary data) yet, but the console should display some hexadecimal gibberish nevertheless. Now, it is time to bring IoT Core and the stack we created via CDK together by creating a IoT Rule.

A rule to process messages

We connected the test client and sent the first messages. To process the protobufs messages we need to hook up Kinesis with IoT Core. Entry IoT Rules. Rules enable our IoT things (or in this case our test client) to interact with AWS services. Each rule allows us to specify a SQL-like query on the MQTT topic stream. Based on this query result, we can execute actions. Actions execute when an inbound message matches a rule query. For example, we could route messages of mobile devices to one Lambda function and messages of desktop clients to another function. Our action will route incoming messages to the Kinesis Data Stream, which invokes our Lambda function to process the messages. Let’s create this rule in the IoT Core console:

  1. Click on “Act” then “Rules”
  2. Select “Create” in the top right corner
  3. Fill out name and query statement SELECT * from ‘deviceLocation’
  4. Click “Add Action” and select “Send a message to an Amazon Kinesis Stream”
  5. Configure Action
    1. Select the Kinesis Stream created with the CDK
    2. Use “${newuuid()}” as partition key (see explanation below)
    3. Create a new role to allow IoT Core access to Kinesis Stream
Create a rule
An action for the rule
Configure action

The selection on specific fields within a query is limited to JSON, hence we query all our protobufs messages with a “SELECT *”. The partition key is used to group data by shard in a stream. We just use the built in generator function “${newuuid()}” to create a random id, which is sufficient for our example.

Bring it together

After rule creation we start our test client again. When the client has sent a couple of messages (see console output), we check our DynamoDB table. If everything went according to plan, the table should contain some device data.

Conclusion

The AWS IoT Core is the “API Gateway” for your IoT devices. The limitation to JSON messages at the IoT Core is a bit cumbersome, yet Kinesis enables us to use protobufs nevertheless. With a large number of devices and an increasing message count per minute, it is a good idea to add a buffer between receiving and processing of messages anyway. The buffer ensures that we loose no information if an error occurs during processing.

We have seen how to set up a “serverless” scalable infrastructure for IoT devices, with certificates, a sample golang client which sends protobufs via the MQTT protocol, as well as how to set up IoT Core and connect it to Kinesis via Rules. We can scale this example to test all sorts of functionalities up to load testing our architecture. After we established the end-to-end connection from the device to the AWS Lambda function, we can build whatever we want within our cloud.

share post

//

More articles in this subject area

Discover exciting further topics and let the codecentric world inspire you.

//

Gemeinsam bessere Projekte umsetzen.

Wir helfen deinem Unternehmen.

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.