Compare commits
4 Commits
054b42547e
...
v0.2.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2774e83d99 | ||
|
|
9910bf9402 | ||
|
|
424c795170 | ||
|
|
6b26ed9318 |
@@ -1,17 +1,19 @@
|
|||||||
// src/bus/mod.rs
|
// src/bus/mod.rs
|
||||||
|
//! Shared access to the hardware I2C peripheral on a
|
||||||
|
//! single core.
|
||||||
|
|
||||||
use core::cell::RefCell;
|
use core::cell::RefCell;
|
||||||
use embedded_hal_bus::i2c::RefCellDevice;
|
use embedded_hal_bus::i2c::RefCellDevice;
|
||||||
use esp_hal::i2c::master::I2c;
|
use esp_hal::i2c::master::I2c;
|
||||||
use esp_hal::Async;
|
use esp_hal::Async;
|
||||||
|
|
||||||
/// The underlying I2C peripheral type
|
/// I2C peripheral type
|
||||||
pub type I2cInner = I2c<'static, Async>;
|
pub type I2cInner = I2c<'static, Async>;
|
||||||
|
|
||||||
/// RefCell to share the bus on a single core.
|
/// RefCell to share the bus on a single core.
|
||||||
pub type SharedI2c = RefCell<I2cInner>;
|
pub type SharedI2c = RefCell<I2cInner>;
|
||||||
|
|
||||||
/// A handle to a shared I2C device.
|
/// Shared I2C device.
|
||||||
pub type I2cDevice = RefCellDevice<'static, I2cInner>;
|
pub type I2cDevice = RefCellDevice<'static, I2cInner>;
|
||||||
|
|
||||||
/// New I2C device handle from the shared bus.
|
/// New I2C device handle from the shared bus.
|
||||||
|
|||||||
@@ -1,10 +1,9 @@
|
|||||||
// src/contracts.rs
|
// src/contracts.rs
|
||||||
//! Cross-feature message contracts.
|
//! Cross-feature message contracts.
|
||||||
//!
|
//!
|
||||||
//! This is the ONLY coupling point between features.
|
|
||||||
//! Features depend on these types, not on each other.
|
//! Features depend on these types, not on each other.
|
||||||
|
|
||||||
use heapless::String as HString;
|
use heapless::String;
|
||||||
use pages_tui::input::Key;
|
use pages_tui::input::Key;
|
||||||
|
|
||||||
/// IMU sensor reading from MPU6050
|
/// IMU sensor reading from MPU6050
|
||||||
@@ -20,17 +19,13 @@ pub struct ImuReading {
|
|||||||
/// Commands that can be sent to the display actor
|
/// Commands that can be sent to the display actor
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub enum DisplayCommand {
|
pub enum DisplayCommand {
|
||||||
/// Show IMU sensor data
|
|
||||||
SetImu(ImuReading),
|
SetImu(ImuReading),
|
||||||
/// Show a status line (max 32 chars)
|
SetStatus(String<32>),
|
||||||
SetStatus(HString<32>),
|
ShowError(String<64>),
|
||||||
/// Show an error message (max 64 chars)
|
|
||||||
ShowError(HString<64>),
|
|
||||||
/// Show MQTT connection status
|
|
||||||
SetMqttStatus { connected: bool, msg_count: u32 },
|
SetMqttStatus { connected: bool, msg_count: u32 },
|
||||||
/// Clear the display to default state
|
/// Clear the display to default state
|
||||||
Clear,
|
Clear,
|
||||||
|
|
||||||
PushKey(Key),
|
PushKey(Key),
|
||||||
AddChatMessage(HString<24>),
|
AddChatMessage(String<24>),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
// src/display/api.rs
|
// src/display/api.rs
|
||||||
//! Public API for the display feature.
|
//! display API
|
||||||
|
|
||||||
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
|
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
|
||||||
use embassy_sync::channel::{Channel, Receiver, TrySendError};
|
use embassy_sync::channel::{Channel, Receiver, TrySendError};
|
||||||
@@ -29,7 +29,7 @@ pub(crate) fn receiver() -> Receiver<'static, CriticalSectionRawMutex, DisplayCo
|
|||||||
|
|
||||||
/// Show IMU data - NON-BLOCKING to prevent backpressure deadlock
|
/// Show IMU data - NON-BLOCKING to prevent backpressure deadlock
|
||||||
pub fn show_imu(reading: ImuReading) {
|
pub fn show_imu(reading: ImuReading) {
|
||||||
// CRITICAL FIX: Use try_send instead of send().await
|
// try_send instead of send().await
|
||||||
// If display is slow, we drop the reading rather than blocking the main loop
|
// If display is slow, we drop the reading rather than blocking the main loop
|
||||||
let _ = try_send(DisplayCommand::SetImu(reading));
|
let _ = try_send(DisplayCommand::SetImu(reading));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use ssd1306::{mode::BufferedGraphicsMode, prelude::*, I2CDisplayInterface, Ssd13
|
|||||||
|
|
||||||
use crate::bus::I2cDevice;
|
use crate::bus::I2cDevice;
|
||||||
use crate::display::api::receiver;
|
use crate::display::api::receiver;
|
||||||
use crate::display::tui::{render_frame, next_page_id, prev_page_id, DisplayState, Screen, ScreenEvent};
|
use crate::display::tui::{render_frame, DisplayState, Screen, ScreenEvent};
|
||||||
use crate::contracts::DisplayCommand;
|
use crate::contracts::DisplayCommand;
|
||||||
use pages_tui::prelude::*;
|
use pages_tui::prelude::*;
|
||||||
|
|
||||||
@@ -43,14 +43,15 @@ pub async fn display_task(i2c: I2cDevice) {
|
|||||||
let rx = receiver();
|
let rx = receiver();
|
||||||
|
|
||||||
// Register pages
|
// Register pages
|
||||||
orchestrator.register_page("menu".into(), Screen::Menu);
|
// Enum-based registration
|
||||||
orchestrator.register_page("imu".into(), Screen::Imu);
|
orchestrator.register(Screen::Menu);
|
||||||
orchestrator.register_page("chat".into(), Screen::Chat);
|
orchestrator.register(Screen::Imu);
|
||||||
|
orchestrator.register(Screen::Chat);
|
||||||
|
|
||||||
orchestrator.bind(Key::tab(), ComponentAction::Next);
|
orchestrator.bind(Key::tab(), ComponentAction::Next);
|
||||||
orchestrator.bind(Key::enter(), ComponentAction::Select);
|
orchestrator.bind(Key::enter(), ComponentAction::Select);
|
||||||
|
|
||||||
let _ = orchestrator.navigate_to("menu".into());
|
let _ = orchestrator.navigate_to(Screen::Menu);
|
||||||
|
|
||||||
info!("Display ready");
|
info!("Display ready");
|
||||||
|
|
||||||
@@ -59,27 +60,27 @@ pub async fn display_task(i2c: I2cDevice) {
|
|||||||
match cmd {
|
match cmd {
|
||||||
DisplayCommand::PushKey(key) => {
|
DisplayCommand::PushKey(key) => {
|
||||||
if key == Key::tab() {
|
if key == Key::tab() {
|
||||||
orchestrator.focus_manager_mut().wrap_next();
|
orchestrator.focus_manager_mut().next();
|
||||||
} else if key == Key::enter() {
|
} else if key == Key::enter() {
|
||||||
if let Ok(events) = orchestrator.process_frame(key) {
|
if let Ok(events) = orchestrator.process_frame(key) {
|
||||||
for event in events {
|
for event in events {
|
||||||
match event {
|
match event {
|
||||||
ScreenEvent::GoToImu => {
|
ScreenEvent::GoToImu => {
|
||||||
let _ = orchestrator.navigate_to("imu".into());
|
let _ = orchestrator.navigate_to(Screen::Imu);
|
||||||
}
|
}
|
||||||
ScreenEvent::GoToChat => {
|
ScreenEvent::GoToChat => {
|
||||||
let _ = orchestrator.navigate_to("chat".into());
|
let _ = orchestrator.navigate_to(Screen::Chat);
|
||||||
}
|
}
|
||||||
ScreenEvent::NavigatePrev => {
|
ScreenEvent::NavigatePrev => {
|
||||||
if let Some(cur) = orchestrator.current_id() {
|
if let Some(cur) = orchestrator.current() {
|
||||||
let prev = prev_page_id(cur.as_str());
|
let prev = cur.prev();
|
||||||
let _ = orchestrator.navigate_to(prev.into());
|
let _ = orchestrator.navigate_to(prev);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ScreenEvent::NavigateNext => {
|
ScreenEvent::NavigateNext => {
|
||||||
if let Some(cur) = orchestrator.current_id() {
|
if let Some(cur) = orchestrator.current() {
|
||||||
let next = next_page_id(cur.as_str());
|
let next = cur.next();
|
||||||
let _ = orchestrator.navigate_to(next.into());
|
let _ = orchestrator.navigate_to(next);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -137,16 +137,30 @@ impl Component for Screen {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PAGE ORDER
|
// PAGE ORDER
|
||||||
const PAGE_ORDER: &[&str] = &["menu", "imu", "chat"];
|
const PAGE_ORDER: &[Screen] = &[Screen::Menu, Screen::Imu, Screen::Chat];
|
||||||
|
|
||||||
pub fn next_page_id(current: &str) -> &'static str {
|
impl Screen {
|
||||||
let idx = PAGE_ORDER.iter().position(|&p| p == current).unwrap_or(0);
|
pub fn next(&self) -> Screen {
|
||||||
PAGE_ORDER[(idx + 1) % PAGE_ORDER.len()]
|
let idx = PAGE_ORDER.iter().position(|p| p == self).unwrap_or(0);
|
||||||
}
|
PAGE_ORDER[(idx + 1) % PAGE_ORDER.len()].clone()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn prev_page_id(current: &str) -> &'static str {
|
pub fn prev(&self) -> Screen {
|
||||||
let idx = PAGE_ORDER.iter().position(|&p| p == current).unwrap_or(0);
|
let idx = PAGE_ORDER.iter().position(|p| p == self).unwrap_or(0);
|
||||||
if idx == 0 { PAGE_ORDER[PAGE_ORDER.len() - 1] } else { PAGE_ORDER[idx - 1] }
|
if idx == 0 {
|
||||||
|
PAGE_ORDER[PAGE_ORDER.len() - 1].clone()
|
||||||
|
} else {
|
||||||
|
PAGE_ORDER[idx - 1].clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn to_str(&self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
Screen::Menu => "menu",
|
||||||
|
Screen::Imu => "imu",
|
||||||
|
Screen::Chat => "chat",
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RENDERING
|
// RENDERING
|
||||||
|
|||||||
@@ -5,19 +5,17 @@ use esp_hal::{
|
|||||||
i2c::master::{Config, I2c},
|
i2c::master::{Config, I2c},
|
||||||
peripherals::Peripherals,
|
peripherals::Peripherals,
|
||||||
};
|
};
|
||||||
use ssd1306::mode::BufferedGraphicsMode;
|
use mousefood::{EmbeddedBackend, EmbeddedBackendConfig};
|
||||||
|
use ratatui::{
|
||||||
|
layout::{Constraint, Direction, Layout},
|
||||||
|
widgets::{Block, Borders, Paragraph},
|
||||||
|
Terminal,
|
||||||
|
};
|
||||||
|
use ssd1306::{prelude::*, I2CDisplayInterface, Ssd1306, mode::BufferedGraphicsMode};
|
||||||
use log::info;
|
use log::info;
|
||||||
|
|
||||||
#[embassy_executor::task]
|
#[embassy_executor::task]
|
||||||
pub async fn display_task() {
|
pub async fn display_task() {
|
||||||
use mousefood::{EmbeddedBackend, EmbeddedBackendConfig};
|
|
||||||
use ratatui::{
|
|
||||||
layout::{Constraint, Direction, Layout},
|
|
||||||
widgets::{Block, Borders, Paragraph},
|
|
||||||
Terminal,
|
|
||||||
};
|
|
||||||
use ssd1306::{prelude::*, I2CDisplayInterface, Ssd1306};
|
|
||||||
|
|
||||||
let peripherals = unsafe { Peripherals::steal() };
|
let peripherals = unsafe { Peripherals::steal() };
|
||||||
|
|
||||||
let i2c = I2c::new(peripherals.I2C0, Config::default())
|
let i2c = I2c::new(peripherals.I2C0, Config::default())
|
||||||
|
|||||||
@@ -1,8 +1,7 @@
|
|||||||
// src/mqtt/client.rs
|
// src/mqtt/client.rs
|
||||||
|
|
||||||
use embassy_futures::select::{select, Either};
|
|
||||||
use embassy_net::{tcp::TcpSocket, Stack};
|
use embassy_net::{tcp::TcpSocket, Stack};
|
||||||
use embassy_time::{Duration, Timer, with_timeout, Instant};
|
use embassy_time::{Duration, Timer, Instant};
|
||||||
use rust_mqtt::client::client::MqttClient;
|
use rust_mqtt::client::client::MqttClient;
|
||||||
use rust_mqtt::client::client_config::{ClientConfig, MqttVersion};
|
use rust_mqtt::client::client_config::{ClientConfig, MqttVersion};
|
||||||
use rust_mqtt::packet::v5::publish_packet::QualityOfService;
|
use rust_mqtt::packet::v5::publish_packet::QualityOfService;
|
||||||
@@ -21,7 +20,8 @@ use crate::mqtt::config::mqtt_broker_endpoint;
|
|||||||
const RECONNECT_DELAY_SECS: u64 = 5;
|
const RECONNECT_DELAY_SECS: u64 = 5;
|
||||||
const KEEPALIVE_SECS: u64 = 60;
|
const KEEPALIVE_SECS: u64 = 60;
|
||||||
const PING_PERIOD: Duration = Duration::from_secs(KEEPALIVE_SECS / 2);
|
const PING_PERIOD: Duration = Duration::from_secs(KEEPALIVE_SECS / 2);
|
||||||
const PING_TIMEOUT: Duration = Duration::from_secs(2);
|
const SOCKET_POLL_TIMEOUT: Duration = Duration::from_secs(1);
|
||||||
|
const SUBS_MAX: usize = 8;
|
||||||
|
|
||||||
// Limits for static buffers
|
// Limits for static buffers
|
||||||
pub const TOPIC_MAX: usize = 128;
|
pub const TOPIC_MAX: usize = 128;
|
||||||
@@ -68,9 +68,11 @@ static EVT_CHAN: Channel<CriticalSectionRawMutex, IncomingMsg, EVENT_QUEUE> = Ch
|
|||||||
static IMU_LATEST: Mutex<CriticalSectionRawMutex, Option<Vec<u8, PAYLOAD_MAX>>> =
|
static IMU_LATEST: Mutex<CriticalSectionRawMutex, Option<Vec<u8, PAYLOAD_MAX>>> =
|
||||||
Mutex::new(None);
|
Mutex::new(None);
|
||||||
|
|
||||||
|
static SUBS: Mutex<CriticalSectionRawMutex, Vec<String<TOPIC_MAX>, SUBS_MAX>> =
|
||||||
|
Mutex::new(Vec::new());
|
||||||
|
|
||||||
/// Public API
|
/// Public API
|
||||||
|
|
||||||
/// Blocking publish for command/response messages.
|
|
||||||
pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) {
|
pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) {
|
||||||
CMD_CHAN
|
CMD_CHAN
|
||||||
.send(Command::Publish(PublishMsg {
|
.send(Command::Publish(PublishMsg {
|
||||||
@@ -82,7 +84,6 @@ pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, re
|
|||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Non-blocking publish for other traffic (fire-and-forget)
|
|
||||||
pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) -> bool {
|
pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) -> bool {
|
||||||
CMD_CHAN
|
CMD_CHAN
|
||||||
.try_send(Command::Publish(PublishMsg {
|
.try_send(Command::Publish(PublishMsg {
|
||||||
@@ -94,7 +95,6 @@ pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, reta
|
|||||||
.is_ok()
|
.is_ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set latest IMU telemetry payload (non-blocking, overwrites previous)
|
|
||||||
pub fn mqtt_set_imu(payload: &[u8]) {
|
pub fn mqtt_set_imu(payload: &[u8]) {
|
||||||
if let Ok(mut guard) = IMU_LATEST.try_lock() {
|
if let Ok(mut guard) = IMU_LATEST.try_lock() {
|
||||||
let mut buf: Vec<u8, PAYLOAD_MAX> = Vec::new();
|
let mut buf: Vec<u8, PAYLOAD_MAX> = Vec::new();
|
||||||
@@ -104,9 +104,15 @@ pub fn mqtt_set_imu(payload: &[u8]) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn mqtt_subscribe(topic: &str) {
|
pub async fn mqtt_subscribe(topic: &str) {
|
||||||
CMD_CHAN
|
let t = truncate_str::<TOPIC_MAX>(topic);
|
||||||
.send(Command::Subscribe(truncate_str::<TOPIC_MAX>(topic)))
|
{
|
||||||
.await;
|
let mut subs = SUBS.lock().await;
|
||||||
|
let exists = subs.iter().any(|s| s.as_str() == t.as_str());
|
||||||
|
if !exists {
|
||||||
|
let _ = subs.push(t.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CMD_CHAN.send(Command::Subscribe(t)).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn mqtt_events(
|
pub fn mqtt_events(
|
||||||
@@ -126,7 +132,6 @@ fn truncate_str<const N: usize>(s: &str) -> String<N> {
|
|||||||
return h;
|
return h;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cut on a UTF-8 char boundary to avoid panics.
|
|
||||||
let mut cut = N;
|
let mut cut = N;
|
||||||
while cut > 0 && !s.is_char_boundary(cut) {
|
while cut > 0 && !s.is_char_boundary(cut) {
|
||||||
cut -= 1;
|
cut -= 1;
|
||||||
@@ -144,36 +149,16 @@ fn truncate_payload(data: &[u8]) -> Vec<u8, PAYLOAD_MAX> {
|
|||||||
async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(), ReasonCode> {
|
async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(), ReasonCode> {
|
||||||
match cmd {
|
match cmd {
|
||||||
Command::Publish(msg) => {
|
Command::Publish(msg) => {
|
||||||
match with_timeout(
|
client
|
||||||
Duration::from_secs(5),
|
.send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain)
|
||||||
client.send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain),
|
|
||||||
)
|
|
||||||
.await
|
.await
|
||||||
{
|
|
||||||
Ok(result) => result,
|
|
||||||
Err(_) => {
|
|
||||||
warn!("MQTT send timed out, forcing reconnect");
|
|
||||||
Err(ReasonCode::UnspecifiedError)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Command::Subscribe(topic) => {
|
Command::Subscribe(topic) => {
|
||||||
match with_timeout(
|
let res = client.subscribe_to_topic(topic.as_str()).await;
|
||||||
Duration::from_secs(5),
|
if res.is_ok() {
|
||||||
client.subscribe_to_topic(topic.as_str()),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(result) => {
|
|
||||||
result?;
|
|
||||||
info!("Subscribed to '{}'", topic);
|
info!("Subscribed to '{}'", topic);
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
warn!("MQTT subscribe timed out");
|
|
||||||
Err(ReasonCode::UnspecifiedError)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
res
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -198,7 +183,7 @@ async fn run_one_session(
|
|||||||
mqtt_rx: &mut [u8],
|
mqtt_rx: &mut [u8],
|
||||||
) -> Result<(), ()> {
|
) -> Result<(), ()> {
|
||||||
let mut socket = TcpSocket::new(stack, tcp_rx, tcp_tx);
|
let mut socket = TcpSocket::new(stack, tcp_rx, tcp_tx);
|
||||||
socket.set_timeout(Some(Duration::from_secs(20)));
|
socket.set_timeout(Some(SOCKET_POLL_TIMEOUT));
|
||||||
match socket.connect(mqtt_broker_endpoint()).await {
|
match socket.connect(mqtt_broker_endpoint()).await {
|
||||||
Ok(_) => info!("Connected TCP to MQTT broker"),
|
Ok(_) => info!("Connected TCP to MQTT broker"),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -207,7 +192,6 @@ async fn run_one_session(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MQTT configuration and client setup
|
|
||||||
let mut cfg: ClientConfig<8, CountingRng> =
|
let mut cfg: ClientConfig<8, CountingRng> =
|
||||||
ClientConfig::new(MqttVersion::MQTTv5, CountingRng(0));
|
ClientConfig::new(MqttVersion::MQTTv5, CountingRng(0));
|
||||||
cfg.keep_alive = KEEPALIVE_SECS as u16;
|
cfg.keep_alive = KEEPALIVE_SECS as u16;
|
||||||
@@ -224,6 +208,24 @@ async fn run_one_session(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Re-subscribe after every (re)connect
|
||||||
|
let mut subs_snapshot: Vec<String<TOPIC_MAX>, SUBS_MAX> = Vec::new();
|
||||||
|
{
|
||||||
|
let subs = SUBS.lock().await;
|
||||||
|
for t in subs.iter() {
|
||||||
|
let _ = subs_snapshot.push(t.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for t in subs_snapshot.iter() {
|
||||||
|
match client.subscribe_to_topic(t.as_str()).await {
|
||||||
|
Ok(_) => info!("Subscribed to '{}'", t),
|
||||||
|
Err(e) => {
|
||||||
|
warn!("MQTT resubscribe failed: {:?}", e);
|
||||||
|
return Err(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let mut next_ping_at = Instant::now() + PING_PERIOD;
|
let mut next_ping_at = Instant::now() + PING_PERIOD;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
@@ -231,29 +233,19 @@ async fn run_one_session(
|
|||||||
if let Ok(mut guard) = IMU_LATEST.try_lock() {
|
if let Ok(mut guard) = IMU_LATEST.try_lock() {
|
||||||
if let Some(payload) = guard.take() {
|
if let Some(payload) = guard.take() {
|
||||||
drop(guard);
|
drop(guard);
|
||||||
|
|
||||||
log::info!("MQTT IMU TX start ({} bytes)", payload.len());
|
log::info!("MQTT IMU TX start ({} bytes)", payload.len());
|
||||||
|
match client
|
||||||
// Limit send to max 2 seconds to catch network stalls
|
.send_message("esp32/imu", &payload, QualityOfService::QoS0, false)
|
||||||
let send_res = with_timeout(
|
.await
|
||||||
Duration::from_secs(2),
|
{
|
||||||
client.send_message("esp32/imu", &payload, QualityOfService::QoS0, false),
|
Ok(_) => {
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
match send_res {
|
|
||||||
Ok(Ok(_)) => {
|
|
||||||
log::info!("MQTT IMU TX ok");
|
log::info!("MQTT IMU TX ok");
|
||||||
next_ping_at = Instant::now() + PING_PERIOD;
|
next_ping_at = Instant::now() + PING_PERIOD;
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => {
|
Err(e) => {
|
||||||
log::warn!("MQTT IMU TX failed: {:?}", e);
|
log::warn!("MQTT IMU TX failed: {:?}", e);
|
||||||
return Err(());
|
return Err(());
|
||||||
}
|
}
|
||||||
Err(_) => {
|
|
||||||
log::warn!("MQTT IMU TX timed out, restarting session");
|
|
||||||
return Err(());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -264,39 +256,34 @@ async fn run_one_session(
|
|||||||
next_ping_at = Instant::now() + PING_PERIOD;
|
next_ping_at = Instant::now() + PING_PERIOD;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for incoming messages with timeout
|
// Receive tick: socket timeout turns "no data" into ReasonCode::NetworkError.
|
||||||
match with_timeout(Duration::from_millis(100), client.receive_message()).await {
|
// Treat that as idle, not as a broken session.
|
||||||
Ok(Ok((topic, payload))) => {
|
match client.receive_message().await {
|
||||||
// Handle incoming message
|
Ok((topic, payload)) => {
|
||||||
let _ = handle_incoming(Ok((topic, payload))).await;
|
let _ = handle_incoming(Ok((topic, payload))).await;
|
||||||
next_ping_at = Instant::now() + PING_PERIOD;
|
next_ping_at = Instant::now() + PING_PERIOD;
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => {
|
Err(ReasonCode::NetworkError) => {
|
||||||
log::warn!("MQTT receive error: {:?}", e);
|
// No incoming data during SOCKET_POLL_TIMEOUT -> ignore.
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("MQTT receive error (fatal): {:?}", e);
|
||||||
return Err(());
|
return Err(());
|
||||||
}
|
}
|
||||||
Err(_) => {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if Instant::now() >= next_ping_at {
|
if Instant::now() >= next_ping_at {
|
||||||
match with_timeout(PING_TIMEOUT, client.send_ping()).await {
|
match client.send_ping().await {
|
||||||
Ok(Ok(_)) => {
|
Ok(_) => next_ping_at = Instant::now() + PING_PERIOD,
|
||||||
next_ping_at = Instant::now() + PING_PERIOD;
|
Err(e) => {
|
||||||
}
|
|
||||||
Ok(Err(e)) => {
|
|
||||||
warn!("MQTT ping failed: {:?}", e);
|
warn!("MQTT ping failed: {:?}", e);
|
||||||
return Err(());
|
return Err(());
|
||||||
}
|
}
|
||||||
Err(_) => {
|
|
||||||
warn!("MQTT ping timed out, restarting session");
|
|
||||||
return Err(());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Main MQTT embassy task
|
|
||||||
#[embassy_executor::task]
|
#[embassy_executor::task]
|
||||||
pub async fn mqtt_task(stack: Stack<'static>) {
|
pub async fn mqtt_task(stack: Stack<'static>) {
|
||||||
info!("MQTT task starting...");
|
info!("MQTT task starting...");
|
||||||
|
|||||||
Reference in New Issue
Block a user