better way to do it
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
use embassy_net::{tcp::TcpSocket, Stack};
|
||||
use embassy_net::{tcp::TcpSocket, Ipv4Address, Stack};
|
||||
use embassy_time::{Duration, Timer};
|
||||
use log::info;
|
||||
|
||||
@@ -8,19 +8,18 @@ use rust_mqtt::packet::v5::publish_packet::QualityOfService;
|
||||
use rust_mqtt::utils::rng_generator::CountingRng;
|
||||
|
||||
#[embassy_executor::task]
|
||||
pub async fn mqtt_task(stack: &'static Stack<'static>) {
|
||||
pub async fn mqtt_task(stack: Stack<'static>) {
|
||||
info!("MQTT task starting...");
|
||||
|
||||
// TCP socket buffers
|
||||
static mut RX_BUFFER: [u8; 2048] = [0; 2048];
|
||||
static mut TX_BUFFER: [u8; 2048] = [0; 2048];
|
||||
// TCP socket buffers (for embassy-net TcpSocket)
|
||||
static mut TCP_RX_BUFFER: [u8; 2048] = [0; 2048];
|
||||
static mut TCP_TX_BUFFER: [u8; 2048] = [0; 2048];
|
||||
|
||||
// Fix #1: Dereference stack
|
||||
// embassy-net 0.7: pass Stack by value (not &Stack)
|
||||
let mut socket =
|
||||
TcpSocket::new(*stack, unsafe { &mut RX_BUFFER }, unsafe { &mut TX_BUFFER });
|
||||
TcpSocket::new(stack, unsafe { &mut TCP_RX_BUFFER }, unsafe { &mut TCP_TX_BUFFER });
|
||||
|
||||
// Connect to broker
|
||||
use embassy_net::Ipv4Address;
|
||||
// Adjust broker IP/port as needed
|
||||
let broker_ip = Ipv4Address::new(192, 168, 1, 100);
|
||||
let broker_port = 1883;
|
||||
|
||||
@@ -28,61 +27,61 @@ pub async fn mqtt_task(stack: &'static Stack<'static>) {
|
||||
info!("TCP connect failed: {:?}", e);
|
||||
return;
|
||||
}
|
||||
info!("Connected to TCP broker");
|
||||
info!("Connected TCP to MQTT broker");
|
||||
|
||||
// MQTT client needs TWO separate buffers!
|
||||
static mut MQTT_WRITE_BUF: [u8; 1024] = [0; 1024];
|
||||
static mut MQTT_RECV_BUF: [u8; 1024] = [0; 1024];
|
||||
// MQTT client buffers (separate from the TcpSocket’s buffers)
|
||||
static mut MQTT_TX_BUF: [u8; 1024] = [0; 1024];
|
||||
static mut MQTT_RX_BUF: [u8; 1024] = [0; 1024];
|
||||
|
||||
// Fix #2: CountingRng initialization
|
||||
let rng = CountingRng(12345);
|
||||
// rust-mqtt 0.3.0 RNG
|
||||
let rng = CountingRng(0);
|
||||
|
||||
// Fix #3: ClientConfig takes MqttVersion and rng
|
||||
let mut client_config = ClientConfig::new(MqttVersion::MQTTv5, rng);
|
||||
client_config.add_client_id("esp32-client");
|
||||
client_config.keep_alive = 60;
|
||||
// Build client config
|
||||
let mut cfg: ClientConfig<'static, 8, _> = ClientConfig::new(MqttVersion::MQTTv5, rng);
|
||||
cfg.keep_alive = 60;
|
||||
cfg.add_client_id("esp32-client");
|
||||
// Optional:
|
||||
// cfg.add_username("user");
|
||||
// cfg.add_password("pass");
|
||||
|
||||
// Fix #4: MqttClient::new takes TWO buffers (write + receive)
|
||||
let mut client: MqttClient<_, 16, _> = MqttClient::new(
|
||||
// Create client
|
||||
let mut client: MqttClient<'static, _, 8, _> = MqttClient::new(
|
||||
socket,
|
||||
unsafe { &mut MQTT_WRITE_BUF }, // write buffer
|
||||
1024, // write buffer length
|
||||
unsafe { &mut MQTT_RECV_BUF }, // receive buffer
|
||||
1024, // receive buffer length
|
||||
client_config,
|
||||
unsafe { &mut MQTT_TX_BUF },
|
||||
unsafe { MQTT_TX_BUF.len() },
|
||||
unsafe { &mut MQTT_RX_BUF },
|
||||
unsafe { MQTT_RX_BUF.len() },
|
||||
cfg,
|
||||
);
|
||||
|
||||
// Connect to MQTT broker
|
||||
match client.connect_to_broker().await {
|
||||
Ok(_) => info!("MQTT CONNECT successful"),
|
||||
Err(e) => {
|
||||
info!("MQTT connect failed: {:?}", e);
|
||||
// CONNECT
|
||||
if let Err(reason) = client.connect_to_broker().await {
|
||||
info!("MQTT connect failed: {:?}", reason);
|
||||
return;
|
||||
}
|
||||
}
|
||||
info!("MQTT CONNACK received");
|
||||
|
||||
// Publish a message using send_message()
|
||||
match client.send_message(
|
||||
// Publish a message (QoS0)
|
||||
if let Err(reason) = client
|
||||
.send_message(
|
||||
"esp32/topic",
|
||||
b"hello from esp32",
|
||||
QualityOfService::QoS0,
|
||||
false // retain
|
||||
).await {
|
||||
Ok(_) => info!("MQTT message published"),
|
||||
Err(e) => info!("MQTT publish error: {:?}", e),
|
||||
false,
|
||||
)
|
||||
.await
|
||||
{
|
||||
info!("MQTT publish failed: {:?}", reason);
|
||||
} else {
|
||||
info!("MQTT PUBLISH sent");
|
||||
}
|
||||
|
||||
// Main loop - receive messages
|
||||
// Keep the session alive; send PING and await PINGRESP every 30s
|
||||
loop {
|
||||
match client.receive_message().await {
|
||||
Ok((topic, payload)) => {
|
||||
info!("Received on {}: {:?}", topic, payload);
|
||||
}
|
||||
Err(e) => {
|
||||
info!("MQTT error: {:?}", e);
|
||||
if let Err(reason) = client.send_ping().await {
|
||||
info!("MQTT ping failed: {:?}", reason);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Timer::after(Duration::from_millis(100)).await;
|
||||
Timer::after(Duration::from_secs(30)).await;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user