From ba2b3b188aad8ad7a266d6167defe71ef0c0ed6c Mon Sep 17 00:00:00 2001 From: Priec Date: Sun, 18 Jan 2026 22:21:33 +0100 Subject: [PATCH] big improvement, not theere yet --- mqtt_display/src/bin/main.rs | 10 +-- mqtt_display/src/mqtt/client.rs | 154 ++++++++++++++++++++------------ 2 files changed, 101 insertions(+), 63 deletions(-) diff --git a/mqtt_display/src/bin/main.rs b/mqtt_display/src/bin/main.rs index 05a2a33..b4b73af 100644 --- a/mqtt_display/src/bin/main.rs +++ b/mqtt_display/src/bin/main.rs @@ -15,6 +15,7 @@ use embassy_sync::signal::Signal; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_time::{Duration, Timer}; use projekt_final::bus::I2cInner; +use projekt_final::mqtt::client::mqtt_set_imu; use esp_alloc as _; use esp_backtrace as _; @@ -179,14 +180,7 @@ async fn main(spawner: Spawner) -> ! { "{{\"ax\":{:.2},\"ay\":{:.2},\"az\":{:.2},\"t\":{:.1}}}", reading.accel_g[0], reading.accel_g[1], reading.accel_g[2], reading.temp_c ); - // CRITICAL FIX: Use non-blocking publish for sensor data - // If MQTT channel is full, we drop this reading rather than blocking - if !mqtt_try_publish("esp32/imu", payload.as_bytes(), QualityOfService::QoS0, false) { - mqtt_publish_drops += 1; - if mqtt_publish_drops % 10 == 1 { - log::warn!("MQTT publish dropped (total: {})", mqtt_publish_drops); - } - } + mqtt_set_imu(payload.as_bytes()); } } Either3::Third(_) => { diff --git a/mqtt_display/src/mqtt/client.rs b/mqtt_display/src/mqtt/client.rs index 6d75df0..df39601 100644 --- a/mqtt_display/src/mqtt/client.rs +++ b/mqtt_display/src/mqtt/client.rs @@ -1,8 +1,8 @@ // src/mqtt/client.rs -use embassy_futures::select::{select, select3, Either, Either3}; + +use embassy_futures::select::{select, Either}; use embassy_net::{tcp::TcpSocket, Stack}; -use embassy_time::{Duration, Timer}; -use embassy_time::with_timeout; +use embassy_time::{Duration, Timer, with_timeout}; use rust_mqtt::client::client::MqttClient; use rust_mqtt::client::client_config::{ClientConfig, MqttVersion}; use rust_mqtt::packet::v5::publish_packet::QualityOfService; @@ -10,11 +10,11 @@ use rust_mqtt::packet::v5::reason_codes::ReasonCode; use rust_mqtt::utils::rng_generator::CountingRng; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; +use embassy_sync::mutex::Mutex; use embassy_sync::channel::{Channel, Receiver}; -use embassy_sync::signal::Signal; use heapless::{String, Vec}; use static_cell::ConstStaticCell; -use log::info; +use log::{info, warn}; use crate::mqtt::config::mqtt_broker_endpoint; @@ -59,14 +59,17 @@ enum Command { Subscribe(String), } +// Standard command/info channels static CMD_CHAN: Channel = Channel::new(); static EVT_CHAN: Channel = Channel::new(); -/// Signal for latest IMU telemetry - overwrites instead of queuing -/// This ensures we always send the FRESHEST data, never queue up stale readings -static IMU_TELEMETRY: Signal> = Signal::new(); +/// Shared latest IMU payload (non-blocking, latest-value semantics) +static IMU_LATEST: Mutex>> = + Mutex::new(None); -// Public API - BLOCKING version (waits for channel space) +/// ---------- Public API ---------- + +/// Blocking publish for command/response messages. pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) { CMD_CHAN .send(Command::Publish(PublishMsg { @@ -74,11 +77,11 @@ pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, re payload: truncate_payload(payload), qos, retain, - })).await; + })) + .await; } -/// NON-BLOCKING publish - returns false if channel is full -/// Use this for high-frequency sensor data to avoid backpressure deadlock +/// Non-blocking publish for other traffic (fire-and-forget) pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) -> bool { CMD_CHAN .try_send(Command::Publish(PublishMsg { @@ -90,15 +93,19 @@ pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, reta .is_ok() } -/// Set the latest IMU telemetry to publish. -/// This OVERWRITES any pending data - we always send the freshest reading. -/// Non-blocking, fire-and-forget. -pub fn mqtt_set_imu_telemetry(payload: &[u8]) { - IMU_TELEMETRY.signal(truncate_payload(payload)); +/// Set latest IMU telemetry payload (non-blocking, overwrites previous) +pub fn mqtt_set_imu(payload: &[u8]) { + if let Ok(mut guard) = IMU_LATEST.try_lock() { + let mut buf: Vec = Vec::new(); + let _ = buf.extend_from_slice(&payload[..payload.len().min(PAYLOAD_MAX)]); + *guard = Some(buf); + } } pub async fn mqtt_subscribe(topic: &str) { - CMD_CHAN.send(Command::Subscribe(truncate_str::(topic))).await; + CMD_CHAN + .send(Command::Subscribe(truncate_str::(topic))) + .await; } pub fn mqtt_events( @@ -106,7 +113,8 @@ pub fn mqtt_events( EVT_CHAN.receiver() } -// Helper functions for memory-safe truncation +/// ---------- Internals ---------- + fn truncate_str(s: &str) -> String { let mut h = String::new(); let _ = h.push_str(&s[..s.len().min(N)]); @@ -124,11 +132,13 @@ async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(), Command::Publish(msg) => { match with_timeout( Duration::from_secs(5), - client.send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain) - ).await { + client.send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain), + ) + .await + { Ok(result) => result, Err(_) => { - log::warn!("MQTT send timed out, forcing reconnect"); + warn!("MQTT send timed out, forcing reconnect"); Err(ReasonCode::UnspecifiedError) } } @@ -136,15 +146,17 @@ async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(), Command::Subscribe(topic) => { match with_timeout( Duration::from_secs(5), - client.subscribe_to_topic(topic.as_str()) - ).await { + client.subscribe_to_topic(topic.as_str()), + ) + .await + { Ok(result) => { result?; info!("Subscribed to '{}'", topic); Ok(()) } Err(_) => { - log::warn!("MQTT subscribe timed out"); + warn!("MQTT subscribe timed out"); Err(ReasonCode::UnspecifiedError) } } @@ -158,11 +170,13 @@ async fn handle_incoming(result: Result<(&str, &[u8]), ReasonCode>) -> Result<() .send(IncomingMsg { topic: truncate_str::(topic), payload: truncate_payload(payload), - }).await; + }) + .await; Ok(()) } -// Session and reconnect control +/// ---------- Session Management ---------- + async fn run_one_session( stack: Stack<'static>, tcp_rx: &mut [u8], @@ -181,11 +195,14 @@ async fn run_one_session( } // MQTT configuration and client setup - let mut cfg: ClientConfig<8, CountingRng> = ClientConfig::new(MqttVersion::MQTTv5, CountingRng(0)); + let mut cfg: ClientConfig<8, CountingRng> = + ClientConfig::new(MqttVersion::MQTTv5, CountingRng(0)); cfg.keep_alive = KEEPALIVE_SECS as u16; cfg.add_client_id("esp32-client"); - let mut client = MqttClient::new(socket, mqtt_tx, mqtt_tx.len(), mqtt_rx, mqtt_rx.len(), cfg); + let mut client = + MqttClient::new(socket, mqtt_tx, mqtt_tx.len(), mqtt_rx, mqtt_rx.len(), cfg); + match client.connect_to_broker().await { Ok(_) => info!("MQTT CONNACK received"), Err(reason) => { @@ -194,36 +211,59 @@ async fn run_one_session( } } - // Operational loop - let mut ping_countdown: u32 = (KEEPALIVE_SECS * 10) as u32; // ticks until ping (at 100ms per tick) + let mut ping_countdown: u32 = (KEEPALIVE_SECS * 10) as u32; // ticks until ping (100 ms each) loop { - info!("MQTT loop tick"); - // Drain all pending commands - let mut processed = 0; - while let Ok(cmd) = CMD_CHAN.try_receive() { - info!("MQTT processing command"); - handle_command(&mut client, cmd).await.map_err(|_| ())?; - processed += 1; - } - if processed > 0 { - info!("MQTT processed {} commands", processed); + // 1. Send latest IMU payload if available + if let Ok(mut guard) = IMU_LATEST.try_lock() { + if let Some(payload) = guard.take() { + drop(guard); + + log::info!("MQTT IMU TX start ({} bytes)", payload.len()); + + // Limit send to max 2 seconds to catch network stalls + let send_res = with_timeout( + Duration::from_secs(2), + client.send_message("esp32/imu", &payload, QualityOfService::QoS0, false), + ) + .await; + + match send_res { + Ok(Ok(_)) => { + log::info!("MQTT IMU TX ok"); + } + Ok(Err(e)) => { + log::warn!("MQTT IMU TX failed: {:?}", e); + return Err(()); + } + Err(_) => { + log::warn!("MQTT IMU TX timed out, restarting session"); + return Err(()); + } + } + } } - match select( - client.receive_message(), - Timer::after(Duration::from_millis(100)), - ).await { - Either::First(result) => { - info!("MQTT received message from broker"); - handle_incoming(result).await.map_err(|_| ())?; + // 2. Process any queued control commands + while let Ok(cmd) = CMD_CHAN.try_receive() { + handle_command(&mut client, cmd).await.map_err(|_| ())?; + } + + // 3. Check for incoming messages with timeout + match with_timeout(Duration::from_millis(100), client.receive_message()).await { + Ok(Ok((topic, payload))) => { + // Handle incoming message + let _ = handle_incoming(Ok((topic, payload))).await; } - Either::Second(_) => { - // Check if ping needed + Ok(Err(e)) => { + log::warn!("MQTT receive error: {:?}", e); + return Err(()); + } + Err(_) => { + // Timeout -> no incoming message this period, check ping ping_countdown = ping_countdown.saturating_sub(1); if ping_countdown == 0 { - info!("MQTT sending ping"); - client.send_ping().await.map_err(|_| ())?; + let _ = client.send_ping().await; ping_countdown = (KEEPALIVE_SECS * 10) as u32; } } @@ -231,7 +271,7 @@ async fn run_one_session( } } -// Main MQTT embassy task +/// Main MQTT embassy task #[embassy_executor::task] pub async fn mqtt_task(stack: Stack<'static>) { info!("MQTT task starting..."); @@ -248,9 +288,13 @@ pub async fn mqtt_task(stack: Stack<'static>) { &mut tcp_tx[..], &mut mqtt_tx[..], &mut mqtt_rx[..], - ).await; + ) + .await; - info!("Reconnecting in {}s after session end/failure", RECONNECT_DELAY_SECS); + info!( + "Reconnecting in {}s after session end/failure", + RECONNECT_DELAY_SECS + ); Timer::after(Duration::from_secs(RECONNECT_DELAY_SECS)).await; } }