working now

This commit is contained in:
Priec
2026-01-18 23:58:17 +01:00
parent b729d3f23d
commit 054b42547e
2 changed files with 54 additions and 24 deletions

View File

@@ -13,7 +13,7 @@ use embassy_futures::select::{select, Either, select3, Either3};
use embassy_net::{Runner, StackResources}; use embassy_net::{Runner, StackResources};
use embassy_sync::signal::Signal; use embassy_sync::signal::Signal;
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_time::{Duration, Timer}; use embassy_time::{Duration, Timer, Instant};
use projekt_final::bus::I2cInner; use projekt_final::bus::I2cInner;
use projekt_final::mqtt::client::mqtt_set_imu; use projekt_final::mqtt::client::mqtt_set_imu;
@@ -136,7 +136,7 @@ async fn main(spawner: Spawner) -> ! {
spawner.spawn(mpu::task::mpu_task(mpu_i2c)).expect("spawn mpu_task"); spawner.spawn(mpu::task::mpu_task(mpu_i2c)).expect("spawn mpu_task");
display::api::set_status("Booting...").await; display::api::set_status("Booting...").await;
mqtt_subscribe("esp32/imu").await; mqtt_subscribe("esp32/read").await;
mqtt_publish("esp32/imu", b"online", QualityOfService::QoS1, false).await; mqtt_publish("esp32/imu", b"online", QualityOfService::QoS1, false).await;
display::api::set_status("Running").await; display::api::set_status("Running").await;
@@ -147,6 +147,8 @@ async fn main(spawner: Spawner) -> ! {
let mut imu_reading_count: u32 = 0; let mut imu_reading_count: u32 = 0;
let mut mqtt_msg_count: u32 = 0; let mut mqtt_msg_count: u32 = 0;
let mut mqtt_publish_drops: u32 = 0; let mut mqtt_publish_drops: u32 = 0;
let mut last_mqtt_publish = Instant::now();
let mqtt_publish_interval = Duration::from_secs(3);
loop { loop {
match select3( match select3(
@@ -160,26 +162,27 @@ async fn main(spawner: Spawner) -> ! {
display::api::set_mqtt_status(true, mqtt_msg_count).await; display::api::set_mqtt_status(true, mqtt_msg_count).await;
} }
Either3::Second(mut reading) => { Either3::Second(mut reading) => {
// Drain any queued IMU messages and keep only the latest // Drain zabezpečuje, že 'reading' je najčerstvejšia možná hodnota
let mut drained = 0; let mut drained = 0;
while let Ok(next) = imu_rx.try_receive() { while let Ok(next) = imu_rx.try_receive() {
reading = next; reading = next;
drained += 1; drained += 1;
} }
if drained > 0 {
log::info!("IMU drained {} stale readings before display", drained);
}
imu_reading_count += 1; imu_reading_count += 1;
// Show_imu is now non-blocking (uses try_send internally)
display::api::show_imu(reading); display::api::show_imu(reading);
if imu_reading_count % MQTT_PUBLISH_DIVIDER == 0 { // 3. Nahraďte pôvodnú podmienku týmto časovým zámkom
if last_mqtt_publish.elapsed() >= mqtt_publish_interval {
let payload = format!( let payload = format!(
"{{\"ax\":{:.2},\"ay\":{:.2},\"az\":{:.2},\"t\":{:.1}}}", "{{\"ax\":{:.2},\"ay\":{:.2},\"az\":{:.2},\"t\":{:.1}}}",
reading.accel_g[0], reading.accel_g[1], reading.accel_g[2], reading.temp_c reading.accel_g[0], reading.accel_g[1], reading.accel_g[2], reading.temp_c
); );
mqtt_set_imu(payload.as_bytes()); mqtt_set_imu(payload.as_bytes());
// Aktualizujeme čas posledného odoslania
last_mqtt_publish = Instant::now();
log::info!("MQTT IMU payload buffered (freshest data)");
} }
} }
Either3::Third(_) => { Either3::Third(_) => {

View File

@@ -2,7 +2,7 @@
use embassy_futures::select::{select, Either}; use embassy_futures::select::{select, Either};
use embassy_net::{tcp::TcpSocket, Stack}; use embassy_net::{tcp::TcpSocket, Stack};
use embassy_time::{Duration, Timer, with_timeout}; use embassy_time::{Duration, Timer, with_timeout, Instant};
use rust_mqtt::client::client::MqttClient; use rust_mqtt::client::client::MqttClient;
use rust_mqtt::client::client_config::{ClientConfig, MqttVersion}; use rust_mqtt::client::client_config::{ClientConfig, MqttVersion};
use rust_mqtt::packet::v5::publish_packet::QualityOfService; use rust_mqtt::packet::v5::publish_packet::QualityOfService;
@@ -21,6 +21,7 @@ use crate::mqtt::config::mqtt_broker_endpoint;
const RECONNECT_DELAY_SECS: u64 = 5; const RECONNECT_DELAY_SECS: u64 = 5;
const KEEPALIVE_SECS: u64 = 60; const KEEPALIVE_SECS: u64 = 60;
const PING_PERIOD: Duration = Duration::from_secs(KEEPALIVE_SECS / 2); const PING_PERIOD: Duration = Duration::from_secs(KEEPALIVE_SECS / 2);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
// Limits for static buffers // Limits for static buffers
pub const TOPIC_MAX: usize = 128; pub const TOPIC_MAX: usize = 128;
@@ -117,7 +118,20 @@ pub fn mqtt_events(
fn truncate_str<const N: usize>(s: &str) -> String<N> { fn truncate_str<const N: usize>(s: &str) -> String<N> {
let mut h = String::new(); let mut h = String::new();
let _ = h.push_str(&s[..s.len().min(N)]); if N == 0 {
return h;
}
if s.len() <= N {
let _ = h.push_str(s);
return h;
}
// Cut on a UTF-8 char boundary to avoid panics.
let mut cut = N;
while cut > 0 && !s.is_char_boundary(cut) {
cut -= 1;
}
let _ = h.push_str(&s[..cut]);
h h
} }
@@ -166,12 +180,13 @@ async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(),
async fn handle_incoming(result: Result<(&str, &[u8]), ReasonCode>) -> Result<(), ReasonCode> { async fn handle_incoming(result: Result<(&str, &[u8]), ReasonCode>) -> Result<(), ReasonCode> {
let (topic, payload) = result?; let (topic, payload) = result?;
EVT_CHAN let msg = IncomingMsg {
.send(IncomingMsg { topic: truncate_str::<TOPIC_MAX>(topic),
topic: truncate_str::<TOPIC_MAX>(topic), payload: truncate_payload(payload),
payload: truncate_payload(payload), };
}) if EVT_CHAN.try_send(msg).is_err() {
.await; warn!("MQTT EVT queue full, dropping incoming message");
}
Ok(()) Ok(())
} }
@@ -209,7 +224,7 @@ async fn run_one_session(
} }
} }
let mut ping_countdown: u32 = (KEEPALIVE_SECS * 10) as u32; // ticks until ping (100 ms each) let mut next_ping_at = Instant::now() + PING_PERIOD;
loop { loop {
// Send latest IMU payload if available // Send latest IMU payload if available
@@ -229,6 +244,7 @@ async fn run_one_session(
match send_res { match send_res {
Ok(Ok(_)) => { Ok(Ok(_)) => {
log::info!("MQTT IMU TX ok"); log::info!("MQTT IMU TX ok");
next_ping_at = Instant::now() + PING_PERIOD;
} }
Ok(Err(e)) => { Ok(Err(e)) => {
log::warn!("MQTT IMU TX failed: {:?}", e); log::warn!("MQTT IMU TX failed: {:?}", e);
@@ -245,6 +261,7 @@ async fn run_one_session(
// Process any queued control commands // Process any queued control commands
while let Ok(cmd) = CMD_CHAN.try_receive() { while let Ok(cmd) = CMD_CHAN.try_receive() {
handle_command(&mut client, cmd).await.map_err(|_| ())?; handle_command(&mut client, cmd).await.map_err(|_| ())?;
next_ping_at = Instant::now() + PING_PERIOD;
} }
// Check for incoming messages with timeout // Check for incoming messages with timeout
@@ -252,17 +269,27 @@ async fn run_one_session(
Ok(Ok((topic, payload))) => { Ok(Ok((topic, payload))) => {
// Handle incoming message // Handle incoming message
let _ = handle_incoming(Ok((topic, payload))).await; let _ = handle_incoming(Ok((topic, payload))).await;
next_ping_at = Instant::now() + PING_PERIOD;
} }
Ok(Err(e)) => { Ok(Err(e)) => {
log::warn!("MQTT receive error: {:?}", e); log::warn!("MQTT receive error: {:?}", e);
return Err(()); return Err(());
} }
Err(_) => { Err(_) => {
// Timeout -> no incoming message this period, check ping }
ping_countdown = ping_countdown.saturating_sub(1); }
if ping_countdown == 0 { if Instant::now() >= next_ping_at {
let _ = client.send_ping().await; match with_timeout(PING_TIMEOUT, client.send_ping()).await {
ping_countdown = (KEEPALIVE_SECS * 10) as u32; Ok(Ok(_)) => {
next_ping_at = Instant::now() + PING_PERIOD;
}
Ok(Err(e)) => {
warn!("MQTT ping failed: {:?}", e);
return Err(());
}
Err(_) => {
warn!("MQTT ping timed out, restarting session");
return Err(());
} }
} }
} }