LF Edge eKuiper connects external systems with various communication protocols such as MQTT, HTTP, etc. via source and sink. eKuiper supports codecs for configuring source/sink transport data, currently supporting JSON, ProtoBuf and Binary formats. Protocol Buffers (Protobuf) is a language-neutral, platform-neutral and extensible mechanism for serializing structured data in a binary transfer format. Protobuf is widely used because it is more efficient and faster than conventional data transfer formats such as JSON or XML, and saves transfer bandwidth.
This tutorial takes Protobuf format as an example to explain how to set up the codec format in eKuiper, read and parse the data in this format by source, and write using this format to encode data in sink. This tutorial uses eKuiper Manager for rule creation and management, please refer to UI Tutorial. You can also use the REST API or run the command line tool on the same machine of the eKuiper instance to perform the same rule management operation.
Before starting the hands-on operation, the following environment needs to be prepared.
tcp://broker.emqx.io:1883
. It is a public MQTT server provided by EMQ. If you are running eKuiper locally, you need to change etc/mqtt_source.yaml
and change the configuration server to "tcp://broker.emqx.io:1883"; if you are using docker to start, you should set the environment variable MQTT_SOURCEDEFAULTSERVER=" tcp://broker.emqx.io:1883".Compared with the schema-free JSON format, Protobuf needs to define the data structure, i.e. the schema, in advance. In the proto file, it can contain multiple messages and definitions of other entities, but only the definition of message can be used in the configuration of the codec format. In this tutorial, we use the following schema for the definition of data structures. The file defines a message structure named Book, which contains title of string type and price of integer type, and the transmitted data will be coded and decoded as binary data for the book data based on this structure.
message Book {
required string title = 1;
required int32 price = 2;
}
protobuf
; the schema name can be entered as a custom unique name to identify the schema id in the subsequent rule creation; the schema content can be filled in as a file or text content. If you choose file, you need to fill in the url of the file; the schema used in this tutorial is simple, so you can choose content and fill in the text of the proto file in the content box.
At this point, we have registered a schema named schema1
, which defines the type Book
, and the registered schema can be used in the source and sink of the rule. Users can also continue to register and manage more schemas in this page.
In this section, we will use the MQTT source as an example to describe how to access and parse data transmitted based on Protobuf format so that it can be computed by eKuiper rules. Note that in Source, the encoding format is not bound to the transport protocol. Any source type such as MQTT, httpPull, etc. can be used with different encoding formats, such as ProtoBuf and JSON.
Suppose we have an MQTT topic demo
with Protobuf-encoded binary data for the purpose of saving transmission bandwidth. Next, we will configure the eKuiper data source to access the data from this topic and process it.
protobuf
; the schema name is selected as schema1
registered in the previous step; the schema message is set to the message Book
defined in the proto file. This configuration means that the stream protoDemo
will listen to the MQTT topic protoDemo
and when it receives the binary data it will be decoded in protobuf using the format of Book
in schema1
. Clicking submit should list the newly created stream in the stream list.
protoDemo
, sent to the MQTT topic result/protobuf
. {
"id": "ruleDecode",
"sql": "SELECT * FROM protoDemo",
"actions": [{
"mqtt": {
"server": "tcp://broker.emqx.io:1883",
"topic": "result/protobuf",
"sendSingle": true
}
}]
}
protoDemo
topic and observe if the result received is the correct data after decoding.
tcp://broker.emqx.io:1883
.result/protobuf
which is the topic defined in the rule action.protoDemo
and the Payload format to Hex
, sending binary data encoded according to the Book format in schema1, e.g. 0a1073747265616d696e672073797374656d107b
.
At this point, we have finished reading and decoding Protobuf data and processing the output with simple rules. The user can create a variety of rules just like with normal JSON format data. If you do not get the expected result, you can check the rule status in the rule list page of the management console to ensure that the rule data is coming in and out with the expected metrics.
In this section, we show the usage of reading JSON formatted data, processing it and sending it to the cloud MQTT broker using Protobuf format. In the IoT edge-cloud collaboration scenario, this usage saves the bandwidth overhead of edge-cloud transfers. The eKuiper deployed at the edge accesses the local MQTT broker without consuming bandwidth and can be accessed through the faster processing JSON format. After the rules are calculated, the results need to be sent to the cloud MQTT broker and can be encoded using Protobuf to save bandwidth.
SELECT * FROM demo
.result/protobufOut
; the data is sent by item is configured as true to ensure that the data received is a single item in matching format; the stream format is configured as protobuf
, the schema name is schema1
registered in the first section, and the schema message is Book
. The rule will read the JSON data and then encode it into binary data in the Book format and send it to the result/protobufOut
topic. Click Submit to complete the action configuration.
result/protobufOut
topic. As you can see in the figure below, pay attention to the configuration of the data format to avoid displaying garbled code.
This tutorial describes how to read and write Protobuf data in eKuiper. ProtoBuf format is one of the formats that eKuiper connects to external systems and any combination of formats can be used as the internal format representation is used after integrating into the system. Firstly, the user needs to define the schema of Protobuf; after that, the Protobuf format can be configured in the stream creation and action creation, and the defined schema can be selected to encode and decode the data.