Subscribe Subscribe

yaml
type: "io.kestra.plugin.mqtt.Subscribe"

Subscribe message in a MQTT topic

Examples

yaml
id: mqtt_subscribe
namespace: company.team

tasks:
  - id: subscribe
    type: io.kestra.plugin.mqtt.Subscribe
    server: tcp://localhost:1883
    clientId: kestraProducer
    topic:
      - kestra/sensors/cpu
      - kestra/sensors/mem
    serdeType: JSON
    maxRecords: 10
yaml
id: mqtt_subscribe
namespace: company.team

tasks:
  - id: subscribe
    type: io.kestra.plugin.mqtt.Subscribe
    server: ssl://localhost:8883
    clientId: kestraProducer
    topic:
      - kestra/sensors/cpu
      - kestra/sensors/mem
    crt: /home/path/to/ca.crt
    serdeType: JSON
    maxRecords: 10

Properties

clientId

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

A client identifier that is unique on the server being connected to

A client identifier clientId must be specified and be less that 65535 characters. It must be unique across all clients connecting to the same server. The clientId is used by the server to store data related to the client, hence it is important that the clientId remain the same when connecting to a server if durable subscriptions or reliable messaging are required. As the client identifier is used by the server to identify a client when it reconnects, the client must use the same identifier between connections if durable subscriptions or reliable delivery of messages is required.

qos

  • Type: integer
  • Dynamic:
  • Required: ✔️
  • Default: 1

Sets the quality of service for this message.

  • Quality of Service 0: indicates that a message should be delivered at most once (zero or one times). The message will not be persisted to disk, and will not be acknowledged across the network. This QoS is the fastest, but should only be used for messages which are not valuable - note that if the server cannot process the message (for example, there is an authorization problem). Also known as "fire and forget".
  • Quality of Service 1: indicates that a message should be delivered at least once (one or more times). The message can only be delivered safely if it can be persisted, so the application must supply a means of persistence using MqttConnectOptions. If a persistence mechanism is not specified, the message will not be delivered in the event of a client failure. The message will be acknowledged across the network.
  • Quality of Service 2: indicates that a message should be delivered once. The message will be persisted to disk, and will be subject to a two-phase acknowledgement across the network. The message can only be delivered safely if it can be persisted, so the application must supply a means of persistence using MqttConnectOptions. If a persistence mechanism is not specified, the message will not be delivered in the event of a client failure. If persistence is not configured, QoS 1 and 2 messages will still be delivered in the event of a network or server problem as the client will hold state in memory. If the MQTT client is shutdown or fails and persistence is not configured then delivery of QoS 1 and 2 messages can not be maintained as client-side state will be lost.

serdeType

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: JSON
  • Possible Values:
    • STRING
    • JSON
    • BYTES

Serializer / Deserializer used for the payload

server

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

The address of the server to connect to, specified as a URI

The serverURI parameter is typically used with the the clientId parameter to form a key. The key is used to store and reference messages while they are being delivered. The address of the server to connect to is specified as a URI. Two types of connection are supported tcp:// for a TCP connection and ssl:// for a TCP connection secured by SSL/TLS. For example:

  • tcp://localhost:1883
  • ssl://localhost:8883 If the port is not specified, it will default to 1883 for tcp://" URIs, and 8883 for ssl:// URIs.

topic

  • Type: object
  • Dynamic: ✔️
  • Required: ✔️

Topic where to consume message

Can be a string or a List of string to consume from multiple topic

version

  • Type: string
  • Dynamic:
  • Required: ✔️
  • Default: V5
  • Possible Values:
    • V3
    • V5

The MQTT version to use.

authMethod

  • Type: string
  • Dynamic: ✔️
  • Required:

The Authentication Method.

Only available if version = V5 If set, this value contains the name of the authentication method to be used for extended authentication. If null, extended authentication is not performed.

connectionTimeout

  • Type: string
  • Dynamic:
  • Required:
  • Format: duration

The connection timeout.

This value defines the maximum time interval the client will wait for the network connection to the MQTT server to be established. The default timeout is 30 seconds. A value of 0 disables timeout processing meaning the client will wait until the network connection is made successfully or fails.

crt

  • Type: string
  • Dynamic: ✔️
  • Required:

Server certificate file path.

httpsHostnameVerificationEnabled

  • Type: boolean
  • Dynamic:
  • Required:

Disable ssl verification.

This value will allow all ca certificate.

maxDuration

  • Type: string
  • Dynamic:
  • Required:
  • Format: duration

The max duration waiting for new rows

It's not an hard limit and is evaluated every second

maxRecords

  • Type: integer
  • Dynamic:
  • Required:

The max number of rows to fetch before stopping

It's not an hard limit and is evaluated every second

password

  • Type: string
  • Dynamic: ✔️
  • Required:

The password to use for the connection.

username

  • Type: string
  • Dynamic: ✔️
  • Required:

The user name to use for the connection.

Outputs

messagesCount

  • Type: integer
  • Required:

Number of message produced

uri

  • Type: string
  • Required:
  • Format: uri

URI of a kestra internal storage file

Definitions

Was this page helpful?