Skip to main content

Running an MQTT Kafka broker


Running an MQTT Kafka broker

In this guide, you create Kafka topics and use Zilla to mediate MQTT broker messages onto those topics.

Specifically, you will:

Verify prerequisites to run this guide.
Install and run Zilla with Kafka or use your own.
Create topics for the MQTT broker messages.
Watch Kafka for new messages on the topics.
Pub & Sub with an MQTT client.
Route messages to different Kafka topics.

Tl;Dr

Download and run the Zilla zilla-examples/mqtt.kafka.brokeropen in new window example using this install script. It will start Zilla and everything you need for this guide.

wget -qO- https://raw.githubusercontent.com/aklivity/zilla-examples/main/startup.sh | sh -s -- mqtt.kafka.broker

Note

Alternatively, download mqtt.kafka.brokeropen in new window or the startup.shopen in new window script yourself.

Prerequisites

Before proceeding, you should have Composeopen in new window or optionally Helmopen in new window and Kubernetesopen in new window installed.

Detailed prerequisites
  • A connection to the internet
  • Docker version 1.13.0+ or later is installed and running
  • Docker Desktop or Docker Desktop for Windows on WSL 2
  • Container host resources: 1 CPU, 1GB memory

Optional:

  • Kafka 3.0+ hosted with the Docker network allowed to communicate
  • Helm 3.0+
  • Kubernetes 1.13.0+

Check the Kafka topics

Run the docker command under the Verify the Kafka topics created section of the script output. Verify these topics are listed. Read more on the data in these topics in the overview.

mqtt-messages
mqtt-retained
mqtt-sessions
mqtt-devices

Listen for messages

Run the docker command under the Start a topic consumer to listen for messages section of the script output. If you didn't use your own Kafka, you can also see all the topics in the Kafka UI.open in new window

Send a greeting

Using eclipse-mosquittoopen in new window subscribe to the zilla topic.

docker run -it --rm eclipse-mosquitto \
mosquitto_sub --url mqtt://host.docker.internal:7183/zilla

In a separate session, publish a message on the zilla topic.

docker run -it --rm eclipse-mosquitto \
mosquitto_pub --url mqtt://host.docker.internal:7183/zilla --message 'Hello, world'

Send messages with the retained flag.

docker run -it --rm eclipse-mosquitto \
mosquitto_pub --url mqtt://host.docker.internal:7183/zilla --message 'Hello, retained' --retain

Then restart the mosquitto_sub above. The latest retained message is delivered, and the other messages are not.

Message routing

Send a message from a device and a sensor.

docker run -it --rm eclipse-mosquitto \
mosquitto_pub --url mqtt://host.docker.internal:7183/place/01/device/01 --message 'I am device01'
docker run -it --rm eclipse-mosquitto \
mosquitto_pub --url mqtt://host.docker.internal:7183/place/01/sensor/01 --message 'I am sensor01'

You can check the Kafka UIopen in new window and see that device01's message was delivered to the mqtt-devices topic while sensor01's message is on the mqtt-messages topic.

Creating this example yourself

Start a Kafka instance

You can use your own Kafka or set up a local Kafka with kafka.brokeropen in new window and follow the setup instructions in the README.md.

Export these environment variables or overwrite them with your remote Kafka if you skipped the local setup.

export KAFKA_HOST=host.docker.internal
export KAFKA_PORT=9092

Bootstrap Kafka

Create these topics in the Kafka environment.

\
/bin/kafka-topics.sh --bootstrap-server $KAFKA_HOST:$KAFKA_PORT --create --if-not-exists --topic mqtt-sessions
/bin/kafka-topics.sh --bootstrap-server $KAFKA_HOST:$KAFKA_PORT --create --if-not-exists --topic mqtt-messages --config cleanup.policy=compact
/bin/kafka-topics.sh --bootstrap-server $KAFKA_HOST:$KAFKA_PORT --create --if-not-exists --topic mqtt-retained --config cleanup.policy=compact

Create your config

Create a new file called zilla.yaml and append the below yaml to it.

Entrypoint

This will configure Zilla for accepting all of the mqtt traffic. The tcp binding defines the ports Zilla will accept traffic for both MQTT and WebSocket connections.

name: zilla-mqtt-kafka-broker
bindings:
  # Proxy service entrypoint
  north_tcp_server:
    type: tcp
    kind: server
    options:
      host: 0.0.0.0
      port:
        - 7114
        - 7183
    routes:
      - when:
          - port: 7114
        exit: north_http_server
      - when:
          - port: 7183
        exit: north_mqtt_server











 
 


 


 

More on binding-tcp

A ws binding is added to handle any MQTT over WebSocket using the mqtt protocol. The mqtt binding then handles all of the MQTT message traffic that needs to go to Kafka.

# WebSocket server
north_http_server:
  type: http
  kind: server
  routes:
    - when:
        - headers:
            :scheme: http
            :authority: localhost:7114
            upgrade: websocket
      exit: north_ws_server
north_ws_server:
  type: ws
  kind: server
  routes:
    - when:
        - protocol: mqtt
      exit: north_mqtt_server

# Shared MQTT server
north_mqtt_server:
  type: mqtt
  kind: server
  exit: north_mqtt_kafka_mapping

















 




 



More on binding-mqtt
More on binding-ws

Service definition

The service definition defines how the clients using this service will interact with Kafka through Zilla. The required set of Kafka topics are defined in the options.topics where Zilla manages any MQTT required features. A client identity can be determined by pulling the identifier out of the topic using the options.clients property.

# MQTT messages to Kafka topics
north_mqtt_kafka_mapping:
  type: mqtt-kafka
  kind: proxy
  options:
    topics:
      sessions: mqtt-sessions
      messages: mqtt-messages
      retained: mqtt-retained
  clients:
    - place/{identity}/#
  routes:
    - when:
        - publish:
            - topic: place/+/device/#
            - topic: device/#
        - subscribe:
            - topic: place/+/device/#
            - topic: device/#
      with:
        messages: mqtt-devices
      exit: north_kafka_cache_client
  exit: north_kafka_cache_client







 
 
 











 



More on binding-mqtt-kafka
More on topic data

Additionally, a route is defined to capture any "device" messages and route them to a specific topic called mqtt-devices. Here Zilla enables routing different topic patterns into one Kafka topic using MQTT supported wildcards. All other messages will use the default exit and end up in the mqtt-messages topic.

routes:
  - when:
      - publish:
          - topic: place/+/device/#
          - topic: device/#
      - subscribe:
          - topic: place/+/device/#
          - topic: device/#
    with:
      messages: mqtt-devices
    exit: north_kafka_cache_client




 
 

 
 

 


More on When a route matches
More on binding-mqtt-kafka routing

Add a Kafka sync layer

The Zilla cache_client and cache_server helps manage the smooth data transfer between the service definition and Kafka. It is important to bootstrap the topics that will be brokering MQTT messages.

# Kafka sync layer
north_kafka_cache_client:
  type: kafka
  kind: cache_client
  exit: south_kafka_cache_server
south_kafka_cache_server:
  type: kafka
  kind: cache_server
  options:
    bootstrap:
      - mqtt-messages
      - mqtt-retained
      - mqtt-devices
  exit: south_kafka_client











 
 
 


More on binding-kafka cache

Point to a Running Kafka instance

This will define the location and connection for Zilla to communicate with Kafka.

# Connect to Kafka
south_kafka_client:
  type: kafka
  kind: client
  exit: south_tcp_client
south_tcp_client:
  type: tcp
  kind: client
  options:
    host: ${{env.KAFKA_HOST}}
    port: ${{env.KAFKA_PORT}}
  routes:
    - when:
        - cidr: 0.0.0.0/0










 
 




Full zilla.yaml
name: zilla-mqtt-kafka-broker
bindings:
  # Proxy service entrypoint
  north_tcp_server:
    type: tcp
    kind: server
    options:
      host: 0.0.0.0
      port:
        - 7114
        - 7183
    routes:
      - when:
          - port: 7114
        exit: north_http_server
      - when:
          - port: 7183
        exit: north_mqtt_server

  # WebSocket server
  north_http_server:
    type: http
    kind: server
    routes:
      - when:
          - headers:
              :scheme: http
              :authority: localhost:7114
              upgrade: websocket
        exit: north_ws_server
  north_ws_server:
    type: ws
    kind: server
    routes:
      - when:
          - protocol: mqtt
        exit: north_mqtt_server

  # Shared MQTT server
  north_mqtt_server:
    type: mqtt
    kind: server
    exit: north_mqtt_kafka_mapping

  # MQTT messages to Kafka topics
  north_mqtt_kafka_mapping:
    type: mqtt-kafka
    kind: proxy
    options:
      topics:
        sessions: mqtt-sessions
        messages: mqtt-messages
        retained: mqtt-retained
    clients:
      - place/{identity}/#
    routes:
      - when:
          - publish:
              - topic: place/+/device/#
              - topic: device/#
          - subscribe:
              - topic: place/+/device/#
              - topic: device/#
        with:
          messages: mqtt-devices
        exit: north_kafka_cache_client
    exit: north_kafka_cache_client

  # Kafka sync layer
  north_kafka_cache_client:
    type: kafka
    kind: cache_client
    exit: south_kafka_cache_server
  south_kafka_cache_server:
    type: kafka
    kind: cache_server
    options:
      bootstrap:
        - mqtt-messages
        - mqtt-retained
        - mqtt-devices
    exit: south_kafka_client

  # Connect to Kafka
  south_kafka_client:
    type: kafka
    kind: client
    exit: south_tcp_client
  south_tcp_client:
    type: tcp
    kind: client
    options:
      host: ${{env.KAFKA_HOST}}
      port: ${{env.KAFKA_PORT}}
    routes:
      - when:
          - cidr: 0.0.0.0/0

More on binding-kafka client

Start Zilla

With your zilla.yaml config, follow the Zilla install instructions using your method of choice. Set the necessary Kafka environment variables.

Docker
--env KAFKA_HOST="host.docker.internal" --env KAFKA_PORT="9092"

Adding TLS

You can add TLS to this broker by adding a vault and tls binding as described in the Server Encryption section.

Remove the running containers

Find the path to the teardown.sh script(s) in the use the teardown script(s) to clean up section of the example output and run it. If you didn't provide an external Kafka endpoint, there will be scripts for both Zilla and the local Kafka installs.