Uniot Core
0.8.1
Loading...
Searching...
No Matches
MQTTKit.h
Go to the documentation of this file.
1/*
2 * This is a part of the Uniot project.
3 * Copyright (C) 2016-2024 Uniot <contact@uniot.io>
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
26
31
36
37#pragma once
38
39#if defined(ESP8266)
40#include "ESP8266WiFi.h"
41#elif defined(ESP32)
42#include "WiFi.h"
43#endif
44
45#include <Bytes.h>
46#include <CBORObject.h>
47#include <COSEMessage.h>
48#include <ClearQueue.h>
49#include <Common.h>
50#include <Date.h>
51#include <EventListener.h>
52#include <NetworkEvents.h>
53#include <NetworkScheduler.h>
54#include <MQTTEvents.h>
55#include <PubSubClient.h>
56#include <TaskScheduler.h>
57
58#include <functional>
59
60#include "CallbackMQTTDevice.h"
61#include "MQTTDevice.h"
62#include "MQTTPath.h"
63
64namespace uniot {
77 typedef std::function<void(CBORObject &)> CBORExtender;
78 friend class MQTTDevice;
79
80 public:
86 MQTTKit(const Credentials &credentials, CBORExtender infoExtender = nullptr)
87 : mpCredentials(&credentials),
88 mPath(credentials),
89 mInfoExtender(infoExtender),
90 mPubSubClient(mWiFiClient),
91 mNetworkConnected(false),
92 mConnectionId(0) {
93 mPubSubClient.setCallback([this](char *topic, uint8_t *payload, unsigned int length) {
94 mDevices.forEach([&](MQTTDevice *device) {
95 if (device->isSubscribed(String(topic))) {
96 if (!length) {
97 device->handle(topic, Bytes());
98 return;
99 }
100
101 Bytes decoded;
102 if (_readCOSEMessage(Bytes(payload, length), decoded)) {
103 device->handle(topic, decoded);
104 } else {
105 UNIOT_LOG_ERROR("Failed to decode message on topic: %s", topic);
106 }
107 }
108 });
109 });
110 _initTasks();
113
114 // mWiFiClient.allowSelfSignedCerts();
115 // mWiFiClient.setInsecure();
116 }
117
126
132 void setServer(const char *domain, uint16_t port) {
133 mPubSubClient.setServer(domain, port);
134 }
135
143 void addDevice(MQTTDevice &device) {
144 if (mDevices.pushUnique(&device)) {
145 device.kit(this);
146 device.topics()->forEach([this](String topic) {
147 mPubSubClient.subscribe(topic.c_str());
148 });
149 }
150 }
151
159 void removeDevice(MQTTDevice &device) {
160 if (mDevices.removeOne(&device)) {
161 device.kit(nullptr);
162 device.topics()->forEach([this](String topic) {
163 mPubSubClient.unsubscribe(topic.c_str());
164 });
165 }
166 }
167
172 const MQTTPath &getPath() {
173 return mPath;
174 }
175
183 mDevices.forEach([this](MQTTDevice *device) {
184 device->unsubscribeFromAll();
185 device->syncSubscriptions();
186 });
187 }
188
194 virtual void pushTo(TaskScheduler &scheduler) override {
195 scheduler.push("mqtt", mTaskMQTT);
196 }
197
202 virtual void attach() override {}
203
214 virtual void onEventReceived(unsigned int topic, int msg) override {
216 switch (msg) {
218 mNetworkConnected = true;
220 break;
227 default:
228 mNetworkConnected = false;
229 mTaskMQTT->detach();
230 break;
231 }
232 return;
233 }
234 if (events::date::Topic::TIME == topic) {
235 switch (msg) {
237 if (!mTaskMQTT->isAttached()) {
238 mTaskMQTT->attach(10);
239 }
240 break;
241 default:
242 break;
243 }
244 return;
245 }
246 }
247
248 protected:
253 PubSubClient *client() {
254 return &mPubSubClient;
255 }
256
257 private:
261 inline void _initTasks() {
262 mTaskMQTT = TaskScheduler::make([this](SchedulerTask &self, short t) {
263 if (!mNetworkConnected) {
264 UNIOT_LOG_DEBUG("MQTT: Network is not connected");
265 return;
266 }
267 if (!mPubSubClient.connected()) {
268 UNIOT_LOG_DEBUG("Attempting MQTT connection #%d...", mConnectionId);
269 Bytes packetExtention;
270 if (mInfoExtender) {
271 CBORObject packet;
272 mInfoExtender(packet);
273 packetExtention = packet.build();
274 }
275
276 CBORObject offlineCBOR(packetExtention);
277 _prepareOfflinePacket(offlineCBOR);
278 auto offlinePacket = _buildCOSEMessage(offlineCBOR.build());
279 auto password = _getUserPassword();
280 if (mPubSubClient.connect(
281 _getClientId().c_str(),
282 _getUserLogin().c_str(),
283 (const char *)password.raw(),
284 password.size(),
285 mPath.buildDevicePath("status").c_str(),
286 0,
287 true,
288 (const char *)offlinePacket.raw(),
289 offlinePacket.size(),
290 true)) {
291 CBORObject onlineCBOR(packetExtention);
292 _prepareOnlinePacket(onlineCBOR);
293 auto onlinePacket = _buildCOSEMessage(onlineCBOR.build());
294 mPubSubClient.publish(
295 mPath.buildDevicePath("status").c_str(),
296 onlinePacket.raw(),
297 onlinePacket.size(),
298 true); // publish an announcement
299 mDevices.forEach([this](MQTTDevice *device) {
300 device->topics()->forEach([this](String topic) {
301 mPubSubClient.subscribe(topic.c_str());
302 });
303 });
305 } else {
307 }
308 }
309 mPubSubClient.loop();
310 });
311 }
312
319 Bytes _buildCOSEMessage(const Bytes &payload, bool sign = false) {
320 COSEMessage obj;
321 obj.setPayload(payload);
322 auto kid = mpCredentials->keyId(); // NOTE: dynamic data must be within the scope of the obj.build() function
323 if (sign) {
324 obj.sign(*mpCredentials);
325 obj.setUnprotectedKid(kid);
326 }
327 return obj.build();
328 }
329
337 bool _readCOSEMessage(const Bytes &message, Bytes &outPayload) {
338 COSEMessage obj(message);
339 if (obj.wasReadSuccessful()) {
340 outPayload = obj.getPayload();
341 return true;
342 }
343 return false;
344 }
345
353 void _prepareOnlinePacket(CBORObject &packet) {
354 packet
355 .put("online", 1)
356 .put("connection_id", mConnectionId++);
357 }
358
366 void _prepareOfflinePacket(CBORObject &packet) {
367 packet
368 .put("online", 0)
369 .put("connection_id", mConnectionId);
370 }
371
376 String _getClientId() {
377 return "device:" + mpCredentials->getDeviceId(); // TODO: owner
378 }
379
384 String _getUserLogin() {
385 return mpCredentials->getPublicKey();
386 }
387
396 Bytes _getUserPassword() {
397 CBORObject password;
398 auto protectedData = password.putMap("protected");
399 protectedData.put("device", mpCredentials->getDeviceId().c_str());
400 protectedData.put("owner", mpCredentials->getOwnerId().c_str());
401 protectedData.put("creator", mpCredentials->getCreatorId().c_str());
402 protectedData.put("timestamp", static_cast<int64_t>(Date::now()));
403 auto unprotectedData = password.putMap("unprotected");
404 unprotectedData.put("alg", "EdDSA");
405
406 auto signature = mpCredentials->sign(protectedData.build());
407 password.put("signature", signature.raw(), signature.size());
408
409 return password.build();
410 }
411
412 const Credentials *mpCredentials;
413
414 MQTTPath mPath;
415 CBORExtender mInfoExtender;
416 PubSubClient mPubSubClient;
417
418 bool mNetworkConnected;
419 int mConnectionId;
420
421 WiFiClient mWiFiClient;
422 // WiFiClientSecure mWiFiClient; /**< Secure TCP client (commented out) */
423 ClearQueue<MQTTDevice *> mDevices;
424 TaskScheduler::TaskPtr mTaskMQTT;
425};
426
427} // namespace uniot
MQTT event definitions for the Uniot event system.
Network event definitions for the Uniot event system.
Complete WiFi network management and configuration system.
Definition Bytes.h:38
void forEach(VoidCallback callback) const
Executes a callback function on each element in the queue.
Definition ClearQueue.h:325
Definition CBORObject.h:40
Manages device identity and cryptographic credentials for Uniot devices.
Definition Credentials.h:61
void forceSync()
Forces immediate NTP time synchronization.
Definition Date.h:139
static time_t now()
Returns the current Unix timestamp.
Definition Date.h:74
void emitEvent(unsigned int topic, int msg)
EventListener * stopListeningToEvent(unsigned int topic)
EventListener * listenToEvent(unsigned int topic)
Interface for connecting components to the TaskScheduler.
Definition ISchedulerConnectionKit.h:35
virtual void syncSubscriptions()=0
Reconstructs subscriptions after reconnection or credential changes.
bool isSubscribed(const String &topic)
Checks if the device is subscribed to a given topic.
Definition MQTTDevice.cpp:126
void unsubscribeFromAll()
Unsubscribes from all subscribed topics.
Definition MQTTDevice.cpp:55
virtual void handle(const String &topic, const Bytes &payload)=0
Handles incoming MQTT messages.
virtual void pushTo(TaskScheduler &scheduler) override
Registers MQTT tasks with the provided scheduler.
Definition MQTTKit.h:194
friend class MQTTDevice
Definition MQTTKit.h:78
~MQTTKit()
Destructor - cleans up event listeners.
Definition MQTTKit.h:121
void setServer(const char *domain, uint16_t port)
Sets the MQTT broker server address and port.
Definition MQTTKit.h:132
void renewSubscriptions()
Renews all device subscriptions.
Definition MQTTKit.h:182
const MQTTPath & getPath()
Gets the MQTT path helper object.
Definition MQTTKit.h:172
virtual void attach() override
Attaches this kit (empty implementation)
Definition MQTTKit.h:202
void addDevice(MQTTDevice &device)
Adds a device to be managed by this MQTT kit.
Definition MQTTKit.h:143
MQTTKit(const Credentials &credentials, CBORExtender infoExtender=nullptr)
Constructs an MQTTKit instance.
Definition MQTTKit.h:86
PubSubClient * client()
Gets access to the underlying PubSubClient.
Definition MQTTKit.h:253
virtual void onEventReceived(unsigned int topic, int msg) override
Handles network and time events.
Definition MQTTKit.h:214
void removeDevice(MQTTDevice &device)
Removes a device from this MQTT kit.
Definition MQTTKit.h:159
Definition MQTTPath.h:37
Definition TaskScheduler.h:67
static Date & getInstance()
Definition Singleton.h:73
Definition TaskScheduler.h:164
EventListener< unsigned int, int, Bytes > CoreEventListener
Type alias for the common EventListener configuration used in the core system.
Definition EventListener.h:100
#define UNIOT_LOG_DEBUG(...)
Log an DEBUG level message Used for general information about system operation. Only compiled if UNIO...
Definition Logger.h:293
#define UNIOT_LOG_ERROR(...)
Log an ERROR level message Used for critical errors that may prevent normal operation....
Definition Logger.h:226
SharedPointer< SchedulerTask > TaskPtr
Shared pointer type for scheduler tasks.
Definition TaskScheduler.h:171
static TaskPtr make(SchedulerTask::SchedulerTaskCallback callback)
Static factory method to create a task with a callback.
Definition TaskScheduler.h:193
TaskScheduler & push(const char *name, TaskPtr task)
Add a named task to the scheduler.
Definition TaskScheduler.h:214
@ TIME
Time synchronization and date-related operations.
Definition DateEvents.h:61
@ SYNCED
System time has been successfully synchronized with time source.
Definition DateEvents.h:72
@ CONNECTION
MQTT connection state changes and operations.
Definition MQTTEvents.h:61
@ SUCCESS
MQTT connection established successfully.
Definition MQTTEvents.h:73
@ FAILED
MQTT connection failed or was lost.
Definition MQTTEvents.h:72
@ DISCONNECTING
Currently disconnecting from network.
Definition NetworkEvents.h:88
@ SUCCESS
Network operation completed successfully.
Definition NetworkEvents.h:86
@ ACCESS_POINT
Device is operating in access point mode.
Definition NetworkEvents.h:90
@ AVAILABLE
Configured network is available for connection.
Definition NetworkEvents.h:91
@ DISCONNECTED
Network connection has been lost or terminated.
Definition NetworkEvents.h:89
@ FAILED
Network operation failed (connection, scan, etc.)
Definition NetworkEvents.h:85
@ CONNECTING
Currently attempting to connect to network.
Definition NetworkEvents.h:87
@ CONNECTION
WiFi connection state changes and operations.
Definition NetworkEvents.h:74
Contains all classes and functions related to the Uniot Core.