working with enums passed as a strings to the library

This commit is contained in:
Priec
2026-01-19 13:09:46 +01:00
parent 054b42547e
commit 6b26ed9318
5 changed files with 93 additions and 92 deletions

View File

@@ -5,13 +5,13 @@ use embedded_hal_bus::i2c::RefCellDevice;
use esp_hal::i2c::master::I2c;
use esp_hal::Async;
/// The underlying I2C peripheral type
/// I2C peripheral type
pub type I2cInner = I2c<'static, Async>;
/// RefCell to share the bus on a single core.
pub type SharedI2c = RefCell<I2cInner>;
/// A handle to a shared I2C device.
/// Shared I2C device.
pub type I2cDevice = RefCellDevice<'static, I2cInner>;
/// New I2C device handle from the shared bus.

View File

@@ -1,5 +1,5 @@
// src/display/api.rs
//! Public API for the display feature.
//! display API
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::channel::{Channel, Receiver, TrySendError};
@@ -29,7 +29,7 @@ pub(crate) fn receiver() -> Receiver<'static, CriticalSectionRawMutex, DisplayCo
/// Show IMU data - NON-BLOCKING to prevent backpressure deadlock
pub fn show_imu(reading: ImuReading) {
// CRITICAL FIX: Use try_send instead of send().await
// try_send instead of send().await
// If display is slow, we drop the reading rather than blocking the main loop
let _ = try_send(DisplayCommand::SetImu(reading));
}

View File

@@ -9,7 +9,7 @@ use ssd1306::{mode::BufferedGraphicsMode, prelude::*, I2CDisplayInterface, Ssd13
use crate::bus::I2cDevice;
use crate::display::api::receiver;
use crate::display::tui::{render_frame, next_page_id, prev_page_id, DisplayState, Screen, ScreenEvent};
use crate::display::tui::{render_frame, DisplayState, Screen, ScreenEvent};
use crate::contracts::DisplayCommand;
use pages_tui::prelude::*;
@@ -71,15 +71,15 @@ pub async fn display_task(i2c: I2cDevice) {
let _ = orchestrator.navigate_to("chat".into());
}
ScreenEvent::NavigatePrev => {
if let Some(cur) = orchestrator.current_id() {
let prev = prev_page_id(cur.as_str());
let _ = orchestrator.navigate_to(prev.into());
if let Some(cur) = orchestrator.current() {
let prev = cur.prev();
let _ = orchestrator.navigate_to(prev.to_str().into());
}
}
ScreenEvent::NavigateNext => {
if let Some(cur) = orchestrator.current_id() {
let next = next_page_id(cur.as_str());
let _ = orchestrator.navigate_to(next.into());
if let Some(cur) = orchestrator.current() {
let next = cur.next();
let _ = orchestrator.navigate_to(next.to_str().into());
}
}
}

View File

@@ -137,16 +137,30 @@ impl Component for Screen {
}
// PAGE ORDER
const PAGE_ORDER: &[&str] = &["menu", "imu", "chat"];
const PAGE_ORDER: &[Screen] = &[Screen::Menu, Screen::Imu, Screen::Chat];
pub fn next_page_id(current: &str) -> &'static str {
let idx = PAGE_ORDER.iter().position(|&p| p == current).unwrap_or(0);
PAGE_ORDER[(idx + 1) % PAGE_ORDER.len()]
impl Screen {
pub fn next(&self) -> Screen {
let idx = PAGE_ORDER.iter().position(|p| p == self).unwrap_or(0);
PAGE_ORDER[(idx + 1) % PAGE_ORDER.len()].clone()
}
pub fn prev_page_id(current: &str) -> &'static str {
let idx = PAGE_ORDER.iter().position(|&p| p == current).unwrap_or(0);
if idx == 0 { PAGE_ORDER[PAGE_ORDER.len() - 1] } else { PAGE_ORDER[idx - 1] }
pub fn prev(&self) -> Screen {
let idx = PAGE_ORDER.iter().position(|p| p == self).unwrap_or(0);
if idx == 0 {
PAGE_ORDER[PAGE_ORDER.len() - 1].clone()
} else {
PAGE_ORDER[idx - 1].clone()
}
}
pub fn to_str(&self) -> &'static str {
match self {
Screen::Menu => "menu",
Screen::Imu => "imu",
Screen::Chat => "chat",
}
}
}
// RENDERING

View File

@@ -1,8 +1,7 @@
// src/mqtt/client.rs
use embassy_futures::select::{select, Either};
use embassy_net::{tcp::TcpSocket, Stack};
use embassy_time::{Duration, Timer, with_timeout, Instant};
use embassy_time::{Duration, Timer, Instant};
use rust_mqtt::client::client::MqttClient;
use rust_mqtt::client::client_config::{ClientConfig, MqttVersion};
use rust_mqtt::packet::v5::publish_packet::QualityOfService;
@@ -21,7 +20,8 @@ use crate::mqtt::config::mqtt_broker_endpoint;
const RECONNECT_DELAY_SECS: u64 = 5;
const KEEPALIVE_SECS: u64 = 60;
const PING_PERIOD: Duration = Duration::from_secs(KEEPALIVE_SECS / 2);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const SOCKET_POLL_TIMEOUT: Duration = Duration::from_secs(1);
const SUBS_MAX: usize = 8;
// Limits for static buffers
pub const TOPIC_MAX: usize = 128;
@@ -68,9 +68,11 @@ static EVT_CHAN: Channel<CriticalSectionRawMutex, IncomingMsg, EVENT_QUEUE> = Ch
static IMU_LATEST: Mutex<CriticalSectionRawMutex, Option<Vec<u8, PAYLOAD_MAX>>> =
Mutex::new(None);
static SUBS: Mutex<CriticalSectionRawMutex, Vec<String<TOPIC_MAX>, SUBS_MAX>> =
Mutex::new(Vec::new());
/// Public API
/// Blocking publish for command/response messages.
pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) {
CMD_CHAN
.send(Command::Publish(PublishMsg {
@@ -82,7 +84,6 @@ pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, re
.await;
}
/// Non-blocking publish for other traffic (fire-and-forget)
pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) -> bool {
CMD_CHAN
.try_send(Command::Publish(PublishMsg {
@@ -94,7 +95,6 @@ pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, reta
.is_ok()
}
/// Set latest IMU telemetry payload (non-blocking, overwrites previous)
pub fn mqtt_set_imu(payload: &[u8]) {
if let Ok(mut guard) = IMU_LATEST.try_lock() {
let mut buf: Vec<u8, PAYLOAD_MAX> = Vec::new();
@@ -104,9 +104,15 @@ pub fn mqtt_set_imu(payload: &[u8]) {
}
pub async fn mqtt_subscribe(topic: &str) {
CMD_CHAN
.send(Command::Subscribe(truncate_str::<TOPIC_MAX>(topic)))
.await;
let t = truncate_str::<TOPIC_MAX>(topic);
{
let mut subs = SUBS.lock().await;
let exists = subs.iter().any(|s| s.as_str() == t.as_str());
if !exists {
let _ = subs.push(t.clone());
}
}
CMD_CHAN.send(Command::Subscribe(t)).await;
}
pub fn mqtt_events(
@@ -126,7 +132,6 @@ fn truncate_str<const N: usize>(s: &str) -> String<N> {
return h;
}
// Cut on a UTF-8 char boundary to avoid panics.
let mut cut = N;
while cut > 0 && !s.is_char_boundary(cut) {
cut -= 1;
@@ -144,36 +149,16 @@ fn truncate_payload(data: &[u8]) -> Vec<u8, PAYLOAD_MAX> {
async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(), ReasonCode> {
match cmd {
Command::Publish(msg) => {
match with_timeout(
Duration::from_secs(5),
client.send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain),
)
client
.send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain)
.await
{
Ok(result) => result,
Err(_) => {
warn!("MQTT send timed out, forcing reconnect");
Err(ReasonCode::UnspecifiedError)
}
}
}
Command::Subscribe(topic) => {
match with_timeout(
Duration::from_secs(5),
client.subscribe_to_topic(topic.as_str()),
)
.await
{
Ok(result) => {
result?;
let res = client.subscribe_to_topic(topic.as_str()).await;
if res.is_ok() {
info!("Subscribed to '{}'", topic);
Ok(())
}
Err(_) => {
warn!("MQTT subscribe timed out");
Err(ReasonCode::UnspecifiedError)
}
}
res
}
}
}
@@ -198,7 +183,7 @@ async fn run_one_session(
mqtt_rx: &mut [u8],
) -> Result<(), ()> {
let mut socket = TcpSocket::new(stack, tcp_rx, tcp_tx);
socket.set_timeout(Some(Duration::from_secs(20)));
socket.set_timeout(Some(SOCKET_POLL_TIMEOUT));
match socket.connect(mqtt_broker_endpoint()).await {
Ok(_) => info!("Connected TCP to MQTT broker"),
Err(e) => {
@@ -207,7 +192,6 @@ async fn run_one_session(
}
}
// MQTT configuration and client setup
let mut cfg: ClientConfig<8, CountingRng> =
ClientConfig::new(MqttVersion::MQTTv5, CountingRng(0));
cfg.keep_alive = KEEPALIVE_SECS as u16;
@@ -224,6 +208,24 @@ async fn run_one_session(
}
}
// Re-subscribe after every (re)connect
let mut subs_snapshot: Vec<String<TOPIC_MAX>, SUBS_MAX> = Vec::new();
{
let subs = SUBS.lock().await;
for t in subs.iter() {
let _ = subs_snapshot.push(t.clone());
}
}
for t in subs_snapshot.iter() {
match client.subscribe_to_topic(t.as_str()).await {
Ok(_) => info!("Subscribed to '{}'", t),
Err(e) => {
warn!("MQTT resubscribe failed: {:?}", e);
return Err(());
}
}
}
let mut next_ping_at = Instant::now() + PING_PERIOD;
loop {
@@ -231,29 +233,19 @@ async fn run_one_session(
if let Ok(mut guard) = IMU_LATEST.try_lock() {
if let Some(payload) = guard.take() {
drop(guard);
log::info!("MQTT IMU TX start ({} bytes)", payload.len());
// Limit send to max 2 seconds to catch network stalls
let send_res = with_timeout(
Duration::from_secs(2),
client.send_message("esp32/imu", &payload, QualityOfService::QoS0, false),
)
.await;
match send_res {
Ok(Ok(_)) => {
match client
.send_message("esp32/imu", &payload, QualityOfService::QoS0, false)
.await
{
Ok(_) => {
log::info!("MQTT IMU TX ok");
next_ping_at = Instant::now() + PING_PERIOD;
}
Ok(Err(e)) => {
Err(e) => {
log::warn!("MQTT IMU TX failed: {:?}", e);
return Err(());
}
Err(_) => {
log::warn!("MQTT IMU TX timed out, restarting session");
return Err(());
}
}
}
}
@@ -264,39 +256,34 @@ async fn run_one_session(
next_ping_at = Instant::now() + PING_PERIOD;
}
// Check for incoming messages with timeout
match with_timeout(Duration::from_millis(100), client.receive_message()).await {
Ok(Ok((topic, payload))) => {
// Handle incoming message
// Receive tick: socket timeout turns "no data" into ReasonCode::NetworkError.
// Treat that as idle, not as a broken session.
match client.receive_message().await {
Ok((topic, payload)) => {
let _ = handle_incoming(Ok((topic, payload))).await;
next_ping_at = Instant::now() + PING_PERIOD;
}
Ok(Err(e)) => {
log::warn!("MQTT receive error: {:?}", e);
Err(ReasonCode::NetworkError) => {
// No incoming data during SOCKET_POLL_TIMEOUT -> ignore.
}
Err(e) => {
warn!("MQTT receive error (fatal): {:?}", e);
return Err(());
}
Err(_) => {
}
}
if Instant::now() >= next_ping_at {
match with_timeout(PING_TIMEOUT, client.send_ping()).await {
Ok(Ok(_)) => {
next_ping_at = Instant::now() + PING_PERIOD;
}
Ok(Err(e)) => {
match client.send_ping().await {
Ok(_) => next_ping_at = Instant::now() + PING_PERIOD,
Err(e) => {
warn!("MQTT ping failed: {:?}", e);
return Err(());
}
Err(_) => {
warn!("MQTT ping timed out, restarting session");
return Err(());
}
}
}
}
}
/// Main MQTT embassy task
#[embassy_executor::task]
pub async fn mqtt_task(stack: Stack<'static>) {
info!("MQTT task starting...");