split function to multiple functions in mqtt client

This commit is contained in:
Priec
2025-10-05 12:24:38 +02:00
parent 871c9bddd1
commit 3e0801674f

View File

@@ -5,6 +5,7 @@ use log::info;
use rust_mqtt::client::client::MqttClient; use rust_mqtt::client::client::MqttClient;
use rust_mqtt::client::client_config::{ClientConfig, MqttVersion}; use rust_mqtt::client::client_config::{ClientConfig, MqttVersion};
use rust_mqtt::packet::v5::publish_packet::QualityOfService; use rust_mqtt::packet::v5::publish_packet::QualityOfService;
use rust_mqtt::packet::v5::reason_codes::ReasonCode;
use rust_mqtt::utils::rng_generator::CountingRng; use rust_mqtt::utils::rng_generator::CountingRng;
use static_cell::ConstStaticCell; use static_cell::ConstStaticCell;
@@ -18,49 +19,48 @@ static TCP_TX_BUFFER: ConstStaticCell<[u8; 2048]> = ConstStaticCell::new([0; 204
static MQTT_TX_BUF: ConstStaticCell<[u8; 1024]> = ConstStaticCell::new([0; 1024]); static MQTT_TX_BUF: ConstStaticCell<[u8; 1024]> = ConstStaticCell::new([0; 1024]);
static MQTT_RX_BUF: ConstStaticCell<[u8; 1024]> = ConstStaticCell::new([0; 1024]); static MQTT_RX_BUF: ConstStaticCell<[u8; 1024]> = ConstStaticCell::new([0; 1024]);
#[embassy_executor::task] // Type alias for clarity in helper functions
pub async fn mqtt_task(stack: Stack<'static>) { // TODO number 8 is number of allowed subs
info!("MQTT task starting..."); type Client<'a> = MqttClient<'a, TcpSocket<'static>, 8, CountingRng>;
let tcp_rx = TCP_RX_BUFFER.take(); fn build_client_config() -> ClientConfig<'static, 8, CountingRng> {
let tcp_tx = TCP_TX_BUFFER.take();
let mut socket = TcpSocket::new(stack, tcp_rx, tcp_tx);
match socket.connect(mqtt_broker_endpoint()).await {
Ok(_) => info!("Connected TCP to MQTT broker"),
Err(e) => {
info!("TCP connect failed: {:?}", e);
return;
}
}
let mqtt_tx = MQTT_TX_BUF.take();
let mqtt_rx = MQTT_RX_BUF.take();
// Config
let rng = CountingRng(0); let rng = CountingRng(0);
let mut cfg: ClientConfig<'static, 8, _> = ClientConfig::new(MqttVersion::MQTTv5, rng); let mut cfg: ClientConfig<'static, 8, _> = ClientConfig::new(MqttVersion::MQTTv5, rng);
cfg.keep_alive = 60; cfg.keep_alive = 60;
cfg.add_client_id("esp32-client"); cfg.add_client_id("esp32-client");
// cfg.add_username("user"); cfg
// cfg.add_password("pass"); }
fn build_client<'a>(
socket: TcpSocket<'static>,
mqtt_tx: &'a mut [u8],
mqtt_rx: &'a mut [u8],
) -> Client<'a> {
let cfg = build_client_config();
let mqtt_tx_len = mqtt_tx.len(); let mqtt_tx_len = mqtt_tx.len();
let mqtt_rx_len = mqtt_rx.len(); let mqtt_rx_len = mqtt_rx.len();
MqttClient::new(socket, mqtt_tx, mqtt_tx_len, mqtt_rx, mqtt_rx_len, cfg)
}
let mut client = MqttClient::new(socket, mqtt_tx, mqtt_tx_len, mqtt_rx, mqtt_rx_len, cfg); async fn connect_tcp(socket: &mut TcpSocket<'static>) -> Result<(), ()> {
match socket.connect(mqtt_broker_endpoint()).await {
// Connect Ok(_) => {
match client.connect_to_broker().await { info!("Connected TCP to MQTT broker");
Ok(_) => info!("MQTT CONNACK received"), Ok(())
Err(reason) => { }
info!("MQTT connect failed: {:?}", reason); Err(e) => {
return; info!("TCP connect failed: {:?}", e);
Err(())
} }
} }
}
// Publish simple message async fn connect_mqtt(client: &mut Client<'_>) -> Result<(), ReasonCode> {
match client client.connect_to_broker().await
}
async fn publish_once(client: &mut Client<'_>) -> Result<(), ReasonCode> {
client
.send_message( .send_message(
"esp32/topic", "esp32/topic",
b"hello from esp32", b"hello from esp32",
@@ -68,16 +68,50 @@ pub async fn mqtt_task(stack: Stack<'static>) {
false, false,
) )
.await .await
{ }
Ok(_) => info!("MQTT PUBLISH sent"),
Err(reason) => info!("MQTT publish failed: {:?}", reason),
}
async fn ping_loop(client: &mut Client<'_>) -> Result<(), ReasonCode> {
loop { loop {
if let Err(reason) = client.send_ping().await { if let Err(reason) = client.send_ping().await {
info!("MQTT ping failed: {:?}", reason); info!("MQTT ping failed: {:?}", reason);
break; return Err(reason);
} }
Timer::after(Duration::from_secs(30)).await; Timer::after(Duration::from_secs(30)).await;
} }
} }
#[embassy_executor::task]
pub async fn mqtt_task(stack: Stack<'static>) {
info!("MQTT task starting...");
// Take static buffers once
let tcp_rx = TCP_RX_BUFFER.take();
let tcp_tx = TCP_TX_BUFFER.take();
let mqtt_tx = MQTT_TX_BUF.take();
let mqtt_rx = MQTT_RX_BUF.take();
// Build socket and connect TCP
let mut socket = TcpSocket::new(stack, tcp_rx, tcp_tx);
if connect_tcp(&mut socket).await.is_err() {
return;
}
// Build client and connect MQTT
let mut client = build_client(socket, mqtt_tx, mqtt_rx);
match connect_mqtt(&mut client).await {
Ok(_) => info!("MQTT CONNACK received"),
Err(reason) => {
info!("MQTT connect failed: {:?}", reason);
return;
}
}
// Publish single message
match publish_once(&mut client).await {
Ok(_) => info!("MQTT PUBLISH sent"),
Err(reason) => info!("MQTT publish failed: {:?}", reason),
}
// Keep the connection with pings
let _ = ping_loop(&mut client).await;
}