From b729d3f23d2bb0e0377d683db4c1c44d70bcb388 Mon Sep 17 00:00:00 2001 From: Priec Date: Sun, 18 Jan 2026 23:16:06 +0100 Subject: [PATCH] sending messages to the tui chat --- mqtt_display/src/bin/main.rs | 10 ++++------ mqtt_display/src/display/api.rs | 2 -- mqtt_display/src/mqtt/client.rs | 14 ++++++-------- 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/mqtt_display/src/bin/main.rs b/mqtt_display/src/bin/main.rs index b4b73af..3de57d1 100644 --- a/mqtt_display/src/bin/main.rs +++ b/mqtt_display/src/bin/main.rs @@ -136,8 +136,8 @@ async fn main(spawner: Spawner) -> ! { spawner.spawn(mpu::task::mpu_task(mpu_i2c)).expect("spawn mpu_task"); display::api::set_status("Booting...").await; - mqtt_subscribe("esp32/topic").await; - mqtt_publish("esp32/topic", b"online", QualityOfService::QoS1, false).await; + mqtt_subscribe("esp32/imu").await; + mqtt_publish("esp32/imu", b"online", QualityOfService::QoS1, false).await; display::api::set_status("Running").await; display::api::set_mqtt_status(true, 0).await; @@ -171,8 +171,7 @@ async fn main(spawner: Spawner) -> ! { } imu_reading_count += 1; - // CRITICAL FIX: show_imu is now non-blocking (uses try_send internally) - // This prevents display channel backpressure from blocking the main loop + // Show_imu is now non-blocking (uses try_send internally) display::api::show_imu(reading); if imu_reading_count % MQTT_PUBLISH_DIVIDER == 0 { @@ -228,7 +227,6 @@ async fn core1_network_task( // Signal core 0 that network is ready NETWORK_READY.signal(()); - // Start MQTT on this core (it needs the stack) spawner.spawn(mqtt_task(stack)).ok(); } @@ -237,7 +235,7 @@ async fn handle_mqtt_message(msg: IncomingMsg) { match txt { "clear" => { display::api::clear().await; } "status" => { mqtt_publish("esp32/status", b"running", QualityOfService::QoS1, false).await; } - _ => {} + _ => { display::api::add_chat_message(txt).await; } } } } diff --git a/mqtt_display/src/display/api.rs b/mqtt_display/src/display/api.rs index d303805..e870f45 100644 --- a/mqtt_display/src/display/api.rs +++ b/mqtt_display/src/display/api.rs @@ -27,8 +27,6 @@ pub(crate) fn receiver() -> Receiver<'static, CriticalSectionRawMutex, DisplayCo DISPLAY_CHANNEL.receiver() } -// ───────────────────────────────────────────────────────────────────────────── - /// Show IMU data - NON-BLOCKING to prevent backpressure deadlock pub fn show_imu(reading: ImuReading) { // CRITICAL FIX: Use try_send instead of send().await diff --git a/mqtt_display/src/mqtt/client.rs b/mqtt_display/src/mqtt/client.rs index df39601..5436526 100644 --- a/mqtt_display/src/mqtt/client.rs +++ b/mqtt_display/src/mqtt/client.rs @@ -59,7 +59,7 @@ enum Command { Subscribe(String), } -// Standard command/info channels +// Command/info channels static CMD_CHAN: Channel = Channel::new(); static EVT_CHAN: Channel = Channel::new(); @@ -67,7 +67,7 @@ static EVT_CHAN: Channel = Ch static IMU_LATEST: Mutex>> = Mutex::new(None); -/// ---------- Public API ---------- +/// Public API /// Blocking publish for command/response messages. pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) { @@ -113,7 +113,7 @@ pub fn mqtt_events( EVT_CHAN.receiver() } -/// ---------- Internals ---------- +/// Internals fn truncate_str(s: &str) -> String { let mut h = String::new(); @@ -175,8 +175,6 @@ async fn handle_incoming(result: Result<(&str, &[u8]), ReasonCode>) -> Result<() Ok(()) } -/// ---------- Session Management ---------- - async fn run_one_session( stack: Stack<'static>, tcp_rx: &mut [u8], @@ -214,7 +212,7 @@ async fn run_one_session( let mut ping_countdown: u32 = (KEEPALIVE_SECS * 10) as u32; // ticks until ping (100 ms each) loop { - // 1. Send latest IMU payload if available + // Send latest IMU payload if available if let Ok(mut guard) = IMU_LATEST.try_lock() { if let Some(payload) = guard.take() { drop(guard); @@ -244,12 +242,12 @@ async fn run_one_session( } } - // 2. Process any queued control commands + // Process any queued control commands while let Ok(cmd) = CMD_CHAN.try_receive() { handle_command(&mut client, cmd).await.map_err(|_| ())?; } - // 3. Check for incoming messages with timeout + // Check for incoming messages with timeout match with_timeout(Duration::from_millis(100), client.receive_message()).await { Ok(Ok((topic, payload))) => { // Handle incoming message