Running an MQTT Kafka broker
Running an MQTT Kafka broker
In this guide, you create Kafka topics and use Zilla to map MQTT broker messages onto those topics.
Specifically, you will:
Verify prerequisites to run this guide. Install and run Zilla with Kafka or use your own. Create topics for the MQTT broker messages. Watch Kafka for new messages on the topics. Pub & Sub with an MQTT client. Route messages to different Kafka topics.
Tl;Dr
Download and run the Zilla mqtt.kafka.broker cookbook. It will start Zilla and everything you need for this guide.
Download mqtt.kafka.broker and follow the README yourself.
Prerequisites
Before proceeding, you should have Compose installed.
Detailed prerequisites
- A connection to the internet
- Docker version 1.13.0+ or later is installed and running
- Docker Desktop or Docker Desktop for Windows on WSL 2
- Container host resources: 1 CPU, 1GB memory
Optional:
- Kafka 3.0+ hosted with the Docker network allowed to communicate
Check the Kafka topics
Run the docker command under the Verify the Kafka topics created section of the script output. Verify these topics are listed. Read more on the data in these topics in the overview.
mqtt-messages
mqtt-devices
mqtt-retained
mqtt-sessionsListen for messages
Run the docker command under the Start a topic consumer to listen for messages section of the script output. If you didn't use your own Kafka, you can also see all the topics in the Kafka UI.
Send a greeting
Using eclipse-mosquitto subscribe to the zilla topic.
docker run -it --rm eclipse-mosquitto \
mosquitto_sub --url mqtt://host.docker.internal:7183/zillaIn a separate session, publish a message on the zilla topic.
docker run -it --rm eclipse-mosquitto \
mosquitto_pub --url mqtt://host.docker.internal:7183/zilla --message 'Hello, world'Send messages with the retained flag.
docker run -it --rm eclipse-mosquitto \
mosquitto_pub --url mqtt://host.docker.internal:7183/zilla --message 'Hello, retained' --retainThen restart the mosquitto_sub above. The latest retained message is delivered, and the other messages are not.
Message routing
Send a message from a device and a sensor.
docker run -it --rm eclipse-mosquitto \
mosquitto_pub --url mqtt://host.docker.internal:7183/place/01/device/01 --message 'I am device01'docker run -it --rm eclipse-mosquitto \
mosquitto_pub --url mqtt://host.docker.internal:7183/place/01/sensor/01 --message 'I am sensor01'You can check the Kafka UI and see that device01's message was delivered to the mqtt-devices topic while sensor01's message is on the mqtt-messages topic.
Creating this example yourself
Start a Kafka or Redpanda instance
You will need to create the required topics below.
mqtt-messages
mqtt-devices cleanup.policy=compact
mqtt-retained cleanup.policy=compact
mqtt-sessions cleanup.policy=compactCreate your config
Create a new file called zilla.yaml and append the below yaml to it.
Entrypoint
This will configure Zilla for accepting all of the mqtt traffic. The tcp binding defines the ports Zilla will accept traffic for both MQTT and WebSocket connections.
# Proxy service entrypoint
north_tcp_server:
type: tcp
kind: server
options:
host: 0.0.0.0
port:
- 7114
- 7183
routes:
- when:
- port: 7114
exit: north_http_server
- when:
- port: 7183
exit: north_mqtt_serverA ws binding is added to handle any MQTT over WebSocket using the mqtt protocol. The mqtt binding then handles all of the MQTT message traffic that needs to go to Kafka.
# WebSocket server
north_http_server:
type: http
kind: server
routes:
- when:
- headers:
:scheme: http
:authority: localhost:7114
upgrade: websocket
exit: north_ws_server
north_ws_server:
type: ws
kind: server
routes:
- when:
- protocol: mqtt
exit: north_mqtt_server
# Shared MQTT server
north_mqtt_server:
type: mqtt
kind: server
exit: north_mqtt_kafka_mappingService definition
The service definition defines how the clients using this service will interact with Kafka through Zilla. The required set of Kafka topics are defined in the options.topics where Zilla manages any MQTT required features. A client identity can be determined by pulling the identifier out of the topic using the options.clients property.
# MQTT messages to Kafka topics
north_mqtt_kafka_mapping:
type: mqtt-kafka
kind: proxy
options:
topics:
sessions: mqtt-sessions
messages: mqtt-messages
retained: mqtt-retained
clients:
- place/{identity}/#
exit: north_kafka_cache_clientAdditionally, a route is defined to capture any "device" messages and route them to a specific topic called mqtt-devices. Here Zilla enables routing different topic patterns into one Kafka topic using MQTT supported wildcards. All other messages will use the default exit and end up in the mqtt-messages topic.
routes:
- when:
- publish:
- topic: place/+/device/#
- topic: device/#
- subscribe:
- topic: place/+/device/#
- topic: device/#
with:
messages: mqtt-devices
exit: north_kafka_cache_clientMore on When a route matches | More on mqtt-kafka binding routes
Add a Kafka sync layer
The Zilla cache_client and cache_server helps manage the smooth data transfer between the service definition and Kafka. It is important to bootstrap the topics that will be brokering MQTT messages.
# Kafka sync layer
north_kafka_cache_client:
type: kafka
kind: cache_client
exit: south_kafka_cache_server
south_kafka_cache_server:
type: kafka
kind: cache_server
options:
bootstrap:
- mqtt-messages
- mqtt-retained
- mqtt-devices
exit: south_kafka_clientPoint to a Running Kafka instance
This will define the location and connection for Zilla to communicate with Kafka.
# Connect to Kafka
south_kafka_client:
type: kafka
kind: client
options:
servers:
- ${{env.KAFKA_BOOTSTRAP_SERVER}}
exit: south_kafka_tcp_client
south_kafka_tcp_client:
type: tcp
kind: clientFull zilla.yaml
name: zilla-mqtt-kafka-broker
bindings:
#region entrypoint
# Proxy service entrypoint
north_tcp_server:
type: tcp
kind: server
options:
host: 0.0.0.0
port:
- 7114
- 7183
routes:
- when:
- port: 7114
exit: north_http_server
- when:
- port: 7183
exit: north_mqtt_server
#endregion entrypoint
#region server
# WebSocket server
north_http_server:
type: http
kind: server
routes:
- when:
- headers:
:scheme: http
:authority: localhost:7114
upgrade: websocket
exit: north_ws_server
north_ws_server:
type: ws
kind: server
routes:
- when:
- protocol: mqtt
exit: north_mqtt_server
# Shared MQTT server
north_mqtt_server:
type: mqtt
kind: server
exit: north_mqtt_kafka_mapping
#endregion server
#region kafka_mapping
# MQTT messages to Kafka topics
north_mqtt_kafka_mapping:
type: mqtt-kafka
kind: proxy
options:
topics:
sessions: mqtt-sessions
messages: mqtt-messages
retained: mqtt-retained
clients:
- place/{identity}/#
exit: north_kafka_cache_client
#endregion kafka_mapping
#region device_mapping
routes:
- when:
- publish:
- topic: place/+/device/#
- topic: device/#
- subscribe:
- topic: place/+/device/#
- topic: device/#
with:
messages: mqtt-devices
exit: north_kafka_cache_client
#endregion device_mapping
#region kafka_sync
# Kafka sync layer
north_kafka_cache_client:
type: kafka
kind: cache_client
exit: south_kafka_cache_server
south_kafka_cache_server:
type: kafka
kind: cache_server
options:
bootstrap:
- mqtt-messages
- mqtt-retained
- mqtt-devices
exit: south_kafka_client
#endregion kafka_sync
#region kafka_client
# Connect to Kafka
south_kafka_client:
type: kafka
kind: client
options:
servers:
- ${{env.KAFKA_BOOTSTRAP_SERVER}}
exit: south_kafka_tcp_client
south_kafka_tcp_client:
type: tcp
kind: client
#endregion kafka_client
telemetry:
exporters:
stdout_logs_exporter:
type: stdoutStart Zilla
With your zilla.yaml config, follow the Zilla install instructions using your method of choice. Set the necessary KAFKA_BOOTSTRAP_SERVER environment variable to your running Kafka instance.
Adding TLS
You can add TLS to this MQTT broker by adding a vault and tls binding as described in the Server Encryption section.
Remove the running containers
Find the path to the teardown.sh script(s) in the use the teardown script(s) to clean up section of the example output and run it. If you didn't provide an external Kafka endpoint, there will be scripts for both Zilla and the local Kafka installs.

