From 0eab3bbaba0d58b06a3c25b072c950ca349c1ba5 Mon Sep 17 00:00:00 2001 From: Priec Date: Fri, 3 Oct 2025 22:32:40 +0200 Subject: [PATCH] compiled rust mqtt --- final/src/bin/main.rs | 1 - final/src/lib.rs | 2 ++ final/src/mqtt/client.rs | 77 +++++++++++++++++++++++++++++----------- 3 files changed, 58 insertions(+), 22 deletions(-) diff --git a/final/src/bin/main.rs b/final/src/bin/main.rs index e0b7506..53504aa 100644 --- a/final/src/bin/main.rs +++ b/final/src/bin/main.rs @@ -17,7 +17,6 @@ use esp_wifi::{ wifi::{ClientConfiguration, Configuration, WifiController, WifiDevice, WifiEvent, WifiState}, }; use log::info; -mod mqtt; extern crate alloc; diff --git a/final/src/lib.rs b/final/src/lib.rs index 0c9ac1a..cfc857e 100644 --- a/final/src/lib.rs +++ b/final/src/lib.rs @@ -1 +1,3 @@ #![no_std] + +pub mod mqtt; diff --git a/final/src/mqtt/client.rs b/final/src/mqtt/client.rs index 7b47831..bcf5640 100644 --- a/final/src/mqtt/client.rs +++ b/final/src/mqtt/client.rs @@ -1,21 +1,25 @@ -use embassy_time::{Duration, Timer}; use embassy_net::{tcp::TcpSocket, Stack}; -use esp_wifi::wifi::WifiDevice; -use rust_mqtt::client::client::MqttClient; -use rust_mqtt::packet::v5::{connect::ConnectPacket, publish::PublishPacket}; -use rust_mqtt::utils::rng_generator::CountingRng32; +use embassy_time::{Duration, Timer}; use log::info; +use rust_mqtt::client::client::MqttClient; +use rust_mqtt::client::client_config::{ClientConfig, MqttVersion}; +use rust_mqtt::packet::v5::publish_packet::QualityOfService; +use rust_mqtt::utils::rng_generator::CountingRng; + #[embassy_executor::task] -pub async fn mqtt_task(stack: &'static Stack>) { +pub async fn mqtt_task(stack: &'static Stack<'static>) { info!("MQTT task starting..."); + // TCP socket buffers static mut RX_BUFFER: [u8; 2048] = [0; 2048]; static mut TX_BUFFER: [u8; 2048] = [0; 2048]; - let mut socket = TcpSocket::new(stack, unsafe { &mut RX_BUFFER }, unsafe { &mut TX_BUFFER }); + // Fix #1: Dereference stack + let mut socket = + TcpSocket::new(*stack, unsafe { &mut RX_BUFFER }, unsafe { &mut TX_BUFFER }); - // TODO: replace with your broker IP + // Connect to broker use embassy_net::Ipv4Address; let broker_ip = Ipv4Address::new(192, 168, 1, 100); let broker_port = 1883; @@ -24,30 +28,61 @@ pub async fn mqtt_task(stack: &'static Stack>) { info!("TCP connect failed: {:?}", e); return; } - info!("Connected to MQTT broker"); + info!("Connected to TCP broker"); - let rng = CountingRng32::new(); - let mut client: MqttClient<_, _, 1024, 1024> = MqttClient::new(socket, rng); + // MQTT client needs TWO separate buffers! + static mut MQTT_WRITE_BUF: [u8; 1024] = [0; 1024]; + static mut MQTT_RECV_BUF: [u8; 1024] = [0; 1024]; - let connect = ConnectPacket::new("esp32-client"); - if client.connect(connect).await.is_ok() { - info!("MQTT CONNECT sent"); + // Fix #2: CountingRng initialization + let rng = CountingRng(12345); + + // Fix #3: ClientConfig takes MqttVersion and rng + let mut client_config = ClientConfig::new(MqttVersion::MQTTv5, rng); + client_config.add_client_id("esp32-client"); + client_config.keep_alive = 60; + + // Fix #4: MqttClient::new takes TWO buffers (write + receive) + let mut client: MqttClient<_, 16, _> = MqttClient::new( + socket, + unsafe { &mut MQTT_WRITE_BUF }, // write buffer + 1024, // write buffer length + unsafe { &mut MQTT_RECV_BUF }, // receive buffer + 1024, // receive buffer length + client_config, + ); + + // Connect to MQTT broker + match client.connect_to_broker().await { + Ok(_) => info!("MQTT CONNECT successful"), + Err(e) => { + info!("MQTT connect failed: {:?}", e); + return; + } } - let publish = PublishPacket::new("esp32/topic", b"hello from esp32", 0); - if client.publish(publish).await.is_ok() { - info!("MQTT PUBLISH sent"); + // Publish a message using send_message() + match client.send_message( + "esp32/topic", + b"hello from esp32", + QualityOfService::QoS0, + false // retain + ).await { + Ok(_) => info!("MQTT message published"), + Err(e) => info!("MQTT publish error: {:?}", e), } - // Keep polling + // Main loop - receive messages loop { - match client.poll().await { - Ok(_) => {} + match client.receive_message().await { + Ok((topic, payload)) => { + info!("Received on {}: {:?}", topic, payload); + } Err(e) => { info!("MQTT error: {:?}", e); break; } } - Timer::after(Duration::from_secs(5)).await; + Timer::after(Duration::from_millis(100)).await; } }