diff --git a/final/Cargo.lock b/final/Cargo.lock index 18b2742..be927e2 100644 --- a/final/Cargo.lock +++ b/final/Cargo.lock @@ -336,7 +336,7 @@ dependencies = [ "embassy-time 0.5.0", "embedded-io-async", "embedded-nal-async", - "heapless", + "heapless 0.8.0", "log", "managed", "smoltcp", @@ -359,7 +359,7 @@ dependencies = [ "embedded-io-async", "futures-sink", "futures-util", - "heapless", + "heapless 0.8.0", ] [[package]] @@ -373,7 +373,7 @@ dependencies = [ "embedded-io-async", "futures-core", "futures-sink", - "heapless", + "heapless 0.8.0", ] [[package]] @@ -425,7 +425,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc55c748d16908a65b166d09ce976575fb8852cf60ccd06174092b41064d8f83" dependencies = [ "embassy-executor", - "heapless", + "heapless 0.8.0", ] [[package]] @@ -561,7 +561,7 @@ dependencies = [ "cfg-if", "esp-config", "esp-println", - "heapless", + "heapless 0.8.0", "semihosting", ] @@ -942,6 +942,16 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "heapless" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1edcd5a338e64688fbdcb7531a846cfd3476a54784dcb918a0844682bc7ada5" +dependencies = [ + "hash32", + "stable_deref_trait", +] + [[package]] name = "heck" version = "0.5.0" @@ -1185,7 +1195,9 @@ dependencies = [ "defmt-rtt", "dotenvy", "embassy-executor", + "embassy-futures", "embassy-net", + "embassy-sync 0.7.2", "embassy-time 0.5.0", "embedded-io", "embedded-io-async", @@ -1196,6 +1208,7 @@ dependencies = [ "esp-hal-embassy", "esp-println", "esp-wifi", + "heapless 0.9.1", "log", "rust-mqtt", "smoltcp", @@ -1279,7 +1292,7 @@ dependencies = [ "defmt 0.3.100", "embedded-io", "embedded-io-async", - "heapless", + "heapless 0.8.0", "rand_core 0.6.4", ] @@ -1347,7 +1360,7 @@ dependencies = [ "bitflags 1.3.2", "byteorder", "cfg-if", - "heapless", + "heapless 0.8.0", "log", "managed", ] diff --git a/final/Cargo.toml b/final/Cargo.toml index cc136ce..3cb610e 100644 --- a/final/Cargo.toml +++ b/final/Cargo.toml @@ -68,6 +68,9 @@ smoltcp = { version = "0.12.0", default-features = false, features = [ static_cell = "2.1.1" rust-mqtt = { version = "0.3.0", default-features = false, features = ["no_std"] } defmt-rtt = "1.0.0" +embassy-futures = "0.1.2" +embassy-sync = "0.7.2" +heapless = "0.9.1" [build-dependencies] dotenvy = "0.15.7" diff --git a/final/src/bin/main.rs b/final/src/bin/main.rs index c523ae0..df163a5 100644 --- a/final/src/bin/main.rs +++ b/final/src/bin/main.rs @@ -17,7 +17,8 @@ use esp_wifi::{ wifi::{ClientConfiguration, Configuration, WifiController, WifiDevice, WifiEvent, WifiState}, }; use log::info; -use projekt_final::mqtt::client::mqtt_task; +use rust_mqtt::packet::v5::publish_packet::QualityOfService; +use projekt_final::mqtt::client::{mqtt_task, mqtt_publish}; use defmt_rtt as _; extern crate alloc; @@ -95,7 +96,17 @@ async fn main(spawner: Spawner) -> ! { info!("MQTT task started"); loop { - Timer::after(Duration::from_secs(60)).await; + // TODO example + mqtt_publish( + "esp32/topic", + b"hello from main", + QualityOfService::QoS1, + false, + ) + .await; + + // Avoid spamming, just an example cadence + Timer::after(Duration::from_secs(5)).await; } } diff --git a/final/src/mqtt/client.rs b/final/src/mqtt/client.rs index eb49db6..12f3f99 100644 --- a/final/src/mqtt/client.rs +++ b/final/src/mqtt/client.rs @@ -1,4 +1,5 @@ // src/mqtt/client.rs +use embassy_futures::select::{select, Either}; use embassy_net::{tcp::TcpSocket, Stack}; use embassy_time::{Duration, Timer}; use log::info; @@ -8,10 +9,20 @@ use rust_mqtt::packet::v5::publish_packet::QualityOfService; use rust_mqtt::packet::v5::reason_codes::ReasonCode; use rust_mqtt::utils::rng_generator::CountingRng; use static_cell::ConstStaticCell; +use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; +use embassy_sync::channel::Channel; +use heapless::{String as HString, Vec as HVec}; use crate::mqtt::config::mqtt_broker_endpoint; const RECONNECT_DELAY_SECS: u64 = 5; +const KEEPALIVE_SECS: u64 = 60; +const PING_PERIOD: Duration = Duration::from_secs(KEEPALIVE_SECS / 2); + +// Small owned buffers for data from main (no heap, no lifetimes) +pub const TOPIC_MAX: usize = 128; +pub const PAYLOAD_MAX: usize = 512; +const PUBLISH_QUEUE: usize = 4; // TCP socket buffers (for embassy-net TcpSocket) static TCP_RX_BUFFER: ConstStaticCell<[u8; 2048]> = ConstStaticCell::new([0; 2048]); @@ -21,16 +32,47 @@ static TCP_TX_BUFFER: ConstStaticCell<[u8; 2048]> = ConstStaticCell::new([0; 204 static MQTT_TX_BUF: ConstStaticCell<[u8; 1024]> = ConstStaticCell::new([0; 1024]); static MQTT_RX_BUF: ConstStaticCell<[u8; 1024]> = ConstStaticCell::new([0; 1024]); -// Type alias for clarity in helper functions // NOTE: Tie the TcpSocket lifetime to the session (not 'static), and -// the MQTT buffers lifetime to 'a. This resolves the E0521 error. -// The const 8 is the MAX_PROPERTIES generic used by rust-mqtt config. +// the MQTT buffers lifetime to 'a. type Client<'a, 'net> = MqttClient<'a, TcpSocket<'net>, 8, CountingRng>; +#[derive(Clone)] +struct PublishMsg { + topic: HString, + payload: HVec, + qos: QualityOfService, + retain: bool, +} + +static PUB_CHAN: Channel = + Channel::new(); + +// Public API for main: enqueue a publish +pub async fn mqtt_publish( + topic: &str, + payload: &[u8], + qos: QualityOfService, + retain: bool, +) { + let mut t: HString = HString::new(); + let _ = t.push_str(&topic[..core::cmp::min(topic.len(), TOPIC_MAX)]); + let mut p: HVec = HVec::new(); + let take = core::cmp::min(payload.len(), PAYLOAD_MAX); + let _ = p.extend_from_slice(&payload[..take]); + PUB_CHAN + .send(PublishMsg { + topic: t, + payload: p, + qos, + retain, + }) + .await; +} + fn build_client_config() -> ClientConfig<'static, 8, CountingRng> { let rng = CountingRng(0); let mut cfg: ClientConfig<'static, 8, _> = ClientConfig::new(MqttVersion::MQTTv5, rng); - cfg.keep_alive = 60; + cfg.keep_alive = KEEPALIVE_SECS as u16; cfg.add_client_id("esp32-client"); cfg } @@ -63,24 +105,30 @@ async fn connect_mqtt(client: &mut Client<'_, '_>) -> Result<(), ReasonCode> { client.connect_to_broker().await } -async fn publish_once(client: &mut Client<'_, '_>) -> Result<(), ReasonCode> { - client - .send_message( - "esp32/topic", - b"hello from esp32", - QualityOfService::QoS1, - false, - ) - .await -} - -async fn ping_loop(client: &mut Client<'_, '_>) -> Result<(), ReasonCode> { +// While connected, either send queued publishes or ping periodically +async fn connected_loop(client: &mut Client<'_, '_>) -> Result<(), ReasonCode> { loop { - if let Err(reason) = client.send_ping().await { - info!("MQTT ping failed: {:?}", reason); - return Err(reason); + match select(PUB_CHAN.receive(), Timer::after(PING_PERIOD)).await { + // Got a publish request from main + Either::First(cmd) => { + // QoS1 may read PUBACK, keep all socket I/O here + let res = client + .send_message( + cmd.topic.as_str(), + &cmd.payload, + cmd.qos, + cmd.retain, + ) + .await; + if let Err(e) = res { + return Err(e); + } + } + // Time to ping + Either::Second(_) => { + client.send_ping().await?; + } } - Timer::after(Duration::from_secs(30)).await; } } @@ -108,14 +156,10 @@ async fn run_one_session( } } - // Optional demo publish (same behavior as before) - match publish_once(&mut client).await { - Ok(_) => info!("MQTT PUBLISH sent"), - Err(reason) => info!("MQTT publish failed: {:?}", reason), - } - - // Keepalive. Any error ends the session; outer loop will reconnect. - ping_loop(&mut client).await.map_err(|_| ()) + // While connected, process publishes and ping. Any error triggers reconnect. + connected_loop(&mut client) + .await + .map_err(|_| ()) } #[embassy_executor::task]