diff --git a/mqtt_display/src/bin/main.rs b/mqtt_display/src/bin/main.rs index fcf9850..05a2a33 100644 --- a/mqtt_display/src/bin/main.rs +++ b/mqtt_display/src/bin/main.rs @@ -43,7 +43,7 @@ use projekt_final::{ bus, display, mpu, - mqtt::client::{mqtt_events, mqtt_publish, mqtt_subscribe, mqtt_task, IncomingMsg}, + mqtt::client::{mqtt_events, mqtt_try_publish, mqtt_publish, mqtt_subscribe, mqtt_task, IncomingMsg}, }; extern crate alloc; @@ -145,6 +145,7 @@ async fn main(spawner: Spawner) -> ! { let imu_rx = mpu::api::events(); let mut imu_reading_count: u32 = 0; let mut mqtt_msg_count: u32 = 0; + let mut mqtt_publish_drops: u32 = 0; loop { match select3( @@ -168,19 +169,30 @@ async fn main(spawner: Spawner) -> ! { log::info!("IMU drained {} stale readings before display", drained); } imu_reading_count += 1; - display::api::show_imu(reading).await; + + // CRITICAL FIX: show_imu is now non-blocking (uses try_send internally) + // This prevents display channel backpressure from blocking the main loop + display::api::show_imu(reading); + if imu_reading_count % MQTT_PUBLISH_DIVIDER == 0 { let payload = format!( "{{\"ax\":{:.2},\"ay\":{:.2},\"az\":{:.2},\"t\":{:.1}}}", reading.accel_g[0], reading.accel_g[1], reading.accel_g[2], reading.temp_c ); - mqtt_publish("esp32/imu", payload.as_bytes(), QualityOfService::QoS0, false).await; + // CRITICAL FIX: Use non-blocking publish for sensor data + // If MQTT channel is full, we drop this reading rather than blocking + if !mqtt_try_publish("esp32/imu", payload.as_bytes(), QualityOfService::QoS0, false) { + mqtt_publish_drops += 1; + if mqtt_publish_drops % 10 == 1 { + log::warn!("MQTT publish dropped (total: {})", mqtt_publish_drops); + } + } } } Either3::Third(_) => { crate::mpu::api::IMU_CHANNEL.clear(); - info!("IMU heartbeat: force-cleared queue, {} readings total", imu_reading_count); - // info!("Heartbeat: {} IMU readings", imu_reading_count); + info!("IMU heartbeat: force-cleared queue, {} readings total, {} mqtt drops", + imu_reading_count, mqtt_publish_drops); } } } diff --git a/mqtt_display/src/display/api.rs b/mqtt_display/src/display/api.rs index 5d15968..d303805 100644 --- a/mqtt_display/src/display/api.rs +++ b/mqtt_display/src/display/api.rs @@ -29,8 +29,11 @@ pub(crate) fn receiver() -> Receiver<'static, CriticalSectionRawMutex, DisplayCo // ───────────────────────────────────────────────────────────────────────────── -pub async fn show_imu(reading: ImuReading) { - send(DisplayCommand::SetImu(reading)).await; +/// Show IMU data - NON-BLOCKING to prevent backpressure deadlock +pub fn show_imu(reading: ImuReading) { + // CRITICAL FIX: Use try_send instead of send().await + // If display is slow, we drop the reading rather than blocking the main loop + let _ = try_send(DisplayCommand::SetImu(reading)); } pub async fn set_status(text: &str) { diff --git a/mqtt_display/src/mqtt/client.rs b/mqtt_display/src/mqtt/client.rs index 4944761..6d75df0 100644 --- a/mqtt_display/src/mqtt/client.rs +++ b/mqtt_display/src/mqtt/client.rs @@ -1,7 +1,8 @@ // src/mqtt/client.rs -use embassy_futures::select::{select, Either}; +use embassy_futures::select::{select, select3, Either, Either3}; use embassy_net::{tcp::TcpSocket, Stack}; use embassy_time::{Duration, Timer}; +use embassy_time::with_timeout; use rust_mqtt::client::client::MqttClient; use rust_mqtt::client::client_config::{ClientConfig, MqttVersion}; use rust_mqtt::packet::v5::publish_packet::QualityOfService; @@ -10,6 +11,7 @@ use rust_mqtt::utils::rng_generator::CountingRng; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::channel::{Channel, Receiver}; +use embassy_sync::signal::Signal; use heapless::{String, Vec}; use static_cell::ConstStaticCell; use log::info; @@ -60,7 +62,11 @@ enum Command { static CMD_CHAN: Channel = Channel::new(); static EVT_CHAN: Channel = Channel::new(); -// Public API +/// Signal for latest IMU telemetry - overwrites instead of queuing +/// This ensures we always send the FRESHEST data, never queue up stale readings +static IMU_TELEMETRY: Signal> = Signal::new(); + +// Public API - BLOCKING version (waits for channel space) pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) { CMD_CHAN .send(Command::Publish(PublishMsg { @@ -71,6 +77,26 @@ pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, re })).await; } +/// NON-BLOCKING publish - returns false if channel is full +/// Use this for high-frequency sensor data to avoid backpressure deadlock +pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) -> bool { + CMD_CHAN + .try_send(Command::Publish(PublishMsg { + topic: truncate_str::(topic), + payload: truncate_payload(payload), + qos, + retain, + })) + .is_ok() +} + +/// Set the latest IMU telemetry to publish. +/// This OVERWRITES any pending data - we always send the freshest reading. +/// Non-blocking, fire-and-forget. +pub fn mqtt_set_imu_telemetry(payload: &[u8]) { + IMU_TELEMETRY.signal(truncate_payload(payload)); +} + pub async fn mqtt_subscribe(topic: &str) { CMD_CHAN.send(Command::Subscribe(truncate_str::(topic))).await; } @@ -96,14 +122,32 @@ fn truncate_payload(data: &[u8]) -> Vec { async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(), ReasonCode> { match cmd { Command::Publish(msg) => { - client - .send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain) - .await + match with_timeout( + Duration::from_secs(5), + client.send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain) + ).await { + Ok(result) => result, + Err(_) => { + log::warn!("MQTT send timed out, forcing reconnect"); + Err(ReasonCode::UnspecifiedError) + } + } } Command::Subscribe(topic) => { - client.subscribe_to_topic(topic.as_str()).await?; - info!("Subscribed to '{}'", topic); - Ok(()) + match with_timeout( + Duration::from_secs(5), + client.subscribe_to_topic(topic.as_str()) + ).await { + Ok(result) => { + result?; + info!("Subscribed to '{}'", topic); + Ok(()) + } + Err(_) => { + log::warn!("MQTT subscribe timed out"); + Err(ReasonCode::UnspecifiedError) + } + } } } } @@ -127,6 +171,7 @@ async fn run_one_session( mqtt_rx: &mut [u8], ) -> Result<(), ()> { let mut socket = TcpSocket::new(stack, tcp_rx, tcp_tx); + socket.set_timeout(Some(Duration::from_secs(20))); match socket.connect(mqtt_broker_endpoint()).await { Ok(_) => info!("Connected TCP to MQTT broker"), Err(e) => { @@ -150,13 +195,38 @@ async fn run_one_session( } // Operational loop - loop { - let net_or_ping = select(client.receive_message(), Timer::after(PING_PERIOD)); + let mut ping_countdown: u32 = (KEEPALIVE_SECS * 10) as u32; // ticks until ping (at 100ms per tick) - match select(CMD_CHAN.receive(), net_or_ping).await { - Either::First(cmd) => handle_command(&mut client, cmd).await.map_err(|_| ())?, - Either::Second(Either::First(result)) => handle_incoming(result).await.map_err(|_| ())?, - Either::Second(Either::Second(_)) => client.send_ping().await.map_err(|_| ())?, + loop { + info!("MQTT loop tick"); + // Drain all pending commands + let mut processed = 0; + while let Ok(cmd) = CMD_CHAN.try_receive() { + info!("MQTT processing command"); + handle_command(&mut client, cmd).await.map_err(|_| ())?; + processed += 1; + } + if processed > 0 { + info!("MQTT processed {} commands", processed); + } + + match select( + client.receive_message(), + Timer::after(Duration::from_millis(100)), + ).await { + Either::First(result) => { + info!("MQTT received message from broker"); + handle_incoming(result).await.map_err(|_| ())?; + } + Either::Second(_) => { + // Check if ping needed + ping_countdown = ping_countdown.saturating_sub(1); + if ping_countdown == 0 { + info!("MQTT sending ping"); + client.send_ping().await.map_err(|_| ())?; + ping_countdown = (KEEPALIVE_SECS * 10) as u32; + } + } } } }