From 6b26ed93189fc9591c118bd24e4be601b8c57648 Mon Sep 17 00:00:00 2001 From: Priec Date: Mon, 19 Jan 2026 13:09:46 +0100 Subject: [PATCH] working with enums passed as a strings to the library --- mqtt_display/src/bus/mod.rs | 4 +- mqtt_display/src/display/api.rs | 4 +- mqtt_display/src/display/task.rs | 14 ++-- mqtt_display/src/display/tui.rs | 30 +++++-- mqtt_display/src/mqtt/client.rs | 133 ++++++++++++++----------------- 5 files changed, 93 insertions(+), 92 deletions(-) diff --git a/mqtt_display/src/bus/mod.rs b/mqtt_display/src/bus/mod.rs index bdc6d2a..52638a5 100644 --- a/mqtt_display/src/bus/mod.rs +++ b/mqtt_display/src/bus/mod.rs @@ -5,13 +5,13 @@ use embedded_hal_bus::i2c::RefCellDevice; use esp_hal::i2c::master::I2c; use esp_hal::Async; -/// The underlying I2C peripheral type +/// I2C peripheral type pub type I2cInner = I2c<'static, Async>; /// RefCell to share the bus on a single core. pub type SharedI2c = RefCell; -/// A handle to a shared I2C device. +/// Shared I2C device. pub type I2cDevice = RefCellDevice<'static, I2cInner>; /// New I2C device handle from the shared bus. diff --git a/mqtt_display/src/display/api.rs b/mqtt_display/src/display/api.rs index e870f45..563a8b0 100644 --- a/mqtt_display/src/display/api.rs +++ b/mqtt_display/src/display/api.rs @@ -1,5 +1,5 @@ // src/display/api.rs -//! Public API for the display feature. +//! display API use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::channel::{Channel, Receiver, TrySendError}; @@ -29,7 +29,7 @@ pub(crate) fn receiver() -> Receiver<'static, CriticalSectionRawMutex, DisplayCo /// Show IMU data - NON-BLOCKING to prevent backpressure deadlock pub fn show_imu(reading: ImuReading) { - // CRITICAL FIX: Use try_send instead of send().await + // try_send instead of send().await // If display is slow, we drop the reading rather than blocking the main loop let _ = try_send(DisplayCommand::SetImu(reading)); } diff --git a/mqtt_display/src/display/task.rs b/mqtt_display/src/display/task.rs index 914f4db..c8aea67 100644 --- a/mqtt_display/src/display/task.rs +++ b/mqtt_display/src/display/task.rs @@ -9,7 +9,7 @@ use ssd1306::{mode::BufferedGraphicsMode, prelude::*, I2CDisplayInterface, Ssd13 use crate::bus::I2cDevice; use crate::display::api::receiver; -use crate::display::tui::{render_frame, next_page_id, prev_page_id, DisplayState, Screen, ScreenEvent}; +use crate::display::tui::{render_frame, DisplayState, Screen, ScreenEvent}; use crate::contracts::DisplayCommand; use pages_tui::prelude::*; @@ -71,15 +71,15 @@ pub async fn display_task(i2c: I2cDevice) { let _ = orchestrator.navigate_to("chat".into()); } ScreenEvent::NavigatePrev => { - if let Some(cur) = orchestrator.current_id() { - let prev = prev_page_id(cur.as_str()); - let _ = orchestrator.navigate_to(prev.into()); + if let Some(cur) = orchestrator.current() { + let prev = cur.prev(); + let _ = orchestrator.navigate_to(prev.to_str().into()); } } ScreenEvent::NavigateNext => { - if let Some(cur) = orchestrator.current_id() { - let next = next_page_id(cur.as_str()); - let _ = orchestrator.navigate_to(next.into()); + if let Some(cur) = orchestrator.current() { + let next = cur.next(); + let _ = orchestrator.navigate_to(next.to_str().into()); } } } diff --git a/mqtt_display/src/display/tui.rs b/mqtt_display/src/display/tui.rs index 4b2d17f..92ed593 100644 --- a/mqtt_display/src/display/tui.rs +++ b/mqtt_display/src/display/tui.rs @@ -137,16 +137,30 @@ impl Component for Screen { } // PAGE ORDER -const PAGE_ORDER: &[&str] = &["menu", "imu", "chat"]; +const PAGE_ORDER: &[Screen] = &[Screen::Menu, Screen::Imu, Screen::Chat]; -pub fn next_page_id(current: &str) -> &'static str { - let idx = PAGE_ORDER.iter().position(|&p| p == current).unwrap_or(0); - PAGE_ORDER[(idx + 1) % PAGE_ORDER.len()] -} +impl Screen { + pub fn next(&self) -> Screen { + let idx = PAGE_ORDER.iter().position(|p| p == self).unwrap_or(0); + PAGE_ORDER[(idx + 1) % PAGE_ORDER.len()].clone() + } -pub fn prev_page_id(current: &str) -> &'static str { - let idx = PAGE_ORDER.iter().position(|&p| p == current).unwrap_or(0); - if idx == 0 { PAGE_ORDER[PAGE_ORDER.len() - 1] } else { PAGE_ORDER[idx - 1] } + pub fn prev(&self) -> Screen { + let idx = PAGE_ORDER.iter().position(|p| p == self).unwrap_or(0); + if idx == 0 { + PAGE_ORDER[PAGE_ORDER.len() - 1].clone() + } else { + PAGE_ORDER[idx - 1].clone() + } + } + + pub fn to_str(&self) -> &'static str { + match self { + Screen::Menu => "menu", + Screen::Imu => "imu", + Screen::Chat => "chat", + } + } } // RENDERING diff --git a/mqtt_display/src/mqtt/client.rs b/mqtt_display/src/mqtt/client.rs index 2f39a01..ae77e21 100644 --- a/mqtt_display/src/mqtt/client.rs +++ b/mqtt_display/src/mqtt/client.rs @@ -1,8 +1,7 @@ // src/mqtt/client.rs -use embassy_futures::select::{select, Either}; use embassy_net::{tcp::TcpSocket, Stack}; -use embassy_time::{Duration, Timer, with_timeout, Instant}; +use embassy_time::{Duration, Timer, Instant}; use rust_mqtt::client::client::MqttClient; use rust_mqtt::client::client_config::{ClientConfig, MqttVersion}; use rust_mqtt::packet::v5::publish_packet::QualityOfService; @@ -21,7 +20,8 @@ use crate::mqtt::config::mqtt_broker_endpoint; const RECONNECT_DELAY_SECS: u64 = 5; const KEEPALIVE_SECS: u64 = 60; const PING_PERIOD: Duration = Duration::from_secs(KEEPALIVE_SECS / 2); -const PING_TIMEOUT: Duration = Duration::from_secs(2); +const SOCKET_POLL_TIMEOUT: Duration = Duration::from_secs(1); +const SUBS_MAX: usize = 8; // Limits for static buffers pub const TOPIC_MAX: usize = 128; @@ -68,9 +68,11 @@ static EVT_CHAN: Channel = Ch static IMU_LATEST: Mutex>> = Mutex::new(None); +static SUBS: Mutex, SUBS_MAX>> = + Mutex::new(Vec::new()); + /// Public API -/// Blocking publish for command/response messages. pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) { CMD_CHAN .send(Command::Publish(PublishMsg { @@ -82,7 +84,6 @@ pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, re .await; } -/// Non-blocking publish for other traffic (fire-and-forget) pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) -> bool { CMD_CHAN .try_send(Command::Publish(PublishMsg { @@ -94,7 +95,6 @@ pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, reta .is_ok() } -/// Set latest IMU telemetry payload (non-blocking, overwrites previous) pub fn mqtt_set_imu(payload: &[u8]) { if let Ok(mut guard) = IMU_LATEST.try_lock() { let mut buf: Vec = Vec::new(); @@ -104,9 +104,15 @@ pub fn mqtt_set_imu(payload: &[u8]) { } pub async fn mqtt_subscribe(topic: &str) { - CMD_CHAN - .send(Command::Subscribe(truncate_str::(topic))) - .await; + let t = truncate_str::(topic); + { + let mut subs = SUBS.lock().await; + let exists = subs.iter().any(|s| s.as_str() == t.as_str()); + if !exists { + let _ = subs.push(t.clone()); + } + } + CMD_CHAN.send(Command::Subscribe(t)).await; } pub fn mqtt_events( @@ -126,7 +132,6 @@ fn truncate_str(s: &str) -> String { return h; } - // Cut on a UTF-8 char boundary to avoid panics. let mut cut = N; while cut > 0 && !s.is_char_boundary(cut) { cut -= 1; @@ -144,36 +149,16 @@ fn truncate_payload(data: &[u8]) -> Vec { async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(), ReasonCode> { match cmd { Command::Publish(msg) => { - match with_timeout( - Duration::from_secs(5), - client.send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain), - ) - .await - { - Ok(result) => result, - Err(_) => { - warn!("MQTT send timed out, forcing reconnect"); - Err(ReasonCode::UnspecifiedError) - } - } + client + .send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain) + .await } Command::Subscribe(topic) => { - match with_timeout( - Duration::from_secs(5), - client.subscribe_to_topic(topic.as_str()), - ) - .await - { - Ok(result) => { - result?; - info!("Subscribed to '{}'", topic); - Ok(()) - } - Err(_) => { - warn!("MQTT subscribe timed out"); - Err(ReasonCode::UnspecifiedError) - } + let res = client.subscribe_to_topic(topic.as_str()).await; + if res.is_ok() { + info!("Subscribed to '{}'", topic); } + res } } } @@ -198,7 +183,7 @@ async fn run_one_session( mqtt_rx: &mut [u8], ) -> Result<(), ()> { let mut socket = TcpSocket::new(stack, tcp_rx, tcp_tx); - socket.set_timeout(Some(Duration::from_secs(20))); + socket.set_timeout(Some(SOCKET_POLL_TIMEOUT)); match socket.connect(mqtt_broker_endpoint()).await { Ok(_) => info!("Connected TCP to MQTT broker"), Err(e) => { @@ -207,7 +192,6 @@ async fn run_one_session( } } - // MQTT configuration and client setup let mut cfg: ClientConfig<8, CountingRng> = ClientConfig::new(MqttVersion::MQTTv5, CountingRng(0)); cfg.keep_alive = KEEPALIVE_SECS as u16; @@ -224,6 +208,24 @@ async fn run_one_session( } } + // Re-subscribe after every (re)connect + let mut subs_snapshot: Vec, SUBS_MAX> = Vec::new(); + { + let subs = SUBS.lock().await; + for t in subs.iter() { + let _ = subs_snapshot.push(t.clone()); + } + } + for t in subs_snapshot.iter() { + match client.subscribe_to_topic(t.as_str()).await { + Ok(_) => info!("Subscribed to '{}'", t), + Err(e) => { + warn!("MQTT resubscribe failed: {:?}", e); + return Err(()); + } + } + } + let mut next_ping_at = Instant::now() + PING_PERIOD; loop { @@ -231,29 +233,19 @@ async fn run_one_session( if let Ok(mut guard) = IMU_LATEST.try_lock() { if let Some(payload) = guard.take() { drop(guard); - log::info!("MQTT IMU TX start ({} bytes)", payload.len()); - - // Limit send to max 2 seconds to catch network stalls - let send_res = with_timeout( - Duration::from_secs(2), - client.send_message("esp32/imu", &payload, QualityOfService::QoS0, false), - ) - .await; - - match send_res { - Ok(Ok(_)) => { + match client + .send_message("esp32/imu", &payload, QualityOfService::QoS0, false) + .await + { + Ok(_) => { log::info!("MQTT IMU TX ok"); next_ping_at = Instant::now() + PING_PERIOD; } - Ok(Err(e)) => { + Err(e) => { log::warn!("MQTT IMU TX failed: {:?}", e); return Err(()); } - Err(_) => { - log::warn!("MQTT IMU TX timed out, restarting session"); - return Err(()); - } } } } @@ -264,39 +256,34 @@ async fn run_one_session( next_ping_at = Instant::now() + PING_PERIOD; } - // Check for incoming messages with timeout - match with_timeout(Duration::from_millis(100), client.receive_message()).await { - Ok(Ok((topic, payload))) => { - // Handle incoming message + // Receive tick: socket timeout turns "no data" into ReasonCode::NetworkError. + // Treat that as idle, not as a broken session. + match client.receive_message().await { + Ok((topic, payload)) => { let _ = handle_incoming(Ok((topic, payload))).await; next_ping_at = Instant::now() + PING_PERIOD; } - Ok(Err(e)) => { - log::warn!("MQTT receive error: {:?}", e); + Err(ReasonCode::NetworkError) => { + // No incoming data during SOCKET_POLL_TIMEOUT -> ignore. + } + Err(e) => { + warn!("MQTT receive error (fatal): {:?}", e); return Err(()); } - Err(_) => { - } } + if Instant::now() >= next_ping_at { - match with_timeout(PING_TIMEOUT, client.send_ping()).await { - Ok(Ok(_)) => { - next_ping_at = Instant::now() + PING_PERIOD; - } - Ok(Err(e)) => { + match client.send_ping().await { + Ok(_) => next_ping_at = Instant::now() + PING_PERIOD, + Err(e) => { warn!("MQTT ping failed: {:?}", e); return Err(()); } - Err(_) => { - warn!("MQTT ping timed out, restarting session"); - return Err(()); - } } } } } -/// Main MQTT embassy task #[embassy_executor::task] pub async fn mqtt_task(stack: Stack<'static>) { info!("MQTT task starting...");