From 70c37c344bee918fbf74440cb7ba6245dfa0822b Mon Sep 17 00:00:00 2001 From: Priec Date: Thu, 8 Jan 2026 15:59:44 +0100 Subject: [PATCH] working mqtt now --- flake.nix | 29 ++++--- mqtt/Makefile | 3 + mqtt/build.rs | 11 ++- mqtt/src/mqtt/client.rs | 173 +++++++++++++++++----------------------- 4 files changed, 98 insertions(+), 118 deletions(-) diff --git a/flake.nix b/flake.nix index 569ebe6..9b76a2f 100644 --- a/flake.nix +++ b/flake.nix @@ -28,12 +28,12 @@ ]; shellHook = '' - echo ">>> ESP32 DevShell (esp-hal only)" + echo ">>> ESP32 DevShell (esp-hal only)" - if [ ! -d "$HOME/.espup" ]; then - echo "Running espup install..." - espup install - fi + if [ ! -d "$HOME/.espup" ]; then + echo "Running espup install..." + espup install + fi # Ensure the real rust-analyzer from Nix is used (not rustup's shim) export PATH=${pkgs.rust-analyzer}/bin:$PATH @@ -42,16 +42,25 @@ shellHook = '' # (do NOT set RUSTUP_TOOLCHAIN=esp globally; that breaks rust-analyzer) export PATH=$HOME/.rustup/toolchains/esp/bin:$PATH - # Add GCC/binutils path for xtensa-esp32-elf-gcc - export PATH=$HOME/.rustup/toolchains/esp/xtensa-esp-elf/esp-14.2.0_20240906/xtensa-esp-elf/bin:$PATH + # Add GCC/binutils path for xtensa-esp32-elf-gcc + export PATH=$HOME/.rustup/toolchains/esp/xtensa-esp-elf/esp-14.2.0_20240906/xtensa-esp-elf/bin:$PATH # Helpers that force the ESP toolchain explicitly when needed alias cargo-esp="RUSTUP_TOOLCHAIN=esp cargo" alias rustc-esp="RUSTUP_TOOLCHAIN=esp rustc" - echo "Xtensa Rust toolchain ready." - rustc --version - which xtensa-esp32-elf-gcc || echo "⚠️ xtensa-esp32-elf-gcc not found in PATH" + echo "Xtensa Rust toolchain ready." + rustc --version + which xtensa-esp32-elf-gcc || echo "⚠️ xtensa-esp32-elf-gcc not found in PATH" + + echo "" + echo "MQTT broker test (run manually in two terminals):" + echo " Terminal 1:" + echo " mosquitto_sub -h mqtt.farmeris.sk -p 1883 -t esp32/topic -i subEsp -v" + echo "" + echo " Terminal 2:" + echo " mosquitto_pub -h mqtt.farmeris.sk -p 1883 -t esp32/topic -m \"AAAAA\" -i pubEsp" + echo "" ''; }; }; diff --git a/mqtt/Makefile b/mqtt/Makefile index 2845b25..d4b9f29 100644 --- a/mqtt/Makefile +++ b/mqtt/Makefile @@ -7,3 +7,6 @@ build: flash: cargo espflash flash --release --monitor + +info: + espflash board-info diff --git a/mqtt/build.rs b/mqtt/build.rs index 8005431..7b470c4 100644 --- a/mqtt/build.rs +++ b/mqtt/build.rs @@ -1,7 +1,8 @@ fn main() { - // load .env and pass SSID/PASSWORD to compiler - if let Ok(dotenv_path) = dotenvy::dotenv() { - // Only rebuild if .env changes + // Explicitly load .env from the project root, even when called from other dirs + let project_root = std::env::var("CARGO_MANIFEST_DIR").unwrap(); + let dotenv_path = std::path::Path::new(&project_root).join(".env"); + if dotenvy::from_path(dotenv_path.clone()).is_ok() { println!("cargo:rerun-if-changed={}", dotenv_path.display()); } @@ -12,8 +13,6 @@ fn main() { if let Ok(password) = std::env::var("PASSWORD") { println!("cargo:rustc-env=PASSWORD={}", password); } - - // Export BROKER_IP and PORT as string envs also (optional, for debugging) if let Ok(ip) = std::env::var("BROKER_IP") { println!("cargo:rustc-env=BROKER_IP={}", ip); } @@ -22,7 +21,7 @@ fn main() { } linker_be_nice(); - // make sure linkall.x is the last linker script + println!("cargo:rustc-link-arg=-Tlinkall.x"); } diff --git a/mqtt/src/mqtt/client.rs b/mqtt/src/mqtt/client.rs index 93940a5..bda6be9 100644 --- a/mqtt/src/mqtt/client.rs +++ b/mqtt/src/mqtt/client.rs @@ -38,6 +38,14 @@ static MQTT_RX_BUF: ConstStaticCell<[u8; 1024]> = ConstStaticCell::new([0; 1024] // Tie TcpSocket lifetime to session type Client<'a, 'net> = MqttClient<'a, TcpSocket<'net>, 8, CountingRng>; +// ===================== Improved Section Starts Here ===================== // + +#[derive(Clone)] +pub struct IncomingMsg { + pub topic: HString, + pub payload: HVec, +} + #[derive(Clone)] struct PublishMsg { topic: HString, @@ -47,38 +55,21 @@ struct PublishMsg { } #[derive(Clone)] -enum MqttCommand { +enum Command { Publish(PublishMsg), Subscribe(HString), } -static CMD_CHAN: Channel = Channel::new(); - -#[derive(Clone)] -pub struct IncomingMsg { - pub topic: HString, - pub payload: HVec, -} - +static CMD_CHAN: Channel = Channel::new(); static EVT_CHAN: Channel = Channel::new(); -// Public API for main -pub async fn mqtt_publish( - topic: &str, - payload: &[u8], - qos: QualityOfService, - retain: bool, -) { - let mut t: HString = HString::new(); - let _ = t.push_str(&topic[..core::cmp::min(topic.len(), TOPIC_MAX)]); - let mut p: HVec = HVec::new(); - let take = core::cmp::min(payload.len(), PAYLOAD_MAX); - let _ = p.extend_from_slice(&payload[..take]); +// --- Public API --- +pub async fn mqtt_publish(topic: &str, payload: &[u8], qos: QualityOfService, retain: bool) { CMD_CHAN - .send(MqttCommand::Publish(PublishMsg { - topic: t, - payload: p, + .send(Command::Publish(PublishMsg { + topic: truncate_str::(topic), + payload: truncate_payload(payload), qos, retain, })) @@ -86,20 +77,30 @@ pub async fn mqtt_publish( } pub async fn mqtt_subscribe(topic: &str) { - let mut t: HString = HString::new(); - let _ = t.push_str(&topic[..core::cmp::min(topic.len(), TOPIC_MAX)]); - CMD_CHAN.send(MqttCommand::Subscribe(t)).await; + CMD_CHAN.send(Command::Subscribe(truncate_str::(topic))).await; } -// Receiver for incoming MQTT messages pub fn mqtt_events( ) -> Receiver<'static, CriticalSectionRawMutex, IncomingMsg, EVENT_QUEUE> { EVT_CHAN.receiver() } +// --- Helper functions for memory-safe truncation --- +fn truncate_str(s: &str) -> HString { + let mut h = HString::new(); + let _ = h.push_str(&s[..s.len().min(N)]); + h +} + +fn truncate_payload(data: &[u8]) -> HVec { + let mut v = HVec::new(); + let _ = v.extend_from_slice(&data[..data.len().min(PAYLOAD_MAX)]); + v +} + +// --- MQTT configuration and client setup --- fn build_client_config() -> ClientConfig<'static, 8, CountingRng> { - let rng = CountingRng(0); - let mut cfg: ClientConfig<'static, 8, _> = ClientConfig::new(MqttVersion::MQTTv5, rng); + let mut cfg = ClientConfig::new(MqttVersion::MQTTv5, CountingRng(0)); cfg.keep_alive = KEEPALIVE_SECS as u16; cfg.add_client_id("esp32-client"); cfg @@ -110,12 +111,13 @@ fn build_client<'a, 'net>( mqtt_tx: &'a mut [u8], mqtt_rx: &'a mut [u8], ) -> Client<'a, 'net> { - let cfg = build_client_config(); let mqtt_tx_len = mqtt_tx.len(); let mqtt_rx_len = mqtt_rx.len(); - MqttClient::new(socket, mqtt_tx, mqtt_tx_len, mqtt_rx, mqtt_rx_len, cfg) + MqttClient::new(socket, mqtt_tx, mqtt_tx_len, mqtt_rx, mqtt_rx_len, build_client_config()) } +// --- Connection lifecycle and main session loop --- + async fn connect_tcp<'net>(socket: &mut TcpSocket<'net>) -> Result<(), ()> { match socket.connect(mqtt_broker_endpoint()).await { Ok(_) => { @@ -133,84 +135,51 @@ async fn connect_mqtt(client: &mut Client<'_, '_>) -> Result<(), ReasonCode> { client.connect_to_broker().await } -// While connected, handle publishes, subscribes, ping, and incoming messages -async fn connected_loop(client: &mut Client<'_, '_>) -> Result<(), ReasonCode> { - // Subscribe to a default topic on connect (optional). You can remove this - // if you always subscribe from main via mqtt_subscribe(). +async fn run_loop(client: &mut Client<'_, '_>) -> Result<(), ReasonCode> { let default_topic = "esp32/topic"; match client.subscribe_to_topic(default_topic).await { Ok(_) => info!("Subscribed to '{}'", default_topic), - Err(e) => { - info!("Default subscribe failed: {:?}", e); - // Not fatal: continue - } - } + Err(e) => info!("Default subscribe failed: {:?}", e), + }; loop { - // Nested select so we react to incoming publishes ASAP, while still - // sending keepalive pings on schedule and servicing commands. - let in_or_ping = async { - select(client.receive_message(), Timer::after(PING_PERIOD)).await - }; + let net_or_ping = select(client.receive_message(), Timer::after(PING_PERIOD)); - match select(CMD_CHAN.receive(), in_or_ping).await { - // Command from main (publish/subscribe) - Either::First(cmd) => match cmd { - MqttCommand::Publish(m) => { - if let Err(e) = client - .send_message( - m.topic.as_str(), - &m.payload, - m.qos, - m.retain, - ) - .await - { - return Err(e); - } - } - MqttCommand::Subscribe(topic) => { - match client.subscribe_to_topic(topic.as_str()).await { - Ok(_) => info!("Subscribed to '{}'", topic.as_str()), - Err(e) => { - info!( - "Subscribe failed for '{}': {:?}", - topic.as_str(), - e - ); - return Err(e); - } - } - } - }, - // Either an incoming publish or the ping timer fired - Either::Second(e) => match e { - // Got a PUBLISH from broker - Either::First(Ok((topic, msg))) => { - let mut t: HString = HString::new(); - let _ = t.push_str( - &topic[..core::cmp::min(topic.len(), TOPIC_MAX)], - ); - let mut p: HVec = HVec::new(); - let take = core::cmp::min(msg.len(), PAYLOAD_MAX); - let _ = p.extend_from_slice(&msg[..take]); - EVT_CHAN - .send(IncomingMsg { topic: t, payload: p }) - .await; - } - Either::First(Err(e)) => { - info!("MQTT receive error (reconnect): {:?}", e); - return Err(e); - } - Either::Second(_) => { - client.send_ping().await?; - } - }, + match select(CMD_CHAN.receive(), net_or_ping).await { + Either::First(cmd) => handle_command(client, cmd).await?, + Either::Second(Either::First(result)) => handle_incoming(result).await?, + Either::Second(Either::Second(_)) => client.send_ping().await?, } } } -// Full session: TCP connect -> MQTT connect -> connected loop +async fn handle_command(client: &mut Client<'_, '_>, cmd: Command) -> Result<(), ReasonCode> { + match cmd { + Command::Publish(msg) => { + client + .send_message(msg.topic.as_str(), &msg.payload, msg.qos, msg.retain) + .await + } + Command::Subscribe(topic) => { + client.subscribe_to_topic(topic.as_str()).await?; + info!("Subscribed to '{}'", topic); + Ok(()) + } + } +} + +async fn handle_incoming(result: Result<(&str, &[u8]), ReasonCode>) -> Result<(), ReasonCode> { + let (topic, payload) = result?; + EVT_CHAN + .send(IncomingMsg { + topic: truncate_str::(topic), + payload: truncate_payload(payload), + }) + .await; + Ok(()) +} + +// --- Session and reconnect control --- async fn run_one_session( stack: Stack<'static>, tcp_rx: &mut [u8], @@ -232,14 +201,14 @@ async fn run_one_session( } } - connected_loop(&mut client).await.map_err(|_| ()) + run_loop(&mut client).await.map_err(|_| ()) } +// --- Main MQTT embassy task --- #[embassy_executor::task] pub async fn mqtt_task(stack: Stack<'static>) { info!("MQTT task starting..."); - // Take static buffers once and reuse across reconnects let tcp_rx = TCP_RX_BUFFER.take(); let tcp_tx = TCP_TX_BUFFER.take(); let mqtt_tx = MQTT_TX_BUF.take();