Voltar para o blog
IoT5 min de leituraPor Conecta-Tech

Arquitetura MQTT - Um Guia Aprofundado

Guia completo sobre MQTT com exemplos práticos em Python, Node.js e ESP32. Aprenda sobre QoS, segurança, escalabilidade e casos de uso reais

  • MQTT
  • IoT
  • Messaging
  • Broker
  • Mosquitto
  • Python
  • Node.js
  • ESP32

Arquitetura MQTT - Um Guia Aprofundado

Vou explicar a arquitetura MQTT de forma detalhada e prática, com exemplos reais de implementação.

O que é MQTT?

MQTT (Message Queuing Telemetry Transport) é um protocolo de mensagens leve, baseado no padrão publish/subscribe (publicar/assinar), projetado para conexões com largura de banda limitada e dispositivos com recursos restritos. Foi criado em 1999 pela IBM e é padrão OASIS desde 2013.

Por que MQTT é tão popular na IoT?

  • Overhead mínimo de rede (pacotes pequenos a partir de 2 bytes)
  • Funciona bem em redes instáveis (3G, satélite, redes mesh)
  • Suporta milhares de clientes simultâneos
  • Desacoplamento total entre publicadores e assinantes
  • Três níveis de QoS para diferentes necessidades
  • Suporte nativo a reconexão automática

Arquitetura Fundamental

Componentes Principais

1. Broker (Servidor)

  • O "coração" do MQTT
  • Responsável por receber todas as mensagens, filtrar e distribuir aos clientes interessados
  • Gerencia conexões e autenticação
  • Exemplos: Mosquitto, HiveMQ, EMQX

2. Clients (Clientes)

  • Qualquer dispositivo que conecta ao broker
  • Pode ser publisher (publicador), subscriber (assinante) ou ambos
  • Exemplos: sensores IoT, aplicações mobile, servidores

3. Topics (Tópicos)

  • Estrutura hierárquica para organizar mensagens
  • Funciona como "endereços" para as mensagens
  • Exemplo: casa/sala/temperatura

Modelo Publish/Subscribe

Diferente do modelo cliente-servidor tradicional, no MQTT:

  1. Publishers publicam mensagens em tópicos específicos
  2. Broker recebe e roteia as mensagens
  3. Subscribers se inscrevem em tópicos de interesse
  4. Publishers e subscribers não se conhecem diretamente (desacoplamento)
[Sensor Temp] --publish--> [BROKER] --subscribe--> [App Dashboard]
    (Publisher)          (casa/sala/temp)           (Subscriber)

Hierarquia de Tópicos

Os tópicos usam "/" como separador hierárquico:

empresa/
├── predio1/
│   ├── andar1/
│   │   ├── sala101/temperatura
│   │   └── sala101/umidade
│   └── andar2/
│       └── sala201/luz
└── predio2/
    └── garagem/vagas

Wildcards

Single-level (+): substitui um nível

  • casa/+/temperaturacasa/sala/temperatura, casa/quarto/temperatura

Multi-level (#): substitui múltiplos níveis (deve ser o último caractere)

  • casa/# → todos os tópicos que começam com casa/

Quality of Service (QoS)

O MQTT oferece 3 níveis de garantia de entrega:

QoS 0 - At most once (No máximo uma vez)

  • "Fire and forget" (disparar e esquecer)
  • Sem confirmação de recebimento
  • Mais rápido, menor overhead
  • Uso: dados não críticos, leituras frequentes de sensores

QoS 1 - At least once (Pelo menos uma vez)

  • Garante que a mensagem chegue
  • Pode haver duplicatas
  • Requer ACK (acknowledgment)
  • Uso: dados importantes que toleram duplicatas

QoS 2 - Exactly once (Exatamente uma vez)

  • Garante entrega única
  • Mais lento, maior overhead
  • Processo de handshake em 4 etapas
  • Uso: dados críticos, transações financeiras

Conexão e Sessão

Processo de Conexão

  1. CONNECT: Cliente envia pacote de conexão
  2. CONNACK: Broker responde confirmando
  3. Troca de mensagens PUBLISH/SUBSCRIBE
  4. DISCONNECT: Encerramento limpo (opcional)

Parâmetros da Conexão

Client ID: identificador único do cliente Clean Session:

  • true: descarta sessão anterior
  • false: mantém subscrições e mensagens pendentes

Keep Alive: intervalo em segundos para verificar se a conexão está ativa

  • Cliente envia PINGREQ se não houver atividade
  • Broker responde com PINGRESP

Will Message (Mensagem de última vontade):

  • Mensagem enviada pelo broker se o cliente desconectar inesperadamente
  • Útil para notificar falhas: dispositivo/status → "offline"

Retained Messages (Mensagens Retidas)

  • Broker armazena a última mensagem de um tópico
  • Novos subscribers recebem imediatamente
  • Útil para estados: sala/luz → "ligada"
  • Definida com flag retain=true

Segurança

Camadas de Segurança

1. Autenticação

  • Username/Password
  • Client certificates (TLS mutual auth)
  • Tokens OAuth

2. Criptografia

  • TLS/SSL (porta 8883)
  • Protege dados em trânsito

3. Autorização

  • ACL (Access Control Lists)
  • Define quem pode pub/sub em quais tópicos

Fluxo Completo de Comunicação

1. Cliente A conecta ao broker
   CLIENT A → CONNECT → BROKER
   BROKER → CONNACK → CLIENT A

2. Cliente A subscreve a um tópico
   CLIENT A → SUBSCRIBE(casa/temperatura) → BROKER
   BROKER → SUBACK → CLIENT A

3. Cliente B publica mensagem
   CLIENT B → PUBLISH(casa/temperatura, "22°C") → BROKER

4. Broker distribui aos subscribers
   BROKER → PUBLISH(casa/temperatura, "22°C") → CLIENT A

Casos de Uso Reais

IoT e Automação Residencial

  • Sensores publicam em casa/sensores/+
  • App subscreve a todos os sensores
  • Comandos enviados para casa/dispositivos/luz/cmd

Monitoramento Industrial

  • Máquinas publicam telemetria
  • Sistema central monitora fabrica/#
  • Alertas em tópicos específicos

Chat e Notificações

  • Usuários subscrevem a chat/sala123
  • Mensagens publicadas no mesmo tópico
  • Presença via will messages

Vantagens do MQTT

  • Leve: overhead mínimo (2 bytes de header)
  • Eficiente: ideal para redes instáveis
  • Escalável: milhares de clientes simultâneos
  • Desacoplado: publishers/subscribers independentes
  • Bidirecional: comunicação em ambas direções

Limitações

  • Sem descoberta automática de serviços
  • Payload opaco (sem estrutura definida)
  • Broker é ponto único de falha (requer redundância)
  • Sem suporte nativo a RPC (request/response)

Comparação com Outros Protocolos

HTTP/REST:

  • MQTT: conexão persistente, push
  • HTTP: request/response, polling necessário

WebSocket:

  • Similar em conexão persistente
  • MQTT adiciona QoS, retained messages, estrutura de tópicos

AMQP:

  • Mais complexo e robusto
  • MQTT mais leve e simples

Implementação Prática

Configurando um Broker (Mosquitto)

Instalação no Linux/Ubuntu:

sudo apt-get update
sudo apt-get install mosquitto mosquitto-clients

# Iniciar o serviço
sudo systemctl start mosquitto
sudo systemctl enable mosquitto

# Verificar status
sudo systemctl status mosquitto

Configuração Básica (/etc/mosquitto/mosquitto.conf):

# Porta padrão
listener 1883

# Porta com TLS
listener 8883
cafile /etc/mosquitto/ca_certificates/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key

# WebSocket
listener 9001
protocol websockets

# Autenticação
allow_anonymous false
password_file /etc/mosquitto/passwd

# Logging
log_dest file /var/log/mosquitto/mosquitto.log
log_type all

# Persistência
persistence true
persistence_location /var/lib/mosquitto/

# Limites
max_connections 1000
max_queued_messages 1000

Criar usuários:

sudo mosquitto_passwd -c /etc/mosquitto/passwd usuario1
sudo mosquitto_passwd /etc/mosquitto/passwd usuario2
sudo systemctl restart mosquitto

Cliente Python (Paho MQTT)

Instalação:

pip install paho-mqtt

Publisher Simples:

import paho.mqtt.client as mqtt
import time
import json

# Callback quando conectado
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Conectado ao broker!")
    else:
        print(f"Falha na conexão. Código: {rc}")

# Callback quando publicado
def on_publish(client, userdata, mid):
    print(f"Mensagem {mid} publicada")

# Configurar cliente
client = mqtt.Client(client_id="sensor_temperatura_01")
client.username_pw_set("usuario1", "senha123")
client.on_connect = on_connect
client.on_publish = on_publish

# Conectar
client.connect("localhost", 1883, 60)
client.loop_start()

# Publicar dados simulados
try:
    while True:
        temperatura = 20 + (time.time() % 10)  # Simular variação
        payload = json.dumps({
            "sensor_id": "temp_01",
            "temperatura": round(temperatura, 2),
            "unidade": "Celsius",
            "timestamp": time.time()
        })

        result = client.publish(
            topic="casa/sala/temperatura",
            payload=payload,
            qos=1,
            retain=True
        )

        print(f"Publicado: {payload}")
        time.sleep(5)

except KeyboardInterrupt:
    print("\nEncerrando...")
    client.loop_stop()
    client.disconnect()

Subscriber Completo:

import paho.mqtt.client as mqtt
import json

def on_connect(client, userdata, flags, rc):
    print(f"Conectado com código: {rc}")

    # Subscrever a múltiplos tópicos
    client.subscribe([
        ("casa/+/temperatura", 1),
        ("casa/+/umidade", 1),
        ("casa/alertas/#", 2)
    ])
    print("Inscrito nos tópicos")

def on_message(client, userdata, msg):
    print(f"\n--- Nova Mensagem ---")
    print(f"Tópico: {msg.topic}")
    print(f"QoS: {msg.qos}")
    print(f"Retained: {msg.retain}")

    try:
        payload = json.loads(msg.payload.decode())
        print(f"Dados: {json.dumps(payload, indent=2)}")
    except:
        print(f"Payload: {msg.payload.decode()}")

def on_subscribe(client, userdata, mid, granted_qos):
    print(f"Subscrição confirmada - QoS: {granted_qos}")

def on_disconnect(client, userdata, rc):
    if rc != 0:
        print(f"Desconexão inesperada. Código: {rc}")
        print("Tentando reconectar...")

# Configurar cliente com sessão persistente
client = mqtt.Client(
    client_id="monitor_casa_01",
    clean_session=False  # Mantém subscrições
)

client.username_pw_set("usuario1", "senha123")
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
client.on_disconnect = on_disconnect

# Will message - notifica se desconectar inesperadamente
client.will_set(
    topic="casa/monitor/status",
    payload="offline",
    qos=1,
    retain=True
)

# Conectar e manter loop
client.connect("localhost", 1883, 60)

try:
    client.loop_forever()
except KeyboardInterrupt:
    print("\nEncerrando...")
    client.disconnect()

Cliente Node.js (MQTT.js)

Instalação:

npm install mqtt

Publisher:

const mqtt = require('mqtt');

const client = mqtt.connect('mqtt://localhost:1883', {
  clientId: 'sensor_node_01',
  username: 'usuario1',
  password: 'senha123',
  clean: true,
  keepalive: 60,
  reconnectPeriod: 1000,
  will: {
    topic: 'casa/sensores/node01/status',
    payload: 'offline',
    qos: 1,
    retain: true
  }
});

client.on('connect', () => {
  console.log('Conectado ao broker');

  // Publicar status online
  client.publish('casa/sensores/node01/status', 'online', {
    qos: 1,
    retain: true
  });

  // Publicar dados periodicamente
  setInterval(() => {
    const data = {
      sensor_id: 'node_01',
      temperatura: (20 + Math.random() * 10).toFixed(2),
      umidade: (50 + Math.random() * 20).toFixed(2),
      timestamp: Date.now()
    };

    client.publish(
      'casa/sala/sensores',
      JSON.stringify(data),
      { qos: 1 },
      (err) => {
        if (err) {
          console.error('Erro ao publicar:', err);
        } else {
          console.log('Publicado:', data);
        }
      }
    );
  }, 5000);
});

client.on('error', (err) => {
  console.error('Erro de conexão:', err);
});

client.on('offline', () => {
  console.log('Cliente offline');
});

client.on('reconnect', () => {
  console.log('Reconectando...');
});

process.on('SIGINT', () => {
  console.log('\nEncerrando...');
  client.end(true);
  process.exit();
});

Subscriber:

const mqtt = require('mqtt');

const client = mqtt.connect('mqtt://localhost:1883', {
  clientId: 'app_dashboard',
  username: 'usuario1',
  password: 'senha123',
  clean: false, // Manter subscrições
});

client.on('connect', () => {
  console.log('Conectado ao broker');

  // Subscrever a múltiplos tópicos
  const subscriptions = {
    'casa/#': { qos: 1 },
    'alertas/+/critico': { qos: 2 }
  };

  client.subscribe(subscriptions, (err, granted) => {
    if (err) {
      console.error('Erro ao subscrever:', err);
    } else {
      console.log('Subscrições confirmadas:');
      granted.forEach(g => {
        console.log(`  ${g.topic} - QoS ${g.qos}`);
      });
    }
  });
});

client.on('message', (topic, message, packet) => {
  console.log('\n--- Nova Mensagem ---');
  console.log(`Tópico: ${topic}`);
  console.log(`QoS: ${packet.qos}`);
  console.log(`Retained: ${packet.retain}`);

  try {
    const data = JSON.parse(message.toString());
    console.log('Dados:', JSON.stringify(data, null, 2));
  } catch {
    console.log('Mensagem:', message.toString());
  }
});

client.on('error', (err) => {
  console.error('Erro:', err);
});

process.on('SIGINT', () => {
  console.log('\nEncerrando...');
  client.end();
  process.exit();
});

Cliente ESP32/ESP8266 (Arduino)

Bibliotecas necessárias:

  • PubSubClient
#include <WiFi.h>
#include <PubSubClient.h>

// Configurações WiFi
const char* ssid = "SUA_REDE";
const char* password = "SUA_SENHA";

// Configurações MQTT
const char* mqtt_server = "192.168.1.100";
const int mqtt_port = 1883;
const char* mqtt_user = "usuario1";
const char* mqtt_password = "senha123";
const char* client_id = "esp32_sensor_01";

WiFiClient espClient;
PubSubClient client(espClient);

// Tópicos
const char* topic_temp = "casa/quarto/temperatura";
const char* topic_status = "casa/quarto/status";
const char* topic_cmd = "casa/quarto/comando";

void setup_wifi() {
  delay(10);
  Serial.println();
  Serial.print("Conectando ao WiFi: ");
  Serial.println(ssid);

  WiFi.begin(ssid, password);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }

  Serial.println("");
  Serial.println("WiFi conectado");
  Serial.print("Endereço IP: ");
  Serial.println(WiFi.localIP());
}

void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Mensagem recebida no tópico: ");
  Serial.println(topic);

  String message;
  for (int i = 0; i < length; i++) {
    message += (char)payload[i];
  }
  Serial.print("Mensagem: ");
  Serial.println(message);

  // Processar comandos
  if (String(topic) == topic_cmd) {
    if (message == "ligar_led") {
      digitalWrite(LED_BUILTIN, HIGH);
      Serial.println("LED ligado");
    } else if (message == "desligar_led") {
      digitalWrite(LED_BUILTIN, LOW);
      Serial.println("LED desligado");
    }
  }
}

void reconnect() {
  while (!client.connected()) {
    Serial.print("Conectando ao MQTT...");

    // Will message
    if (client.connect(client_id, mqtt_user, mqtt_password,
                       topic_status, 1, true, "offline")) {
      Serial.println("conectado");

      // Publicar status online
      client.publish(topic_status, "online", true);

      // Subscrever a tópicos
      client.subscribe(topic_cmd, 1);

    } else {
      Serial.print("falhou, rc=");
      Serial.print(client.state());
      Serial.println(" tentando novamente em 5 segundos");
      delay(5000);
    }
  }
}

void setup() {
  Serial.begin(115200);
  pinMode(LED_BUILTIN, OUTPUT);

  setup_wifi();
  client.setServer(mqtt_server, mqtt_port);
  client.setCallback(callback);
}

void loop() {
  if (!client.connected()) {
    reconnect();
  }
  client.loop();

  // Publicar temperatura a cada 10 segundos
  static unsigned long lastMsg = 0;
  unsigned long now = millis();

  if (now - lastMsg > 10000) {
    lastMsg = now;

    // Simular leitura de sensor
    float temperatura = 20.0 + (random(0, 100) / 10.0);

    char msg[50];
    snprintf(msg, 50, "{\"temp\":%.2f,\"unit\":\"C\"}", temperatura);

    Serial.print("Publicando: ");
    Serial.println(msg);

    client.publish(topic_temp, msg, false);
  }
}

Melhores Práticas de Produção

Design de Tópicos

Boas práticas:

✓ empresa/localidade/edificio/andar/sala/dispositivo/metrica
✓ iot/devices/{device_id}/telemetry/{metric}
✓ app/users/{user_id}/notifications
✓ system/logs/{service_name}/{level}

✗ topic123
✗ data
✗ temp
✗ muito/longo/demais/com/muitos/niveis/que/complica/gestao/topico

Convenções recomendadas:

  • Use lowercase e separadores consistentes (/ ou -)
  • Primeiro nível: domínio/namespace (empresa, iot, system)
  • Coloque identificadores únicos no meio (device_id, user_id)
  • Termine com a métrica ou tipo de dado
  • Evite mais de 7-8 níveis
  • Não use caracteres especiais (#, +, $, espaços)

Estratégias de QoS

Escolha o QoS apropriado:

# QoS 0 - Leituras frequentes não críticas
client.publish("sensores/temperatura/ambiente", temp, qos=0)

# QoS 1 - Dados importantes que toleram duplicatas
client.publish("alertas/movimento", alerta, qos=1)

# QoS 2 - Comandos críticos, transações
client.publish("controle/portas/tranca", comando, qos=2)

Gerenciamento de Sessões

Clean Session = True:

  • Usado para clientes temporários
  • Não mantém estado no broker
  • Economiza memória no broker
  • Ideal para publishers que só enviam dados

Clean Session = False:

  • Usado para subscribers importantes
  • Mantém subscrições entre reconexões
  • Recebe mensagens perdidas durante desconexão
  • Requer Client ID único e estável

Retained Messages

Quando usar:

# Estados que novos clientes precisam saber imediatamente
client.publish("casa/sala/luz/estado", "ligada", retain=True)
client.publish("sensores/temp01/ultimo_valor", "23.5", retain=True)
client.publish("dispositivo/config", config_json, retain=True)

# Limpar retained message
client.publish("topico/antigo", None, retain=True)

Will Messages

Implementação de presença:

client = mqtt.Client("device_01")

# Configurar will message
client.will_set(
    topic="devices/device_01/status",
    payload="offline",
    qos=1,
    retain=True
)

# Ao conectar, publicar status online
def on_connect(client, userdata, flags, rc):
    client.publish("devices/device_01/status", "online",
                   qos=1, retain=True)

Tratamento de Erros e Reconexão

Python com retry exponencial:

import time
import random

def connect_with_retry(client, broker, max_retries=10):
    retry_count = 0

    while retry_count < max_retries:
        try:
            client.connect(broker, 1883, 60)
            print("Conectado com sucesso")
            return True
        except Exception as e:
            retry_count += 1
            wait_time = min(300, (2 ** retry_count) + random.uniform(0, 1))
            print(f"Falha na conexão: {e}")
            print(f"Tentativa {retry_count}/{max_retries}")
            print(f"Aguardando {wait_time:.2f}s...")
            time.sleep(wait_time)

    print("Número máximo de tentativas excedido")
    return False

# Usar
client = mqtt.Client()
if connect_with_retry(client, "mqtt.exemplo.com"):
    client.loop_forever()

Node.js com reconexão automática:

const mqtt = require('mqtt');

const options = {
  clientId: 'robust_client',
  reconnectPeriod: 5000,  // Tentar reconectar a cada 5s
  connectTimeout: 30000,  // Timeout de conexão 30s
  keepalive: 60,

  // Tentar múltiplos servidores
  servers: [
    { host: 'mqtt1.exemplo.com', port: 1883 },
    { host: 'mqtt2.exemplo.com', port: 1883 }
  ],

  will: {
    topic: 'clients/robust_client/status',
    payload: 'offline_unexpected',
    qos: 1,
    retain: true
  }
};

const client = mqtt.connect(options);

client.on('reconnect', () => {
  console.log('Tentando reconectar...');
});

client.on('offline', () => {
  console.log('Cliente offline');
});

client.on('error', (error) => {
  console.error('Erro:', error.message);
});

Segurança em Profundidade

TLS/SSL

Gerar certificados para Mosquitto:

# Criar CA
openssl genrsa -out ca.key 2048
openssl req -new -x509 -days 3650 -key ca.key -out ca.crt

# Criar certificado do servidor
openssl genrsa -out server.key 2048
openssl req -new -key server.key -out server.csr
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key \
  -CAcreateserial -out server.crt -days 3650

# Criar certificado do cliente
openssl genrsa -out client.key 2048
openssl req -new -key client.key -out client.csr
openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key \
  -CAcreateserial -out client.crt -days 3650

Cliente Python com TLS:

import ssl
import paho.mqtt.client as mqtt

client = mqtt.Client()

# Configurar TLS
client.tls_set(
    ca_certs="ca.crt",
    certfile="client.crt",
    keyfile="client.key",
    cert_reqs=ssl.CERT_REQUIRED,
    tls_version=ssl.PROTOCOL_TLSv1_2
)

# TLS sem verificação do certificado (apenas para dev!)
# client.tls_insecure_set(True)

client.connect("mqtt.exemplo.com", 8883, 60)

ACL (Access Control Lists)

Configuração Mosquitto (/etc/mosquitto/acl):

# Usuário admin - acesso total
user admin
topic readwrite #

# Sensores - apenas publicam seus dados
user sensor_01
topic write sensores/sensor_01/#
topic read sensores/sensor_01/config

# Dashboard - apenas lê
user dashboard
topic read sensores/#
topic read alertas/#

# Serviço de controle - lê sensores e escreve comandos
user servico_controle
topic read sensores/#
topic write comandos/#

# Padrões com wildcards
user device_%u
topic readwrite devices/%u/#

Ativar ACL no mosquitto.conf:

acl_file /etc/mosquitto/acl

Autenticação com JWT

Exemplo com mosquitto-auth-plugin:

import jwt
import time
import paho.mqtt.client as mqtt

# Gerar token JWT
def generate_token(device_id, secret_key):
    payload = {
        'device_id': device_id,
        'exp': time.time() + 3600,  # Expira em 1 hora
        'iat': time.time()
    }
    return jwt.encode(payload, secret_key, algorithm='HS256')

# Usar token como senha
token = generate_token('device_01', 'meu_secret_key')

client = mqtt.Client(client_id='device_01')
client.username_pw_set(username='device_01', password=token)
client.connect('mqtt.exemplo.com', 1883)

Escalabilidade e Alta Disponibilidade

Clustering

EMQX Cluster (exemplo docker-compose.yml):

version: '3'

services:
  emqx1:
    image: emqx/emqx:latest
    environment:
      - EMQX_NAME=emqx
      - EMQX_CLUSTER__DISCOVERY=static
      - EMQX_CLUSTER__STATIC__SEEDS=emqx@emqx1,emqx@emqx2,emqx@emqx3
    ports:
      - "1883:1883"
      - "18083:18083"
    networks:
      - emqx-net

  emqx2:
    image: emqx/emqx:latest
    environment:
      - EMQX_NAME=emqx
      - EMQX_CLUSTER__DISCOVERY=static
      - EMQX_CLUSTER__STATIC__SEEDS=emqx@emqx1,emqx@emqx2,emqx@emqx3
    ports:
      - "1884:1883"
    networks:
      - emqx-net

  emqx3:
    image: emqx/emqx:latest
    environment:
      - EMQX_NAME=emqx
      - EMQX_CLUSTER__DISCOVERY=static
      - EMQX_CLUSTER__STATIC__SEEDS=emqx@emqx1,emqx@emqx2,emqx@emqx3
    ports:
      - "1885:1883"
    networks:
      - emqx-net

networks:
  emqx-net:
    driver: bridge

Load Balancing

HAProxy para MQTT:

global
    log /dev/log local0
    maxconn 50000

defaults
    log global
    mode tcp
    option tcplog
    timeout connect 5s
    timeout client 50s
    timeout server 50s

frontend mqtt_front
    bind *:1883
    default_backend mqtt_back

backend mqtt_back
    balance leastconn
    option tcp-check
    server mqtt1 192.168.1.101:1883 check
    server mqtt2 192.168.1.102:1883 check
    server mqtt3 192.168.1.103:1883 check backup

Monitoramento

Prometheus + Grafana com EMQX:

# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'emqx'
    static_configs:
      - targets: ['emqx1:18083', 'emqx2:18083', 'emqx3:18083']

Métricas importantes:

  • Número de conexões ativas
  • Taxa de mensagens/segundo
  • Latência de entrega
  • Uso de memória/CPU
  • Mensagens na fila
  • Taxa de desconexões

Padrões de Integração

MQTT para Kafka (Ponte)

Confluent MQTT Proxy ou implementação customizada:

import paho.mqtt.client as mqtt
from kafka import KafkaProducer
import json

# Kafka producer
kafka_producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def on_message(client, userdata, msg):
    # Receber do MQTT
    data = {
        'topic': msg.topic,
        'payload': msg.payload.decode(),
        'qos': msg.qos,
        'timestamp': time.time()
    }

    # Enviar para Kafka
    kafka_topic = msg.topic.replace('/', '.')
    kafka_producer.send(kafka_topic, value=data)
    print(f"Ponte: {msg.topic} -> Kafka:{kafka_topic}")

mqtt_client = mqtt.Client()
mqtt_client.on_message = on_message
mqtt_client.connect("localhost", 1883)
mqtt_client.subscribe("sensores/#")
mqtt_client.loop_forever()

MQTT para HTTP/REST

Bridge MQTT -> Webhook:

import paho.mqtt.client as mqtt
import requests
import json

def on_message(client, userdata, msg):
    payload = msg.payload.decode()

    # Enviar para API REST
    try:
        response = requests.post(
            'https://api.exemplo.com/mqtt-data',
            json={
                'topic': msg.topic,
                'data': payload,
                'qos': msg.qos
            },
            headers={'Authorization': 'Bearer TOKEN'},
            timeout=5
        )

        if response.status_code == 200:
            print(f"Dados enviados: {msg.topic}")
        else:
            print(f"Erro HTTP: {response.status_code}")

    except requests.exceptions.RequestException as e:
        print(f"Erro ao enviar: {e}")

client = mqtt.Client()
client.on_message = on_message
client.connect("localhost", 1883)
client.subscribe("eventos/#")
client.loop_forever()

Request/Response Pattern

Implementação RPC-like com MQTT:

# Cliente que faz requisição
import paho.mqtt.client as mqtt
import uuid
import json
import time

class MQTTRPCClient:
    def __init__(self, broker):
        self.client = mqtt.Client()
        self.client.on_message = self.on_response
        self.client.connect(broker, 1883)
        self.client.loop_start()

        self.pending_requests = {}
        self.response_topic = f"responses/{uuid.uuid4()}"
        self.client.subscribe(self.response_topic)

    def on_response(self, client, userdata, msg):
        data = json.loads(msg.payload.decode())
        request_id = data['request_id']

        if request_id in self.pending_requests:
            self.pending_requests[request_id] = data['result']

    def call(self, method, params, timeout=10):
        request_id = str(uuid.uuid4())

        request = {
            'method': method,
            'params': params,
            'request_id': request_id,
            'response_topic': self.response_topic
        }

        self.pending_requests[request_id] = None
        self.client.publish('rpc/requests', json.dumps(request))

        # Aguardar resposta
        start_time = time.time()
        while self.pending_requests[request_id] is None:
            if time.time() - start_time > timeout:
                raise TimeoutError("RPC timeout")
            time.sleep(0.1)

        result = self.pending_requests[request_id]
        del self.pending_requests[request_id]
        return result

# Servidor RPC
class MQTTRPCServer:
    def __init__(self, broker):
        self.client = mqtt.Client()
        self.client.on_message = self.on_request
        self.client.connect(broker, 1883)
        self.client.subscribe('rpc/requests')

        self.methods = {}

    def register(self, name, func):
        self.methods[name] = func

    def on_request(self, client, userdata, msg):
        request = json.loads(msg.payload.decode())
        method = request['method']
        params = request['params']
        request_id = request['request_id']
        response_topic = request['response_topic']

        if method in self.methods:
            try:
                result = self.methods[method](**params)
                response = {
                    'request_id': request_id,
                    'result': result,
                    'error': None
                }
            except Exception as e:
                response = {
                    'request_id': request_id,
                    'result': None,
                    'error': str(e)
                }
        else:
            response = {
                'request_id': request_id,
                'result': None,
                'error': f'Method {method} not found'
            }

        self.client.publish(response_topic, json.dumps(response))

    def start(self):
        self.client.loop_forever()

# Uso
if __name__ == '__main__':
    # Servidor
    server = MQTTRPCServer('localhost')
    server.register('add', lambda a, b: a + b)
    server.register('multiply', lambda a, b: a * b)
    # server.start()  # Executar em thread separada

    # Cliente
    client = MQTTRPCClient('localhost')
    result = client.call('add', {'a': 5, 'b': 3})
    print(f"Resultado: {result}")  # 8

Troubleshooting Comum

Problema: Mensagens não chegam

Diagnóstico:

# 1. Verificar se o broker está rodando
sudo systemctl status mosquitto

# 2. Testar conexão
mosquitto_sub -h localhost -t teste -v

# 3. Em outro terminal, publicar
mosquitto_pub -h localhost -t teste -m "mensagem teste"

# 4. Verificar logs
tail -f /var/log/mosquitto/mosquitto.log

# 5. Testar com debug
mosquitto_sub -h localhost -t '#' -v -d

Possíveis causas:

  • QoS incompatível
  • Cliente não está subscrito ao tópico correto (verificar wildcards)
  • Clean session=true e cliente desconectou antes de receber
  • ACL bloqueando
  • Firewall bloqueando porta

Problema: Alta latência

Investigar:

import paho.mqtt.client as mqtt
import time

latencies = []

def on_message(client, userdata, msg):
    sent_time = float(msg.payload.decode())
    latency = (time.time() - sent_time) * 1000  # ms
    latencies.append(latency)
    print(f"Latência: {latency:.2f}ms")

    if len(latencies) >= 100:
        avg = sum(latencies) / len(latencies)
        print(f"\nLatência média: {avg:.2f}ms")
        print(f"Min: {min(latencies):.2f}ms")
        print(f"Max: {max(latencies):.2f}ms")

client = mqtt.Client()
client.on_message = on_message
client.connect("localhost", 1883)
client.subscribe("latency/test")

# Publicar timestamps
import threading
def publish_loop():
    pub_client = mqtt.Client()
    pub_client.connect("localhost", 1883)
    while True:
        pub_client.publish("latency/test", str(time.time()))
        time.sleep(1)

threading.Thread(target=publish_loop, daemon=True).start()
client.loop_forever()

Otimizações:

  • Reduzir QoS se possível
  • Aumentar keep-alive interval
  • Usar conexão local ao broker
  • Verificar sobrecarga do broker
  • Considerar clustering/load balancing

Problema: Broker consumindo muita memória

Verificar:

# Estatísticas do broker
mosquitto_sub -h localhost -t '$SYS/#' -v

# Importantes:
# $SYS/broker/clients/connected
# $SYS/broker/messages/stored
# $SYS/broker/subscriptions/count

Soluções:

  • Limitar mensagens retidas
  • Configurar limites no mosquitto.conf:
max_queued_messages 1000
max_inflight_messages 20
message_size_limit 10240
  • Implementar TTL para retained messages
  • Limpar retained messages antigas:
mosquitto_pub -h localhost -t 'topico/antigo' -n -r

Ferramentas e Brokers Populares

Brokers

Eclipse Mosquitto (Open Source)

  • Leve e rápido
  • Ideal para projetos pequenos/médios
  • Suporta MQTT 3.1.1 e 5.0
  • Fácil configuração

EMQX (Open Source/Enterprise)

  • Alto desempenho (milhões de conexões)
  • Clustering nativo
  • Dashboard web rico
  • Extensível com plugins

HiveMQ (Enterprise)

  • Altamente escalável
  • Suporte empresarial
  • Integrações prontas (Kafka, DBs)
  • Versão community gratuita

VerneMQ (Open Source)

  • Escrito em Erlang
  • Muito escalável
  • Clustering distribuído

AWS IoT Core (Cloud)

  • Totalmente gerenciado
  • Integração com AWS services
  • Segurança robusta
  • Pay-per-use

Ferramentas de Teste

MQTT.fx (Desktop)

  • GUI completa
  • Suporta múltiplas conexões
  • Scripts de teste

MQTT Explorer (Desktop, Open Source)

  • Visualização hierárquica de tópicos
  • Diff de mensagens
  • Histórico

Mosquitto CLI:

# Publicar
mosquitto_pub -h broker.com -t topico -m "mensagem" -q 1 -u user -P pass

# Subscrever
mosquitto_sub -h broker.com -t 'topico/#' -v -u user -P pass

# Benchmark
mosquitto_pub -h localhost -t test -m "test" -r 1000 -c

# Subscrever com timestamp
mosquitto_sub -h localhost -t '#' -v -F '@Y-@m-@dT@H:@M:@S@z : %t : %p'

MQTT.js CLI:

# Instalar
npm install -g mqtt

# Publicar
mqtt pub -h broker.com -t 'topico' -m 'mensagem'

# Subscrever
mqtt sub -h broker.com -t 'topico/#' -v

# Bench
mqtt bench -h localhost -t test -c 100 -m 1000

Performance e Otimização

Benchmarking

Teste de throughput:

import paho.mqtt.client as mqtt
import time
import threading

messages_sent = 0
messages_received = 0
start_time = None

def publisher():
    global messages_sent, start_time
    client = mqtt.Client()
    client.connect("localhost", 1883)
    start_time = time.time()

    for i in range(10000):
        client.publish("benchmark/test", f"message_{i}", qos=0)
        messages_sent += 1

    client.disconnect()

def subscriber():
    global messages_received

    def on_message(client, userdata, msg):
        global messages_received
        messages_received += 1

        if messages_received >= 10000:
            duration = time.time() - start_time
            print(f"\n--- Resultados ---")
            print(f"Mensagens: {messages_received}")
            print(f"Tempo: {duration:.2f}s")
            print(f"Throughput: {messages_received/duration:.2f} msg/s")
            client.disconnect()

    client = mqtt.Client()
    client.on_message = on_message
    client.connect("localhost", 1883)
    client.subscribe("benchmark/test")
    client.loop_forever()

# Executar
threading.Thread(target=subscriber, daemon=True).start()
time.sleep(1)
publisher()
time.sleep(5)

Otimizações de Cliente

Batching de mensagens:

import paho.mqtt.client as mqtt
import time

client = mqtt.Client()
client.connect("localhost", 1883)

# Publicar múltiplas mensagens
messages = []
for i in range(1000):
    info = client.publish(f"sensor/data/{i}", f"value_{i}", qos=0)
    messages.append(info)

# Aguardar todas serem publicadas
for info in messages:
    info.wait_for_publish()

print("Todas as mensagens publicadas")

Conexões persistentes:

# Manter conexão aberta e reutilizar
client = mqtt.Client()
client.connect("localhost", 1883, keepalive=3600)  # 1 hora
client.loop_start()  # Loop em background

# Publicar quando necessário
for data in sensor_readings():
    client.publish("sensors/data", data)
    time.sleep(1)

# Conexão permanece aberta

Otimizações de Broker

Mosquitto otimizado (mosquitto.conf):

# Persistência otimizada
persistence true
persistence_location /var/lib/mosquitto/
autosave_interval 300
autosave_on_changes false

# Limites otimizados
max_connections 10000
max_queued_messages 1000
max_inflight_messages 20
max_keepalive 300

# Mensagens
message_size_limit 10240
queue_qos0_messages false

# Logging (menos verbose em produção)
log_dest file /var/log/mosquitto/mosquitto.log
log_type error
log_type warning

Casos de Uso Avançados

Smart Home Completo

Arquitetura:

Dispositivos IoT → MQTT Broker → Home Assistant → UI/Apps
                         ↓
                    Automações
                         ↓
                  Banco de Dados (InfluxDB)
                         ↓
                    Grafana (Visualização)

Exemplo de automação:

import paho.mqtt.client as mqtt
import json

class HomeAutomation:
    def __init__(self):
        self.client = mqtt.Client()
        self.client.on_message = self.on_message
        self.client.connect("localhost", 1883)

        # Subscrever sensores
        self.client.subscribe("casa/+/temperatura")
        self.client.subscribe("casa/+/movimento")
        self.client.subscribe("casa/+/porta")

        self.estado = {}

    def on_message(self, client, userdata, msg):
        parts = msg.topic.split('/')
        local = parts[1]
        sensor = parts[2]
        valor = msg.payload.decode()

        # Armazenar estado
        if local not in self.estado:
            self.estado[local] = {}
        self.estado[local][sensor] = valor

        # Regras de automação
        self.aplicar_regras(local, sensor, valor)

    def aplicar_regras(self, local, sensor, valor):
        # Regra: Temperatura alta -> ligar AC
        if sensor == "temperatura":
            temp = float(valor)
            if temp > 26:
                self.client.publish(
                    f"casa/{local}/ar_condicionado/comando",
                    "ligar"
                )
                print(f"AC ligado em {local} (temp: {temp}°C)")

        # Regra: Movimento detectado + noite -> ligar luz
        if sensor == "movimento" and valor == "detectado":
            import datetime
            hora = datetime.datetime.now().hour
            if hora >= 18 or hora <= 6:
                self.client.publish(
                    f"casa/{local}/luz/comando",
                    "ligar"
                )
                print(f"Luz ligada em {local} (movimento noturno)")

        # Regra: Porta aberta + ninguém em casa -> alerta
        if sensor == "porta" and valor == "aberta":
            pessoas_casa = any(
                local.get("movimento") == "detectado"
                for local in self.estado.values()
            )
            if not pessoas_casa:
                self.client.publish(
                    "casa/alertas/seguranca",
                    json.dumps({
                        "tipo": "porta_aberta",
                        "local": local,
                        "timestamp": datetime.datetime.now().isoformat()
                    }),
                    qos=2
                )
                print(f"ALERTA: Porta aberta em {local} sem presença!")

    def start(self):
        self.client.loop_forever()

# Executar
automation = HomeAutomation()
automation.start()

Frota de Veículos (Telemetria)

# Veículo publicando telemetria
import paho.mqtt.client as mqtt
import json
import time
import random

class VehicleTelemetry:
    def __init__(self, vehicle_id):
        self.vehicle_id = vehicle_id
        self.client = mqtt.Client(f"vehicle_{vehicle_id}")
        self.client.connect("fleet.mqtt.com", 1883)

        # Will message para detectar perda de sinal
        self.client.will_set(
            f"fleet/vehicles/{vehicle_id}/status",
            "offline",
            qos=1,
            retain=True
        )

        self.client.loop_start()

        # Publicar status online
        self.client.publish(
            f"fleet/vehicles/{vehicle_id}/status",
            "online",
            retain=True
        )

    def send_telemetry(self):
        while True:
            telemetry = {
                "vehicle_id": self.vehicle_id,
                "timestamp": time.time(),
                "position": {
                    "lat": -23.550520 + random.uniform(-0.1, 0.1),
                    "lon": -46.633308 + random.uniform(-0.1, 0.1)
                },
                "speed": random.randint(0, 120),
                "fuel_level": random.randint(0, 100),
                "engine_temp": random.randint(80, 110),
                "rpm": random.randint(800, 4000),
                "odometer": random.randint(10000, 200000)
            }

            # Publicar telemetria (QoS 0 - dados frequentes)
            self.client.publish(
                f"fleet/vehicles/{self.vehicle_id}/telemetry",
                json.dumps(telemetry),
                qos=0
            )

            # Alertas críticos (QoS 2)
            if telemetry["engine_temp"] > 105:
                self.client.publish(
                    f"fleet/alerts/critical",
                    json.dumps({
                        "vehicle_id": self.vehicle_id,
                        "type": "high_temperature",
                        "value": telemetry["engine_temp"],
                        "timestamp": time.time()
                    }),
                    qos=2
                )

            time.sleep(5)  # Telemetria a cada 5s

# Central de monitoramento
class FleetMonitoring:
    def __init__(self):
        self.client = mqtt.Client("fleet_monitoring")
        self.client.on_message = self.on_telemetry
        self.client.connect("fleet.mqtt.com", 1883)

        # Monitorar todos os veículos
        self.client.subscribe("fleet/vehicles/+/telemetry")
        self.client.subscribe("fleet/vehicles/+/status")
        self.client.subscribe("fleet/alerts/#")

        self.vehicles = {}

    def on_telemetry(self, client, userdata, msg):
        if "/telemetry" in msg.topic:
            data = json.loads(msg.payload.decode())
            vehicle_id = data["vehicle_id"]
            self.vehicles[vehicle_id] = data

            # Processar dados...
            self.check_maintenance(vehicle_id, data)

        elif "/alerts/" in msg.topic:
            alert = json.loads(msg.payload.decode())
            self.handle_alert(alert)

    def check_maintenance(self, vehicle_id, data):
        # Verificar necessidade de manutenção
        if data["odometer"] % 10000 < 100:  # Próximo de múltiplo de 10k
            self.client.publish(
                f"fleet/vehicles/{vehicle_id}/maintenance",
                json.dumps({
                    "type": "scheduled",
                    "reason": "odometer_milestone",
                    "odometer": data["odometer"]
                }),
                qos=1
            )

    def handle_alert(self, alert):
        print(f"🚨 ALERTA: {alert}")
        # Enviar notificação, email, etc.

    def start(self):
        self.client.loop_forever()

Industrial IoT (IIoT)

# Máquina industrial
class IndustrialMachine:
    def __init__(self, machine_id, line, station):
        self.machine_id = machine_id
        self.topic_base = f"factory/line{line}/station{station}/{machine_id}"

        self.client = mqtt.Client(f"machine_{machine_id}")
        self.client.connect("factory.mqtt.local", 1883)
        self.client.loop_start()

        # Subscrever comandos
        self.client.subscribe(f"{self.topic_base}/commands")
        self.client.on_message = self.on_command

        self.running = False
        self.parts_produced = 0

    def on_command(self, client, userdata, msg):
        command = msg.payload.decode()

        if command == "start":
            self.running = True
            print(f"Máquina {self.machine_id} iniciada")
        elif command == "stop":
            self.running = False
            print(f"Máquina {self.machine_id} parada")
        elif command == "reset_counter":
            self.parts_produced = 0

    def run(self):
        while True:
            if self.running:
                # Simular produção
                time.sleep(2)
                self.parts_produced += 1

                # Publicar métricas OEE (Overall Equipment Effectiveness)
                metrics = {
                    "machine_id": self.machine_id,
                    "timestamp": time.time(),
                    "status": "running",
                    "parts_produced": self.parts_produced,
                    "cycle_time": 2.0,
                    "temperature": 65 + random.uniform(-5, 5),
                    "vibration": random.uniform(0, 10),
                    "power_consumption": random.uniform(50, 100)
                }

                self.client.publish(
                    f"{self.topic_base}/metrics",
                    json.dumps(metrics),
                    qos=1
                )

                # Publicar eventos de produção
                if self.parts_produced % 100 == 0:
                    self.client.publish(
                        f"{self.topic_base}/events",
                        json.dumps({
                            "type": "milestone",
                            "parts": self.parts_produced
                        }),
                        qos=2
                    )
            else:
                time.sleep(1)

Conclusão

MQTT é um protocolo poderoso e versátil que se tornou o padrão de fato para comunicação em IoT. Sua arquitetura publish/subscribe, combinada com recursos como QoS, retained messages e will messages, o torna ideal para uma ampla gama de aplicações, desde smart homes até sistemas industriais complexos.

Pontos-chave para lembrar:

  • Escolha o QoS apropriado para cada caso de uso
  • Projete uma hierarquia de tópicos clara e escalável
  • Implemente segurança adequada (TLS, autenticação, ACL)
  • Monitore performance e implemente reconexão robusta
  • Considere clustering para alta disponibilidade
  • Use ferramentas de teste e monitoramento

Próximos passos:

  • Experimente diferentes brokers para encontrar o melhor para seu caso
  • Implemente padrões avançados como request/response
  • Integre com outras tecnologias (Kafka, bancos de dados, cloud)
  • Otimize para seu caso de uso específico (latência vs throughput)
  • Implemente observabilidade completa (logs, métricas, tracing)

MQTT continuará evoluindo com a versão 5.0 trazendo novos recursos como propriedades de mensagem, reason codes mais detalhados, e melhor suporte a request/response. É uma tecnologia essencial para qualquer desenvolvedor trabalhando com IoT e sistemas distribuídos.

Gostou do Conteúdo?

Podemos ajudar sua empresa a implementar estas soluções. Entre em contato e descubra como transformar conhecimento em resultados.