From 841197775133a6ab6b3cea348170905b777bc966 Mon Sep 17 00:00:00 2001 From: Priec Date: Mon, 19 Jan 2026 23:24:41 +0100 Subject: [PATCH] trying to fix, not success yet --- mqtt_display/src/bin/main.rs | 16 +-- mqtt_display/src/display/tui.rs | 18 +-- mqtt_display/src/i2c/com.rs | 63 --------- mqtt_display/src/mqtt/client.rs | 224 +++++++++++++++++++++++++------- 4 files changed, 192 insertions(+), 129 deletions(-) diff --git a/mqtt_display/src/bin/main.rs b/mqtt_display/src/bin/main.rs index b6cfcb8..7c9f8a5 100644 --- a/mqtt_display/src/bin/main.rs +++ b/mqtt_display/src/bin/main.rs @@ -15,7 +15,7 @@ use embassy_sync::signal::Signal; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_time::{Duration, Timer, Instant}; use projekt_final::bus::I2cInner; -use projekt_final::mqtt::client::mqtt_set_imu; +use projekt_final::mqtt::client; use esp_alloc as _; use esp_backtrace as _; @@ -39,6 +39,8 @@ use log::info; use rust_mqtt::packet::v5::publish_packet::QualityOfService; use static_cell::StaticCell; use core::cell::RefCell; +use heapless::String; +use core::fmt::Write; use projekt_final::{ bus, @@ -106,7 +108,7 @@ async fn main(spawner: Spawner) -> ! { let wifi_interface = interfaces.sta; let timg1 = TimerGroup::new(peripherals.TIMG1); - esp_hal_embassy::init(timg1.timer0); + esp_hal_embassy::init([timg1.timer0, timg1.timer1]); let seed = (rng.random() as u64) << 32 | rng.random() as u64; @@ -174,15 +176,9 @@ async fn main(spawner: Spawner) -> ! { // 3. Nahraďte pôvodnú podmienku týmto časovým zámkom if last_mqtt_publish.elapsed() >= mqtt_publish_interval { - let payload = format!( - "{{\"ax\":{:.2},\"ay\":{:.2},\"az\":{:.2},\"t\":{:.1}}}", - reading.accel_g[0], reading.accel_g[1], reading.accel_g[2], reading.temp_c - ); - mqtt_set_imu(payload.as_bytes()); - - // Aktualizujeme čas posledného odoslania + let payload = client::encode_imu_json(&reading); + client::mqtt_set_imu_payload(payload); last_mqtt_publish = Instant::now(); - log::info!("MQTT IMU payload buffered (freshest data)"); } } Either3::Third(_) => { diff --git a/mqtt_display/src/display/tui.rs b/mqtt_display/src/display/tui.rs index 92ed593..7a3c35a 100644 --- a/mqtt_display/src/display/tui.rs +++ b/mqtt_display/src/display/tui.rs @@ -1,7 +1,7 @@ // src/display/tui.rs use crate::contracts::{DisplayCommand, ImuReading}; use alloc::format; -use heapless::String as HString; +use heapless::String; use pages_tui::prelude::*; use ratatui::{ layout::Rect, @@ -27,25 +27,25 @@ const NAV_TARGETS: &[PageFocus] = &[PageFocus::NavPrev, PageFocus::NavNext]; /// Display state pub struct DisplayState { - pub(crate) status: HString<32>, + pub(crate) status: String<32>, pub(crate) last_imu: Option, - pub(crate) last_error: Option>, + pub(crate) last_error: Option>, pub(crate) mqtt_connected: bool, pub(crate) mqtt_msg_count: u32, - pub(crate) chat_msg1: HString<24>, - pub(crate) chat_msg2: HString<24>, + pub(crate) chat_msg1: String<24>, + pub(crate) chat_msg2: String<24>, } impl Default for DisplayState { fn default() -> Self { Self { - status: HString::new(), + status: String::new(), last_imu: None, last_error: None, mqtt_connected: false, mqtt_msg_count: 0, - chat_msg1: HString::new(), - chat_msg2: HString::new(), + chat_msg1: String::new(), + chat_msg2: String::new(), } } } @@ -66,7 +66,7 @@ impl DisplayState { DisplayCommand::Clear => { self.last_imu = None; self.last_error = None; - self.status = HString::new(); + self.status = String::new(); } DisplayCommand::PushKey(_) => {} DisplayCommand::AddChatMessage(msg) => { diff --git a/mqtt_display/src/i2c/com.rs b/mqtt_display/src/i2c/com.rs index a53ada6..485a276 100644 --- a/mqtt_display/src/i2c/com.rs +++ b/mqtt_display/src/i2c/com.rs @@ -5,71 +5,8 @@ use esp_hal::{ i2c::master::{Config, I2c}, peripherals::Peripherals, }; -use mousefood::{EmbeddedBackend, EmbeddedBackendConfig}; -use ratatui::{ - layout::{Constraint, Direction, Layout}, - widgets::{Block, Borders, Paragraph}, - Terminal, -}; -use ssd1306::{prelude::*, I2CDisplayInterface, Ssd1306, mode::BufferedGraphicsMode}; use log::info; -#[embassy_executor::task] -pub async fn display_task() { - let peripherals = unsafe { Peripherals::steal() }; - - let i2c = I2c::new(peripherals.I2C0, Config::default()) - .expect("Failed to initialize I2C") - .with_sda(peripherals.GPIO21) - .with_scl(peripherals.GPIO22); - - let interface = I2CDisplayInterface::new(i2c); - let mut display = Ssd1306::new(interface, DisplaySize128x64, DisplayRotation::Rotate0) - .into_buffered_graphics_mode(); - - display.init().unwrap(); - - let config = EmbeddedBackendConfig { - flush_callback: alloc::boxed::Box::new(|display| { - let d: &mut Ssd1306<_, _, BufferedGraphicsMode> = display; - d.flush().unwrap(); - }), - ..Default::default() - }; - - let backend = EmbeddedBackend::new(&mut display, config); - let mut terminal = Terminal::new(backend).unwrap(); - - let mut counter = 0; - - loop { - terminal - .draw(|f| { - let chunks = Layout::default() - .direction(Direction::Vertical) - .constraints([Constraint::Length(3), Constraint::Min(0)]) - .split(f.area()); - - f.render_widget( - Block::default().title(" ESP32 Status ").borders(Borders::ALL), - chunks[0], - ); - - let content = alloc::format!("MQTT Active\nCounter: {}", counter); - f.render_widget( - Paragraph::new(content).block( - Block::default().borders(Borders::LEFT | Borders::RIGHT | Borders::BOTTOM), - ), - chunks[1], - ); - }) - .unwrap(); - - counter += 1; - Timer::after(Duration::from_millis(1000)).await; - } -} - #[embassy_executor::task] pub async fn i2c_check() { let peripherals = unsafe { Peripherals::steal() }; diff --git a/mqtt_display/src/mqtt/client.rs b/mqtt_display/src/mqtt/client.rs index ae77e21..a7d2f8b 100644 --- a/mqtt_display/src/mqtt/client.rs +++ b/mqtt_display/src/mqtt/client.rs @@ -2,6 +2,7 @@ use embassy_net::{tcp::TcpSocket, Stack}; use embassy_time::{Duration, Timer, Instant}; +use embassy_futures::select::{select, Either}; use rust_mqtt::client::client::MqttClient; use rust_mqtt::client::client_config::{ClientConfig, MqttVersion}; use rust_mqtt::packet::v5::publish_packet::QualityOfService; @@ -11,16 +12,23 @@ use rust_mqtt::utils::rng_generator::CountingRng; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::mutex::Mutex; use embassy_sync::channel::{Channel, Receiver}; +use embassy_sync::signal::Signal; use heapless::{String, Vec}; use static_cell::ConstStaticCell; +use core::fmt::Write; use log::{info, warn}; use crate::mqtt::config::mqtt_broker_endpoint; +use crate::contracts::ImuReading; const RECONNECT_DELAY_SECS: u64 = 5; const KEEPALIVE_SECS: u64 = 60; const PING_PERIOD: Duration = Duration::from_secs(KEEPALIVE_SECS / 2); const SOCKET_POLL_TIMEOUT: Duration = Duration::from_secs(1); +const PING_TIMEOUT: Duration = Duration::from_secs(5); +// Must be > PING_PERIOD, ideally > KEEPALIVE +const NO_SUCCESS_TIMEOUT: Duration = Duration::from_secs(120); +const NO_IMU_SIG_WARN: Duration = Duration::from_secs(10); const SUBS_MAX: usize = 8; // Limits for static buffers @@ -64,9 +72,8 @@ enum Command { static CMD_CHAN: Channel = Channel::new(); static EVT_CHAN: Channel = Channel::new(); -/// Shared latest IMU payload (non-blocking, latest-value semantics) -static IMU_LATEST: Mutex>> = - Mutex::new(None); +/// Latest-value + wake-up semantics for IMU publish payload (single consumer: MQTT task) +static IMU_SIG: Signal> = Signal::new(); static SUBS: Mutex, SUBS_MAX>> = Mutex::new(Vec::new()); @@ -95,12 +102,29 @@ pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, reta .is_ok() } -pub fn mqtt_set_imu(payload: &[u8]) { - if let Ok(mut guard) = IMU_LATEST.try_lock() { - let mut buf: Vec = Vec::new(); - let _ = buf.extend_from_slice(&payload[..payload.len().min(PAYLOAD_MAX)]); - *guard = Some(buf); - } +pub fn mqtt_set_imu(reading: ImuReading) { + // Encode JSON into a bounded buffer (no alloc::format!) + let payload = encode_imu_json(&reading); + IMU_SIG.signal(payload); +} + +pub fn mqtt_set_imu_payload(payload: Vec) { + IMU_SIG.signal(payload); +} + +pub fn encode_imu_json(reading: &ImuReading) -> Vec { + let mut s: String<256> = String::new(); + let _ = write!( + &mut s, + "{{\"ax\":{:.2},\"ay\":{:.2},\"az\":{:.2},\"gx\":{:.1},\"gy\":{:.1},\"gz\":{:.1},\"t\":{:.1},\"ts\":{}}}", + reading.accel_g[0], reading.accel_g[1], reading.accel_g[2], + reading.gyro_dps[0], reading.gyro_dps[1], reading.gyro_dps[2], + reading.temp_c, + reading.timestamp_ms + ); + let mut v: Vec = Vec::new(); + let _ = v.extend_from_slice(s.as_bytes()); + v } pub async fn mqtt_subscribe(topic: &str) { @@ -183,7 +207,7 @@ async fn run_one_session( mqtt_rx: &mut [u8], ) -> Result<(), ()> { let mut socket = TcpSocket::new(stack, tcp_rx, tcp_tx); - socket.set_timeout(Some(SOCKET_POLL_TIMEOUT)); + socket.set_timeout(Some(SOCKET_POLL_TIMEOUT * 10)); match socket.connect(mqtt_broker_endpoint()).await { Ok(_) => info!("Connected TCP to MQTT broker"), Err(e) => { @@ -227,57 +251,163 @@ async fn run_one_session( } let mut next_ping_at = Instant::now() + PING_PERIOD; + let cmd_rx = CMD_CHAN.receiver(); + + // Only restart the session after N consecutive IMU publish failures + let mut imu_tx_fail_streak: u8 = 0; + let mut hb_at = Instant::now() + Duration::from_secs(10); + let mut tx_ok: u32 = 0; + let mut tx_err: u32 = 0; + let mut rx_ok: u32 = 0; + let mut ping_ok: u32 = 0; + let mut last_ok = Instant::now(); // last successful MQTT I/O (tx/rx/ping) + let mut last_imu_sig = Instant::now(); // last time we received IMU_SIG in MQTT task loop { - // Send latest IMU payload if available - if let Ok(mut guard) = IMU_LATEST.try_lock() { - if let Some(payload) = guard.take() { - drop(guard); - log::info!("MQTT IMU TX start ({} bytes)", payload.len()); - match client - .send_message("esp32/imu", &payload, QualityOfService::QoS0, false) + let now = Instant::now(); + + if now - last_ok > NO_SUCCESS_TIMEOUT { + warn!( + "MQTT no successful I/O for {:?} -> restart session", + now - last_ok + ); + return Err(()); + } + + if now - last_imu_sig > NO_IMU_SIG_WARN { + // This is diagnostic: if core0 claims it's sending, but this prints, you have cross-core signaling loss. + warn!( + "MQTT hasn't received IMU_SIG for {:?} (core0->core1 sync likely broken)", + now - last_imu_sig + ); + // Rate-limit the warning + last_imu_sig = now; + } + + let ping_in = if next_ping_at > now { next_ping_at - now } else { Duration::from_secs(0) }; + + // Timebox receive_message() even if lower layers misbehave. + let recv_fut = async { + match select(client.receive_message(), Timer::after(SOCKET_POLL_TIMEOUT)).await { + Either::First(res) => Some(res), + Either::Second(_) => None, + } + }; + + // 4-way select using nested selects to avoid relying on select4() + match select( + select(cmd_rx.receive(), IMU_SIG.wait()), + select(recv_fut, Timer::after(ping_in)), + ) + .await + { + // Command received + Either::First(Either::First(cmd)) => { + handle_command(&mut client, cmd).await.map_err(|_| ())?; + next_ping_at = Instant::now() + PING_PERIOD; + last_ok = Instant::now(); + + // Drain any additional queued commands quickly + while let Ok(cmd) = CMD_CHAN.try_receive() { + handle_command(&mut client, cmd).await.map_err(|_| ())?; + last_ok = Instant::now(); + next_ping_at = Instant::now() + PING_PERIOD; + } + } + + // IMU update signaled (latest value semantics) + Either::First(Either::Second(payload)) => { + last_imu_sig = Instant::now(); + // Enable temporarily for diagnostics: + // info!("IMU_SIG received in MQTT task (len={})", payload.len()); + + // Timebox the publish so we don't hang forever inside send_message().await + let send_res = match select( + client.send_message("esp32/imu", &payload, QualityOfService::QoS0, false), + Timer::after(Duration::from_secs(5)), + ) .await - { + { + Either::First(res) => res, + Either::Second(_) => Err(ReasonCode::NetworkError), + }; + + match send_res { Ok(_) => { - log::info!("MQTT IMU TX ok"); next_ping_at = Instant::now() + PING_PERIOD; + imu_tx_fail_streak = 0; + last_ok = Instant::now(); + + tx_ok = tx_ok.wrapping_add(1); + if (tx_ok % 10) == 0 { + info!( + "MQTT alive: tx_ok={} tx_err={} rx_ok={} streak={}", + tx_ok, tx_err, rx_ok, imu_tx_fail_streak + ); + } } Err(e) => { - log::warn!("MQTT IMU TX failed: {:?}", e); - return Err(()); + tx_err = tx_err.wrapping_add(1); + imu_tx_fail_streak = imu_tx_fail_streak.saturating_add(1); + warn!("MQTT IMU TX fail {}/5: {:?}", imu_tx_fail_streak, e); + + if imu_tx_fail_streak >= 5 { + warn!("MQTT IMU TX fail 5x -> restart session"); + return Err(()); + } } } } - } - // Process any queued control commands - while let Ok(cmd) = CMD_CHAN.try_receive() { - handle_command(&mut client, cmd).await.map_err(|_| ())?; - next_ping_at = Instant::now() + PING_PERIOD; - } + // Incoming message (or None on timeout) + Either::Second(Either::First(opt)) => { + if let Some(res) = opt { + match res { + Ok((topic, payload)) => { + rx_ok = rx_ok.wrapping_add(1); + let _ = handle_incoming(Ok((topic, payload))).await; - // Receive tick: socket timeout turns "no data" into ReasonCode::NetworkError. - // Treat that as idle, not as a broken session. - match client.receive_message().await { - Ok((topic, payload)) => { - let _ = handle_incoming(Ok((topic, payload))).await; - next_ping_at = Instant::now() + PING_PERIOD; + last_ok = Instant::now(); + imu_tx_fail_streak = 0; + next_ping_at = Instant::now() + PING_PERIOD; + } + Err(ReasonCode::NetworkError) => { + // idle tick + } + Err(e) => { + warn!("MQTT receive error (fatal): {:?}", e); + return Err(()); + } + } + } } - Err(ReasonCode::NetworkError) => { - // No incoming data during SOCKET_POLL_TIMEOUT -> ignore. - } - Err(e) => { - warn!("MQTT receive error (fatal): {:?}", e); - return Err(()); - } - } - if Instant::now() >= next_ping_at { - match client.send_ping().await { - Ok(_) => next_ping_at = Instant::now() + PING_PERIOD, - Err(e) => { - warn!("MQTT ping failed: {:?}", e); - return Err(()); + // Ping timer fired + Either::Second(Either::Second(_)) => { + if Instant::now() >= hb_at { + info!( + "MQTT hb tx_ok={} tx_err={} rx_ok={} ping_ok={} streak={}", + tx_ok, tx_err, rx_ok, ping_ok, imu_tx_fail_streak + ); + hb_at = Instant::now() + Duration::from_secs(10); + } + + let ping_res = match select(client.send_ping(), Timer::after(PING_TIMEOUT)).await { + Either::First(res) => res, + Either::Second(_) => Err(ReasonCode::NetworkError), + }; + + match ping_res { + Ok(_) => { + ping_ok = ping_ok.wrapping_add(1); + imu_tx_fail_streak = 0; + last_ok = Instant::now(); + next_ping_at = Instant::now() + PING_PERIOD; + } + Err(e) => { + warn!("MQTT ping failed/timeout: {:?}", e); + return Err(()); + } } } }