6 Commits

Author SHA1 Message Date
Priec
8411977751 trying to fix, not success yet 2026-01-19 23:24:41 +01:00
Priec
fd5dd6e888 mermaids 2026-01-19 16:43:58 +01:00
Priec
2774e83d99 ported into a generic library 2026-01-19 15:53:19 +01:00
Priec
9910bf9402 enum to string that is passed to the library, waiting for the enum based generic library 2026-01-19 15:23:39 +01:00
Priec
424c795170 cleanup 2026-01-19 13:56:39 +01:00
Priec
6b26ed9318 working with enums passed as a strings to the library 2026-01-19 13:09:46 +01:00
15 changed files with 424 additions and 227 deletions

View File

@@ -25,6 +25,7 @@
cargo-espflash
espup
mosquitto
mermaid-cli
];
shellHook = ''

View File

@@ -0,0 +1,37 @@
graph TD
START[MPU reads @50ms = 20 Hz] --> A{IMU_CHANNEL<br/>try_send}
A -->|Full| DROP1[❌ DROP: Channel full<br/>16 slots @ 50ms = 800ms buffer]
A -->|OK| B[Main Loop receive]
B --> C[Drain loop:<br/>Get freshest reading]
C --> D{Time check:<br/>≥3s since last?}
D -->|No| E[Skip MQTT]
D -->|Yes| F{mqtt_set_imu<br/>try_lock IMU_LATEST}
F -->|Locked| DROP2[❌ SKIP: Mutex busy]
F -->|OK| G[Store payload]
E --> H[show_imu]
G --> H
H --> I{DISPLAY_CHANNEL<br/>try_send}
I -->|Full| DROP3[❌ DROP: Display slow<br/>8 slots @ 100ms = 800ms buffer]
I -->|OK| J[Display renders]
G --> K[MQTT Task loop]
K --> L{IMU_LATEST<br/>try_lock}
L -->|Empty/Locked| M[Skip this iteration]
L -->|Has data| N[Send to broker<br/>QoS0 no retain]
N --> O{TCP send result}
O -->|Fail| RECONNECT[❌ Session dies<br/>Reconnect in 5s]
O -->|OK| P[✅ Data sent]
style DROP1 fill:#ffcccc
style DROP2 fill:#ffcccc
style DROP3 fill:#ffcccc
style RECONNECT fill:#ffcccc
style P fill:#ccffcc

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 32 KiB

View File

@@ -0,0 +1,54 @@
sequenceDiagram
participant HW as MPU6050 Hardware
participant MPU as MPU Task<br/>(Core 0)
participant CH as IMU_CHANNEL<br/>[16 slots]
participant MAIN as Main Loop<br/>(Core 0)
participant LATEST as IMU_LATEST<br/>(Mutex)
participant MQTT as MQTT Task<br/>(Core 1)
participant DISP as Display API
Note over MPU: Every 50ms
MPU->>HW: Read sensor (I2C)
HW-->>MPU: Raw data
MPU->>MPU: Convert to ImuReading
MPU->>CH: try_send(reading)
alt Channel full
CH--xMPU: Drop (log warning)
else Channel has space
CH-->>MPU: OK
end
Note over MAIN: Continuous loop
MAIN->>CH: receive() [blocking]
CH-->>MAIN: reading
Note over MAIN: Drain queue
loop While available
MAIN->>CH: try_receive()
CH-->>MAIN: newer reading
end
MAIN->>DISP: show_imu(reading)<br/>[try_send]
Note over MAIN: Every 3 seconds
alt Time >= 3s since last
MAIN->>MAIN: Format JSON payload
MAIN->>LATEST: mqtt_set_imu()<br/>[try_lock]
alt Mutex available
LATEST-->>MAIN: Stored
else Mutex locked
LATEST--xMAIN: Skip
end
end
Note over MQTT: Continuous loop
MQTT->>LATEST: try_lock()
alt Data available
LATEST-->>MQTT: payload
MQTT->>MQTT: send_message("esp32/imu")
else No data
LATEST--xMQTT: None
end

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 37 KiB

View File

@@ -0,0 +1,42 @@
graph TB
subgraph "Core 0 - Application"
MPU[MPU Task<br/>50ms sampling]
DISPLAY[Display Task<br/>100ms refresh]
MAIN[Main Loop]
BUTTONS[Button Task]
end
subgraph "Core 1 - Network"
WIFI[WiFi Connection Task]
NETWORK[Network Stack Runner]
MQTT[MQTT Task]
end
subgraph "Shared Channels"
IMU_CH[(IMU_CHANNEL<br/>size: 16)]
DISP_CH[(DISPLAY_CHANNEL<br/>size: 8)]
CMD_CH[(CMD_CHAN<br/>size: 8)]
EVT_CH[(EVT_CHAN<br/>size: 8)]
IMU_LATEST[(IMU_LATEST<br/>Mutex)]
end
subgraph "Hardware"
MPU_HW[MPU6050<br/>I2C 0x68]
OLED[SSD1306<br/>I2C]
BROKER[MQTT Broker]
end
MPU_HW -->|I2C Read| MPU
MPU -->|send| IMU_CH
IMU_CH -->|receive| MAIN
MAIN -->|try_send| DISP_CH
MAIN -->|mqtt_set_imu| IMU_LATEST
DISP_CH -->|receive| DISPLAY
DISPLAY -->|I2C Write| OLED
IMU_LATEST -->|try_lock| MQTT
MQTT <-->|TCP/IP| BROKER
BUTTONS -->|push_key| DISP_CH
style IMU_CH fill:#ff9999
style DISP_CH fill:#99ccff
style IMU_LATEST fill:#ffcc99

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 24 KiB

View File

@@ -15,7 +15,7 @@ use embassy_sync::signal::Signal;
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_time::{Duration, Timer, Instant};
use projekt_final::bus::I2cInner;
use projekt_final::mqtt::client::mqtt_set_imu;
use projekt_final::mqtt::client;
use esp_alloc as _;
use esp_backtrace as _;
@@ -39,6 +39,8 @@ use log::info;
use rust_mqtt::packet::v5::publish_packet::QualityOfService;
use static_cell::StaticCell;
use core::cell::RefCell;
use heapless::String;
use core::fmt::Write;
use projekt_final::{
bus,
@@ -106,7 +108,7 @@ async fn main(spawner: Spawner) -> ! {
let wifi_interface = interfaces.sta;
let timg1 = TimerGroup::new(peripherals.TIMG1);
esp_hal_embassy::init(timg1.timer0);
esp_hal_embassy::init([timg1.timer0, timg1.timer1]);
let seed = (rng.random() as u64) << 32 | rng.random() as u64;
@@ -174,15 +176,9 @@ async fn main(spawner: Spawner) -> ! {
// 3. Nahraďte pôvodnú podmienku týmto časovým zámkom
if last_mqtt_publish.elapsed() >= mqtt_publish_interval {
let payload = format!(
"{{\"ax\":{:.2},\"ay\":{:.2},\"az\":{:.2},\"t\":{:.1}}}",
reading.accel_g[0], reading.accel_g[1], reading.accel_g[2], reading.temp_c
);
mqtt_set_imu(payload.as_bytes());
// Aktualizujeme čas posledného odoslania
let payload = client::encode_imu_json(&reading);
client::mqtt_set_imu_payload(payload);
last_mqtt_publish = Instant::now();
log::info!("MQTT IMU payload buffered (freshest data)");
}
}
Either3::Third(_) => {

View File

@@ -1,17 +1,19 @@
// src/bus/mod.rs
//! Shared access to the hardware I2C peripheral on a
//! single core.
use core::cell::RefCell;
use embedded_hal_bus::i2c::RefCellDevice;
use esp_hal::i2c::master::I2c;
use esp_hal::Async;
/// The underlying I2C peripheral type
/// I2C peripheral type
pub type I2cInner = I2c<'static, Async>;
/// RefCell to share the bus on a single core.
pub type SharedI2c = RefCell<I2cInner>;
/// A handle to a shared I2C device.
/// Shared I2C device.
pub type I2cDevice = RefCellDevice<'static, I2cInner>;
/// New I2C device handle from the shared bus.

View File

@@ -1,10 +1,9 @@
// src/contracts.rs
//! Cross-feature message contracts.
//!
//! This is the ONLY coupling point between features.
//! Features depend on these types, not on each other.
use heapless::String as HString;
use heapless::String;
use pages_tui::input::Key;
/// IMU sensor reading from MPU6050
@@ -20,17 +19,13 @@ pub struct ImuReading {
/// Commands that can be sent to the display actor
#[derive(Clone, Debug)]
pub enum DisplayCommand {
/// Show IMU sensor data
SetImu(ImuReading),
/// Show a status line (max 32 chars)
SetStatus(HString<32>),
/// Show an error message (max 64 chars)
ShowError(HString<64>),
/// Show MQTT connection status
SetStatus(String<32>),
ShowError(String<64>),
SetMqttStatus { connected: bool, msg_count: u32 },
/// Clear the display to default state
Clear,
PushKey(Key),
AddChatMessage(HString<24>),
AddChatMessage(String<24>),
}

View File

@@ -1,5 +1,5 @@
// src/display/api.rs
//! Public API for the display feature.
//! display API
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::channel::{Channel, Receiver, TrySendError};
@@ -29,7 +29,7 @@ pub(crate) fn receiver() -> Receiver<'static, CriticalSectionRawMutex, DisplayCo
/// Show IMU data - NON-BLOCKING to prevent backpressure deadlock
pub fn show_imu(reading: ImuReading) {
// CRITICAL FIX: Use try_send instead of send().await
// try_send instead of send().await
// If display is slow, we drop the reading rather than blocking the main loop
let _ = try_send(DisplayCommand::SetImu(reading));
}

View File

@@ -9,7 +9,7 @@ use ssd1306::{mode::BufferedGraphicsMode, prelude::*, I2CDisplayInterface, Ssd13
use crate::bus::I2cDevice;
use crate::display::api::receiver;
use crate::display::tui::{render_frame, next_page_id, prev_page_id, DisplayState, Screen, ScreenEvent};
use crate::display::tui::{render_frame, DisplayState, Screen, ScreenEvent};
use crate::contracts::DisplayCommand;
use pages_tui::prelude::*;
@@ -43,14 +43,15 @@ pub async fn display_task(i2c: I2cDevice) {
let rx = receiver();
// Register pages
orchestrator.register_page("menu".into(), Screen::Menu);
orchestrator.register_page("imu".into(), Screen::Imu);
orchestrator.register_page("chat".into(), Screen::Chat);
// Enum-based registration
orchestrator.register(Screen::Menu);
orchestrator.register(Screen::Imu);
orchestrator.register(Screen::Chat);
orchestrator.bind(Key::tab(), ComponentAction::Next);
orchestrator.bind(Key::enter(), ComponentAction::Select);
let _ = orchestrator.navigate_to("menu".into());
let _ = orchestrator.navigate_to(Screen::Menu);
info!("Display ready");
@@ -59,27 +60,27 @@ pub async fn display_task(i2c: I2cDevice) {
match cmd {
DisplayCommand::PushKey(key) => {
if key == Key::tab() {
orchestrator.focus_manager_mut().wrap_next();
orchestrator.focus_manager_mut().next();
} else if key == Key::enter() {
if let Ok(events) = orchestrator.process_frame(key) {
for event in events {
match event {
ScreenEvent::GoToImu => {
let _ = orchestrator.navigate_to("imu".into());
let _ = orchestrator.navigate_to(Screen::Imu);
}
ScreenEvent::GoToChat => {
let _ = orchestrator.navigate_to("chat".into());
let _ = orchestrator.navigate_to(Screen::Chat);
}
ScreenEvent::NavigatePrev => {
if let Some(cur) = orchestrator.current_id() {
let prev = prev_page_id(cur.as_str());
let _ = orchestrator.navigate_to(prev.into());
if let Some(cur) = orchestrator.current() {
let prev = cur.prev();
let _ = orchestrator.navigate_to(prev);
}
}
ScreenEvent::NavigateNext => {
if let Some(cur) = orchestrator.current_id() {
let next = next_page_id(cur.as_str());
let _ = orchestrator.navigate_to(next.into());
if let Some(cur) = orchestrator.current() {
let next = cur.next();
let _ = orchestrator.navigate_to(next);
}
}
}

View File

@@ -1,7 +1,7 @@
// src/display/tui.rs
use crate::contracts::{DisplayCommand, ImuReading};
use alloc::format;
use heapless::String as HString;
use heapless::String;
use pages_tui::prelude::*;
use ratatui::{
layout::Rect,
@@ -27,25 +27,25 @@ const NAV_TARGETS: &[PageFocus] = &[PageFocus::NavPrev, PageFocus::NavNext];
/// Display state
pub struct DisplayState {
pub(crate) status: HString<32>,
pub(crate) status: String<32>,
pub(crate) last_imu: Option<ImuReading>,
pub(crate) last_error: Option<HString<64>>,
pub(crate) last_error: Option<String<64>>,
pub(crate) mqtt_connected: bool,
pub(crate) mqtt_msg_count: u32,
pub(crate) chat_msg1: HString<24>,
pub(crate) chat_msg2: HString<24>,
pub(crate) chat_msg1: String<24>,
pub(crate) chat_msg2: String<24>,
}
impl Default for DisplayState {
fn default() -> Self {
Self {
status: HString::new(),
status: String::new(),
last_imu: None,
last_error: None,
mqtt_connected: false,
mqtt_msg_count: 0,
chat_msg1: HString::new(),
chat_msg2: HString::new(),
chat_msg1: String::new(),
chat_msg2: String::new(),
}
}
}
@@ -66,7 +66,7 @@ impl DisplayState {
DisplayCommand::Clear => {
self.last_imu = None;
self.last_error = None;
self.status = HString::new();
self.status = String::new();
}
DisplayCommand::PushKey(_) => {}
DisplayCommand::AddChatMessage(msg) => {
@@ -137,16 +137,30 @@ impl Component for Screen {
}
// PAGE ORDER
const PAGE_ORDER: &[&str] = &["menu", "imu", "chat"];
const PAGE_ORDER: &[Screen] = &[Screen::Menu, Screen::Imu, Screen::Chat];
pub fn next_page_id(current: &str) -> &'static str {
let idx = PAGE_ORDER.iter().position(|&p| p == current).unwrap_or(0);
PAGE_ORDER[(idx + 1) % PAGE_ORDER.len()]
impl Screen {
pub fn next(&self) -> Screen {
let idx = PAGE_ORDER.iter().position(|p| p == self).unwrap_or(0);
PAGE_ORDER[(idx + 1) % PAGE_ORDER.len()].clone()
}
pub fn prev_page_id(current: &str) -> &'static str {
let idx = PAGE_ORDER.iter().position(|&p| p == current).unwrap_or(0);
if idx == 0 { PAGE_ORDER[PAGE_ORDER.len() - 1] } else { PAGE_ORDER[idx - 1] }
pub fn prev(&self) -> Screen {
let idx = PAGE_ORDER.iter().position(|p| p == self).unwrap_or(0);
if idx == 0 {
PAGE_ORDER[PAGE_ORDER.len() - 1].clone()
} else {
PAGE_ORDER[idx - 1].clone()
}
}
pub fn to_str(&self) -> &'static str {
match self {
Screen::Menu => "menu",
Screen::Imu => "imu",
Screen::Chat => "chat",
}
}
}
// RENDERING

View File

@@ -5,73 +5,8 @@ use esp_hal::{
i2c::master::{Config, I2c},
peripherals::Peripherals,
};
use ssd1306::mode::BufferedGraphicsMode;
use log::info;
#[embassy_executor::task]
pub async fn display_task() {
use mousefood::{EmbeddedBackend, EmbeddedBackendConfig};
use ratatui::{
layout::{Constraint, Direction, Layout},
widgets::{Block, Borders, Paragraph},
Terminal,
};
use ssd1306::{prelude::*, I2CDisplayInterface, Ssd1306};
let peripherals = unsafe { Peripherals::steal() };
let i2c = I2c::new(peripherals.I2C0, Config::default())
.expect("Failed to initialize I2C")
.with_sda(peripherals.GPIO21)
.with_scl(peripherals.GPIO22);
let interface = I2CDisplayInterface::new(i2c);
let mut display = Ssd1306::new(interface, DisplaySize128x64, DisplayRotation::Rotate0)
.into_buffered_graphics_mode();
display.init().unwrap();
let config = EmbeddedBackendConfig {
flush_callback: alloc::boxed::Box::new(|display| {
let d: &mut Ssd1306<_, _, BufferedGraphicsMode<DisplaySize128x64>> = display;
d.flush().unwrap();
}),
..Default::default()
};
let backend = EmbeddedBackend::new(&mut display, config);
let mut terminal = Terminal::new(backend).unwrap();
let mut counter = 0;
loop {
terminal
.draw(|f| {
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Length(3), Constraint::Min(0)])
.split(f.area());
f.render_widget(
Block::default().title(" ESP32 Status ").borders(Borders::ALL),
chunks[0],
);
let content = alloc::format!("MQTT Active\nCounter: {}", counter);
f.render_widget(
Paragraph::new(content).block(
Block::default().borders(Borders::LEFT | Borders::RIGHT | Borders::BOTTOM),
),
chunks[1],
);
})
.unwrap();
counter += 1;
Timer::after(Duration::from_millis(1000)).await;
}
}
#[embassy_executor::task]
pub async fn i2c_check() {
let peripherals = unsafe { Peripherals::steal() };

View File

@@ -1,8 +1,8 @@
// src/mqtt/client.rs
use embassy_futures::select::{select, Either};
use embassy_net::{tcp::TcpSocket, Stack};
use embassy_time::{Duration, Timer, with_timeout, Instant};
use embassy_time::{Duration, Timer, Instant};
use embassy_futures::select::{select, Either};
use rust_mqtt::client::client::MqttClient;
use rust_mqtt::client::client_config::{ClientConfig, MqttVersion};
use rust_mqtt::packet::v5::publish_packet::QualityOfService;
@@ -12,16 +12,24 @@ use rust_mqtt::utils::rng_generator::CountingRng;
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::mutex::Mutex;
use embassy_sync::channel::{Channel, Receiver};
use embassy_sync::signal::Signal;
use heapless::{String, Vec};
use static_cell::ConstStaticCell;
use core::fmt::Write;
use log::{info, warn};
use crate::mqtt::config::mqtt_broker_endpoint;
use crate::contracts::ImuReading;
const RECONNECT_DELAY_SECS: u64 = 5;
const KEEPALIVE_SECS: u64 = 60;
const PING_PERIOD: Duration = Duration::from_secs(KEEPALIVE_SECS / 2);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const SOCKET_POLL_TIMEOUT: Duration = Duration::from_secs(1);
const PING_TIMEOUT: Duration = Duration::from_secs(5);
// Must be > PING_PERIOD, ideally > KEEPALIVE
const NO_SUCCESS_TIMEOUT: Duration = Duration::from_secs(120);
const NO_IMU_SIG_WARN: Duration = Duration::from_secs(10);
const SUBS_MAX: usize = 8;
// Limits for static buffers
pub const TOPIC_MAX: usize = 128;
@@ -64,13 +72,14 @@ enum Command {
static CMD_CHAN: Channel<CriticalSectionRawMutex, Command, COMMAND_QUEUE> = Channel::new();
static EVT_CHAN: Channel<CriticalSectionRawMutex, IncomingMsg, EVENT_QUEUE> = Channel::new();
/// Shared latest IMU payload (non-blocking, latest-value semantics)
static IMU_LATEST: Mutex<CriticalSectionRawMutex, Option<Vec<u8, PAYLOAD_MAX>>> =
Mutex::new(None);
/// Latest-value + wake-up semantics for IMU publish payload (single consumer: MQTT task)
static IMU_SIG: Signal<CriticalSectionRawMutex, Vec<u8, PAYLOAD_MAX>> = Signal::new();
static SUBS: Mutex<CriticalSectionRawMutex, Vec<String<TOPIC_MAX>, SUBS_MAX>> =
Mutex::new(Vec::new());
/// Public API
/// Blocking publish for command/response messages.
pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) {
CMD_CHAN
.send(Command::Publish(PublishMsg {
@@ -82,7 +91,6 @@ pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, re
.await;
}
/// Non-blocking publish for other traffic (fire-and-forget)
pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) -> bool {
CMD_CHAN
.try_send(Command::Publish(PublishMsg {
@@ -94,19 +102,41 @@ pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, reta
.is_ok()
}
/// Set latest IMU telemetry payload (non-blocking, overwrites previous)
pub fn mqtt_set_imu(payload: &[u8]) {
if let Ok(mut guard) = IMU_LATEST.try_lock() {
let mut buf: Vec<u8, PAYLOAD_MAX> = Vec::new();
let _ = buf.extend_from_slice(&payload[..payload.len().min(PAYLOAD_MAX)]);
*guard = Some(buf);
pub fn mqtt_set_imu(reading: ImuReading) {
// Encode JSON into a bounded buffer (no alloc::format!)
let payload = encode_imu_json(&reading);
IMU_SIG.signal(payload);
}
pub fn mqtt_set_imu_payload(payload: Vec<u8, PAYLOAD_MAX>) {
IMU_SIG.signal(payload);
}
pub fn encode_imu_json(reading: &ImuReading) -> Vec<u8, PAYLOAD_MAX> {
let mut s: String<256> = String::new();
let _ = write!(
&mut s,
"{{\"ax\":{:.2},\"ay\":{:.2},\"az\":{:.2},\"gx\":{:.1},\"gy\":{:.1},\"gz\":{:.1},\"t\":{:.1},\"ts\":{}}}",
reading.accel_g[0], reading.accel_g[1], reading.accel_g[2],
reading.gyro_dps[0], reading.gyro_dps[1], reading.gyro_dps[2],
reading.temp_c,
reading.timestamp_ms
);
let mut v: Vec<u8, PAYLOAD_MAX> = Vec::new();
let _ = v.extend_from_slice(s.as_bytes());
v
}
pub async fn mqtt_subscribe(topic: &str) {
CMD_CHAN
.send(Command::Subscribe(truncate_str::<TOPIC_MAX>(topic)))
.await;
let t = truncate_str::<TOPIC_MAX>(topic);
{
let mut subs = SUBS.lock().await;
let exists = subs.iter().any(|s| s.as_str() == t.as_str());
if !exists {
let _ = subs.push(t.clone());
}
}
CMD_CHAN.send(Command::Subscribe(t)).await;
}
pub fn mqtt_events(
@@ -126,7 +156,6 @@ fn truncate_str<const N: usize>(s: &str) -> String<N> {
return h;
}
// Cut on a UTF-8 char boundary to avoid panics.
let mut cut = N;
while cut > 0 && !s.is_char_boundary(cut) {
cut -= 1;
@@ -144,36 +173,16 @@ fn truncate_payload(data: &[u8]) -> Vec<u8, PAYLOAD_MAX> {
async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(), ReasonCode> {
match cmd {
Command::Publish(msg) => {
match with_timeout(
Duration::from_secs(5),
client.send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain),
)
client
.send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain)
.await
{
Ok(result) => result,
Err(_) => {
warn!("MQTT send timed out, forcing reconnect");
Err(ReasonCode::UnspecifiedError)
}
}
}
Command::Subscribe(topic) => {
match with_timeout(
Duration::from_secs(5),
client.subscribe_to_topic(topic.as_str()),
)
.await
{
Ok(result) => {
result?;
let res = client.subscribe_to_topic(topic.as_str()).await;
if res.is_ok() {
info!("Subscribed to '{}'", topic);
Ok(())
}
Err(_) => {
warn!("MQTT subscribe timed out");
Err(ReasonCode::UnspecifiedError)
}
}
res
}
}
}
@@ -198,7 +207,7 @@ async fn run_one_session(
mqtt_rx: &mut [u8],
) -> Result<(), ()> {
let mut socket = TcpSocket::new(stack, tcp_rx, tcp_tx);
socket.set_timeout(Some(Duration::from_secs(20)));
socket.set_timeout(Some(SOCKET_POLL_TIMEOUT * 10));
match socket.connect(mqtt_broker_endpoint()).await {
Ok(_) => info!("Connected TCP to MQTT broker"),
Err(e) => {
@@ -207,7 +216,6 @@ async fn run_one_session(
}
}
// MQTT configuration and client setup
let mut cfg: ClientConfig<8, CountingRng> =
ClientConfig::new(MqttVersion::MQTTv5, CountingRng(0));
cfg.keep_alive = KEEPALIVE_SECS as u16;
@@ -224,79 +232,188 @@ async fn run_one_session(
}
}
// Re-subscribe after every (re)connect
let mut subs_snapshot: Vec<String<TOPIC_MAX>, SUBS_MAX> = Vec::new();
{
let subs = SUBS.lock().await;
for t in subs.iter() {
let _ = subs_snapshot.push(t.clone());
}
}
for t in subs_snapshot.iter() {
match client.subscribe_to_topic(t.as_str()).await {
Ok(_) => info!("Subscribed to '{}'", t),
Err(e) => {
warn!("MQTT resubscribe failed: {:?}", e);
return Err(());
}
}
}
let mut next_ping_at = Instant::now() + PING_PERIOD;
let cmd_rx = CMD_CHAN.receiver();
// Only restart the session after N consecutive IMU publish failures
let mut imu_tx_fail_streak: u8 = 0;
let mut hb_at = Instant::now() + Duration::from_secs(10);
let mut tx_ok: u32 = 0;
let mut tx_err: u32 = 0;
let mut rx_ok: u32 = 0;
let mut ping_ok: u32 = 0;
let mut last_ok = Instant::now(); // last successful MQTT I/O (tx/rx/ping)
let mut last_imu_sig = Instant::now(); // last time we received IMU_SIG in MQTT task
loop {
// Send latest IMU payload if available
if let Ok(mut guard) = IMU_LATEST.try_lock() {
if let Some(payload) = guard.take() {
drop(guard);
let now = Instant::now();
log::info!("MQTT IMU TX start ({} bytes)", payload.len());
if now - last_ok > NO_SUCCESS_TIMEOUT {
warn!(
"MQTT no successful I/O for {:?} -> restart session",
now - last_ok
);
return Err(());
}
// Limit send to max 2 seconds to catch network stalls
let send_res = with_timeout(
Duration::from_secs(2),
client.send_message("esp32/imu", &payload, QualityOfService::QoS0, false),
if now - last_imu_sig > NO_IMU_SIG_WARN {
// This is diagnostic: if core0 claims it's sending, but this prints, you have cross-core signaling loss.
warn!(
"MQTT hasn't received IMU_SIG for {:?} (core0->core1 sync likely broken)",
now - last_imu_sig
);
// Rate-limit the warning
last_imu_sig = now;
}
let ping_in = if next_ping_at > now { next_ping_at - now } else { Duration::from_secs(0) };
// Timebox receive_message() even if lower layers misbehave.
let recv_fut = async {
match select(client.receive_message(), Timer::after(SOCKET_POLL_TIMEOUT)).await {
Either::First(res) => Some(res),
Either::Second(_) => None,
}
};
// 4-way select using nested selects to avoid relying on select4()
match select(
select(cmd_rx.receive(), IMU_SIG.wait()),
select(recv_fut, Timer::after(ping_in)),
)
.await;
match send_res {
Ok(Ok(_)) => {
log::info!("MQTT IMU TX ok");
next_ping_at = Instant::now() + PING_PERIOD;
}
Ok(Err(e)) => {
log::warn!("MQTT IMU TX failed: {:?}", e);
return Err(());
}
Err(_) => {
log::warn!("MQTT IMU TX timed out, restarting session");
return Err(());
}
}
}
}
// Process any queued control commands
while let Ok(cmd) = CMD_CHAN.try_receive() {
.await
{
// Command received
Either::First(Either::First(cmd)) => {
handle_command(&mut client, cmd).await.map_err(|_| ())?;
next_ping_at = Instant::now() + PING_PERIOD;
last_ok = Instant::now();
// Drain any additional queued commands quickly
while let Ok(cmd) = CMD_CHAN.try_receive() {
handle_command(&mut client, cmd).await.map_err(|_| ())?;
last_ok = Instant::now();
next_ping_at = Instant::now() + PING_PERIOD;
}
}
// Check for incoming messages with timeout
match with_timeout(Duration::from_millis(100), client.receive_message()).await {
Ok(Ok((topic, payload))) => {
// Handle incoming message
// IMU update signaled (latest value semantics)
Either::First(Either::Second(payload)) => {
last_imu_sig = Instant::now();
// Enable temporarily for diagnostics:
// info!("IMU_SIG received in MQTT task (len={})", payload.len());
// Timebox the publish so we don't hang forever inside send_message().await
let send_res = match select(
client.send_message("esp32/imu", &payload, QualityOfService::QoS0, false),
Timer::after(Duration::from_secs(5)),
)
.await
{
Either::First(res) => res,
Either::Second(_) => Err(ReasonCode::NetworkError),
};
match send_res {
Ok(_) => {
next_ping_at = Instant::now() + PING_PERIOD;
imu_tx_fail_streak = 0;
last_ok = Instant::now();
tx_ok = tx_ok.wrapping_add(1);
if (tx_ok % 10) == 0 {
info!(
"MQTT alive: tx_ok={} tx_err={} rx_ok={} streak={}",
tx_ok, tx_err, rx_ok, imu_tx_fail_streak
);
}
}
Err(e) => {
tx_err = tx_err.wrapping_add(1);
imu_tx_fail_streak = imu_tx_fail_streak.saturating_add(1);
warn!("MQTT IMU TX fail {}/5: {:?}", imu_tx_fail_streak, e);
if imu_tx_fail_streak >= 5 {
warn!("MQTT IMU TX fail 5x -> restart session");
return Err(());
}
}
}
}
// Incoming message (or None on timeout)
Either::Second(Either::First(opt)) => {
if let Some(res) = opt {
match res {
Ok((topic, payload)) => {
rx_ok = rx_ok.wrapping_add(1);
let _ = handle_incoming(Ok((topic, payload))).await;
last_ok = Instant::now();
imu_tx_fail_streak = 0;
next_ping_at = Instant::now() + PING_PERIOD;
}
Ok(Err(e)) => {
log::warn!("MQTT receive error: {:?}", e);
Err(ReasonCode::NetworkError) => {
// idle tick
}
Err(e) => {
warn!("MQTT receive error (fatal): {:?}", e);
return Err(());
}
Err(_) => {
}
}
if Instant::now() >= next_ping_at {
match with_timeout(PING_TIMEOUT, client.send_ping()).await {
Ok(Ok(_)) => {
}
// Ping timer fired
Either::Second(Either::Second(_)) => {
if Instant::now() >= hb_at {
info!(
"MQTT hb tx_ok={} tx_err={} rx_ok={} ping_ok={} streak={}",
tx_ok, tx_err, rx_ok, ping_ok, imu_tx_fail_streak
);
hb_at = Instant::now() + Duration::from_secs(10);
}
let ping_res = match select(client.send_ping(), Timer::after(PING_TIMEOUT)).await {
Either::First(res) => res,
Either::Second(_) => Err(ReasonCode::NetworkError),
};
match ping_res {
Ok(_) => {
ping_ok = ping_ok.wrapping_add(1);
imu_tx_fail_streak = 0;
last_ok = Instant::now();
next_ping_at = Instant::now() + PING_PERIOD;
}
Ok(Err(e)) => {
warn!("MQTT ping failed: {:?}", e);
Err(e) => {
warn!("MQTT ping failed/timeout: {:?}", e);
return Err(());
}
Err(_) => {
warn!("MQTT ping timed out, restarting session");
return Err(());
}
}
}
}
}
/// Main MQTT embassy task
#[embassy_executor::task]
pub async fn mqtt_task(stack: Stack<'static>) {
info!("MQTT task starting...");