diff --git a/final/src/mqtt/client.rs b/final/src/mqtt/client.rs index bcf5640..1b62d52 100644 --- a/final/src/mqtt/client.rs +++ b/final/src/mqtt/client.rs @@ -1,4 +1,4 @@ -use embassy_net::{tcp::TcpSocket, Stack}; +use embassy_net::{tcp::TcpSocket, Ipv4Address, Stack}; use embassy_time::{Duration, Timer}; use log::info; @@ -8,19 +8,18 @@ use rust_mqtt::packet::v5::publish_packet::QualityOfService; use rust_mqtt::utils::rng_generator::CountingRng; #[embassy_executor::task] -pub async fn mqtt_task(stack: &'static Stack<'static>) { +pub async fn mqtt_task(stack: Stack<'static>) { info!("MQTT task starting..."); - // TCP socket buffers - static mut RX_BUFFER: [u8; 2048] = [0; 2048]; - static mut TX_BUFFER: [u8; 2048] = [0; 2048]; + // TCP socket buffers (for embassy-net TcpSocket) + static mut TCP_RX_BUFFER: [u8; 2048] = [0; 2048]; + static mut TCP_TX_BUFFER: [u8; 2048] = [0; 2048]; - // Fix #1: Dereference stack + // embassy-net 0.7: pass Stack by value (not &Stack) let mut socket = - TcpSocket::new(*stack, unsafe { &mut RX_BUFFER }, unsafe { &mut TX_BUFFER }); + TcpSocket::new(stack, unsafe { &mut TCP_RX_BUFFER }, unsafe { &mut TCP_TX_BUFFER }); - // Connect to broker - use embassy_net::Ipv4Address; + // Adjust broker IP/port as needed let broker_ip = Ipv4Address::new(192, 168, 1, 100); let broker_port = 1883; @@ -28,61 +27,61 @@ pub async fn mqtt_task(stack: &'static Stack<'static>) { info!("TCP connect failed: {:?}", e); return; } - info!("Connected to TCP broker"); + info!("Connected TCP to MQTT broker"); - // MQTT client needs TWO separate buffers! - static mut MQTT_WRITE_BUF: [u8; 1024] = [0; 1024]; - static mut MQTT_RECV_BUF: [u8; 1024] = [0; 1024]; + // MQTT client buffers (separate from the TcpSocket’s buffers) + static mut MQTT_TX_BUF: [u8; 1024] = [0; 1024]; + static mut MQTT_RX_BUF: [u8; 1024] = [0; 1024]; - // Fix #2: CountingRng initialization - let rng = CountingRng(12345); - - // Fix #3: ClientConfig takes MqttVersion and rng - let mut client_config = ClientConfig::new(MqttVersion::MQTTv5, rng); - client_config.add_client_id("esp32-client"); - client_config.keep_alive = 60; + // rust-mqtt 0.3.0 RNG + let rng = CountingRng(0); - // Fix #4: MqttClient::new takes TWO buffers (write + receive) - let mut client: MqttClient<_, 16, _> = MqttClient::new( + // Build client config + let mut cfg: ClientConfig<'static, 8, _> = ClientConfig::new(MqttVersion::MQTTv5, rng); + cfg.keep_alive = 60; + cfg.add_client_id("esp32-client"); + // Optional: + // cfg.add_username("user"); + // cfg.add_password("pass"); + + // Create client + let mut client: MqttClient<'static, _, 8, _> = MqttClient::new( socket, - unsafe { &mut MQTT_WRITE_BUF }, // write buffer - 1024, // write buffer length - unsafe { &mut MQTT_RECV_BUF }, // receive buffer - 1024, // receive buffer length - client_config, + unsafe { &mut MQTT_TX_BUF }, + unsafe { MQTT_TX_BUF.len() }, + unsafe { &mut MQTT_RX_BUF }, + unsafe { MQTT_RX_BUF.len() }, + cfg, ); - // Connect to MQTT broker - match client.connect_to_broker().await { - Ok(_) => info!("MQTT CONNECT successful"), - Err(e) => { - info!("MQTT connect failed: {:?}", e); - return; - } + // CONNECT + if let Err(reason) = client.connect_to_broker().await { + info!("MQTT connect failed: {:?}", reason); + return; + } + info!("MQTT CONNACK received"); + + // Publish a message (QoS0) + if let Err(reason) = client + .send_message( + "esp32/topic", + b"hello from esp32", + QualityOfService::QoS0, + false, + ) + .await + { + info!("MQTT publish failed: {:?}", reason); + } else { + info!("MQTT PUBLISH sent"); } - // Publish a message using send_message() - match client.send_message( - "esp32/topic", - b"hello from esp32", - QualityOfService::QoS0, - false // retain - ).await { - Ok(_) => info!("MQTT message published"), - Err(e) => info!("MQTT publish error: {:?}", e), - } - - // Main loop - receive messages + // Keep the session alive; send PING and await PINGRESP every 30s loop { - match client.receive_message().await { - Ok((topic, payload)) => { - info!("Received on {}: {:?}", topic, payload); - } - Err(e) => { - info!("MQTT error: {:?}", e); - break; - } + if let Err(reason) = client.send_ping().await { + info!("MQTT ping failed: {:?}", reason); + break; } - Timer::after(Duration::from_millis(100)).await; + Timer::after(Duration::from_secs(30)).await; } }