big improvement, not theere yet
This commit is contained in:
@@ -15,6 +15,7 @@ use embassy_sync::signal::Signal;
|
|||||||
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
|
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
|
||||||
use embassy_time::{Duration, Timer};
|
use embassy_time::{Duration, Timer};
|
||||||
use projekt_final::bus::I2cInner;
|
use projekt_final::bus::I2cInner;
|
||||||
|
use projekt_final::mqtt::client::mqtt_set_imu;
|
||||||
|
|
||||||
use esp_alloc as _;
|
use esp_alloc as _;
|
||||||
use esp_backtrace as _;
|
use esp_backtrace as _;
|
||||||
@@ -179,14 +180,7 @@ async fn main(spawner: Spawner) -> ! {
|
|||||||
"{{\"ax\":{:.2},\"ay\":{:.2},\"az\":{:.2},\"t\":{:.1}}}",
|
"{{\"ax\":{:.2},\"ay\":{:.2},\"az\":{:.2},\"t\":{:.1}}}",
|
||||||
reading.accel_g[0], reading.accel_g[1], reading.accel_g[2], reading.temp_c
|
reading.accel_g[0], reading.accel_g[1], reading.accel_g[2], reading.temp_c
|
||||||
);
|
);
|
||||||
// CRITICAL FIX: Use non-blocking publish for sensor data
|
mqtt_set_imu(payload.as_bytes());
|
||||||
// If MQTT channel is full, we drop this reading rather than blocking
|
|
||||||
if !mqtt_try_publish("esp32/imu", payload.as_bytes(), QualityOfService::QoS0, false) {
|
|
||||||
mqtt_publish_drops += 1;
|
|
||||||
if mqtt_publish_drops % 10 == 1 {
|
|
||||||
log::warn!("MQTT publish dropped (total: {})", mqtt_publish_drops);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Either3::Third(_) => {
|
Either3::Third(_) => {
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
// src/mqtt/client.rs
|
// src/mqtt/client.rs
|
||||||
use embassy_futures::select::{select, select3, Either, Either3};
|
|
||||||
|
use embassy_futures::select::{select, Either};
|
||||||
use embassy_net::{tcp::TcpSocket, Stack};
|
use embassy_net::{tcp::TcpSocket, Stack};
|
||||||
use embassy_time::{Duration, Timer};
|
use embassy_time::{Duration, Timer, with_timeout};
|
||||||
use embassy_time::with_timeout;
|
|
||||||
use rust_mqtt::client::client::MqttClient;
|
use rust_mqtt::client::client::MqttClient;
|
||||||
use rust_mqtt::client::client_config::{ClientConfig, MqttVersion};
|
use rust_mqtt::client::client_config::{ClientConfig, MqttVersion};
|
||||||
use rust_mqtt::packet::v5::publish_packet::QualityOfService;
|
use rust_mqtt::packet::v5::publish_packet::QualityOfService;
|
||||||
@@ -10,11 +10,11 @@ use rust_mqtt::packet::v5::reason_codes::ReasonCode;
|
|||||||
use rust_mqtt::utils::rng_generator::CountingRng;
|
use rust_mqtt::utils::rng_generator::CountingRng;
|
||||||
|
|
||||||
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
|
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
|
||||||
|
use embassy_sync::mutex::Mutex;
|
||||||
use embassy_sync::channel::{Channel, Receiver};
|
use embassy_sync::channel::{Channel, Receiver};
|
||||||
use embassy_sync::signal::Signal;
|
|
||||||
use heapless::{String, Vec};
|
use heapless::{String, Vec};
|
||||||
use static_cell::ConstStaticCell;
|
use static_cell::ConstStaticCell;
|
||||||
use log::info;
|
use log::{info, warn};
|
||||||
|
|
||||||
use crate::mqtt::config::mqtt_broker_endpoint;
|
use crate::mqtt::config::mqtt_broker_endpoint;
|
||||||
|
|
||||||
@@ -59,14 +59,17 @@ enum Command {
|
|||||||
Subscribe(String<TOPIC_MAX>),
|
Subscribe(String<TOPIC_MAX>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Standard command/info channels
|
||||||
static CMD_CHAN: Channel<CriticalSectionRawMutex, Command, COMMAND_QUEUE> = Channel::new();
|
static CMD_CHAN: Channel<CriticalSectionRawMutex, Command, COMMAND_QUEUE> = Channel::new();
|
||||||
static EVT_CHAN: Channel<CriticalSectionRawMutex, IncomingMsg, EVENT_QUEUE> = Channel::new();
|
static EVT_CHAN: Channel<CriticalSectionRawMutex, IncomingMsg, EVENT_QUEUE> = Channel::new();
|
||||||
|
|
||||||
/// Signal for latest IMU telemetry - overwrites instead of queuing
|
/// Shared latest IMU payload (non-blocking, latest-value semantics)
|
||||||
/// This ensures we always send the FRESHEST data, never queue up stale readings
|
static IMU_LATEST: Mutex<CriticalSectionRawMutex, Option<Vec<u8, PAYLOAD_MAX>>> =
|
||||||
static IMU_TELEMETRY: Signal<CriticalSectionRawMutex, Vec<u8, PAYLOAD_MAX>> = Signal::new();
|
Mutex::new(None);
|
||||||
|
|
||||||
// Public API - BLOCKING version (waits for channel space)
|
/// ---------- Public API ----------
|
||||||
|
|
||||||
|
/// Blocking publish for command/response messages.
|
||||||
pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) {
|
pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) {
|
||||||
CMD_CHAN
|
CMD_CHAN
|
||||||
.send(Command::Publish(PublishMsg {
|
.send(Command::Publish(PublishMsg {
|
||||||
@@ -74,11 +77,11 @@ pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, re
|
|||||||
payload: truncate_payload(payload),
|
payload: truncate_payload(payload),
|
||||||
qos,
|
qos,
|
||||||
retain,
|
retain,
|
||||||
})).await;
|
}))
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// NON-BLOCKING publish - returns false if channel is full
|
/// Non-blocking publish for other traffic (fire-and-forget)
|
||||||
/// Use this for high-frequency sensor data to avoid backpressure deadlock
|
|
||||||
pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) -> bool {
|
pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) -> bool {
|
||||||
CMD_CHAN
|
CMD_CHAN
|
||||||
.try_send(Command::Publish(PublishMsg {
|
.try_send(Command::Publish(PublishMsg {
|
||||||
@@ -90,15 +93,19 @@ pub fn mqtt_try_publish(topic: &str, payload: &[u8], qos: QualityOfService, reta
|
|||||||
.is_ok()
|
.is_ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set the latest IMU telemetry to publish.
|
/// Set latest IMU telemetry payload (non-blocking, overwrites previous)
|
||||||
/// This OVERWRITES any pending data - we always send the freshest reading.
|
pub fn mqtt_set_imu(payload: &[u8]) {
|
||||||
/// Non-blocking, fire-and-forget.
|
if let Ok(mut guard) = IMU_LATEST.try_lock() {
|
||||||
pub fn mqtt_set_imu_telemetry(payload: &[u8]) {
|
let mut buf: Vec<u8, PAYLOAD_MAX> = Vec::new();
|
||||||
IMU_TELEMETRY.signal(truncate_payload(payload));
|
let _ = buf.extend_from_slice(&payload[..payload.len().min(PAYLOAD_MAX)]);
|
||||||
|
*guard = Some(buf);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn mqtt_subscribe(topic: &str) {
|
pub async fn mqtt_subscribe(topic: &str) {
|
||||||
CMD_CHAN.send(Command::Subscribe(truncate_str::<TOPIC_MAX>(topic))).await;
|
CMD_CHAN
|
||||||
|
.send(Command::Subscribe(truncate_str::<TOPIC_MAX>(topic)))
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn mqtt_events(
|
pub fn mqtt_events(
|
||||||
@@ -106,7 +113,8 @@ pub fn mqtt_events(
|
|||||||
EVT_CHAN.receiver()
|
EVT_CHAN.receiver()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Helper functions for memory-safe truncation
|
/// ---------- Internals ----------
|
||||||
|
|
||||||
fn truncate_str<const N: usize>(s: &str) -> String<N> {
|
fn truncate_str<const N: usize>(s: &str) -> String<N> {
|
||||||
let mut h = String::new();
|
let mut h = String::new();
|
||||||
let _ = h.push_str(&s[..s.len().min(N)]);
|
let _ = h.push_str(&s[..s.len().min(N)]);
|
||||||
@@ -124,11 +132,13 @@ async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(),
|
|||||||
Command::Publish(msg) => {
|
Command::Publish(msg) => {
|
||||||
match with_timeout(
|
match with_timeout(
|
||||||
Duration::from_secs(5),
|
Duration::from_secs(5),
|
||||||
client.send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain)
|
client.send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain),
|
||||||
).await {
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(result) => result,
|
Ok(result) => result,
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
log::warn!("MQTT send timed out, forcing reconnect");
|
warn!("MQTT send timed out, forcing reconnect");
|
||||||
Err(ReasonCode::UnspecifiedError)
|
Err(ReasonCode::UnspecifiedError)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -136,15 +146,17 @@ async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(),
|
|||||||
Command::Subscribe(topic) => {
|
Command::Subscribe(topic) => {
|
||||||
match with_timeout(
|
match with_timeout(
|
||||||
Duration::from_secs(5),
|
Duration::from_secs(5),
|
||||||
client.subscribe_to_topic(topic.as_str())
|
client.subscribe_to_topic(topic.as_str()),
|
||||||
).await {
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(result) => {
|
Ok(result) => {
|
||||||
result?;
|
result?;
|
||||||
info!("Subscribed to '{}'", topic);
|
info!("Subscribed to '{}'", topic);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
log::warn!("MQTT subscribe timed out");
|
warn!("MQTT subscribe timed out");
|
||||||
Err(ReasonCode::UnspecifiedError)
|
Err(ReasonCode::UnspecifiedError)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -158,11 +170,13 @@ async fn handle_incoming(result: Result<(&str, &[u8]), ReasonCode>) -> Result<()
|
|||||||
.send(IncomingMsg {
|
.send(IncomingMsg {
|
||||||
topic: truncate_str::<TOPIC_MAX>(topic),
|
topic: truncate_str::<TOPIC_MAX>(topic),
|
||||||
payload: truncate_payload(payload),
|
payload: truncate_payload(payload),
|
||||||
}).await;
|
})
|
||||||
|
.await;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Session and reconnect control
|
/// ---------- Session Management ----------
|
||||||
|
|
||||||
async fn run_one_session(
|
async fn run_one_session(
|
||||||
stack: Stack<'static>,
|
stack: Stack<'static>,
|
||||||
tcp_rx: &mut [u8],
|
tcp_rx: &mut [u8],
|
||||||
@@ -181,11 +195,14 @@ async fn run_one_session(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// MQTT configuration and client setup
|
// MQTT configuration and client setup
|
||||||
let mut cfg: ClientConfig<8, CountingRng> = ClientConfig::new(MqttVersion::MQTTv5, CountingRng(0));
|
let mut cfg: ClientConfig<8, CountingRng> =
|
||||||
|
ClientConfig::new(MqttVersion::MQTTv5, CountingRng(0));
|
||||||
cfg.keep_alive = KEEPALIVE_SECS as u16;
|
cfg.keep_alive = KEEPALIVE_SECS as u16;
|
||||||
cfg.add_client_id("esp32-client");
|
cfg.add_client_id("esp32-client");
|
||||||
|
|
||||||
let mut client = MqttClient::new(socket, mqtt_tx, mqtt_tx.len(), mqtt_rx, mqtt_rx.len(), cfg);
|
let mut client =
|
||||||
|
MqttClient::new(socket, mqtt_tx, mqtt_tx.len(), mqtt_rx, mqtt_rx.len(), cfg);
|
||||||
|
|
||||||
match client.connect_to_broker().await {
|
match client.connect_to_broker().await {
|
||||||
Ok(_) => info!("MQTT CONNACK received"),
|
Ok(_) => info!("MQTT CONNACK received"),
|
||||||
Err(reason) => {
|
Err(reason) => {
|
||||||
@@ -194,36 +211,59 @@ async fn run_one_session(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Operational loop
|
let mut ping_countdown: u32 = (KEEPALIVE_SECS * 10) as u32; // ticks until ping (100 ms each)
|
||||||
let mut ping_countdown: u32 = (KEEPALIVE_SECS * 10) as u32; // ticks until ping (at 100ms per tick)
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
info!("MQTT loop tick");
|
// 1. Send latest IMU payload if available
|
||||||
// Drain all pending commands
|
if let Ok(mut guard) = IMU_LATEST.try_lock() {
|
||||||
let mut processed = 0;
|
if let Some(payload) = guard.take() {
|
||||||
while let Ok(cmd) = CMD_CHAN.try_receive() {
|
drop(guard);
|
||||||
info!("MQTT processing command");
|
|
||||||
handle_command(&mut client, cmd).await.map_err(|_| ())?;
|
log::info!("MQTT IMU TX start ({} bytes)", payload.len());
|
||||||
processed += 1;
|
|
||||||
}
|
// Limit send to max 2 seconds to catch network stalls
|
||||||
if processed > 0 {
|
let send_res = with_timeout(
|
||||||
info!("MQTT processed {} commands", processed);
|
Duration::from_secs(2),
|
||||||
|
client.send_message("esp32/imu", &payload, QualityOfService::QoS0, false),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match send_res {
|
||||||
|
Ok(Ok(_)) => {
|
||||||
|
log::info!("MQTT IMU TX ok");
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
log::warn!("MQTT IMU TX failed: {:?}", e);
|
||||||
|
return Err(());
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
log::warn!("MQTT IMU TX timed out, restarting session");
|
||||||
|
return Err(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match select(
|
// 2. Process any queued control commands
|
||||||
client.receive_message(),
|
while let Ok(cmd) = CMD_CHAN.try_receive() {
|
||||||
Timer::after(Duration::from_millis(100)),
|
handle_command(&mut client, cmd).await.map_err(|_| ())?;
|
||||||
).await {
|
}
|
||||||
Either::First(result) => {
|
|
||||||
info!("MQTT received message from broker");
|
// 3. Check for incoming messages with timeout
|
||||||
handle_incoming(result).await.map_err(|_| ())?;
|
match with_timeout(Duration::from_millis(100), client.receive_message()).await {
|
||||||
|
Ok(Ok((topic, payload))) => {
|
||||||
|
// Handle incoming message
|
||||||
|
let _ = handle_incoming(Ok((topic, payload))).await;
|
||||||
}
|
}
|
||||||
Either::Second(_) => {
|
Ok(Err(e)) => {
|
||||||
// Check if ping needed
|
log::warn!("MQTT receive error: {:?}", e);
|
||||||
|
return Err(());
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
// Timeout -> no incoming message this period, check ping
|
||||||
ping_countdown = ping_countdown.saturating_sub(1);
|
ping_countdown = ping_countdown.saturating_sub(1);
|
||||||
if ping_countdown == 0 {
|
if ping_countdown == 0 {
|
||||||
info!("MQTT sending ping");
|
let _ = client.send_ping().await;
|
||||||
client.send_ping().await.map_err(|_| ())?;
|
|
||||||
ping_countdown = (KEEPALIVE_SECS * 10) as u32;
|
ping_countdown = (KEEPALIVE_SECS * 10) as u32;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -231,7 +271,7 @@ async fn run_one_session(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Main MQTT embassy task
|
/// Main MQTT embassy task
|
||||||
#[embassy_executor::task]
|
#[embassy_executor::task]
|
||||||
pub async fn mqtt_task(stack: Stack<'static>) {
|
pub async fn mqtt_task(stack: Stack<'static>) {
|
||||||
info!("MQTT task starting...");
|
info!("MQTT task starting...");
|
||||||
@@ -248,9 +288,13 @@ pub async fn mqtt_task(stack: Stack<'static>) {
|
|||||||
&mut tcp_tx[..],
|
&mut tcp_tx[..],
|
||||||
&mut mqtt_tx[..],
|
&mut mqtt_tx[..],
|
||||||
&mut mqtt_rx[..],
|
&mut mqtt_rx[..],
|
||||||
).await;
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
info!("Reconnecting in {}s after session end/failure", RECONNECT_DELAY_SECS);
|
info!(
|
||||||
|
"Reconnecting in {}s after session end/failure",
|
||||||
|
RECONNECT_DELAY_SECS
|
||||||
|
);
|
||||||
Timer::after(Duration::from_secs(RECONNECT_DELAY_SECS)).await;
|
Timer::after(Duration::from_secs(RECONNECT_DELAY_SECS)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user