From 054b42547e54146cf601cff850e8a0643ab4030a Mon Sep 17 00:00:00 2001 From: Priec Date: Sun, 18 Jan 2026 23:58:17 +0100 Subject: [PATCH] working now --- mqtt_display/src/bin/main.rs | 23 ++++++++------ mqtt_display/src/mqtt/client.rs | 55 ++++++++++++++++++++++++--------- 2 files changed, 54 insertions(+), 24 deletions(-) diff --git a/mqtt_display/src/bin/main.rs b/mqtt_display/src/bin/main.rs index 3de57d1..b6cfcb8 100644 --- a/mqtt_display/src/bin/main.rs +++ b/mqtt_display/src/bin/main.rs @@ -13,7 +13,7 @@ use embassy_futures::select::{select, Either, select3, Either3}; use embassy_net::{Runner, StackResources}; use embassy_sync::signal::Signal; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; -use embassy_time::{Duration, Timer}; +use embassy_time::{Duration, Timer, Instant}; use projekt_final::bus::I2cInner; use projekt_final::mqtt::client::mqtt_set_imu; @@ -136,7 +136,7 @@ async fn main(spawner: Spawner) -> ! { spawner.spawn(mpu::task::mpu_task(mpu_i2c)).expect("spawn mpu_task"); display::api::set_status("Booting...").await; - mqtt_subscribe("esp32/imu").await; + mqtt_subscribe("esp32/read").await; mqtt_publish("esp32/imu", b"online", QualityOfService::QoS1, false).await; display::api::set_status("Running").await; @@ -147,6 +147,8 @@ async fn main(spawner: Spawner) -> ! { let mut imu_reading_count: u32 = 0; let mut mqtt_msg_count: u32 = 0; let mut mqtt_publish_drops: u32 = 0; + let mut last_mqtt_publish = Instant::now(); + let mqtt_publish_interval = Duration::from_secs(3); loop { match select3( @@ -160,26 +162,27 @@ async fn main(spawner: Spawner) -> ! { display::api::set_mqtt_status(true, mqtt_msg_count).await; } Either3::Second(mut reading) => { - // Drain any queued IMU messages and keep only the latest + // Drain zabezpečuje, že 'reading' je najčerstvejšia možná hodnota let mut drained = 0; while let Ok(next) = imu_rx.try_receive() { reading = next; drained += 1; } - if drained > 0 { - log::info!("IMU drained {} stale readings before display", drained); - } + imu_reading_count += 1; - - // Show_imu is now non-blocking (uses try_send internally) display::api::show_imu(reading); - - if imu_reading_count % MQTT_PUBLISH_DIVIDER == 0 { + + // 3. Nahraďte pôvodnú podmienku týmto časovým zámkom + if last_mqtt_publish.elapsed() >= mqtt_publish_interval { let payload = format!( "{{\"ax\":{:.2},\"ay\":{:.2},\"az\":{:.2},\"t\":{:.1}}}", reading.accel_g[0], reading.accel_g[1], reading.accel_g[2], reading.temp_c ); mqtt_set_imu(payload.as_bytes()); + + // Aktualizujeme čas posledného odoslania + last_mqtt_publish = Instant::now(); + log::info!("MQTT IMU payload buffered (freshest data)"); } } Either3::Third(_) => { diff --git a/mqtt_display/src/mqtt/client.rs b/mqtt_display/src/mqtt/client.rs index 5436526..2f39a01 100644 --- a/mqtt_display/src/mqtt/client.rs +++ b/mqtt_display/src/mqtt/client.rs @@ -2,7 +2,7 @@ use embassy_futures::select::{select, Either}; use embassy_net::{tcp::TcpSocket, Stack}; -use embassy_time::{Duration, Timer, with_timeout}; +use embassy_time::{Duration, Timer, with_timeout, Instant}; use rust_mqtt::client::client::MqttClient; use rust_mqtt::client::client_config::{ClientConfig, MqttVersion}; use rust_mqtt::packet::v5::publish_packet::QualityOfService; @@ -21,6 +21,7 @@ use crate::mqtt::config::mqtt_broker_endpoint; const RECONNECT_DELAY_SECS: u64 = 5; const KEEPALIVE_SECS: u64 = 60; const PING_PERIOD: Duration = Duration::from_secs(KEEPALIVE_SECS / 2); +const PING_TIMEOUT: Duration = Duration::from_secs(2); // Limits for static buffers pub const TOPIC_MAX: usize = 128; @@ -117,7 +118,20 @@ pub fn mqtt_events( fn truncate_str(s: &str) -> String { let mut h = String::new(); - let _ = h.push_str(&s[..s.len().min(N)]); + if N == 0 { + return h; + } + if s.len() <= N { + let _ = h.push_str(s); + return h; + } + + // Cut on a UTF-8 char boundary to avoid panics. + let mut cut = N; + while cut > 0 && !s.is_char_boundary(cut) { + cut -= 1; + } + let _ = h.push_str(&s[..cut]); h } @@ -166,12 +180,13 @@ async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(), async fn handle_incoming(result: Result<(&str, &[u8]), ReasonCode>) -> Result<(), ReasonCode> { let (topic, payload) = result?; - EVT_CHAN - .send(IncomingMsg { - topic: truncate_str::(topic), - payload: truncate_payload(payload), - }) - .await; + let msg = IncomingMsg { + topic: truncate_str::(topic), + payload: truncate_payload(payload), + }; + if EVT_CHAN.try_send(msg).is_err() { + warn!("MQTT EVT queue full, dropping incoming message"); + } Ok(()) } @@ -209,7 +224,7 @@ async fn run_one_session( } } - let mut ping_countdown: u32 = (KEEPALIVE_SECS * 10) as u32; // ticks until ping (100 ms each) + let mut next_ping_at = Instant::now() + PING_PERIOD; loop { // Send latest IMU payload if available @@ -229,6 +244,7 @@ async fn run_one_session( match send_res { Ok(Ok(_)) => { log::info!("MQTT IMU TX ok"); + next_ping_at = Instant::now() + PING_PERIOD; } Ok(Err(e)) => { log::warn!("MQTT IMU TX failed: {:?}", e); @@ -245,6 +261,7 @@ async fn run_one_session( // Process any queued control commands while let Ok(cmd) = CMD_CHAN.try_receive() { handle_command(&mut client, cmd).await.map_err(|_| ())?; + next_ping_at = Instant::now() + PING_PERIOD; } // Check for incoming messages with timeout @@ -252,17 +269,27 @@ async fn run_one_session( Ok(Ok((topic, payload))) => { // Handle incoming message let _ = handle_incoming(Ok((topic, payload))).await; + next_ping_at = Instant::now() + PING_PERIOD; } Ok(Err(e)) => { log::warn!("MQTT receive error: {:?}", e); return Err(()); } Err(_) => { - // Timeout -> no incoming message this period, check ping - ping_countdown = ping_countdown.saturating_sub(1); - if ping_countdown == 0 { - let _ = client.send_ping().await; - ping_countdown = (KEEPALIVE_SECS * 10) as u32; + } + } + if Instant::now() >= next_ping_at { + match with_timeout(PING_TIMEOUT, client.send_ping()).await { + Ok(Ok(_)) => { + next_ping_at = Instant::now() + PING_PERIOD; + } + Ok(Err(e)) => { + warn!("MQTT ping failed: {:?}", e); + return Err(()); + } + Err(_) => { + warn!("MQTT ping timed out, restarting session"); + return Err(()); } } }