diff --git a/flake.nix b/flake.nix index 9b76a2f..0615a6b 100644 --- a/flake.nix +++ b/flake.nix @@ -53,6 +53,7 @@ shellHook = '' rustc --version which xtensa-esp32-elf-gcc || echo "⚠️ xtensa-esp32-elf-gcc not found in PATH" + echo "cargo espflash save-image --release --chip esp32 test.bin" echo "" echo "MQTT broker test (run manually in two terminals):" echo " Terminal 1:" diff --git a/mqtt_display/src/mpu/api.rs b/mqtt_display/src/mpu/api.rs index e8b452e..857764b 100644 --- a/mqtt_display/src/mpu/api.rs +++ b/mqtt_display/src/mpu/api.rs @@ -1,43 +1,17 @@ // src/mpu/api.rs //! Public API for the MPU6050 feature. -//! -//! Other parts of the system use this module to receive IMU readings. -//! The actual sampling happens in `task.rs`. - use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::channel::{Channel, Receiver, Sender}; - use crate::contracts::ImuReading; -/// Queue size for IMU readings. -/// Small because we only care about recent data. const QUEUE_SIZE: usize = 4; -/// Channel for publishing IMU readings. -/// The task writes to this; main reads from it. -pub(crate) static IMU_CHANNEL: Channel = - Channel::new(); +pub(crate) static IMU_CHANNEL: Channel = Channel::new(); -/// Get a receiver for IMU readings. -/// -/// Call this from main to receive sensor data. -/// The MPU task pushes readings at its configured rate (~20Hz). -/// -/// # Example -/// ```ignore -/// let imu_rx = mpu::api::events(); -/// loop { -/// let reading = imu_rx.receive().await; -/// // Process reading... -/// } -/// ``` pub fn events() -> Receiver<'static, CriticalSectionRawMutex, ImuReading, QUEUE_SIZE> { IMU_CHANNEL.receiver() } -/// Get a sender for IMU readings (internal use). -/// -/// Used by the task to publish readings. pub(crate) fn sender() -> Sender<'static, CriticalSectionRawMutex, ImuReading, QUEUE_SIZE> { IMU_CHANNEL.sender() } diff --git a/mqtt_display/src/mpu/driver.rs b/mqtt_display/src/mpu/driver.rs index 95ad31a..c129156 100644 --- a/mqtt_display/src/mpu/driver.rs +++ b/mqtt_display/src/mpu/driver.rs @@ -1,8 +1,5 @@ // src/mpu/driver.rs -//! Minimal MPU6050 driver using raw register I/O. -//! -//! This avoids dependency issues with existing crates that require -//! blocking delay traits incompatible with async Embassy. +//! MPU6050 driver using raw register I/O. use embedded_hal::i2c::I2c; diff --git a/mqtt_display/src/mqtt/client.rs b/mqtt_display/src/mqtt/client.rs index d8c8234..31541c7 100644 --- a/mqtt_display/src/mqtt/client.rs +++ b/mqtt_display/src/mqtt/client.rs @@ -2,18 +2,17 @@ use embassy_futures::select::{select, Either}; use embassy_net::{tcp::TcpSocket, Stack}; use embassy_time::{Duration, Timer}; -use log::info; use rust_mqtt::client::client::MqttClient; use rust_mqtt::client::client_config::{ClientConfig, MqttVersion}; use rust_mqtt::packet::v5::publish_packet::QualityOfService; use rust_mqtt::packet::v5::reason_codes::ReasonCode; use rust_mqtt::utils::rng_generator::CountingRng; -use static_cell::ConstStaticCell; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::channel::{Channel, Receiver}; - -use heapless::{String as HString, Vec as HVec}; +use heapless::{String, Vec}; +use static_cell::ConstStaticCell; +use log::info; use crate::mqtt::config::mqtt_broker_endpoint; @@ -21,7 +20,7 @@ const RECONNECT_DELAY_SECS: u64 = 5; const KEEPALIVE_SECS: u64 = 60; const PING_PERIOD: Duration = Duration::from_secs(KEEPALIVE_SECS / 2); -// Limits for small, static buffers (no heap) +// Limits for static buffers pub const TOPIC_MAX: usize = 128; pub const PAYLOAD_MAX: usize = 512; const COMMAND_QUEUE: usize = 8; @@ -31,7 +30,7 @@ const EVENT_QUEUE: usize = 8; static TCP_RX_BUFFER: ConstStaticCell<[u8; 2048]> = ConstStaticCell::new([0; 2048]); static TCP_TX_BUFFER: ConstStaticCell<[u8; 2048]> = ConstStaticCell::new([0; 2048]); -// MQTT client buffers (separate from the TcpSocket's buffers) +// MQTT client buffers (separate from the TcpSocket buffers) static MQTT_TX_BUF: ConstStaticCell<[u8; 1024]> = ConstStaticCell::new([0; 1024]); static MQTT_RX_BUF: ConstStaticCell<[u8; 1024]> = ConstStaticCell::new([0; 1024]); @@ -40,14 +39,14 @@ type Client<'a, 'net> = MqttClient<'a, TcpSocket<'net>, 8, CountingRng>; #[derive(Clone)] pub struct IncomingMsg { - pub topic: HString, - pub payload: HVec, + pub topic: String, + pub payload: Vec, } #[derive(Clone)] struct PublishMsg { - topic: HString, - payload: HVec, + topic: String, + payload: Vec, qos: QualityOfService, retain: bool, } @@ -55,7 +54,7 @@ struct PublishMsg { #[derive(Clone)] enum Command { Publish(PublishMsg), - Subscribe(HString), + Subscribe(String), } static CMD_CHAN: Channel = Channel::new(); @@ -69,8 +68,7 @@ pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, re payload: truncate_payload(payload), qos, retain, - })) - .await; + })).await; } pub async fn mqtt_subscribe(topic: &str) { @@ -83,73 +81,18 @@ pub fn mqtt_events( } // Helper functions for memory-safe truncation -fn truncate_str(s: &str) -> HString { - let mut h = HString::new(); +fn truncate_str(s: &str) -> String { + let mut h = String::new(); let _ = h.push_str(&s[..s.len().min(N)]); h } -fn truncate_payload(data: &[u8]) -> HVec { - let mut v = HVec::new(); +fn truncate_payload(data: &[u8]) -> Vec { + let mut v = Vec::new(); let _ = v.extend_from_slice(&data[..data.len().min(PAYLOAD_MAX)]); v } -// MQTT configuration and client setup -fn build_client_config() -> ClientConfig<'static, 8, CountingRng> { - let mut cfg = ClientConfig::new(MqttVersion::MQTTv5, CountingRng(0)); - cfg.keep_alive = KEEPALIVE_SECS as u16; - cfg.add_client_id("esp32-client"); - cfg -} - -fn build_client<'a, 'net>( - socket: TcpSocket<'net>, - mqtt_tx: &'a mut [u8], - mqtt_rx: &'a mut [u8], -) -> Client<'a, 'net> { - let mqtt_tx_len = mqtt_tx.len(); - let mqtt_rx_len = mqtt_rx.len(); - MqttClient::new(socket, mqtt_tx, mqtt_tx_len, mqtt_rx, mqtt_rx_len, build_client_config()) -} - -// Connection lifecycle and main session loop - -async fn connect_tcp<'net>(socket: &mut TcpSocket<'net>) -> Result<(), ()> { - match socket.connect(mqtt_broker_endpoint()).await { - Ok(_) => { - info!("Connected TCP to MQTT broker"); - Ok(()) - } - Err(e) => { - info!("TCP connect failed: {:?}", e); - Err(()) - } - } -} - -async fn connect_mqtt(client: &mut Client<'_, '_>) -> Result<(), ReasonCode> { - client.connect_to_broker().await -} - -async fn run_loop(client: &mut Client<'_, '_>) -> Result<(), ReasonCode> { - let default_topic = "esp32/topic"; - match client.subscribe_to_topic(default_topic).await { - Ok(_) => info!("Subscribed to '{}'", default_topic), - Err(e) => info!("Default subscribe failed: {:?}", e), - }; - - loop { - let net_or_ping = select(client.receive_message(), Timer::after(PING_PERIOD)); - - match select(CMD_CHAN.receive(), net_or_ping).await { - Either::First(cmd) => handle_command(client, cmd).await?, - Either::Second(Either::First(result)) => handle_incoming(result).await?, - Either::Second(Either::Second(_)) => client.send_ping().await?, - } - } -} - async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(), ReasonCode> { match cmd { Command::Publish(msg) => { @@ -171,8 +114,7 @@ async fn handle_incoming(result: Result<(&str, &[u8]), ReasonCode>) -> Result<() .send(IncomingMsg { topic: truncate_str::(topic), payload: truncate_payload(payload), - }) - .await; + }).await; Ok(()) } @@ -185,12 +127,21 @@ async fn run_one_session( mqtt_rx: &mut [u8], ) -> Result<(), ()> { let mut socket = TcpSocket::new(stack, tcp_rx, tcp_tx); - if connect_tcp(&mut socket).await.is_err() { - return Err(()); + match socket.connect(mqtt_broker_endpoint()).await { + Ok(_) => info!("Connected TCP to MQTT broker"), + Err(e) => { + info!("TCP connect failed: {:#?}", e); + return Err(()); + } } - let mut client = build_client(socket, mqtt_tx, mqtt_rx); - match connect_mqtt(&mut client).await { + // MQTT configuration and client setup + let mut cfg: ClientConfig<8, CountingRng> = ClientConfig::new(MqttVersion::MQTTv5, CountingRng(0)); + cfg.keep_alive = KEEPALIVE_SECS as u16; + cfg.add_client_id("esp32-client"); + + let mut client = MqttClient::new(socket, mqtt_tx, mqtt_tx.len(), mqtt_rx, mqtt_rx.len(), cfg); + match client.connect_to_broker().await { Ok(_) => info!("MQTT CONNACK received"), Err(reason) => { info!("MQTT connect failed: {:?}", reason); @@ -198,7 +149,22 @@ async fn run_one_session( } } - run_loop(&mut client).await.map_err(|_| ()) + // Operational loop + let default_topic = "esp32/topic"; + match client.subscribe_to_topic(default_topic).await { + Ok(_) => info!("Subscribed to '{}'", default_topic), + Err(e) => info!("Default subscribe failed: {:?}", e), + }; + + loop { + let net_or_ping = select(client.receive_message(), Timer::after(PING_PERIOD)); + + match select(CMD_CHAN.receive(), net_or_ping).await { + Either::First(cmd) => handle_command(&mut client, cmd).await.map_err(|_| ())?, + Either::Second(Either::First(result)) => handle_incoming(result).await.map_err(|_| ())?, + Either::Second(Either::Second(_)) => client.send_ping().await.map_err(|_| ())?, + } + } } // Main MQTT embassy task @@ -218,13 +184,9 @@ pub async fn mqtt_task(stack: Stack<'static>) { &mut tcp_tx[..], &mut mqtt_tx[..], &mut mqtt_rx[..], - ) - .await; + ).await; - info!( - "Reconnecting in {}s after session end/failure", - RECONNECT_DELAY_SECS - ); + info!("Reconnecting in {}s after session end/failure", RECONNECT_DELAY_SECS); Timer::after(Duration::from_secs(RECONNECT_DELAY_SECS)).await; } }