From 04c5a02ae4e5b8265cc28fa6fe7862e2d7852b4a Mon Sep 17 00:00:00 2001 From: Filipriec Date: Tue, 21 Apr 2026 18:33:03 +0200 Subject: [PATCH] working mqtt with protobuf --- protos/src/main.rs | 72 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 66 insertions(+), 6 deletions(-) diff --git a/protos/src/main.rs b/protos/src/main.rs index 956b13e..4e3f819 100644 --- a/protos/src/main.rs +++ b/protos/src/main.rs @@ -1,5 +1,12 @@ // src/main.rs +use micropb::MessageEncode; +use micropb::PbEncoder; +use micropb::{DecodeError, MessageDecode, PbDecoder, PbRead}; +use rumqttc::{Client, MqttOptions, QoS}; +use std::thread; +use std::time::{Duration, Instant}; + mod transport { #![allow(clippy::all)] #![allow(nonstandard_style, dead_code, unused_imports, unused_parens)] @@ -9,11 +16,64 @@ mod transport { use crate::transport::ahoj_::Example; fn main() { - let msg = Example { - f_int32: 42, - f_bool: true, - ..Default::default() - }; + // let mut mqttoptions = MqttOptions::new("client-123", "mqtt.farmeris.sk", 1883); + let client_id = format!("client-{}", Instant::now().elapsed().as_micros()); + let mut mqttoptions = MqttOptions::new(client_id, "185.55.240.128", 1883); + mqttoptions.set_keep_alive(Duration::from_secs(5)); - println!("{:?}", msg); + let (mut client, mut connection) = Client::new(mqttoptions, 10); + + thread::spawn(move || { + for notification in connection.iter() { + match notification { + Ok(rumqttc::Event::Incoming(rumqttc::Packet::Publish(p))) => { + let mut decoder = PbDecoder::new(p.payload.as_ref()); + let mut message = Example::default(); + if let Ok(_) = message.decode(&mut decoder, p.payload.len()) { + println!("{:?}", message); + } + } + Ok(event) => println!("Other Event: {:?}", event), + Err(e) => { + eprintln!("MQTT Connection notice: {:?}", e); + thread::sleep(Duration::from_secs(1)); + } + } + } + }); + + client.subscribe("test/topic", QoS::AtMostOnce).unwrap(); + + let mut last_pub = Instant::now(); + let mut i = 0; + + loop { + if i < 10 && last_pub.elapsed() >= Duration::from_millis(500) { + let msg = Example { + f_int32: i, + f_bool: true, + f_fixed32: (i * 10) as u32, + f_sfixed32: i * 10, + ..Default::default() + }; + + let mut output = heapless::Vec::::new(); + let mut encoder = PbEncoder::new(&mut output); + // msg.encode(&mut encoder).unwrap(); + + if msg.encode(&mut encoder).is_ok() { + println!("Odosielam Protobuf správu {}", i); + if let Err(e) = + client.publish("test/topic", QoS::AtLeastOnce, false, output.to_vec()) + { + eprintln!("Chyba pri publikovaní: {:?}", e); + } + } + + i += 1; + last_pub = Instant::now(); + } + + std::thread::sleep(Duration::from_millis(10)); + } }