MQTT Deep Dive

Từ wire format 2-byte đến persistent sessions, QoS guarantees và drone fleet patterns. MQTT = lightweight pub/sub + persistent TCP + broker fan-out — sinh ra cho IoT từ năm 1999.

Temp Sensor home/+/temp Humid Sensor home/+/humidity Drone 001 drone/001/telemetry QoS 0 QoS 0 QoS 0 MQTT Broker pub/sub router · fan-out · QoS persistent sessions · retained msgs cmd QoS 1 Dashboard home/# · drone/+/telemetry Ground Station drone/+/telemetry · status
01

MQTT là gì?

MQTT (Message Queuing Telemetry Transport) được thiết kế năm 1999 bởi Andy Stanford-Clark (IBM) và Arlen Nipper cho pipeline monitoring qua vệ tinh — nơi bandwidth cực hạn chế, connection không ổn định, và thiết bị chạy bằng pin. Đây là lý do tại sao MQTT tối ưu cho IoT hơn HTTP: minimum header chỉ 2 bytes, persistent TCP connection, và broker fan-out native.

Publish/Subscribe Model

Publisher gửi message đến topic trên broker, không biết subscriber là ai. Subscriber đăng ký topic, không biết publisher là ai. Broker là intermediary duy nhất — decouples producer và consumer hoàn toàn về không gian, thời gian và đồng bộ hóa.

2 Mini-Scenarios

IoT Sensor Network

1000 temperature sensors gửi home/{room}/temp mỗi 10 giây với QoS 0. Dashboard subscribe home/# nhận tất cả. Sensor mất kết nối → broker giữ session, gửi lại khi reconnect (QoS 1). Toàn bộ chỉ cần 1 broker.

Drone Fleet

Drone publish drone/001/telemetry (GPS/battery/altitude) mỗi 200ms QoS 0. Ground station subscribe và hiện real-time. LWT tự động publish drone/001/status = "offline" khi mất kết nối. Command channel drone/001/command dùng QoS 1 đảm bảo đến.

So sánh với HTTP & WebSocket

Tiêu chíMQTTHTTPWebSocket
PatternPub/SubRequest/ResponseBidirectional
Min header overhead2 bytes~200 bytes2–14 bytes
ConnectionPersistent TCPShort-lived (HTTP/1.1 keep-alive)Persistent TCP
Fan-out 1→NNative (broker)ManualManual
Offline supportPersistent session + QoS queue
QoS guarantees0 / 1 / 2
Wildcard subscribe+ và #
Born forIoT, M2M, sensor telemetryWeb APIs, browsersReal-time web apps
02

Fixed Header & Packet Types

Mọi MQTT packet đều bắt đầu bằng Fixed Header tối thiểu 2 bytes — đây là phần nhỏ nhất của giao thức, cho phép broker parse loại packet chỉ từ 1 byte đầu tiên.

Byte 1 — Type + Flags

Bit
7
6
5
4
3
2
1
0
Field
Type
Type
Type
Type
Flag
Flag
Flag
Flag

Bits 7–4 = Packet Type (4 bits → 16 giá trị). Bits 3–0 = Flags — ý nghĩa khác nhau tùy packet type. PUBLISH dùng cả 4 flag bits (DUP / QoS / QoS / RETAIN); các packet khác có flags cố định.

Byte 2+ — Remaining Length (VLE)

Remaining Length = số bytes còn lại sau fixed header (variable header + payload). Dùng Variable-Length Encoding: mỗi byte dùng 7 bits thấp cho value, bit MSB = 1 nếu còn byte tiếp theo. Tối đa 4 bytes → max value = 268,435,455 bytes (~256 MB).

Giá trị 64   → 0x40          (1 byte,  bit 7 = 0)
Giá trị 321  → 0xC1 0x02     (2 bytes, byte đầu bit 7 = 1: "còn tiếp")
               0x41 | 0x80 = 0xC1  →  value = 0x41 = 65
               0x02               →  value = 2 × 128 = 256
               total = 65 + 256 = 321 ✓
Giá trị 16383 → 0xFF 0x7F    (2 bytes, max 2-byte value)

14 Packet Types

TypeValueDirectionMô tả
CONNECT1C → SKhởi tạo connection — gửi sau khi TCP connected
CONNACK2S → CBroker xác nhận connection + session present flag
PUBLISH3BothGửi message đến topic
PUBACK4BothQoS 1 — acknowledge PUBLISH
PUBREC5BothQoS 2 — step 1: "đã nhận"
PUBREL6BothQoS 2 — step 2: "OK deliver"
PUBCOMP7BothQoS 2 — step 3: "đã deliver xong"
SUBSCRIBE8C → SĐăng ký topic filter
SUBACK9S → CXác nhận subscription + granted QoS
UNSUBSCRIBE10C → SHủy đăng ký topic filter
UNSUBACK11S → CXác nhận unsubscribe
PINGREQ12C → SKeepalive ping (2 bytes, không có payload)
PINGRESP13S → CKeepalive pong
DISCONNECT14Both (5.0)Normal disconnect — broker không gửi LWT
PUBLISH Flags (bits 3–0)
Bit 3 — DUP: = 1 khi retransmit QoS 1/2 packet. Chỉ informational — không dùng để detect duplicates.
Bits 2–1 — QoS: 00=QoS0, 01=QoS1, 10=QoS2, 11=reserved.
Bit 0 — RETAIN: = 1 → broker lưu message này làm "last known value" cho topic.
03

CONNECT Packet

CONNECT là packet quan trọng nhất — thiết lập toàn bộ session parameters. Phải là packet đầu tiên sau TCP connection. Nếu broker nhận bất kỳ packet nào khác trước CONNECT → ngắt kết nối.

Wire Format — Byte Layout

Offset
0
1
2
3
4
5
6
7
8
9
10
11
12+
Hex
0x10
RL
0x00
0x04
'M'
'Q'
'T'
'T'
0x04
CF
KA₁
KA₂
Payload…
Field
Type+Flags
Rem Len
Proto Name Len
Proto Name Len
Protocol
Name
"MQTT"
(4 bytes)
Level 3.1.1
Conn Flags
Keep Alive (big-endian)
Keep Alive
ClientID + Will + Auth

Byte 0 = 0x10: type=1 (CONNECT), flags=0000. Variable header là 10 bytes cố định cho MQTT 3.1.1 (Protocol Level = 0x04). MQTT 5.0 dùng level 0x05 và thêm Properties section.

Connect Flags Byte (byte 9)

Bit
7
6
5
4
3
2
1
0
Flag
User
Name
Pass
word
Will
Retain
Will
QoS₁
Will
QoS₀
Will
Flag
Clean
Session
Res
(0)

Bit 0 luôn phải = 0 (reserved). Clean Session = 1 → broker xóa session khi disconnect; = 0 → lưu session state. Will Flag = 1 → payload chứa Will Topic + Will Message (Last Will and Testament).

Payload Order

Mỗi field có 2 bytes length prefix + data: [ClientID] (bắt buộc) → [Will Topic] (nếu Will Flag=1) → [Will Message][Username] (nếu Username Flag=1) → [Password]. Thứ tự này cố định, không thể đảo lộn.

import paho.mqtt.client as mqtt

client = mqtt.Client(client_id="drone-001", clean_session=False)
client.username_pw_set("fleet-user", "s3cr3t")

# Last Will — broker gửi tự động nếu TCP bị drop bất thường
client.will_set("drone/001/status",
                payload='{"status":"offline"}',
                qos=1, retain=True)

client.connect("broker.emqx.io", port=1883, keepalive=60)
client.loop_start()
04

Topics & Wildcards

Topic là UTF-8 string, phân cấp bằng ký tự /. Case-sensitive. Tránh leading slash (/home/temp tạo ra level rỗng đầu tiên). Convention tốt nhất: {context}/{deviceId}/{metric}.

Topic Tree home drone $SYS kitchen/temp living/humidity 001/telemetry 001/command home/+/temp matches: kitchen/temp, living/temp drone/# matches: 001/telemetry, 001/command, ... $SYS/# — không match bởi # phải subscribe explicit

Wildcard Rules

  • + (single-level): thay thế đúng 1 level. drone/+/telemetry match drone/001/telemetry nhưng không match drone/001/sensor/gps.
  • # (multi-level): thay thế 0 hoặc nhiều levels, phải ở cuối. drone/# match drone/001/telemetry, drone/002/command, v.v.
  • Wildcards chỉ dùng trong SUBSCRIBE — không được publish đến topic có wildcard.
  • $SYS: Namespace reserved của broker ($SYS/broker/clients/connected). Subscribe # không match $SYS — phải subscribe explicit.
def on_message(client, userdata, msg):
    parts = msg.topic.split("/")   # ["drone", "003", "telemetry"]
    drone_id = parts[1]            # "003"
    payload  = json.loads(msg.payload)
    print(f"[{drone_id}] lat={payload['lat']:.4f} bat={payload['battery']}%")

client.subscribe("drone/+/telemetry", qos=0)  # tất cả drones
client.subscribe("drone/+/status",    qos=1)  # LWT status
client.subscribe("$SYS/broker/clients/connected", qos=0)  # broker metrics
client.on_message = on_message
05

PUBLISH Packet

PUBLISH là packet được dùng nhiều nhất — mang data từ publisher đến broker, rồi broker forward đến tất cả matching subscribers. Fixed header flags encode QoS, DUP, RETAIN trực tiếp trong nibble thấp của byte đầu tiên.

Fixed Header Flags cho PUBLISH

FlagBitGiá trịÝ nghĩa
DUP30/11 = đây là retransmit. Chỉ mang tính informational — không dùng để deduplicate
QoS2–100/01/1000=fire-forget, 01=at-least-once, 10=exactly-once, 11=reserved (lỗi)
RETAIN00/11 = broker lưu message này, gửi ngay cho subscriber mới

Variable Header

  • Topic Name: 2 bytes length + UTF-8 string. Không có wildcard. Length tính vào Remaining Length.
  • Packet Identifier: 2 bytes big-endian — chỉ có khi QoS > 0. Range 1–65535. QoS 0 không có Packet ID nên nhỏ hơn 2 bytes.

Payload

Binary data tùy ý — không có length prefix riêng, length được tính từ Remaining Length trừ đi variable header size. Empty payload (0 bytes) hợp lệ — dùng để clear retained message (RETAIN=1 + empty payload).

MQTT 5.0 Properties
Thêm Properties section trong variable header (trước payload): Properties Length (VLE) + danh sách properties. Key properties: Message Expiry Interval, Topic Alias, Response Topic, Correlation Data, Content Type, User Properties.
import json, time, math, paho.mqtt.client as mqtt

client = mqtt.Client("drone-publisher")
client.connect("broker.emqx.io", 1883)
client.loop_start()

flight_time = 0
while True:
    telemetry = {
        "lat":     10.7769 + math.sin(flight_time * 0.1) * 0.01,
        "lng":     106.7009 + math.cos(flight_time * 0.1) * 0.01,
        "alt_m":   round(50 + math.sin(flight_time * 0.05) * 10, 1),
        "bat_pct": round(max(0, 100 - flight_time * 0.5), 1),
        "ts":      int(time.time() * 1000)
    }
    # QoS 0: no packet ID, no ACK → minimum overhead
    client.publish("drone/001/telemetry", json.dumps(telemetry), qos=0)
    flight_time += 1
    time.sleep(0.2)   # 5 Hz
06

QoS 0 — At-most-once

Fire-and-forget: publisher gửi PUBLISH một lần, không đợi ACK. Broker nhận xong forward ngay cho subscribers, không lưu. Nếu connection drop ở bất kỳ điểm nào → message mất vĩnh viễn.

Wire

Chỉ 1 PUBLISH packet. Fixed header byte 1 = 0x30 (type=3, QoS=00, RETAIN=0). Không có Packet Identifier — variable header chỉ có Topic Name. Payload là application data trực tiếp.

Semantics

  • Broker forward at most once — không retry, không queue.
  • Subscriber nhận at most once — không thể nhận duplicate.
  • Không cần session state để track QoS 0 messages.

Dùng khi

  • Telemetry liên tục — mất 1 reading OK (GPS, temp, humidity)
  • Real-time dashboard — stale data tệ hơn missing data
  • Logging metrics — volume cao, mất ít packet chấp nhận được
  • Drone flight telemetry (5 Hz) — mất vài packet không ảnh hưởng display

Không dùng khi

  • Command điều khiển thiết bị — phải đến ít nhất 1 lần
  • Alerts/notifications quan trọng
  • Firmware OTA commands
  • Financial transactions
07

QoS 1 — At-least-once

Publisher gửi PUBLISH (với Packet Identifier), chờ PUBACK. Nếu không nhận PUBACK trong timeout → retransmit với DUP=1. Kết quả: subscriber có thể nhận duplicate — application phải xử lý idempotency.

Packet Identifier

2 bytes big-endian, range 1–65535. Client tự quản lý, reuse sau khi nhận PUBACK. Cùng lúc có thể có nhiều in-flight QoS 1 messages với Packet ID khác nhau. Broker giữ lại message cho subscriber offline có persistent session.

Broker Behavior

Broker forward PUBLISH đến tất cả matching subscribers với QoS ≤ subscribed QoS (broker có thể downgrade). Nếu subscriber offline + persistent session → broker queue message, gửi khi reconnect.

import json, time, paho.mqtt.client as mqtt

def send_command(client, drone_id: str, command: str, params: dict = {}):
    payload = json.dumps({
        "cmd": command,
        "params": params,
        "ts": int(time.time() * 1000)
    })
    result = client.publish(f"drone/{drone_id}/command", payload, qos=1)
    result.wait_for_publish()  # block until broker ACK'd (PUBACK received)
    print(f"Command '{command}' delivered to {drone_id}")

client = mqtt.Client("ground-station-001")
client.connect("broker.emqx.io", 1883)
client.loop_start()
time.sleep(1)

send_command(client, "001", "goto_waypoint", {"lat": 10.780, "lng": 106.705, "alt": 100})
send_command(client, "001", "return_home")
08

QoS 2 — Exactly-once

4-step handshake đảm bảo message được deliver đúng một lần. Cả sender và receiver đều duy trì state. Đây là QoS level tốn kém nhất: 4 packets, 2 round-trips, state cả 2 phía.

4-Step Handshake

QoS 0 — At-most-once QoS 1 — At-least-once QoS 2 — Exactly-once Client Broker PUBLISH no ACK · 1 packet Best performance May lose messages No packet ID Client Broker PUBLISH (id=42) PUBACK (id=42) retransmit if no ACK (DUP=1) Guaranteed delivery May duplicate 2 packets, 1 RTT Client Broker PUBLISH (id=7) PUBREC (id=7) PUBREL (id=7) PUBCOMP (id=7) Exactly once 4 packets, 2 RTT State both sides

Guarantee Mechanism

  1. PUBLISH (sender → receiver): gửi message, lưu vào "sent" store với Packet ID.
  2. PUBREC (receiver → sender): "đã nhận, chưa deliver" — receiver lưu Packet ID để deduplicate.
  3. PUBREL (sender → receiver): "OK, deliver đi" — sender có thể xóa message khỏi store sau khi gửi PUBREL.
  4. PUBCOMP (receiver → sender): "đã deliver xong" — receiver xóa Packet ID khỏi store.

Bước 1–2 có thể retry (PUBLISH với DUP=1 nếu không nhận PUBREC). Receiver chỉ deliver một lần vì nó lưu Packet ID — nhận PUBLISH lần 2 với cùng ID → discard (đã deliver rồi, chờ PUBREL).

client.publish("drone/001/firmware/update",
               payload=json.dumps({"version": "2.1.0", "url": "..."}),
               qos=2)   # exactly-once — firmware command không được duplicate
09

Retained Messages & LWT

Retained Messages

PUBLISH với RETAIN=1 → broker lưu message này làm "last known value" cho topic. Subscriber mới kết nối và subscribe topic đó → nhận ngay retained message, không cần chờ publisher gửi lần tới. Chỉ 1 retained message/topic — message mới overwrite.

Clear retained message: publish empty payload (0 bytes) với RETAIN=1 đến cùng topic.

Use Case
Dashboard khởi động lúc 3 AM → subscribe home/# → nhận ngay tất cả giá trị hiện tại của sensors (retained) mà không cần đợi chúng gửi reading tiếp theo. Không có retained → dashboard trống cho đến khi từng sensor gửi lần tới.

Last Will and Testament (LWT)

Client khai báo Will trong CONNECT: nếu TCP connection bị drop bất thường (không gửi DISCONNECT), broker sẽ publish will message thay cho client. LWT không được gửi nếu client disconnect bình thường (gửi DISCONNECT packet).

  • Will Topic, Will Message: khai báo trong CONNECT payload.
  • Will QoS, Will Retain: trong Connect Flags byte.
  • Will Delay Interval (MQTT 5.0): broker chờ N giây trước khi publish will — cho phép client reconnect kịp nếu chỉ là short network blip.
def setup_drone_client(drone_id: str) -> mqtt.Client:
    client = mqtt.Client(drone_id, clean_session=False)

    # LWT — broker publish tự động nếu TCP drop bất thường
    client.will_set(
        topic=f"drone/{drone_id}/status",
        payload=json.dumps({"status": "offline", "ts": int(time.time() * 1000)}),
        qos=1, retain=True
    )

    def on_connect(c, userdata, flags, rc):
        if rc == 0:
            # Birth message — gửi khi kết nối thành công
            c.publish(f"drone/{drone_id}/status",
                      json.dumps({"status": "online", "ts": int(time.time() * 1000)}),
                      qos=1, retain=True)

    client.on_connect = on_connect
    return client
10

Persistent Sessions

Persistent session cho phép broker lưu trữ state của client khi offline — subscriptions, queued QoS 1/2 messages, in-flight handshake state. Khi client reconnect, mọi thứ được khôi phục.

clean_session Flag

clean_session = True

Broker xóa toàn bộ session state khi client disconnect. Reconnect = new session. Subscriptions mất, queued messages mất.

Dùng cho: Stateless consumers, browsers, temporary subscribers.

clean_session = False

Broker lưu session state. CONNACK trả về Session Present flag = 1 nếu session đã tồn tại. Client biết mình không cần re-subscribe.

Dùng cho: IoT devices, drone command channel, offline-capable consumers.

Session State Broker Lưu

  • Subscription list (topic filters + QoS)
  • QoS 1 messages đang chờ deliver (offline queue)
  • QoS 1 messages đã gửi nhưng chưa nhận PUBACK
  • QoS 2 messages trong handshake (PUBREC received, chờ PUBREL)

MQTT 5.0 Session Expiry Interval

Property trong CONNECT (và DISCONNECT) — broker xóa session sau N giây offline. 0 = xóa ngay khi disconnect; 0xFFFFFFFF = giữ mãi mãi. Thay thế hành vi "giữ mãi" mơ hồ của 3.1.1.

Client Takeover

Nếu 2 clients dùng cùng ClientID: broker terminate kết nối cũ, accept kết nối mới. Tránh bằng unique ClientID per device — ví dụ dùng MAC address hoặc UUID.

client = mqtt.Client("drone-001", clean_session=False)

def on_connect(client, userdata, flags, rc):
    if rc != 0:
        print(f"Connect failed: {rc}")
        return
    if flags.get("session present"):
        print("Session restored — broker có queued messages, không cần re-subscribe")
    else:
        print("New session — re-subscribe")
        client.subscribe([
            ("drone/001/command", 1),
            ("drone/001/config",  1),
        ])

client.on_connect = on_connect
client.reconnect_delay_set(min_delay=1, max_delay=30)  # exponential backoff
client.connect("broker.emqx.io", 1883, keepalive=60)
11

SUBSCRIBE & SUBACK

SUBSCRIBE packet có fixed header 0x82 (type=8, flags=0010 — flags cố định, không giống PUBLISH). Variable header: Packet Identifier (2 bytes). Payload: list topic filters.

Payload Structure

Mỗi topic filter = 2 bytes length + UTF-8 string + 1 byte options. Có thể gửi nhiều filters trong 1 SUBSCRIBE packet. SUBACK trả về list return codes (1 byte/filter): 0x00=QoS0, 0x01=QoS1, 0x02=QoS2, 0x80=Failure.

QoS Negotiation

Client request QoS X, broker có thể grant QoS ≤ X — broker có quyền downgrade. Ví dụ: client request QoS 2 nhưng broker chỉ support QoS 1 → SUBACK trả về QoS 1. Application cần check granted QoS.

MQTT 5.0 Subscription Options

OptionBitsMô tả
No-local2= 1 → broker không gửi message từ chính client đó về lại (tránh echo)
Retain-as-published3= 1 → broker giữ nguyên RETAIN flag khi forward, không strip
Retain Handling5–40=gửi retained khi subscribe, 1=chỉ gửi nếu subscription mới, 2=không gửi retained
def on_subscribe(client, userdata, mid, granted_qos):
    print(f"Subscribed mid={mid}, granted QoS: {granted_qos}")
    # granted_qos = (1, 0, 1) cho 3 topics được grant

client.on_subscribe = on_subscribe
client.subscribe([
    ("drone/+/telemetry", 0),  # high-frequency, QoS 0 OK
    ("drone/+/status",    1),  # status/LWT, QoS 1
    ("drone/+/command",   1),  # command echo, QoS 1
])
12

MQTT 5.0 Core Features

MQTT 5.0 (OASIS standard 2019) giải quyết những hạn chế lớn của 3.1.1 bằng cách thêm Properties system vào hầu hết packets — tương đương HTTP headers cho MQTT.

Reason Codes

3.1.1 CONNACK chỉ có 6 return codes; 5.0 thêm reason code đầy đủ cho mọi packet (PUBACK, SUBACK, DISCONNECT, AUTH). Giờ server có thể nói rõ lý do disconnect: 0x8D=Keep Alive timeout, 0x9C=Use Another Server, 0x89=Server Busy.

User Properties

UTF-8 key-value pairs trong bất kỳ packet nào — tương đương custom HTTP headers. Dùng cho: routing metadata, distributed tracing IDs (X-Trace-ID), device firmware version, region tag.

Message Expiry Interval

PUBLISH property — broker drop message sau N giây nếu chưa deliver. Ngăn stale commands chạy trên device sau khi reconnect. Ví dụ: command "hover for 5 seconds" hết hạn sau 10 giây.

Topic Aliases

Map topic string → integer (1–N). Sau khi map lần đầu, gửi integer thay chuỗi. Topic drone/fleet/001/sensors/gps/telemetry/v2 → alias 1. Tiết kiệm bandwidth đáng kể với high-frequency messages.

Response Topic + Correlation Data

Request/response pattern: requester thêm Response TopicCorrelation Data (opaque bytes để match) vào PUBLISH. Responder xử lý xong → publish response về Response Topic kèm Correlation Data. Không cần unique topic per request.

Flow Control — Receive Maximum

CONNECT property — client báo broker số lượng max in-flight QoS 1/2 messages có thể handle đồng thời. Tránh overwhelm client yếu (embedded device với 4KB RAM). Broker tự throttle theo giá trị này.

Content Type

UTF-8 string mô tả payload format — application/json, application/cbor, application/protobuf. Subscriber có thể inspect trước khi parse.

13

Shared Subscriptions & Enhanced Auth

Shared Subscriptions

Syntax: $share/{ShareName}/{TopicFilter}. Nhiều clients subscribe cùng group → broker load-balance messages (round-robin hoặc random). Không phải fan-out — mỗi message chỉ đến 1 subscriber trong group.

Chính thức trong MQTT 5.0, nhưng EMQX và nhiều broker 3.1.1 cũng hỗ trợ.

import paho.mqtt.client as mqtt

def on_message(client, userdata, msg):
    print(f"[{client._client_id.decode()}] {msg.topic}: {msg.payload.decode()}")

# Ground station 1 — cùng group "groundcrew"
gs1 = mqtt.Client("ground-station-1")
gs1.on_message = on_message
gs1.connect("broker.emqx.io", 1883)
gs1.subscribe("$share/groundcrew/drone/+/telemetry", qos=1)
gs1.loop_start()

# Ground station 2 — cùng group → mỗi telemetry message chỉ đến 1 trong 2
gs2 = mqtt.Client("ground-station-2")
gs2.on_message = on_message
gs2.connect("broker.emqx.io", 1883)
gs2.subscribe("$share/groundcrew/drone/+/telemetry", qos=1)
gs2.loop_forever()
Use Case — Fleet Scale-out
100 drones gửi 5 Hz telemetry = 500 messages/giây. 1 ground station không xử lý kịp. Thêm 4 GS nữa vào group $share/groundcrew/drone/+/telemetry → mỗi GS nhận ~100 msg/s. Scale horizontal mà không đổi publisher.

Enhanced Authentication (AUTH packet — MQTT 5.0)

Cho phép multi-step auth flow (challenge-response) thay vì chỉ username/password một lần trong CONNECT. Broker và client trao đổi nhiều AUTH packets trước khi CONNACK.

Flow SCRAM-SHA-256:

  1. Client gửi CONNECT (authMethod="SCRAM-SHA-256", authData=client-first-message)
  2. Broker gửi AUTH (reason=0x18 "Continue", authData=server-first-message)
  3. Client gửi AUTH (authData=client-final-message)
  4. Broker gửi CONNACK (success) hoặc AUTH failure
14

Security

TLS Transport (MQTTS)

Port 8883 = MQTT over TLS. Port 8884 = MQTT over WSS. Không dùng plaintext port 1883 trong production — username/password gửi cleartext.

import ssl, paho.mqtt.client as mqtt

client = mqtt.Client("drone-001-secure")
client.tls_set(
    ca_certs  = "/etc/ssl/certs/ca-certificates.crt",
    certfile  = "/path/to/drone-001.crt",   # mTLS: client cert
    keyfile   = "/path/to/drone-001.key",   # mTLS: client private key
    cert_reqs = ssl.CERT_REQUIRED,
    tls_version = ssl.PROTOCOL_TLS_CLIENT
)
client.connect("broker.example.com", port=8883)

mTLS cho Device Identity

Mỗi drone có certificate riêng signed bởi private CA. Broker verify certificate → xác định device identity mà không cần username/password. Phù hợp IoT vì không cần secret management trên constrained device.

ACL — Access Control List

Authorization ở broker level — mỗi client chỉ được publish/subscribe topics cụ thể. Ví dụ Mosquitto ACL file:

# Drone 001: chỉ publish topic của mình, subscribe lệnh của mình
user drone-001
topic write drone/001/telemetry
topic write drone/001/status
topic read  drone/001/command
topic read  drone/001/config

# Dashboard: read-only tất cả drones
user dashboard-svc
topic read drone/+/telemetry
topic read drone/+/status
Attack Vectors Cần Biết
  • ClientID takeover: Kẻ tấn công kết nối với cùng ClientID → broker terminate connection cũ. Dùng random suffix hoặc certificate-bound ClientID.
  • Retained message poisoning: Publisher ghi đè retained message quan trọng. ACL phải restrict write trên retained topics.
  • Subscription storm: Client subscribe quá nhiều topics → CPU spike. Giới hạn max subscriptions per client.
  • Payload injection: Broker không validate payload — validate JSON schema ở application layer.
15

IoT & Drone Patterns

Drone 001 publisher + subscriber drone/001/telemetry → QoS 0 drone/001/status → QoS 1 retain ← drone/001/command QoS 1 MQTT Broker Routes by topic filter Persistent sessions Retained messages broker.emqx.io:1883 Ground Station drone/+/telemetry (sub) drone/001/command (pub) Dashboard drone/# (sub) home/# (sub) telemetry status/LWT

Pattern 1 — Telemetry (QoS 0)

# drone_publisher.py — chạy: python drone_publisher.py
import json, time, math, paho.mqtt.client as mqtt

BROKER   = "broker.emqx.io"
DRONE_ID = "drone-001"

client = mqtt.Client(DRONE_ID, clean_session=False)
client.will_set(f"drone/{DRONE_ID}/status",
                json.dumps({"status": "offline", "ts": 0}), qos=1, retain=True)
client.connect(BROKER, 1883, keepalive=30)
client.loop_start()

client.publish(f"drone/{DRONE_ID}/status",
               json.dumps({"status": "online", "ts": int(time.time()*1000)}),
               qos=1, retain=True)

t = 0
while True:
    data = {
        "lat":      10.7769 + math.sin(t * 0.05) * 0.005,
        "lng":      106.7009 + math.cos(t * 0.05) * 0.005,
        "alt_m":    round(80 + math.sin(t * 0.02) * 20, 1),
        "bat_pct":  round(max(0, 100 - t * 0.1), 1),
        "speed_ms": round(abs(math.sin(t * 0.03)) * 15, 1),
        "ts":       int(time.time() * 1000)
    }
    client.publish(f"drone/{DRONE_ID}/telemetry", json.dumps(data), qos=0)
    t += 1
    time.sleep(0.2)   # 5 Hz

Pattern 2 — Command/Control (QoS 1)

# ground_station.py — chạy: python ground_station.py
import json, time, paho.mqtt.client as mqtt

BROKER = "broker.emqx.io"
GS_ID  = "ground-station-001"

def on_message(client, userdata, msg):
    parts = msg.topic.split("/")
    drone_id = parts[1]
    data = json.loads(msg.payload)
    if parts[2] == "telemetry":
        print(f"[{drone_id}] lat={data['lat']:.4f} lng={data['lng']:.4f} "
              f"alt={data['alt_m']}m bat={data['bat_pct']}%")
    elif parts[2] == "status":
        print(f"[{drone_id}] *** STATUS: {data['status'].upper()} ***")

def send_command(drone_id: str, command: str, params: dict = {}):
    payload = json.dumps({"cmd": command, "params": params,
                          "ts": int(time.time() * 1000)})
    result = client.publish(f"drone/{drone_id}/command", payload, qos=1)
    result.wait_for_publish()
    print(f"Command '{command}' sent → {drone_id}")

client = mqtt.Client(GS_ID)
client.on_message = on_message
client.connect(BROKER, 1883)
client.subscribe([("drone/+/telemetry", 0), ("drone/+/status", 1)])
client.loop_start()

time.sleep(3)
send_command("drone-001", "goto_waypoint", {"lat": 10.780, "lng": 106.705, "alt": 100})
time.sleep(5)
send_command("drone-001", "return_home")
client.loop_forever()

Pattern 3 — Device Shadow

# Device Shadow = "desired state" vs "reported state"
# drone/{id}/shadow/update   — publish desired state
# drone/{id}/shadow/reported — drone publish actual state
# drone/{id}/shadow/delta    — service publish diff (desired ≠ reported)

desired  = {"altitude": 100, "speed": 10, "mode": "autonomous"}
reported = {"altitude": 85,  "speed": 8,  "mode": "manual"}
delta    = {k: desired[k] for k in desired if desired.get(k) != reported.get(k)}
# delta = {"altitude": 100, "speed": 10, "mode": "autonomous"}

# Ground station publish desired
client.publish("drone/001/shadow/update",
               json.dumps({"desired": desired}), qos=1)

# Drone subscribe delta, execute, publish reported
def on_delta(client, userdata, msg):
    delta = json.loads(msg.payload)
    execute_changes(delta)           # apply desired state
    client.publish("drone/001/shadow/reported",
                   json.dumps({"reported": get_current_state()}), qos=1)

client.message_callback_add("drone/001/shadow/delta", on_delta)

Pattern 4 — Multi-Drone Dashboard

fleet_state = {}

def on_telemetry(client, userdata, msg):
    drone_id = msg.topic.split("/")[1]
    fleet_state[drone_id] = {
        "data": json.loads(msg.payload),
        "last_seen": time.time()
    }
    # Check offline drones (không nhận telemetry > 2 giây)
    offline = [d for d, v in fleet_state.items()
               if time.time() - v["last_seen"] > 2.0]
    if offline:
        print(f"Drones possibly offline: {offline}")

client.message_callback_add("drone/+/telemetry", on_telemetry)
client.subscribe("drone/+/telemetry", qos=0)
16

MQTT over WebSocket

Browser không có raw TCP socket API → không kết nối MQTT port 1883 trực tiếp. Giải pháp: MQTT frames gói trong WebSocket binary frames — MQTT protocol hoàn toàn không đổi, chỉ transport layer thay đổi.

WebSocket Upgrade

Client gửi HTTP Upgrade request với header Sec-WebSocket-Protocol: mqtt. Broker phải hỗ trợ WebSocket listener. Sau khi upgrade thành công → gửi CONNECT packet như bình thường qua WS binary frames.

ModePortURLKhi nào dùng
MQTT over WS8083ws://host:8083/mqttDev/local only
MQTT over WSS8084wss://host:8084/mqttProduction (TLS)
MQTT over TCP1883Server-to-server, IoT devices
MQTT over TLS8883Production IoT devices

paho-mqtt với WebSocket Transport

import paho.mqtt.client as mqtt

# Plain WebSocket (dev)
client = mqtt.Client(transport="websockets")
client.connect("broker.emqx.io", port=8083)

# Secure WebSocket (production)
client = mqtt.Client(transport="websockets")
client.tls_set()   # sử dụng system CA store
client.connect("broker.emqx.io", port=8084)
client.loop_start()

Browser Client (MQTT.js)

// npm install mqtt  hoặc CDN: https://unpkg.com/mqtt/dist/mqtt.min.js
import mqtt from 'mqtt'

const client = mqtt.connect('wss://broker.emqx.io:8084/mqtt', {
  clientId: 'browser-dashboard-' + Math.random().toString(16).slice(2),
  clean: true,
  reconnectPeriod: 3000,
  connectTimeout: 10000,
})

client.on('connect', () => {
  console.log('Connected')
  client.subscribe('drone/+/telemetry', { qos: 0 })
  client.subscribe('drone/+/status',    { qos: 1 })
})

client.on('message', (topic, payload) => {
  const parts = topic.split('/')
  const droneId = parts[1]
  const data = JSON.parse(payload.toString())
  updateFleetMap(droneId, data)   // update map UI
})

client.on('error', err => console.error('MQTT error:', err))
Mosquitto WebSocket Config
listener 8083
protocol websockets

listener 8084
protocol websockets
cafile /etc/mosquitto/ca.crt
certfile /etc/mosquitto/server.crt
keyfile /etc/mosquitto/server.key
17

Brokers

Mosquitto Language: C ~100K connections Eclipse Foundation OSS 250KB RAM · RPi-friendly Best for: dev · edge · IoT ✓ bridge support EMQX Language: Erlang/OTP 100M+ connections Built-in dashboard + metrics Rule engine (SQL-like) Best for: large-scale IoT ✓ clustering · MQTT 5.0 HiveMQ Language: Java ~1M connections Enterprise plugins SDK Kubernetes-native Best for: enterprise ✓ extensions API AWS IoT Core Managed / Serverless Unlimited (pay per msg) Device Shadow built-in Rules Engine → Lambda/S3 Best for: AWS ecosystem ✓ mTLS · X.509 device cert
BrokerMQTT 5.0ClusteringWebSocketMonitoring
Mosquitto 2.xManual bridge$SYS topics
EMQX 5.xNative clusteringDashboard + Prometheus
HiveMQ 4.xNative clusteringControl Center UI
AWS IoT Core✓ (partial)ManagedCloudWatch
VerneMQNative (Erlang)Prometheus
brew install mosquitto   # macOS
# hoặc: sudo apt install mosquitto mosquitto-clients

# Subscribe tất cả drone telemetry
mosquitto_sub -h broker.emqx.io -t "drone/+/telemetry" -v

# Publish command đến drone
mosquitto_pub -h broker.emqx.io -t "drone/001/command" \
              -m '{"cmd":"return_home"}' -q 1

# Xem broker system metrics
mosquitto_sub -h localhost -t '$SYS/#' -v
18

Production & Python Patterns

MQTTService Class Pattern

import paho.mqtt.client as mqtt
import json, time

class MQTTService:
    """Thread-safe MQTT service với auto-reconnect và topic dispatch."""

    def __init__(self, broker: str, port: int, client_id: str,
                 username: str = None, password: str = None):
        self._client = mqtt.Client(client_id, clean_session=False)
        if username:
            self._client.username_pw_set(username, password)
        self._client.on_connect    = self._on_connect
        self._client.on_disconnect = self._on_disconnect
        self._client.on_message    = self._on_unhandled
        self._handlers = {}
        self._broker, self._port = broker, port

    def add_handler(self, topic_pattern: str, fn, qos: int = 1):
        """Register callback cho topic pattern. Tự động re-subscribe on reconnect."""
        self._handlers[topic_pattern] = (fn, qos)
        self._client.message_callback_add(topic_pattern, fn)

    def _on_connect(self, client, userdata, flags, rc):
        if rc != 0:
            print(f"[MQTT] Connect failed rc={rc}")
            return
        print(f"[MQTT] Connected (session={'restored' if flags.get('session present') else 'new'})")
        for pattern, (_, qos) in self._handlers.items():
            client.subscribe(pattern, qos=qos)

    def _on_disconnect(self, client, userdata, rc):
        if rc != 0:
            print(f"[MQTT] Unexpected disconnect rc={rc}, auto-reconnecting...")

    def _on_unhandled(self, client, userdata, msg):
        print(f"[MQTT] Unhandled: {msg.topic}")

    def start(self):
        self._client.reconnect_delay_set(min_delay=1, max_delay=30)
        self._client.connect(self._broker, self._port, keepalive=60)
        self._client.loop_start()   # non-blocking background thread

    def publish(self, topic: str, payload, qos: int = 0, retain: bool = False):
        data = json.dumps(payload) if not isinstance(payload, (str, bytes)) else payload
        return self._client.publish(topic, data, qos=qos, retain=retain)

    def stop(self):
        self._client.disconnect()
        self._client.loop_stop()

$SYS Monitoring Topics

# Mosquitto system metrics — subscribe để monitor broker health
mosquitto_sub -h localhost -t '$SYS/#' -v

# Key metrics:
# $SYS/broker/clients/connected     — active connections
# $SYS/broker/clients/maximum       — peak connections ever
# $SYS/broker/messages/received     — total PUBLISH received
# $SYS/broker/publish/messages/sent — total PUBLISH forwarded
# $SYS/broker/subscriptions/count   — active subscriptions
# $SYS/broker/retained messages/count — retained messages stored
# $SYS/broker/bytes/received        — total bytes received
# $SYS/broker/uptime                — broker uptime (seconds)

End-to-End Drone Simulation

# Cài đặt:  pip install paho-mqtt
# Chạy terminal 1: python drone_publisher.py
# Chạy terminal 2: python ground_station.py
# Hoặc xem raw data:
mosquitto_sub -h broker.emqx.io -t "drone/+/telemetry" -v | python -m json.tool

Capacity Planning

  • Mosquitto: ~1 MB RAM / 1000 connections (default buffer). Single-threaded → CPU bound trên 1 core.
  • QoS 0: negligible broker storage. QoS 1/2 offline queues: cấu hình max_queued_messages 1000 (Mosquitto) để tránh OOM.
  • Persistent sessions: cần disk persistence khi broker restart. Cấu hình persistence true + persistence_location /var/lib/mosquitto.
  • Fan-out bottleneck: 1000 subscribers × 100 msg/s = 100,000 forwards/s → dùng shared subscriptions và horizontal scaling.
  • Keepalive tuning: keepalive=60 (60 giây) = PINGREQ mỗi 60s. Mạng cellular unstable → giảm xuống 30s.
Production Checklist
  • ✓ Dùng TLS (port 8883) hoặc WSS (8084) — không plaintext 1883
  • ✓ ClientID unique per device (MAC/UUID) — tránh client takeover
  • ✓ LWT cho mỗi device quan trọng với retain=True
  • ✓ ACL restrict write/read per client
  • ✓ clean_session=False cho IoT devices + Session Expiry Interval hợp lý
  • ✓ Message Expiry Interval cho commands (tránh stale commands)
  • ✓ Monitor $SYS topics hoặc Prometheus exporter
  • ✓ Test reconnect behavior với network partition

Các điểm cốt lõi cần nhớ

Bản chắt lọc từ 18 sections — những điểm quan trọng nhất, hay bị nhầm, hay gặp trong interviews và hay phát sinh bug trong production MQTT.

Packet Format

  • Fixed header: byte 1 = type (bits 7–4) + flags (bits 3–0), byte 2+ = remaining length (VLE)
  • VLE encoding: 7 bits value + bit 7 = "còn tiếp". Tối đa 4 bytes = 256 MB
  • 14 packet types — PUBLISH dùng cả 4 flag bits (DUP/QoS/QoS/RETAIN)
  • CONNECT variable header: 10 bytes cố định (Protocol Name 6B + Level 1B + Flags 1B + KeepAlive 2B)
  • QoS 0 PUBLISH không có Packet Identifier — nhỏ hơn 2 bytes so với QoS 1/2

QoS State Machines

  • QoS 0: 1 packet, no ACK, may lose — telemetry, metrics
  • QoS 1: 2 packets (PUBLISH + PUBACK), may duplicate — commands, alerts
  • QoS 2: 4 packets, 2 RTT, exactly-once — firmware OTA, financial
  • Broker có thể downgrade QoS khi forward — luôn check granted QoS trong SUBACK
  • QoS 1/2 messages được queue cho offline persistent session clients

Topics & Sessions

  • + = đúng 1 level; # = 0+ levels, phải ở cuối — chỉ dùng trong SUBSCRIBE
  • $SYS không match bởi # — phải subscribe explicit
  • Retained: broker lưu last value — clear bằng PUBLISH empty payload với RETAIN=1
  • LWT chỉ được gửi khi TCP drop bất thường — không gửi nếu client gửi DISCONNECT
  • Persistent session (clean_session=False) → check Session Present trong CONNACK

MQTT 5.0

  • Reason codes trên mọi packet — biết chính xác lý do disconnect/failure
  • User Properties = custom key-value metadata trên bất kỳ packet nào
  • Message Expiry: broker drop stale messages — dùng cho commands có TTL
  • Topic Aliases: map topic string → integer, tiết kiệm bandwidth đáng kể
  • Shared subscriptions: $share/group/topic — load balance, không fan-out

Security

  • TLS port 8883 bắt buộc trong production — 1883 là plaintext
  • mTLS cho device identity — cert per device thay vì shared password
  • ACL restrict write/read per ClientID và topic pattern
  • ClientID takeover: unique ClientID per device để tránh bị disconnect
  • username/password trong CONNECT payload — cleartext nếu không có TLS

IoT & Drone Patterns

  • Telemetry = QoS 0 (high-freq, loss OK): drone/001/telemetry
  • Command = QoS 1 (must arrive once): drone/001/command
  • LWT birth/death: will_set offline + on_connect publish online, cả 2 retain=True
  • Device shadow: desired vs reported, delta topic cho sync
  • Shared sub: $share/fleet/drone/+/telemetry scale ground stations
Câu hỏi hay gặp trong interviews
  • QoS 1 vs QoS 2 — khi nào chọn cái nào? — QoS 1 đủ cho hầu hết cases (commands, alerts) nếu application xử lý được duplicate; QoS 2 chỉ khi duplicate thực sự gây hại (firmware flash, financial transaction) — cost 4x packet
  • Retained message vs persistent session — khác gì? — Retained = broker lưu 1 message per topic, gửi cho mọi subscriber mới. Persistent session = broker queue messages cho 1 client cụ thể khi offline
  • LWT trigger trong trường hợp nào? — Chỉ khi TCP connection bị drop mà không có DISCONNECT packet (network failure, crash). Normal disconnect (gửi DISCONNECT) → broker không gửi LWT
  • Shared subscription vs normal subscription — khác gì? — Normal sub: tất cả subscribers nhận message (fan-out). Shared sub ($share/group/topic): chỉ 1 subscriber trong group nhận mỗi message (load balance)
  • MQTT vs WebSocket — khi nào chọn cái nào? — MQTT khi cần pub/sub native, offline support, QoS guarantee, IoT/M2M, fan-out 1→N; WebSocket khi cần bidirectional custom protocol, browser-native, không cần broker intermediary