debugged MPU working mqtt is dropping
This commit is contained in:
@@ -43,7 +43,7 @@ use projekt_final::{
|
|||||||
bus,
|
bus,
|
||||||
display,
|
display,
|
||||||
mpu,
|
mpu,
|
||||||
mqtt::client::{mqtt_events, mqtt_publish, mqtt_subscribe, mqtt_task, IncomingMsg},
|
mqtt::client::{mqtt_events, mqtt_try_publish, mqtt_publish, mqtt_subscribe, mqtt_task, IncomingMsg},
|
||||||
};
|
};
|
||||||
|
|
||||||
extern crate alloc;
|
extern crate alloc;
|
||||||
@@ -145,6 +145,7 @@ async fn main(spawner: Spawner) -> ! {
|
|||||||
let imu_rx = mpu::api::events();
|
let imu_rx = mpu::api::events();
|
||||||
let mut imu_reading_count: u32 = 0;
|
let mut imu_reading_count: u32 = 0;
|
||||||
let mut mqtt_msg_count: u32 = 0;
|
let mut mqtt_msg_count: u32 = 0;
|
||||||
|
let mut mqtt_publish_drops: u32 = 0;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match select3(
|
match select3(
|
||||||
@@ -168,19 +169,30 @@ async fn main(spawner: Spawner) -> ! {
|
|||||||
log::info!("IMU drained {} stale readings before display", drained);
|
log::info!("IMU drained {} stale readings before display", drained);
|
||||||
}
|
}
|
||||||
imu_reading_count += 1;
|
imu_reading_count += 1;
|
||||||
display::api::show_imu(reading).await;
|
|
||||||
|
// CRITICAL FIX: show_imu is now non-blocking (uses try_send internally)
|
||||||
|
// This prevents display channel backpressure from blocking the main loop
|
||||||
|
display::api::show_imu(reading);
|
||||||
|
|
||||||
if imu_reading_count % MQTT_PUBLISH_DIVIDER == 0 {
|
if imu_reading_count % MQTT_PUBLISH_DIVIDER == 0 {
|
||||||
let payload = format!(
|
let payload = format!(
|
||||||
"{{\"ax\":{:.2},\"ay\":{:.2},\"az\":{:.2},\"t\":{:.1}}}",
|
"{{\"ax\":{:.2},\"ay\":{:.2},\"az\":{:.2},\"t\":{:.1}}}",
|
||||||
reading.accel_g[0], reading.accel_g[1], reading.accel_g[2], reading.temp_c
|
reading.accel_g[0], reading.accel_g[1], reading.accel_g[2], reading.temp_c
|
||||||
);
|
);
|
||||||
mqtt_publish("esp32/imu", payload.as_bytes(), QualityOfService::QoS0, false).await;
|
// CRITICAL FIX: Use non-blocking publish for sensor data
|
||||||
|
// If MQTT channel is full, we drop this reading rather than blocking
|
||||||
|
if !mqtt_try_publish("esp32/imu", payload.as_bytes(), QualityOfService::QoS0, false) {
|
||||||
|
mqtt_publish_drops += 1;
|
||||||
|
if mqtt_publish_drops % 10 == 1 {
|
||||||
|
log::warn!("MQTT publish dropped (total: {})", mqtt_publish_drops);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Either3::Third(_) => {
|
Either3::Third(_) => {
|
||||||
crate::mpu::api::IMU_CHANNEL.clear();
|
crate::mpu::api::IMU_CHANNEL.clear();
|
||||||
info!("IMU heartbeat: force-cleared queue, {} readings total", imu_reading_count);
|
info!("IMU heartbeat: force-cleared queue, {} readings total, {} mqtt drops",
|
||||||
// info!("Heartbeat: {} IMU readings", imu_reading_count);
|
imu_reading_count, mqtt_publish_drops);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,8 +29,11 @@ pub(crate) fn receiver() -> Receiver<'static, CriticalSectionRawMutex, DisplayCo
|
|||||||
|
|
||||||
// ─────────────────────────────────────────────────────────────────────────────
|
// ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
pub async fn show_imu(reading: ImuReading) {
|
/// Show IMU data - NON-BLOCKING to prevent backpressure deadlock
|
||||||
send(DisplayCommand::SetImu(reading)).await;
|
pub fn show_imu(reading: ImuReading) {
|
||||||
|
// CRITICAL FIX: Use try_send instead of send().await
|
||||||
|
// If display is slow, we drop the reading rather than blocking the main loop
|
||||||
|
let _ = try_send(DisplayCommand::SetImu(reading));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn set_status(text: &str) {
|
pub async fn set_status(text: &str) {
|
||||||
|
|||||||
@@ -1,7 +1,8 @@
|
|||||||
// src/mqtt/client.rs
|
// src/mqtt/client.rs
|
||||||
use embassy_futures::select::{select, Either};
|
use embassy_futures::select::{select, select3, Either, Either3};
|
||||||
use embassy_net::{tcp::TcpSocket, Stack};
|
use embassy_net::{tcp::TcpSocket, Stack};
|
||||||
use embassy_time::{Duration, Timer};
|
use embassy_time::{Duration, Timer};
|
||||||
|
use embassy_time::with_timeout;
|
||||||
use rust_mqtt::client::client::MqttClient;
|
use rust_mqtt::client::client::MqttClient;
|
||||||
use rust_mqtt::client::client_config::{ClientConfig, MqttVersion};
|
use rust_mqtt::client::client_config::{ClientConfig, MqttVersion};
|
||||||
use rust_mqtt::packet::v5::publish_packet::QualityOfService;
|
use rust_mqtt::packet::v5::publish_packet::QualityOfService;
|
||||||
@@ -10,6 +11,7 @@ use rust_mqtt::utils::rng_generator::CountingRng;
|
|||||||
|
|
||||||
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
|
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
|
||||||
use embassy_sync::channel::{Channel, Receiver};
|
use embassy_sync::channel::{Channel, Receiver};
|
||||||
|
use embassy_sync::signal::Signal;
|
||||||
use heapless::{String, Vec};
|
use heapless::{String, Vec};
|
||||||
use static_cell::ConstStaticCell;
|
use static_cell::ConstStaticCell;
|
||||||
use log::info;
|
use log::info;
|
||||||
@@ -60,7 +62,11 @@ enum Command {
|
|||||||
static CMD_CHAN: Channel<CriticalSectionRawMutex, Command, COMMAND_QUEUE> = Channel::new();
|
static CMD_CHAN: Channel<CriticalSectionRawMutex, Command, COMMAND_QUEUE> = Channel::new();
|
||||||
static EVT_CHAN: Channel<CriticalSectionRawMutex, IncomingMsg, EVENT_QUEUE> = Channel::new();
|
static EVT_CHAN: Channel<CriticalSectionRawMutex, IncomingMsg, EVENT_QUEUE> = Channel::new();
|
||||||
|
|
||||||
// Public API
|
/// Signal for latest IMU telemetry - overwrites instead of queuing
|
||||||
|
/// This ensures we always send the FRESHEST data, never queue up stale readings
|
||||||
|
static IMU_TELEMETRY: Signal<CriticalSectionRawMutex, Vec<u8, PAYLOAD_MAX>> = Signal::new();
|
||||||
|
|
||||||
|
// Public API - BLOCKING version (waits for channel space)
|
||||||
pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) {
|
pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) {
|
||||||
CMD_CHAN
|
CMD_CHAN
|
||||||
.send(Command::Publish(PublishMsg {
|
.send(Command::Publish(PublishMsg {
|
||||||
@@ -71,6 +77,26 @@ pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, re
|
|||||||
})).await;
|
})).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// NON-BLOCKING publish - returns false if channel is full
|
||||||
|
/// Use this for high-frequency sensor data to avoid backpressure deadlock
|
||||||
|
pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) -> bool {
|
||||||
|
CMD_CHAN
|
||||||
|
.try_send(Command::Publish(PublishMsg {
|
||||||
|
topic: truncate_str::<TOPIC_MAX>(topic),
|
||||||
|
payload: truncate_payload(payload),
|
||||||
|
qos,
|
||||||
|
retain,
|
||||||
|
}))
|
||||||
|
.is_ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the latest IMU telemetry to publish.
|
||||||
|
/// This OVERWRITES any pending data - we always send the freshest reading.
|
||||||
|
/// Non-blocking, fire-and-forget.
|
||||||
|
pub fn mqtt_set_imu_telemetry(payload: &[u8]) {
|
||||||
|
IMU_TELEMETRY.signal(truncate_payload(payload));
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn mqtt_subscribe(topic: &str) {
|
pub async fn mqtt_subscribe(topic: &str) {
|
||||||
CMD_CHAN.send(Command::Subscribe(truncate_str::<TOPIC_MAX>(topic))).await;
|
CMD_CHAN.send(Command::Subscribe(truncate_str::<TOPIC_MAX>(topic))).await;
|
||||||
}
|
}
|
||||||
@@ -96,15 +122,33 @@ fn truncate_payload(data: &[u8]) -> Vec<u8, PAYLOAD_MAX> {
|
|||||||
async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(), ReasonCode> {
|
async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(), ReasonCode> {
|
||||||
match cmd {
|
match cmd {
|
||||||
Command::Publish(msg) => {
|
Command::Publish(msg) => {
|
||||||
client
|
match with_timeout(
|
||||||
.send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain)
|
Duration::from_secs(5),
|
||||||
.await
|
client.send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain)
|
||||||
|
).await {
|
||||||
|
Ok(result) => result,
|
||||||
|
Err(_) => {
|
||||||
|
log::warn!("MQTT send timed out, forcing reconnect");
|
||||||
|
Err(ReasonCode::UnspecifiedError)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Command::Subscribe(topic) => {
|
Command::Subscribe(topic) => {
|
||||||
client.subscribe_to_topic(topic.as_str()).await?;
|
match with_timeout(
|
||||||
|
Duration::from_secs(5),
|
||||||
|
client.subscribe_to_topic(topic.as_str())
|
||||||
|
).await {
|
||||||
|
Ok(result) => {
|
||||||
|
result?;
|
||||||
info!("Subscribed to '{}'", topic);
|
info!("Subscribed to '{}'", topic);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
Err(_) => {
|
||||||
|
log::warn!("MQTT subscribe timed out");
|
||||||
|
Err(ReasonCode::UnspecifiedError)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -127,6 +171,7 @@ async fn run_one_session(
|
|||||||
mqtt_rx: &mut [u8],
|
mqtt_rx: &mut [u8],
|
||||||
) -> Result<(), ()> {
|
) -> Result<(), ()> {
|
||||||
let mut socket = TcpSocket::new(stack, tcp_rx, tcp_tx);
|
let mut socket = TcpSocket::new(stack, tcp_rx, tcp_tx);
|
||||||
|
socket.set_timeout(Some(Duration::from_secs(20)));
|
||||||
match socket.connect(mqtt_broker_endpoint()).await {
|
match socket.connect(mqtt_broker_endpoint()).await {
|
||||||
Ok(_) => info!("Connected TCP to MQTT broker"),
|
Ok(_) => info!("Connected TCP to MQTT broker"),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -150,13 +195,38 @@ async fn run_one_session(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Operational loop
|
// Operational loop
|
||||||
loop {
|
let mut ping_countdown: u32 = (KEEPALIVE_SECS * 10) as u32; // ticks until ping (at 100ms per tick)
|
||||||
let net_or_ping = select(client.receive_message(), Timer::after(PING_PERIOD));
|
|
||||||
|
|
||||||
match select(CMD_CHAN.receive(), net_or_ping).await {
|
loop {
|
||||||
Either::First(cmd) => handle_command(&mut client, cmd).await.map_err(|_| ())?,
|
info!("MQTT loop tick");
|
||||||
Either::Second(Either::First(result)) => handle_incoming(result).await.map_err(|_| ())?,
|
// Drain all pending commands
|
||||||
Either::Second(Either::Second(_)) => client.send_ping().await.map_err(|_| ())?,
|
let mut processed = 0;
|
||||||
|
while let Ok(cmd) = CMD_CHAN.try_receive() {
|
||||||
|
info!("MQTT processing command");
|
||||||
|
handle_command(&mut client, cmd).await.map_err(|_| ())?;
|
||||||
|
processed += 1;
|
||||||
|
}
|
||||||
|
if processed > 0 {
|
||||||
|
info!("MQTT processed {} commands", processed);
|
||||||
|
}
|
||||||
|
|
||||||
|
match select(
|
||||||
|
client.receive_message(),
|
||||||
|
Timer::after(Duration::from_millis(100)),
|
||||||
|
).await {
|
||||||
|
Either::First(result) => {
|
||||||
|
info!("MQTT received message from broker");
|
||||||
|
handle_incoming(result).await.map_err(|_| ())?;
|
||||||
|
}
|
||||||
|
Either::Second(_) => {
|
||||||
|
// Check if ping needed
|
||||||
|
ping_countdown = ping_countdown.saturating_sub(1);
|
||||||
|
if ping_countdown == 0 {
|
||||||
|
info!("MQTT sending ping");
|
||||||
|
client.send_ping().await.map_err(|_| ())?;
|
||||||
|
ping_countdown = (KEEPALIVE_SECS * 10) as u32;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user