Skip to main content
Version: latest

MQTT Source Connector

The MQTT connector Reads record from MQTT topic and writes to Fluvio topic. The connector supports MQTT V3.1.1 and V5. protocols.

Checkout the tutorial [MQTT to SQL Pipeline] for a running example.

Configuration

Optiondefaulttypedescription
timeout60sDurationmqtt broker connect timeout in seconds and nanoseconds
url-SecretStringMQTT url which includes schema, domain, port and credentials such as username and password.
topic-Stringmqtt topic to subscribe and source events from
client_idUUID V4Stringmqtt client ID. Using same client id in different connectors may close connection
payload_output_typebinaryStringcontrols how the output of payload field is produced

url option with type SecretString can be set as raw string value:

url: "mqtt://test.mosquitto.org/"

or, as a reference to a secret with the given name:

url:
secret:
name: "URL_SECRET_NAME"

Record Type Output

JSON Serialized string with fields mqtt_topic and payload

Payload Output Type

ValueOutput
binaryArray of bytes
jsonUTF-8 JSON Serialized String

Usage Example

info

All versions are marked with x.y.z. To find the latest version, run:

  • fluvio hub connector list
  • fluvio hub smartmodule list

This is an example of connector config file:

# sample-config.yaml
apiVersion: 0.1.0
meta:
version: x.y.z
name: my-mqtt-connector
type: mqtt-source
topic: mqtt-topic
mqtt:
url: "mqtt://test.mosquitto.org/"
topic: "mqtt-to-fluvio"
timeout:
secs: 30
nanos: 0
payload_output_type: json

Run connector locally using cdk tool (from root directory or any sub-directory):

cdk hub download infinyon/mqtt-source@x.y.z
cdk deploy start --config sample-config.yaml --ipkg infinyon/mqtt-source@x.y.z

cdk deploy list # to see the status
cdk deploy log my-mqtt-connector # to see connector's logs

Install MQTT Client such as

# for mac , this takes while....
brew install mosquitto

Insert records:

mosquitto_pub -h test.mosquitto.org -t mqtt-to-fluvio -m '{"device": {"device_id":1, "name":"device1"}}'

The produced record in Fluvio topic will be:

{
"mqtt_topic": "mqtt-to-fluvio",
"payload": {
"device": {
"device_id": 1,
"name": "device1"
}
}
}

Transformations

Fluvio MQTT Source Connector supports Transformations. Records can be modified before sending to Fluvio topic.

The previous example can be extended to add extra transformations to outgoing records:

# sample-config.yaml
apiVersion: 0.1.0
meta:
version: x.y.z
name: my-mqtt-connector
type: mqtt-source
topic: mqtt-topic

mqtt:
url: "mqtt://test.mosquitto.org/"
topic: "mqtt-to-fluvio"
timeout:
secs: 30
nanos: 0
payload_output_type: json
transforms:
- uses: infinyon/jolt@x.y.z
with:
spec:
- operation: shift
spec:
payload:
device: "device"
- operation: default
spec:
source: "mqtt-connector"

The object device in the resulting record will be "unwrapped" and the addition field source with value mqtt-connector will be added.

Read more about [JSON to JSON transformations].