Arquitetura MQTT - Um Guia Aprofundado
Vou explicar a arquitetura MQTT de forma detalhada e prática, com exemplos reais de implementação.
O que é MQTT?
MQTT (Message Queuing Telemetry Transport) é um protocolo de mensagens leve, baseado no padrão publish/subscribe (publicar/assinar), projetado para conexões com largura de banda limitada e dispositivos com recursos restritos. Foi criado em 1999 pela IBM e é padrão OASIS desde 2013.
Por que MQTT é tão popular na IoT?
- Overhead mínimo de rede (pacotes pequenos a partir de 2 bytes)
- Funciona bem em redes instáveis (3G, satélite, redes mesh)
- Suporta milhares de clientes simultâneos
- Desacoplamento total entre publicadores e assinantes
- Três níveis de QoS para diferentes necessidades
- Suporte nativo a reconexão automática
Arquitetura Fundamental
Componentes Principais
1. Broker (Servidor)
- O "coração" do MQTT
- Responsável por receber todas as mensagens, filtrar e distribuir aos clientes interessados
- Gerencia conexões e autenticação
- Exemplos: Mosquitto, HiveMQ, EMQX
2. Clients (Clientes)
- Qualquer dispositivo que conecta ao broker
- Pode ser publisher (publicador), subscriber (assinante) ou ambos
- Exemplos: sensores IoT, aplicações mobile, servidores
3. Topics (Tópicos)
- Estrutura hierárquica para organizar mensagens
- Funciona como "endereços" para as mensagens
- Exemplo:
casa/sala/temperatura
Modelo Publish/Subscribe
Diferente do modelo cliente-servidor tradicional, no MQTT:
- Publishers publicam mensagens em tópicos específicos
- Broker recebe e roteia as mensagens
- Subscribers se inscrevem em tópicos de interesse
- Publishers e subscribers não se conhecem diretamente (desacoplamento)
[Sensor Temp] --publish--> [BROKER] --subscribe--> [App Dashboard]
(Publisher) (casa/sala/temp) (Subscriber)
Hierarquia de Tópicos
Os tópicos usam "/" como separador hierárquico:
empresa/
├── predio1/
│ ├── andar1/
│ │ ├── sala101/temperatura
│ │ └── sala101/umidade
│ └── andar2/
│ └── sala201/luz
└── predio2/
└── garagem/vagas
Wildcards
Single-level (+): substitui um nível
casa/+/temperatura→casa/sala/temperatura,casa/quarto/temperatura
Multi-level (#): substitui múltiplos níveis (deve ser o último caractere)
casa/#→ todos os tópicos que começam comcasa/
Quality of Service (QoS)
O MQTT oferece 3 níveis de garantia de entrega:
QoS 0 - At most once (No máximo uma vez)
- "Fire and forget" (disparar e esquecer)
- Sem confirmação de recebimento
- Mais rápido, menor overhead
- Uso: dados não críticos, leituras frequentes de sensores
QoS 1 - At least once (Pelo menos uma vez)
- Garante que a mensagem chegue
- Pode haver duplicatas
- Requer ACK (acknowledgment)
- Uso: dados importantes que toleram duplicatas
QoS 2 - Exactly once (Exatamente uma vez)
- Garante entrega única
- Mais lento, maior overhead
- Processo de handshake em 4 etapas
- Uso: dados críticos, transações financeiras
Conexão e Sessão
Processo de Conexão
- CONNECT: Cliente envia pacote de conexão
- CONNACK: Broker responde confirmando
- Troca de mensagens PUBLISH/SUBSCRIBE
- DISCONNECT: Encerramento limpo (opcional)
Parâmetros da Conexão
Client ID: identificador único do cliente Clean Session:
true: descarta sessão anteriorfalse: mantém subscrições e mensagens pendentes
Keep Alive: intervalo em segundos para verificar se a conexão está ativa
- Cliente envia PINGREQ se não houver atividade
- Broker responde com PINGRESP
Will Message (Mensagem de última vontade):
- Mensagem enviada pelo broker se o cliente desconectar inesperadamente
- Útil para notificar falhas:
dispositivo/status→ "offline"
Retained Messages (Mensagens Retidas)
- Broker armazena a última mensagem de um tópico
- Novos subscribers recebem imediatamente
- Útil para estados:
sala/luz→ "ligada" - Definida com flag
retain=true
Segurança
Camadas de Segurança
1. Autenticação
- Username/Password
- Client certificates (TLS mutual auth)
- Tokens OAuth
2. Criptografia
- TLS/SSL (porta 8883)
- Protege dados em trânsito
3. Autorização
- ACL (Access Control Lists)
- Define quem pode pub/sub em quais tópicos
Fluxo Completo de Comunicação
1. Cliente A conecta ao broker
CLIENT A → CONNECT → BROKER
BROKER → CONNACK → CLIENT A
2. Cliente A subscreve a um tópico
CLIENT A → SUBSCRIBE(casa/temperatura) → BROKER
BROKER → SUBACK → CLIENT A
3. Cliente B publica mensagem
CLIENT B → PUBLISH(casa/temperatura, "22°C") → BROKER
4. Broker distribui aos subscribers
BROKER → PUBLISH(casa/temperatura, "22°C") → CLIENT A
Casos de Uso Reais
IoT e Automação Residencial
- Sensores publicam em
casa/sensores/+ - App subscreve a todos os sensores
- Comandos enviados para
casa/dispositivos/luz/cmd
Monitoramento Industrial
- Máquinas publicam telemetria
- Sistema central monitora
fabrica/# - Alertas em tópicos específicos
Chat e Notificações
- Usuários subscrevem a
chat/sala123 - Mensagens publicadas no mesmo tópico
- Presença via will messages
Vantagens do MQTT
- Leve: overhead mínimo (2 bytes de header)
- Eficiente: ideal para redes instáveis
- Escalável: milhares de clientes simultâneos
- Desacoplado: publishers/subscribers independentes
- Bidirecional: comunicação em ambas direções
Limitações
- Sem descoberta automática de serviços
- Payload opaco (sem estrutura definida)
- Broker é ponto único de falha (requer redundância)
- Sem suporte nativo a RPC (request/response)
Comparação com Outros Protocolos
HTTP/REST:
- MQTT: conexão persistente, push
- HTTP: request/response, polling necessário
WebSocket:
- Similar em conexão persistente
- MQTT adiciona QoS, retained messages, estrutura de tópicos
AMQP:
- Mais complexo e robusto
- MQTT mais leve e simples
Implementação Prática
Configurando um Broker (Mosquitto)
Instalação no Linux/Ubuntu:
sudo apt-get update
sudo apt-get install mosquitto mosquitto-clients
# Iniciar o serviço
sudo systemctl start mosquitto
sudo systemctl enable mosquitto
# Verificar status
sudo systemctl status mosquitto
Configuração Básica (/etc/mosquitto/mosquitto.conf):
# Porta padrão
listener 1883
# Porta com TLS
listener 8883
cafile /etc/mosquitto/ca_certificates/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
# WebSocket
listener 9001
protocol websockets
# Autenticação
allow_anonymous false
password_file /etc/mosquitto/passwd
# Logging
log_dest file /var/log/mosquitto/mosquitto.log
log_type all
# Persistência
persistence true
persistence_location /var/lib/mosquitto/
# Limites
max_connections 1000
max_queued_messages 1000
Criar usuários:
sudo mosquitto_passwd -c /etc/mosquitto/passwd usuario1
sudo mosquitto_passwd /etc/mosquitto/passwd usuario2
sudo systemctl restart mosquitto
Cliente Python (Paho MQTT)
Instalação:
pip install paho-mqtt
Publisher Simples:
import paho.mqtt.client as mqtt
import time
import json
# Callback quando conectado
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Conectado ao broker!")
else:
print(f"Falha na conexão. Código: {rc}")
# Callback quando publicado
def on_publish(client, userdata, mid):
print(f"Mensagem {mid} publicada")
# Configurar cliente
client = mqtt.Client(client_id="sensor_temperatura_01")
client.username_pw_set("usuario1", "senha123")
client.on_connect = on_connect
client.on_publish = on_publish
# Conectar
client.connect("localhost", 1883, 60)
client.loop_start()
# Publicar dados simulados
try:
while True:
temperatura = 20 + (time.time() % 10) # Simular variação
payload = json.dumps({
"sensor_id": "temp_01",
"temperatura": round(temperatura, 2),
"unidade": "Celsius",
"timestamp": time.time()
})
result = client.publish(
topic="casa/sala/temperatura",
payload=payload,
qos=1,
retain=True
)
print(f"Publicado: {payload}")
time.sleep(5)
except KeyboardInterrupt:
print("\nEncerrando...")
client.loop_stop()
client.disconnect()
Subscriber Completo:
import paho.mqtt.client as mqtt
import json
def on_connect(client, userdata, flags, rc):
print(f"Conectado com código: {rc}")
# Subscrever a múltiplos tópicos
client.subscribe([
("casa/+/temperatura", 1),
("casa/+/umidade", 1),
("casa/alertas/#", 2)
])
print("Inscrito nos tópicos")
def on_message(client, userdata, msg):
print(f"\n--- Nova Mensagem ---")
print(f"Tópico: {msg.topic}")
print(f"QoS: {msg.qos}")
print(f"Retained: {msg.retain}")
try:
payload = json.loads(msg.payload.decode())
print(f"Dados: {json.dumps(payload, indent=2)}")
except:
print(f"Payload: {msg.payload.decode()}")
def on_subscribe(client, userdata, mid, granted_qos):
print(f"Subscrição confirmada - QoS: {granted_qos}")
def on_disconnect(client, userdata, rc):
if rc != 0:
print(f"Desconexão inesperada. Código: {rc}")
print("Tentando reconectar...")
# Configurar cliente com sessão persistente
client = mqtt.Client(
client_id="monitor_casa_01",
clean_session=False # Mantém subscrições
)
client.username_pw_set("usuario1", "senha123")
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
client.on_disconnect = on_disconnect
# Will message - notifica se desconectar inesperadamente
client.will_set(
topic="casa/monitor/status",
payload="offline",
qos=1,
retain=True
)
# Conectar e manter loop
client.connect("localhost", 1883, 60)
try:
client.loop_forever()
except KeyboardInterrupt:
print("\nEncerrando...")
client.disconnect()
Cliente Node.js (MQTT.js)
Instalação:
npm install mqtt
Publisher:
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://localhost:1883', {
clientId: 'sensor_node_01',
username: 'usuario1',
password: 'senha123',
clean: true,
keepalive: 60,
reconnectPeriod: 1000,
will: {
topic: 'casa/sensores/node01/status',
payload: 'offline',
qos: 1,
retain: true
}
});
client.on('connect', () => {
console.log('Conectado ao broker');
// Publicar status online
client.publish('casa/sensores/node01/status', 'online', {
qos: 1,
retain: true
});
// Publicar dados periodicamente
setInterval(() => {
const data = {
sensor_id: 'node_01',
temperatura: (20 + Math.random() * 10).toFixed(2),
umidade: (50 + Math.random() * 20).toFixed(2),
timestamp: Date.now()
};
client.publish(
'casa/sala/sensores',
JSON.stringify(data),
{ qos: 1 },
(err) => {
if (err) {
console.error('Erro ao publicar:', err);
} else {
console.log('Publicado:', data);
}
}
);
}, 5000);
});
client.on('error', (err) => {
console.error('Erro de conexão:', err);
});
client.on('offline', () => {
console.log('Cliente offline');
});
client.on('reconnect', () => {
console.log('Reconectando...');
});
process.on('SIGINT', () => {
console.log('\nEncerrando...');
client.end(true);
process.exit();
});
Subscriber:
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://localhost:1883', {
clientId: 'app_dashboard',
username: 'usuario1',
password: 'senha123',
clean: false, // Manter subscrições
});
client.on('connect', () => {
console.log('Conectado ao broker');
// Subscrever a múltiplos tópicos
const subscriptions = {
'casa/#': { qos: 1 },
'alertas/+/critico': { qos: 2 }
};
client.subscribe(subscriptions, (err, granted) => {
if (err) {
console.error('Erro ao subscrever:', err);
} else {
console.log('Subscrições confirmadas:');
granted.forEach(g => {
console.log(` ${g.topic} - QoS ${g.qos}`);
});
}
});
});
client.on('message', (topic, message, packet) => {
console.log('\n--- Nova Mensagem ---');
console.log(`Tópico: ${topic}`);
console.log(`QoS: ${packet.qos}`);
console.log(`Retained: ${packet.retain}`);
try {
const data = JSON.parse(message.toString());
console.log('Dados:', JSON.stringify(data, null, 2));
} catch {
console.log('Mensagem:', message.toString());
}
});
client.on('error', (err) => {
console.error('Erro:', err);
});
process.on('SIGINT', () => {
console.log('\nEncerrando...');
client.end();
process.exit();
});
Cliente ESP32/ESP8266 (Arduino)
Bibliotecas necessárias:
- PubSubClient
#include <WiFi.h>
#include <PubSubClient.h>
// Configurações WiFi
const char* ssid = "SUA_REDE";
const char* password = "SUA_SENHA";
// Configurações MQTT
const char* mqtt_server = "192.168.1.100";
const int mqtt_port = 1883;
const char* mqtt_user = "usuario1";
const char* mqtt_password = "senha123";
const char* client_id = "esp32_sensor_01";
WiFiClient espClient;
PubSubClient client(espClient);
// Tópicos
const char* topic_temp = "casa/quarto/temperatura";
const char* topic_status = "casa/quarto/status";
const char* topic_cmd = "casa/quarto/comando";
void setup_wifi() {
delay(10);
Serial.println();
Serial.print("Conectando ao WiFi: ");
Serial.println(ssid);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("");
Serial.println("WiFi conectado");
Serial.print("Endereço IP: ");
Serial.println(WiFi.localIP());
}
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Mensagem recebida no tópico: ");
Serial.println(topic);
String message;
for (int i = 0; i < length; i++) {
message += (char)payload[i];
}
Serial.print("Mensagem: ");
Serial.println(message);
// Processar comandos
if (String(topic) == topic_cmd) {
if (message == "ligar_led") {
digitalWrite(LED_BUILTIN, HIGH);
Serial.println("LED ligado");
} else if (message == "desligar_led") {
digitalWrite(LED_BUILTIN, LOW);
Serial.println("LED desligado");
}
}
}
void reconnect() {
while (!client.connected()) {
Serial.print("Conectando ao MQTT...");
// Will message
if (client.connect(client_id, mqtt_user, mqtt_password,
topic_status, 1, true, "offline")) {
Serial.println("conectado");
// Publicar status online
client.publish(topic_status, "online", true);
// Subscrever a tópicos
client.subscribe(topic_cmd, 1);
} else {
Serial.print("falhou, rc=");
Serial.print(client.state());
Serial.println(" tentando novamente em 5 segundos");
delay(5000);
}
}
}
void setup() {
Serial.begin(115200);
pinMode(LED_BUILTIN, OUTPUT);
setup_wifi();
client.setServer(mqtt_server, mqtt_port);
client.setCallback(callback);
}
void loop() {
if (!client.connected()) {
reconnect();
}
client.loop();
// Publicar temperatura a cada 10 segundos
static unsigned long lastMsg = 0;
unsigned long now = millis();
if (now - lastMsg > 10000) {
lastMsg = now;
// Simular leitura de sensor
float temperatura = 20.0 + (random(0, 100) / 10.0);
char msg[50];
snprintf(msg, 50, "{\"temp\":%.2f,\"unit\":\"C\"}", temperatura);
Serial.print("Publicando: ");
Serial.println(msg);
client.publish(topic_temp, msg, false);
}
}
Melhores Práticas de Produção
Design de Tópicos
Boas práticas:
✓ empresa/localidade/edificio/andar/sala/dispositivo/metrica
✓ iot/devices/{device_id}/telemetry/{metric}
✓ app/users/{user_id}/notifications
✓ system/logs/{service_name}/{level}
✗ topic123
✗ data
✗ temp
✗ muito/longo/demais/com/muitos/niveis/que/complica/gestao/topico
Convenções recomendadas:
- Use lowercase e separadores consistentes (/ ou -)
- Primeiro nível: domínio/namespace (empresa, iot, system)
- Coloque identificadores únicos no meio (device_id, user_id)
- Termine com a métrica ou tipo de dado
- Evite mais de 7-8 níveis
- Não use caracteres especiais (#, +, $, espaços)
Estratégias de QoS
Escolha o QoS apropriado:
# QoS 0 - Leituras frequentes não críticas
client.publish("sensores/temperatura/ambiente", temp, qos=0)
# QoS 1 - Dados importantes que toleram duplicatas
client.publish("alertas/movimento", alerta, qos=1)
# QoS 2 - Comandos críticos, transações
client.publish("controle/portas/tranca", comando, qos=2)
Gerenciamento de Sessões
Clean Session = True:
- Usado para clientes temporários
- Não mantém estado no broker
- Economiza memória no broker
- Ideal para publishers que só enviam dados
Clean Session = False:
- Usado para subscribers importantes
- Mantém subscrições entre reconexões
- Recebe mensagens perdidas durante desconexão
- Requer Client ID único e estável
Retained Messages
Quando usar:
# Estados que novos clientes precisam saber imediatamente
client.publish("casa/sala/luz/estado", "ligada", retain=True)
client.publish("sensores/temp01/ultimo_valor", "23.5", retain=True)
client.publish("dispositivo/config", config_json, retain=True)
# Limpar retained message
client.publish("topico/antigo", None, retain=True)
Will Messages
Implementação de presença:
client = mqtt.Client("device_01")
# Configurar will message
client.will_set(
topic="devices/device_01/status",
payload="offline",
qos=1,
retain=True
)
# Ao conectar, publicar status online
def on_connect(client, userdata, flags, rc):
client.publish("devices/device_01/status", "online",
qos=1, retain=True)
Tratamento de Erros e Reconexão
Python com retry exponencial:
import time
import random
def connect_with_retry(client, broker, max_retries=10):
retry_count = 0
while retry_count < max_retries:
try:
client.connect(broker, 1883, 60)
print("Conectado com sucesso")
return True
except Exception as e:
retry_count += 1
wait_time = min(300, (2 ** retry_count) + random.uniform(0, 1))
print(f"Falha na conexão: {e}")
print(f"Tentativa {retry_count}/{max_retries}")
print(f"Aguardando {wait_time:.2f}s...")
time.sleep(wait_time)
print("Número máximo de tentativas excedido")
return False
# Usar
client = mqtt.Client()
if connect_with_retry(client, "mqtt.exemplo.com"):
client.loop_forever()
Node.js com reconexão automática:
const mqtt = require('mqtt');
const options = {
clientId: 'robust_client',
reconnectPeriod: 5000, // Tentar reconectar a cada 5s
connectTimeout: 30000, // Timeout de conexão 30s
keepalive: 60,
// Tentar múltiplos servidores
servers: [
{ host: 'mqtt1.exemplo.com', port: 1883 },
{ host: 'mqtt2.exemplo.com', port: 1883 }
],
will: {
topic: 'clients/robust_client/status',
payload: 'offline_unexpected',
qos: 1,
retain: true
}
};
const client = mqtt.connect(options);
client.on('reconnect', () => {
console.log('Tentando reconectar...');
});
client.on('offline', () => {
console.log('Cliente offline');
});
client.on('error', (error) => {
console.error('Erro:', error.message);
});
Segurança em Profundidade
TLS/SSL
Gerar certificados para Mosquitto:
# Criar CA
openssl genrsa -out ca.key 2048
openssl req -new -x509 -days 3650 -key ca.key -out ca.crt
# Criar certificado do servidor
openssl genrsa -out server.key 2048
openssl req -new -key server.key -out server.csr
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key \
-CAcreateserial -out server.crt -days 3650
# Criar certificado do cliente
openssl genrsa -out client.key 2048
openssl req -new -key client.key -out client.csr
openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key \
-CAcreateserial -out client.crt -days 3650
Cliente Python com TLS:
import ssl
import paho.mqtt.client as mqtt
client = mqtt.Client()
# Configurar TLS
client.tls_set(
ca_certs="ca.crt",
certfile="client.crt",
keyfile="client.key",
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1_2
)
# TLS sem verificação do certificado (apenas para dev!)
# client.tls_insecure_set(True)
client.connect("mqtt.exemplo.com", 8883, 60)
ACL (Access Control Lists)
Configuração Mosquitto (/etc/mosquitto/acl):
# Usuário admin - acesso total
user admin
topic readwrite #
# Sensores - apenas publicam seus dados
user sensor_01
topic write sensores/sensor_01/#
topic read sensores/sensor_01/config
# Dashboard - apenas lê
user dashboard
topic read sensores/#
topic read alertas/#
# Serviço de controle - lê sensores e escreve comandos
user servico_controle
topic read sensores/#
topic write comandos/#
# Padrões com wildcards
user device_%u
topic readwrite devices/%u/#
Ativar ACL no mosquitto.conf:
acl_file /etc/mosquitto/acl
Autenticação com JWT
Exemplo com mosquitto-auth-plugin:
import jwt
import time
import paho.mqtt.client as mqtt
# Gerar token JWT
def generate_token(device_id, secret_key):
payload = {
'device_id': device_id,
'exp': time.time() + 3600, # Expira em 1 hora
'iat': time.time()
}
return jwt.encode(payload, secret_key, algorithm='HS256')
# Usar token como senha
token = generate_token('device_01', 'meu_secret_key')
client = mqtt.Client(client_id='device_01')
client.username_pw_set(username='device_01', password=token)
client.connect('mqtt.exemplo.com', 1883)
Escalabilidade e Alta Disponibilidade
Clustering
EMQX Cluster (exemplo docker-compose.yml):
version: '3'
services:
emqx1:
image: emqx/emqx:latest
environment:
- EMQX_NAME=emqx
- EMQX_CLUSTER__DISCOVERY=static
- EMQX_CLUSTER__STATIC__SEEDS=emqx@emqx1,emqx@emqx2,emqx@emqx3
ports:
- "1883:1883"
- "18083:18083"
networks:
- emqx-net
emqx2:
image: emqx/emqx:latest
environment:
- EMQX_NAME=emqx
- EMQX_CLUSTER__DISCOVERY=static
- EMQX_CLUSTER__STATIC__SEEDS=emqx@emqx1,emqx@emqx2,emqx@emqx3
ports:
- "1884:1883"
networks:
- emqx-net
emqx3:
image: emqx/emqx:latest
environment:
- EMQX_NAME=emqx
- EMQX_CLUSTER__DISCOVERY=static
- EMQX_CLUSTER__STATIC__SEEDS=emqx@emqx1,emqx@emqx2,emqx@emqx3
ports:
- "1885:1883"
networks:
- emqx-net
networks:
emqx-net:
driver: bridge
Load Balancing
HAProxy para MQTT:
global
log /dev/log local0
maxconn 50000
defaults
log global
mode tcp
option tcplog
timeout connect 5s
timeout client 50s
timeout server 50s
frontend mqtt_front
bind *:1883
default_backend mqtt_back
backend mqtt_back
balance leastconn
option tcp-check
server mqtt1 192.168.1.101:1883 check
server mqtt2 192.168.1.102:1883 check
server mqtt3 192.168.1.103:1883 check backup
Monitoramento
Prometheus + Grafana com EMQX:
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'emqx'
static_configs:
- targets: ['emqx1:18083', 'emqx2:18083', 'emqx3:18083']
Métricas importantes:
- Número de conexões ativas
- Taxa de mensagens/segundo
- Latência de entrega
- Uso de memória/CPU
- Mensagens na fila
- Taxa de desconexões
Padrões de Integração
MQTT para Kafka (Ponte)
Confluent MQTT Proxy ou implementação customizada:
import paho.mqtt.client as mqtt
from kafka import KafkaProducer
import json
# Kafka producer
kafka_producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def on_message(client, userdata, msg):
# Receber do MQTT
data = {
'topic': msg.topic,
'payload': msg.payload.decode(),
'qos': msg.qos,
'timestamp': time.time()
}
# Enviar para Kafka
kafka_topic = msg.topic.replace('/', '.')
kafka_producer.send(kafka_topic, value=data)
print(f"Ponte: {msg.topic} -> Kafka:{kafka_topic}")
mqtt_client = mqtt.Client()
mqtt_client.on_message = on_message
mqtt_client.connect("localhost", 1883)
mqtt_client.subscribe("sensores/#")
mqtt_client.loop_forever()
MQTT para HTTP/REST
Bridge MQTT -> Webhook:
import paho.mqtt.client as mqtt
import requests
import json
def on_message(client, userdata, msg):
payload = msg.payload.decode()
# Enviar para API REST
try:
response = requests.post(
'https://api.exemplo.com/mqtt-data',
json={
'topic': msg.topic,
'data': payload,
'qos': msg.qos
},
headers={'Authorization': 'Bearer TOKEN'},
timeout=5
)
if response.status_code == 200:
print(f"Dados enviados: {msg.topic}")
else:
print(f"Erro HTTP: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Erro ao enviar: {e}")
client = mqtt.Client()
client.on_message = on_message
client.connect("localhost", 1883)
client.subscribe("eventos/#")
client.loop_forever()
Request/Response Pattern
Implementação RPC-like com MQTT:
# Cliente que faz requisição
import paho.mqtt.client as mqtt
import uuid
import json
import time
class MQTTRPCClient:
def __init__(self, broker):
self.client = mqtt.Client()
self.client.on_message = self.on_response
self.client.connect(broker, 1883)
self.client.loop_start()
self.pending_requests = {}
self.response_topic = f"responses/{uuid.uuid4()}"
self.client.subscribe(self.response_topic)
def on_response(self, client, userdata, msg):
data = json.loads(msg.payload.decode())
request_id = data['request_id']
if request_id in self.pending_requests:
self.pending_requests[request_id] = data['result']
def call(self, method, params, timeout=10):
request_id = str(uuid.uuid4())
request = {
'method': method,
'params': params,
'request_id': request_id,
'response_topic': self.response_topic
}
self.pending_requests[request_id] = None
self.client.publish('rpc/requests', json.dumps(request))
# Aguardar resposta
start_time = time.time()
while self.pending_requests[request_id] is None:
if time.time() - start_time > timeout:
raise TimeoutError("RPC timeout")
time.sleep(0.1)
result = self.pending_requests[request_id]
del self.pending_requests[request_id]
return result
# Servidor RPC
class MQTTRPCServer:
def __init__(self, broker):
self.client = mqtt.Client()
self.client.on_message = self.on_request
self.client.connect(broker, 1883)
self.client.subscribe('rpc/requests')
self.methods = {}
def register(self, name, func):
self.methods[name] = func
def on_request(self, client, userdata, msg):
request = json.loads(msg.payload.decode())
method = request['method']
params = request['params']
request_id = request['request_id']
response_topic = request['response_topic']
if method in self.methods:
try:
result = self.methods[method](**params)
response = {
'request_id': request_id,
'result': result,
'error': None
}
except Exception as e:
response = {
'request_id': request_id,
'result': None,
'error': str(e)
}
else:
response = {
'request_id': request_id,
'result': None,
'error': f'Method {method} not found'
}
self.client.publish(response_topic, json.dumps(response))
def start(self):
self.client.loop_forever()
# Uso
if __name__ == '__main__':
# Servidor
server = MQTTRPCServer('localhost')
server.register('add', lambda a, b: a + b)
server.register('multiply', lambda a, b: a * b)
# server.start() # Executar em thread separada
# Cliente
client = MQTTRPCClient('localhost')
result = client.call('add', {'a': 5, 'b': 3})
print(f"Resultado: {result}") # 8
Troubleshooting Comum
Problema: Mensagens não chegam
Diagnóstico:
# 1. Verificar se o broker está rodando
sudo systemctl status mosquitto
# 2. Testar conexão
mosquitto_sub -h localhost -t teste -v
# 3. Em outro terminal, publicar
mosquitto_pub -h localhost -t teste -m "mensagem teste"
# 4. Verificar logs
tail -f /var/log/mosquitto/mosquitto.log
# 5. Testar com debug
mosquitto_sub -h localhost -t '#' -v -d
Possíveis causas:
- QoS incompatível
- Cliente não está subscrito ao tópico correto (verificar wildcards)
- Clean session=true e cliente desconectou antes de receber
- ACL bloqueando
- Firewall bloqueando porta
Problema: Alta latência
Investigar:
import paho.mqtt.client as mqtt
import time
latencies = []
def on_message(client, userdata, msg):
sent_time = float(msg.payload.decode())
latency = (time.time() - sent_time) * 1000 # ms
latencies.append(latency)
print(f"Latência: {latency:.2f}ms")
if len(latencies) >= 100:
avg = sum(latencies) / len(latencies)
print(f"\nLatência média: {avg:.2f}ms")
print(f"Min: {min(latencies):.2f}ms")
print(f"Max: {max(latencies):.2f}ms")
client = mqtt.Client()
client.on_message = on_message
client.connect("localhost", 1883)
client.subscribe("latency/test")
# Publicar timestamps
import threading
def publish_loop():
pub_client = mqtt.Client()
pub_client.connect("localhost", 1883)
while True:
pub_client.publish("latency/test", str(time.time()))
time.sleep(1)
threading.Thread(target=publish_loop, daemon=True).start()
client.loop_forever()
Otimizações:
- Reduzir QoS se possível
- Aumentar keep-alive interval
- Usar conexão local ao broker
- Verificar sobrecarga do broker
- Considerar clustering/load balancing
Problema: Broker consumindo muita memória
Verificar:
# Estatísticas do broker
mosquitto_sub -h localhost -t '$SYS/#' -v
# Importantes:
# $SYS/broker/clients/connected
# $SYS/broker/messages/stored
# $SYS/broker/subscriptions/count
Soluções:
- Limitar mensagens retidas
- Configurar limites no mosquitto.conf:
max_queued_messages 1000
max_inflight_messages 20
message_size_limit 10240
- Implementar TTL para retained messages
- Limpar retained messages antigas:
mosquitto_pub -h localhost -t 'topico/antigo' -n -r
Ferramentas e Brokers Populares
Brokers
Eclipse Mosquitto (Open Source)
- Leve e rápido
- Ideal para projetos pequenos/médios
- Suporta MQTT 3.1.1 e 5.0
- Fácil configuração
EMQX (Open Source/Enterprise)
- Alto desempenho (milhões de conexões)
- Clustering nativo
- Dashboard web rico
- Extensível com plugins
HiveMQ (Enterprise)
- Altamente escalável
- Suporte empresarial
- Integrações prontas (Kafka, DBs)
- Versão community gratuita
VerneMQ (Open Source)
- Escrito em Erlang
- Muito escalável
- Clustering distribuído
AWS IoT Core (Cloud)
- Totalmente gerenciado
- Integração com AWS services
- Segurança robusta
- Pay-per-use
Ferramentas de Teste
MQTT.fx (Desktop)
- GUI completa
- Suporta múltiplas conexões
- Scripts de teste
MQTT Explorer (Desktop, Open Source)
- Visualização hierárquica de tópicos
- Diff de mensagens
- Histórico
Mosquitto CLI:
# Publicar
mosquitto_pub -h broker.com -t topico -m "mensagem" -q 1 -u user -P pass
# Subscrever
mosquitto_sub -h broker.com -t 'topico/#' -v -u user -P pass
# Benchmark
mosquitto_pub -h localhost -t test -m "test" -r 1000 -c
# Subscrever com timestamp
mosquitto_sub -h localhost -t '#' -v -F '@Y-@m-@dT@H:@M:@S@z : %t : %p'
MQTT.js CLI:
# Instalar
npm install -g mqtt
# Publicar
mqtt pub -h broker.com -t 'topico' -m 'mensagem'
# Subscrever
mqtt sub -h broker.com -t 'topico/#' -v
# Bench
mqtt bench -h localhost -t test -c 100 -m 1000
Performance e Otimização
Benchmarking
Teste de throughput:
import paho.mqtt.client as mqtt
import time
import threading
messages_sent = 0
messages_received = 0
start_time = None
def publisher():
global messages_sent, start_time
client = mqtt.Client()
client.connect("localhost", 1883)
start_time = time.time()
for i in range(10000):
client.publish("benchmark/test", f"message_{i}", qos=0)
messages_sent += 1
client.disconnect()
def subscriber():
global messages_received
def on_message(client, userdata, msg):
global messages_received
messages_received += 1
if messages_received >= 10000:
duration = time.time() - start_time
print(f"\n--- Resultados ---")
print(f"Mensagens: {messages_received}")
print(f"Tempo: {duration:.2f}s")
print(f"Throughput: {messages_received/duration:.2f} msg/s")
client.disconnect()
client = mqtt.Client()
client.on_message = on_message
client.connect("localhost", 1883)
client.subscribe("benchmark/test")
client.loop_forever()
# Executar
threading.Thread(target=subscriber, daemon=True).start()
time.sleep(1)
publisher()
time.sleep(5)
Otimizações de Cliente
Batching de mensagens:
import paho.mqtt.client as mqtt
import time
client = mqtt.Client()
client.connect("localhost", 1883)
# Publicar múltiplas mensagens
messages = []
for i in range(1000):
info = client.publish(f"sensor/data/{i}", f"value_{i}", qos=0)
messages.append(info)
# Aguardar todas serem publicadas
for info in messages:
info.wait_for_publish()
print("Todas as mensagens publicadas")
Conexões persistentes:
# Manter conexão aberta e reutilizar
client = mqtt.Client()
client.connect("localhost", 1883, keepalive=3600) # 1 hora
client.loop_start() # Loop em background
# Publicar quando necessário
for data in sensor_readings():
client.publish("sensors/data", data)
time.sleep(1)
# Conexão permanece aberta
Otimizações de Broker
Mosquitto otimizado (mosquitto.conf):
# Persistência otimizada
persistence true
persistence_location /var/lib/mosquitto/
autosave_interval 300
autosave_on_changes false
# Limites otimizados
max_connections 10000
max_queued_messages 1000
max_inflight_messages 20
max_keepalive 300
# Mensagens
message_size_limit 10240
queue_qos0_messages false
# Logging (menos verbose em produção)
log_dest file /var/log/mosquitto/mosquitto.log
log_type error
log_type warning
Casos de Uso Avançados
Smart Home Completo
Arquitetura:
Dispositivos IoT → MQTT Broker → Home Assistant → UI/Apps
↓
Automações
↓
Banco de Dados (InfluxDB)
↓
Grafana (Visualização)
Exemplo de automação:
import paho.mqtt.client as mqtt
import json
class HomeAutomation:
def __init__(self):
self.client = mqtt.Client()
self.client.on_message = self.on_message
self.client.connect("localhost", 1883)
# Subscrever sensores
self.client.subscribe("casa/+/temperatura")
self.client.subscribe("casa/+/movimento")
self.client.subscribe("casa/+/porta")
self.estado = {}
def on_message(self, client, userdata, msg):
parts = msg.topic.split('/')
local = parts[1]
sensor = parts[2]
valor = msg.payload.decode()
# Armazenar estado
if local not in self.estado:
self.estado[local] = {}
self.estado[local][sensor] = valor
# Regras de automação
self.aplicar_regras(local, sensor, valor)
def aplicar_regras(self, local, sensor, valor):
# Regra: Temperatura alta -> ligar AC
if sensor == "temperatura":
temp = float(valor)
if temp > 26:
self.client.publish(
f"casa/{local}/ar_condicionado/comando",
"ligar"
)
print(f"AC ligado em {local} (temp: {temp}°C)")
# Regra: Movimento detectado + noite -> ligar luz
if sensor == "movimento" and valor == "detectado":
import datetime
hora = datetime.datetime.now().hour
if hora >= 18 or hora <= 6:
self.client.publish(
f"casa/{local}/luz/comando",
"ligar"
)
print(f"Luz ligada em {local} (movimento noturno)")
# Regra: Porta aberta + ninguém em casa -> alerta
if sensor == "porta" and valor == "aberta":
pessoas_casa = any(
local.get("movimento") == "detectado"
for local in self.estado.values()
)
if not pessoas_casa:
self.client.publish(
"casa/alertas/seguranca",
json.dumps({
"tipo": "porta_aberta",
"local": local,
"timestamp": datetime.datetime.now().isoformat()
}),
qos=2
)
print(f"ALERTA: Porta aberta em {local} sem presença!")
def start(self):
self.client.loop_forever()
# Executar
automation = HomeAutomation()
automation.start()
Frota de Veículos (Telemetria)
# Veículo publicando telemetria
import paho.mqtt.client as mqtt
import json
import time
import random
class VehicleTelemetry:
def __init__(self, vehicle_id):
self.vehicle_id = vehicle_id
self.client = mqtt.Client(f"vehicle_{vehicle_id}")
self.client.connect("fleet.mqtt.com", 1883)
# Will message para detectar perda de sinal
self.client.will_set(
f"fleet/vehicles/{vehicle_id}/status",
"offline",
qos=1,
retain=True
)
self.client.loop_start()
# Publicar status online
self.client.publish(
f"fleet/vehicles/{vehicle_id}/status",
"online",
retain=True
)
def send_telemetry(self):
while True:
telemetry = {
"vehicle_id": self.vehicle_id,
"timestamp": time.time(),
"position": {
"lat": -23.550520 + random.uniform(-0.1, 0.1),
"lon": -46.633308 + random.uniform(-0.1, 0.1)
},
"speed": random.randint(0, 120),
"fuel_level": random.randint(0, 100),
"engine_temp": random.randint(80, 110),
"rpm": random.randint(800, 4000),
"odometer": random.randint(10000, 200000)
}
# Publicar telemetria (QoS 0 - dados frequentes)
self.client.publish(
f"fleet/vehicles/{self.vehicle_id}/telemetry",
json.dumps(telemetry),
qos=0
)
# Alertas críticos (QoS 2)
if telemetry["engine_temp"] > 105:
self.client.publish(
f"fleet/alerts/critical",
json.dumps({
"vehicle_id": self.vehicle_id,
"type": "high_temperature",
"value": telemetry["engine_temp"],
"timestamp": time.time()
}),
qos=2
)
time.sleep(5) # Telemetria a cada 5s
# Central de monitoramento
class FleetMonitoring:
def __init__(self):
self.client = mqtt.Client("fleet_monitoring")
self.client.on_message = self.on_telemetry
self.client.connect("fleet.mqtt.com", 1883)
# Monitorar todos os veículos
self.client.subscribe("fleet/vehicles/+/telemetry")
self.client.subscribe("fleet/vehicles/+/status")
self.client.subscribe("fleet/alerts/#")
self.vehicles = {}
def on_telemetry(self, client, userdata, msg):
if "/telemetry" in msg.topic:
data = json.loads(msg.payload.decode())
vehicle_id = data["vehicle_id"]
self.vehicles[vehicle_id] = data
# Processar dados...
self.check_maintenance(vehicle_id, data)
elif "/alerts/" in msg.topic:
alert = json.loads(msg.payload.decode())
self.handle_alert(alert)
def check_maintenance(self, vehicle_id, data):
# Verificar necessidade de manutenção
if data["odometer"] % 10000 < 100: # Próximo de múltiplo de 10k
self.client.publish(
f"fleet/vehicles/{vehicle_id}/maintenance",
json.dumps({
"type": "scheduled",
"reason": "odometer_milestone",
"odometer": data["odometer"]
}),
qos=1
)
def handle_alert(self, alert):
print(f"🚨 ALERTA: {alert}")
# Enviar notificação, email, etc.
def start(self):
self.client.loop_forever()
Industrial IoT (IIoT)
# Máquina industrial
class IndustrialMachine:
def __init__(self, machine_id, line, station):
self.machine_id = machine_id
self.topic_base = f"factory/line{line}/station{station}/{machine_id}"
self.client = mqtt.Client(f"machine_{machine_id}")
self.client.connect("factory.mqtt.local", 1883)
self.client.loop_start()
# Subscrever comandos
self.client.subscribe(f"{self.topic_base}/commands")
self.client.on_message = self.on_command
self.running = False
self.parts_produced = 0
def on_command(self, client, userdata, msg):
command = msg.payload.decode()
if command == "start":
self.running = True
print(f"Máquina {self.machine_id} iniciada")
elif command == "stop":
self.running = False
print(f"Máquina {self.machine_id} parada")
elif command == "reset_counter":
self.parts_produced = 0
def run(self):
while True:
if self.running:
# Simular produção
time.sleep(2)
self.parts_produced += 1
# Publicar métricas OEE (Overall Equipment Effectiveness)
metrics = {
"machine_id": self.machine_id,
"timestamp": time.time(),
"status": "running",
"parts_produced": self.parts_produced,
"cycle_time": 2.0,
"temperature": 65 + random.uniform(-5, 5),
"vibration": random.uniform(0, 10),
"power_consumption": random.uniform(50, 100)
}
self.client.publish(
f"{self.topic_base}/metrics",
json.dumps(metrics),
qos=1
)
# Publicar eventos de produção
if self.parts_produced % 100 == 0:
self.client.publish(
f"{self.topic_base}/events",
json.dumps({
"type": "milestone",
"parts": self.parts_produced
}),
qos=2
)
else:
time.sleep(1)
Conclusão
MQTT é um protocolo poderoso e versátil que se tornou o padrão de fato para comunicação em IoT. Sua arquitetura publish/subscribe, combinada com recursos como QoS, retained messages e will messages, o torna ideal para uma ampla gama de aplicações, desde smart homes até sistemas industriais complexos.
Pontos-chave para lembrar:
- Escolha o QoS apropriado para cada caso de uso
- Projete uma hierarquia de tópicos clara e escalável
- Implemente segurança adequada (TLS, autenticação, ACL)
- Monitore performance e implemente reconexão robusta
- Considere clustering para alta disponibilidade
- Use ferramentas de teste e monitoramento
Próximos passos:
- Experimente diferentes brokers para encontrar o melhor para seu caso
- Implemente padrões avançados como request/response
- Integre com outras tecnologias (Kafka, bancos de dados, cloud)
- Otimize para seu caso de uso específico (latência vs throughput)
- Implemente observabilidade completa (logs, métricas, tracing)
MQTT continuará evoluindo com a versão 5.0 trazendo novos recursos como propriedades de mensagem, reason codes mais detalhados, e melhor suporte a request/response. É uma tecnologia essencial para qualquer desenvolvedor trabalhando com IoT e sistemas distribuídos.