// src/mqtt/client.rs use embassy_futures::select::{select, Either}; use embassy_net::{tcp::TcpSocket, Stack}; use embassy_time::{Duration, Timer}; use rust_mqtt::client::client::MqttClient; use rust_mqtt::client::client_config::{ClientConfig, MqttVersion}; use rust_mqtt::packet::v5::publish_packet::QualityOfService; use rust_mqtt::packet::v5::reason_codes::ReasonCode; use rust_mqtt::utils::rng_generator::CountingRng; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::channel::{Channel, Receiver}; use heapless::{String, Vec}; use static_cell::ConstStaticCell; use log::info; use crate::mqtt::config::mqtt_broker_endpoint; const RECONNECT_DELAY_SECS: u64 = 5; const KEEPALIVE_SECS: u64 = 60; const PING_PERIOD: Duration = Duration::from_secs(KEEPALIVE_SECS / 2); // Limits for static buffers pub const TOPIC_MAX: usize = 128; pub const PAYLOAD_MAX: usize = 512; const COMMAND_QUEUE: usize = 8; const EVENT_QUEUE: usize = 8; // TCP socket buffers (for embassy-net TcpSocket) static TCP_RX_BUFFER: ConstStaticCell<[u8; 2048]> = ConstStaticCell::new([0; 2048]); static TCP_TX_BUFFER: ConstStaticCell<[u8; 2048]> = ConstStaticCell::new([0; 2048]); // MQTT client buffers (separate from the TcpSocket buffers) static MQTT_TX_BUF: ConstStaticCell<[u8; 1024]> = ConstStaticCell::new([0; 1024]); static MQTT_RX_BUF: ConstStaticCell<[u8; 1024]> = ConstStaticCell::new([0; 1024]); // Tie TcpSocket lifetime to session type Client<'a, 'net> = MqttClient<'a, TcpSocket<'net>, 8, CountingRng>; #[derive(Clone)] pub struct IncomingMsg { pub topic: String, pub payload: Vec, } #[derive(Clone)] struct PublishMsg { topic: String, payload: Vec, qos: QualityOfService, retain: bool, } #[derive(Clone)] enum Command { Publish(PublishMsg), Subscribe(String), } static CMD_CHAN: Channel = Channel::new(); static EVT_CHAN: Channel = Channel::new(); // Public API pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) { CMD_CHAN .send(Command::Publish(PublishMsg { topic: truncate_str::(topic), payload: truncate_payload(payload), qos, retain, })).await; } pub async fn mqtt_subscribe(topic: &str) { CMD_CHAN.send(Command::Subscribe(truncate_str::(topic))).await; } pub fn mqtt_events( ) -> Receiver<'static, CriticalSectionRawMutex, IncomingMsg, EVENT_QUEUE> { EVT_CHAN.receiver() } // Helper functions for memory-safe truncation fn truncate_str(s: &str) -> String { let mut h = String::new(); let _ = h.push_str(&s[..s.len().min(N)]); h } fn truncate_payload(data: &[u8]) -> Vec { let mut v = Vec::new(); let _ = v.extend_from_slice(&data[..data.len().min(PAYLOAD_MAX)]); v } async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(), ReasonCode> { match cmd { Command::Publish(msg) => { client .send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain) .await } Command::Subscribe(topic) => { client.subscribe_to_topic(topic.as_str()).await?; info!("Subscribed to '{}'", topic); Ok(()) } } } async fn handle_incoming(result: Result<(&str, &[u8]), ReasonCode>) -> Result<(), ReasonCode> { let (topic, payload) = result?; EVT_CHAN .send(IncomingMsg { topic: truncate_str::(topic), payload: truncate_payload(payload), }).await; Ok(()) } // Session and reconnect control async fn run_one_session( stack: Stack<'static>, tcp_rx: &mut [u8], tcp_tx: &mut [u8], mqtt_tx: &mut [u8], mqtt_rx: &mut [u8], ) -> Result<(), ()> { let mut socket = TcpSocket::new(stack, tcp_rx, tcp_tx); match socket.connect(mqtt_broker_endpoint()).await { Ok(_) => info!("Connected TCP to MQTT broker"), Err(e) => { info!("TCP connect failed: {:#?}", e); return Err(()); } } // MQTT configuration and client setup let mut cfg: ClientConfig<8, CountingRng> = ClientConfig::new(MqttVersion::MQTTv5, CountingRng(0)); cfg.keep_alive = KEEPALIVE_SECS as u16; cfg.add_client_id("esp32-client"); let mut client = MqttClient::new(socket, mqtt_tx, mqtt_tx.len(), mqtt_rx, mqtt_rx.len(), cfg); match client.connect_to_broker().await { Ok(_) => info!("MQTT CONNACK received"), Err(reason) => { info!("MQTT connect failed: {:?}", reason); return Err(()); } } // Operational loop loop { let net_or_ping = select(client.receive_message(), Timer::after(PING_PERIOD)); match select(CMD_CHAN.receive(), net_or_ping).await { Either::First(cmd) => handle_command(&mut client, cmd).await.map_err(|_| ())?, Either::Second(Either::First(result)) => handle_incoming(result).await.map_err(|_| ())?, Either::Second(Either::Second(_)) => client.send_ping().await.map_err(|_| ())?, } } } // Main MQTT embassy task #[embassy_executor::task] pub async fn mqtt_task(stack: Stack<'static>) { info!("MQTT task starting..."); let tcp_rx = TCP_RX_BUFFER.take(); let tcp_tx = TCP_TX_BUFFER.take(); let mqtt_tx = MQTT_TX_BUF.take(); let mqtt_rx = MQTT_RX_BUF.take(); loop { let _ = run_one_session( stack, &mut tcp_rx[..], &mut tcp_tx[..], &mut mqtt_tx[..], &mut mqtt_rx[..], ).await; info!("Reconnecting in {}s after session end/failure", RECONNECT_DELAY_SECS); Timer::after(Duration::from_secs(RECONNECT_DELAY_SECS)).await; } }