polishing only
This commit is contained in:
@@ -38,7 +38,6 @@ static MQTT_RX_BUF: ConstStaticCell<[u8; 1024]> = ConstStaticCell::new([0; 1024]
|
|||||||
// Tie TcpSocket lifetime to session, MQTT buffers to 'a (fixes E0521).
|
// Tie TcpSocket lifetime to session, MQTT buffers to 'a (fixes E0521).
|
||||||
type Client<'a, 'net> = MqttClient<'a, TcpSocket<'net>, 8, CountingRng>;
|
type Client<'a, 'net> = MqttClient<'a, TcpSocket<'net>, 8, CountingRng>;
|
||||||
|
|
||||||
// ===== Commands from main to MQTT task =====
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct PublishMsg {
|
struct PublishMsg {
|
||||||
topic: HString<TOPIC_MAX>,
|
topic: HString<TOPIC_MAX>,
|
||||||
@@ -53,22 +52,17 @@ enum MqttCommand {
|
|||||||
Subscribe(HString<TOPIC_MAX>),
|
Subscribe(HString<TOPIC_MAX>),
|
||||||
}
|
}
|
||||||
|
|
||||||
static CMD_CHAN: Channel<CriticalSectionRawMutex, MqttCommand, COMMAND_QUEUE> =
|
static CMD_CHAN: Channel<CriticalSectionRawMutex, MqttCommand, COMMAND_QUEUE> = Channel::new();
|
||||||
Channel::new();
|
|
||||||
|
|
||||||
// ===== Events (messages) from MQTT task to main =====
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct IncomingMsg {
|
pub struct IncomingMsg {
|
||||||
pub topic: HString<TOPIC_MAX>,
|
pub topic: HString<TOPIC_MAX>,
|
||||||
pub payload: HVec<u8, PAYLOAD_MAX>,
|
pub payload: HVec<u8, PAYLOAD_MAX>,
|
||||||
}
|
}
|
||||||
|
|
||||||
static EVT_CHAN: Channel<CriticalSectionRawMutex, IncomingMsg, EVENT_QUEUE> =
|
static EVT_CHAN: Channel<CriticalSectionRawMutex, IncomingMsg, EVENT_QUEUE> = Channel::new();
|
||||||
Channel::new();
|
|
||||||
|
|
||||||
// === Public API for main ===
|
// Public API for main
|
||||||
|
|
||||||
// Enqueue a publish (non-alloc, copies into small heapless buffers)
|
|
||||||
pub async fn mqtt_publish(
|
pub async fn mqtt_publish(
|
||||||
topic: &str,
|
topic: &str,
|
||||||
payload: &[u8],
|
payload: &[u8],
|
||||||
@@ -91,14 +85,13 @@ pub async fn mqtt_publish(
|
|||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enqueue a subscribe request
|
|
||||||
pub async fn mqtt_subscribe(topic: &str) {
|
pub async fn mqtt_subscribe(topic: &str) {
|
||||||
let mut t: HString<TOPIC_MAX> = HString::new();
|
let mut t: HString<TOPIC_MAX> = HString::new();
|
||||||
let _ = t.push_str(&topic[..core::cmp::min(topic.len(), TOPIC_MAX)]);
|
let _ = t.push_str(&topic[..core::cmp::min(topic.len(), TOPIC_MAX)]);
|
||||||
CMD_CHAN.send(MqttCommand::Subscribe(t)).await;
|
CMD_CHAN.send(MqttCommand::Subscribe(t)).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Receiver for incoming MQTT messages (use in main)
|
// Receiver for incoming MQTT messages
|
||||||
pub fn mqtt_events(
|
pub fn mqtt_events(
|
||||||
) -> Receiver<'static, CriticalSectionRawMutex, IncomingMsg, EVENT_QUEUE> {
|
) -> Receiver<'static, CriticalSectionRawMutex, IncomingMsg, EVENT_QUEUE> {
|
||||||
EVT_CHAN.receiver()
|
EVT_CHAN.receiver()
|
||||||
@@ -205,12 +198,10 @@ async fn connected_loop(client: &mut Client<'_, '_>) -> Result<(), ReasonCode> {
|
|||||||
.send(IncomingMsg { topic: t, payload: p })
|
.send(IncomingMsg { topic: t, payload: p })
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
// Error receiving => reconnect
|
|
||||||
Either::First(Err(e)) => {
|
Either::First(Err(e)) => {
|
||||||
info!("MQTT receive error (reconnect): {:?}", e);
|
info!("MQTT receive error (reconnect): {:?}", e);
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
// Ping timer fired
|
|
||||||
Either::Second(_) => {
|
Either::Second(_) => {
|
||||||
client.send_ping().await?;
|
client.send_ping().await?;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user