4 Commits

Author SHA1 Message Date
Priec
2774e83d99 ported into a generic library 2026-01-19 15:53:19 +01:00
Priec
9910bf9402 enum to string that is passed to the library, waiting for the enum based generic library 2026-01-19 15:23:39 +01:00
Priec
424c795170 cleanup 2026-01-19 13:56:39 +01:00
Priec
6b26ed9318 working with enums passed as a strings to the library 2026-01-19 13:09:46 +01:00
7 changed files with 114 additions and 117 deletions

View File

@@ -1,17 +1,19 @@
// src/bus/mod.rs // src/bus/mod.rs
//! Shared access to the hardware I2C peripheral on a
//! single core.
use core::cell::RefCell; use core::cell::RefCell;
use embedded_hal_bus::i2c::RefCellDevice; use embedded_hal_bus::i2c::RefCellDevice;
use esp_hal::i2c::master::I2c; use esp_hal::i2c::master::I2c;
use esp_hal::Async; use esp_hal::Async;
/// The underlying I2C peripheral type /// I2C peripheral type
pub type I2cInner = I2c<'static, Async>; pub type I2cInner = I2c<'static, Async>;
/// RefCell to share the bus on a single core. /// RefCell to share the bus on a single core.
pub type SharedI2c = RefCell<I2cInner>; pub type SharedI2c = RefCell<I2cInner>;
/// A handle to a shared I2C device. /// Shared I2C device.
pub type I2cDevice = RefCellDevice<'static, I2cInner>; pub type I2cDevice = RefCellDevice<'static, I2cInner>;
/// New I2C device handle from the shared bus. /// New I2C device handle from the shared bus.

View File

@@ -1,10 +1,9 @@
// src/contracts.rs // src/contracts.rs
//! Cross-feature message contracts. //! Cross-feature message contracts.
//! //!
//! This is the ONLY coupling point between features.
//! Features depend on these types, not on each other. //! Features depend on these types, not on each other.
use heapless::String as HString; use heapless::String;
use pages_tui::input::Key; use pages_tui::input::Key;
/// IMU sensor reading from MPU6050 /// IMU sensor reading from MPU6050
@@ -20,17 +19,13 @@ pub struct ImuReading {
/// Commands that can be sent to the display actor /// Commands that can be sent to the display actor
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum DisplayCommand { pub enum DisplayCommand {
/// Show IMU sensor data
SetImu(ImuReading), SetImu(ImuReading),
/// Show a status line (max 32 chars) SetStatus(String<32>),
SetStatus(HString<32>), ShowError(String<64>),
/// Show an error message (max 64 chars)
ShowError(HString<64>),
/// Show MQTT connection status
SetMqttStatus { connected: bool, msg_count: u32 }, SetMqttStatus { connected: bool, msg_count: u32 },
/// Clear the display to default state /// Clear the display to default state
Clear, Clear,
PushKey(Key), PushKey(Key),
AddChatMessage(HString<24>), AddChatMessage(String<24>),
} }

View File

@@ -1,5 +1,5 @@
// src/display/api.rs // src/display/api.rs
//! Public API for the display feature. //! display API
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::channel::{Channel, Receiver, TrySendError}; use embassy_sync::channel::{Channel, Receiver, TrySendError};
@@ -29,7 +29,7 @@ pub(crate) fn receiver() -> Receiver<'static, CriticalSectionRawMutex, DisplayCo
/// Show IMU data - NON-BLOCKING to prevent backpressure deadlock /// Show IMU data - NON-BLOCKING to prevent backpressure deadlock
pub fn show_imu(reading: ImuReading) { pub fn show_imu(reading: ImuReading) {
// CRITICAL FIX: Use try_send instead of send().await // try_send instead of send().await
// If display is slow, we drop the reading rather than blocking the main loop // If display is slow, we drop the reading rather than blocking the main loop
let _ = try_send(DisplayCommand::SetImu(reading)); let _ = try_send(DisplayCommand::SetImu(reading));
} }

View File

@@ -9,7 +9,7 @@ use ssd1306::{mode::BufferedGraphicsMode, prelude::*, I2CDisplayInterface, Ssd13
use crate::bus::I2cDevice; use crate::bus::I2cDevice;
use crate::display::api::receiver; use crate::display::api::receiver;
use crate::display::tui::{render_frame, next_page_id, prev_page_id, DisplayState, Screen, ScreenEvent}; use crate::display::tui::{render_frame, DisplayState, Screen, ScreenEvent};
use crate::contracts::DisplayCommand; use crate::contracts::DisplayCommand;
use pages_tui::prelude::*; use pages_tui::prelude::*;
@@ -43,14 +43,15 @@ pub async fn display_task(i2c: I2cDevice) {
let rx = receiver(); let rx = receiver();
// Register pages // Register pages
orchestrator.register_page("menu".into(), Screen::Menu); // Enum-based registration
orchestrator.register_page("imu".into(), Screen::Imu); orchestrator.register(Screen::Menu);
orchestrator.register_page("chat".into(), Screen::Chat); orchestrator.register(Screen::Imu);
orchestrator.register(Screen::Chat);
orchestrator.bind(Key::tab(), ComponentAction::Next); orchestrator.bind(Key::tab(), ComponentAction::Next);
orchestrator.bind(Key::enter(), ComponentAction::Select); orchestrator.bind(Key::enter(), ComponentAction::Select);
let _ = orchestrator.navigate_to("menu".into()); let _ = orchestrator.navigate_to(Screen::Menu);
info!("Display ready"); info!("Display ready");
@@ -59,27 +60,27 @@ pub async fn display_task(i2c: I2cDevice) {
match cmd { match cmd {
DisplayCommand::PushKey(key) => { DisplayCommand::PushKey(key) => {
if key == Key::tab() { if key == Key::tab() {
orchestrator.focus_manager_mut().wrap_next(); orchestrator.focus_manager_mut().next();
} else if key == Key::enter() { } else if key == Key::enter() {
if let Ok(events) = orchestrator.process_frame(key) { if let Ok(events) = orchestrator.process_frame(key) {
for event in events { for event in events {
match event { match event {
ScreenEvent::GoToImu => { ScreenEvent::GoToImu => {
let _ = orchestrator.navigate_to("imu".into()); let _ = orchestrator.navigate_to(Screen::Imu);
} }
ScreenEvent::GoToChat => { ScreenEvent::GoToChat => {
let _ = orchestrator.navigate_to("chat".into()); let _ = orchestrator.navigate_to(Screen::Chat);
} }
ScreenEvent::NavigatePrev => { ScreenEvent::NavigatePrev => {
if let Some(cur) = orchestrator.current_id() { if let Some(cur) = orchestrator.current() {
let prev = prev_page_id(cur.as_str()); let prev = cur.prev();
let _ = orchestrator.navigate_to(prev.into()); let _ = orchestrator.navigate_to(prev);
} }
} }
ScreenEvent::NavigateNext => { ScreenEvent::NavigateNext => {
if let Some(cur) = orchestrator.current_id() { if let Some(cur) = orchestrator.current() {
let next = next_page_id(cur.as_str()); let next = cur.next();
let _ = orchestrator.navigate_to(next.into()); let _ = orchestrator.navigate_to(next);
} }
} }
} }

View File

@@ -137,16 +137,30 @@ impl Component for Screen {
} }
// PAGE ORDER // PAGE ORDER
const PAGE_ORDER: &[&str] = &["menu", "imu", "chat"]; const PAGE_ORDER: &[Screen] = &[Screen::Menu, Screen::Imu, Screen::Chat];
pub fn next_page_id(current: &str) -> &'static str { impl Screen {
let idx = PAGE_ORDER.iter().position(|&p| p == current).unwrap_or(0); pub fn next(&self) -> Screen {
PAGE_ORDER[(idx + 1) % PAGE_ORDER.len()] let idx = PAGE_ORDER.iter().position(|p| p == self).unwrap_or(0);
PAGE_ORDER[(idx + 1) % PAGE_ORDER.len()].clone()
} }
pub fn prev_page_id(current: &str) -> &'static str { pub fn prev(&self) -> Screen {
let idx = PAGE_ORDER.iter().position(|&p| p == current).unwrap_or(0); let idx = PAGE_ORDER.iter().position(|p| p == self).unwrap_or(0);
if idx == 0 { PAGE_ORDER[PAGE_ORDER.len() - 1] } else { PAGE_ORDER[idx - 1] } if idx == 0 {
PAGE_ORDER[PAGE_ORDER.len() - 1].clone()
} else {
PAGE_ORDER[idx - 1].clone()
}
}
pub fn to_str(&self) -> &'static str {
match self {
Screen::Menu => "menu",
Screen::Imu => "imu",
Screen::Chat => "chat",
}
}
} }
// RENDERING // RENDERING

View File

@@ -5,19 +5,17 @@ use esp_hal::{
i2c::master::{Config, I2c}, i2c::master::{Config, I2c},
peripherals::Peripherals, peripherals::Peripherals,
}; };
use ssd1306::mode::BufferedGraphicsMode;
use log::info;
#[embassy_executor::task]
pub async fn display_task() {
use mousefood::{EmbeddedBackend, EmbeddedBackendConfig}; use mousefood::{EmbeddedBackend, EmbeddedBackendConfig};
use ratatui::{ use ratatui::{
layout::{Constraint, Direction, Layout}, layout::{Constraint, Direction, Layout},
widgets::{Block, Borders, Paragraph}, widgets::{Block, Borders, Paragraph},
Terminal, Terminal,
}; };
use ssd1306::{prelude::*, I2CDisplayInterface, Ssd1306}; use ssd1306::{prelude::*, I2CDisplayInterface, Ssd1306, mode::BufferedGraphicsMode};
use log::info;
#[embassy_executor::task]
pub async fn display_task() {
let peripherals = unsafe { Peripherals::steal() }; let peripherals = unsafe { Peripherals::steal() };
let i2c = I2c::new(peripherals.I2C0, Config::default()) let i2c = I2c::new(peripherals.I2C0, Config::default())

View File

@@ -1,8 +1,7 @@
// src/mqtt/client.rs // src/mqtt/client.rs
use embassy_futures::select::{select, Either};
use embassy_net::{tcp::TcpSocket, Stack}; use embassy_net::{tcp::TcpSocket, Stack};
use embassy_time::{Duration, Timer, with_timeout, Instant}; use embassy_time::{Duration, Timer, Instant};
use rust_mqtt::client::client::MqttClient; use rust_mqtt::client::client::MqttClient;
use rust_mqtt::client::client_config::{ClientConfig, MqttVersion}; use rust_mqtt::client::client_config::{ClientConfig, MqttVersion};
use rust_mqtt::packet::v5::publish_packet::QualityOfService; use rust_mqtt::packet::v5::publish_packet::QualityOfService;
@@ -21,7 +20,8 @@ use crate::mqtt::config::mqtt_broker_endpoint;
const RECONNECT_DELAY_SECS: u64 = 5; const RECONNECT_DELAY_SECS: u64 = 5;
const KEEPALIVE_SECS: u64 = 60; const KEEPALIVE_SECS: u64 = 60;
const PING_PERIOD: Duration = Duration::from_secs(KEEPALIVE_SECS / 2); const PING_PERIOD: Duration = Duration::from_secs(KEEPALIVE_SECS / 2);
const PING_TIMEOUT: Duration = Duration::from_secs(2); const SOCKET_POLL_TIMEOUT: Duration = Duration::from_secs(1);
const SUBS_MAX: usize = 8;
// Limits for static buffers // Limits for static buffers
pub const TOPIC_MAX: usize = 128; pub const TOPIC_MAX: usize = 128;
@@ -68,9 +68,11 @@ static EVT_CHAN: Channel<CriticalSectionRawMutex, IncomingMsg, EVENT_QUEUE> = Ch
static IMU_LATEST: Mutex<CriticalSectionRawMutex, Option<Vec<u8, PAYLOAD_MAX>>> = static IMU_LATEST: Mutex<CriticalSectionRawMutex, Option<Vec<u8, PAYLOAD_MAX>>> =
Mutex::new(None); Mutex::new(None);
static SUBS: Mutex<CriticalSectionRawMutex, Vec<String<TOPIC_MAX>, SUBS_MAX>> =
Mutex::new(Vec::new());
/// Public API /// Public API
/// Blocking publish for command/response messages.
pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) { pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) {
CMD_CHAN CMD_CHAN
.send(Command::Publish(PublishMsg { .send(Command::Publish(PublishMsg {
@@ -82,7 +84,6 @@ pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, re
.await; .await;
} }
/// Non-blocking publish for other traffic (fire-and-forget)
pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) -> bool { pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) -> bool {
CMD_CHAN CMD_CHAN
.try_send(Command::Publish(PublishMsg { .try_send(Command::Publish(PublishMsg {
@@ -94,7 +95,6 @@ pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, reta
.is_ok() .is_ok()
} }
/// Set latest IMU telemetry payload (non-blocking, overwrites previous)
pub fn mqtt_set_imu(payload: &[u8]) { pub fn mqtt_set_imu(payload: &[u8]) {
if let Ok(mut guard) = IMU_LATEST.try_lock() { if let Ok(mut guard) = IMU_LATEST.try_lock() {
let mut buf: Vec<u8, PAYLOAD_MAX> = Vec::new(); let mut buf: Vec<u8, PAYLOAD_MAX> = Vec::new();
@@ -104,9 +104,15 @@ pub fn mqtt_set_imu(payload: &[u8]) {
} }
pub async fn mqtt_subscribe(topic: &str) { pub async fn mqtt_subscribe(topic: &str) {
CMD_CHAN let t = truncate_str::<TOPIC_MAX>(topic);
.send(Command::Subscribe(truncate_str::<TOPIC_MAX>(topic))) {
.await; let mut subs = SUBS.lock().await;
let exists = subs.iter().any(|s| s.as_str() == t.as_str());
if !exists {
let _ = subs.push(t.clone());
}
}
CMD_CHAN.send(Command::Subscribe(t)).await;
} }
pub fn mqtt_events( pub fn mqtt_events(
@@ -126,7 +132,6 @@ fn truncate_str<const N: usize>(s: &str) -> String<N> {
return h; return h;
} }
// Cut on a UTF-8 char boundary to avoid panics.
let mut cut = N; let mut cut = N;
while cut > 0 && !s.is_char_boundary(cut) { while cut > 0 && !s.is_char_boundary(cut) {
cut -= 1; cut -= 1;
@@ -144,36 +149,16 @@ fn truncate_payload(data: &[u8]) -> Vec<u8, PAYLOAD_MAX> {
async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(), ReasonCode> { async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(), ReasonCode> {
match cmd { match cmd {
Command::Publish(msg) => { Command::Publish(msg) => {
match with_timeout( client
Duration::from_secs(5), .send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain)
client.send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain),
)
.await .await
{
Ok(result) => result,
Err(_) => {
warn!("MQTT send timed out, forcing reconnect");
Err(ReasonCode::UnspecifiedError)
}
}
} }
Command::Subscribe(topic) => { Command::Subscribe(topic) => {
match with_timeout( let res = client.subscribe_to_topic(topic.as_str()).await;
Duration::from_secs(5), if res.is_ok() {
client.subscribe_to_topic(topic.as_str()),
)
.await
{
Ok(result) => {
result?;
info!("Subscribed to '{}'", topic); info!("Subscribed to '{}'", topic);
Ok(())
}
Err(_) => {
warn!("MQTT subscribe timed out");
Err(ReasonCode::UnspecifiedError)
}
} }
res
} }
} }
} }
@@ -198,7 +183,7 @@ async fn run_one_session(
mqtt_rx: &mut [u8], mqtt_rx: &mut [u8],
) -> Result<(), ()> { ) -> Result<(), ()> {
let mut socket = TcpSocket::new(stack, tcp_rx, tcp_tx); let mut socket = TcpSocket::new(stack, tcp_rx, tcp_tx);
socket.set_timeout(Some(Duration::from_secs(20))); socket.set_timeout(Some(SOCKET_POLL_TIMEOUT));
match socket.connect(mqtt_broker_endpoint()).await { match socket.connect(mqtt_broker_endpoint()).await {
Ok(_) => info!("Connected TCP to MQTT broker"), Ok(_) => info!("Connected TCP to MQTT broker"),
Err(e) => { Err(e) => {
@@ -207,7 +192,6 @@ async fn run_one_session(
} }
} }
// MQTT configuration and client setup
let mut cfg: ClientConfig<8, CountingRng> = let mut cfg: ClientConfig<8, CountingRng> =
ClientConfig::new(MqttVersion::MQTTv5, CountingRng(0)); ClientConfig::new(MqttVersion::MQTTv5, CountingRng(0));
cfg.keep_alive = KEEPALIVE_SECS as u16; cfg.keep_alive = KEEPALIVE_SECS as u16;
@@ -224,6 +208,24 @@ async fn run_one_session(
} }
} }
// Re-subscribe after every (re)connect
let mut subs_snapshot: Vec<String<TOPIC_MAX>, SUBS_MAX> = Vec::new();
{
let subs = SUBS.lock().await;
for t in subs.iter() {
let _ = subs_snapshot.push(t.clone());
}
}
for t in subs_snapshot.iter() {
match client.subscribe_to_topic(t.as_str()).await {
Ok(_) => info!("Subscribed to '{}'", t),
Err(e) => {
warn!("MQTT resubscribe failed: {:?}", e);
return Err(());
}
}
}
let mut next_ping_at = Instant::now() + PING_PERIOD; let mut next_ping_at = Instant::now() + PING_PERIOD;
loop { loop {
@@ -231,29 +233,19 @@ async fn run_one_session(
if let Ok(mut guard) = IMU_LATEST.try_lock() { if let Ok(mut guard) = IMU_LATEST.try_lock() {
if let Some(payload) = guard.take() { if let Some(payload) = guard.take() {
drop(guard); drop(guard);
log::info!("MQTT IMU TX start ({} bytes)", payload.len()); log::info!("MQTT IMU TX start ({} bytes)", payload.len());
match client
// Limit send to max 2 seconds to catch network stalls .send_message("esp32/imu", &payload, QualityOfService::QoS0, false)
let send_res = with_timeout( .await
Duration::from_secs(2), {
client.send_message("esp32/imu", &payload, QualityOfService::QoS0, false), Ok(_) => {
)
.await;
match send_res {
Ok(Ok(_)) => {
log::info!("MQTT IMU TX ok"); log::info!("MQTT IMU TX ok");
next_ping_at = Instant::now() + PING_PERIOD; next_ping_at = Instant::now() + PING_PERIOD;
} }
Ok(Err(e)) => { Err(e) => {
log::warn!("MQTT IMU TX failed: {:?}", e); log::warn!("MQTT IMU TX failed: {:?}", e);
return Err(()); return Err(());
} }
Err(_) => {
log::warn!("MQTT IMU TX timed out, restarting session");
return Err(());
}
} }
} }
} }
@@ -264,39 +256,34 @@ async fn run_one_session(
next_ping_at = Instant::now() + PING_PERIOD; next_ping_at = Instant::now() + PING_PERIOD;
} }
// Check for incoming messages with timeout // Receive tick: socket timeout turns "no data" into ReasonCode::NetworkError.
match with_timeout(Duration::from_millis(100), client.receive_message()).await { // Treat that as idle, not as a broken session.
Ok(Ok((topic, payload))) => { match client.receive_message().await {
// Handle incoming message Ok((topic, payload)) => {
let _ = handle_incoming(Ok((topic, payload))).await; let _ = handle_incoming(Ok((topic, payload))).await;
next_ping_at = Instant::now() + PING_PERIOD; next_ping_at = Instant::now() + PING_PERIOD;
} }
Ok(Err(e)) => { Err(ReasonCode::NetworkError) => {
log::warn!("MQTT receive error: {:?}", e); // No incoming data during SOCKET_POLL_TIMEOUT -> ignore.
}
Err(e) => {
warn!("MQTT receive error (fatal): {:?}", e);
return Err(()); return Err(());
} }
Err(_) => {
}
} }
if Instant::now() >= next_ping_at { if Instant::now() >= next_ping_at {
match with_timeout(PING_TIMEOUT, client.send_ping()).await { match client.send_ping().await {
Ok(Ok(_)) => { Ok(_) => next_ping_at = Instant::now() + PING_PERIOD,
next_ping_at = Instant::now() + PING_PERIOD; Err(e) => {
}
Ok(Err(e)) => {
warn!("MQTT ping failed: {:?}", e); warn!("MQTT ping failed: {:?}", e);
return Err(()); return Err(());
} }
Err(_) => {
warn!("MQTT ping timed out, restarting session");
return Err(());
}
} }
} }
} }
} }
/// Main MQTT embassy task
#[embassy_executor::task] #[embassy_executor::task]
pub async fn mqtt_task(stack: Stack<'static>) { pub async fn mqtt_task(stack: Stack<'static>) {
info!("MQTT task starting..."); info!("MQTT task starting...");