使用mongodb、Kafka保存mqtt消息

news/2024/9/24 18:02:57

一、引言

随着物联网技术的迅猛发展,大量的设备和传感器产生了海量的数据。本文利用了 MQTT、Kafka 和 MongoDB 各自的优点,满足实时数据处理和大规模数据存储的需求。
如图:
0

二、总结

优点:

1. 可靠和解耦:

Kafka的复制机制和持久化存储确保了数据在传输过程中的可靠性,即使某个节点发生故障,也不会导致数据丢失,将数据生产者和消费者解耦,各模块可以独立扩展和优化,减少了相互影响。
2. 高可用和灵活性:

MongoDB的复制集和分片机制提供了数据的高可用性和容错能力,保证了数据存储的可靠性和灵活性。

缺点:

1. 复杂度高:

包含多个组件(MQTT、Kafka、MongoDB)配置、部署和维护、各组件之间的协调和集成也增加了实现的复杂性。
2. 延迟:

数据从设备上传到最终存储在MongoDB之间经过多个处理环节,每个环节都可能增加一些延迟。
3. 一致性:

数据在Kafka和MongoDB之间传递时可能需要额外的处理机制来确保一致性。

三、实现

准备工作

使用docker-compose.yml创建Kafka服务和MongoDB,简易代码如下:
version: '3.8'networks:app-tier:driver: bridgeservices:kafka:image: 'bitnami/kafka:latest'networks:- app-tierports:- "9092:9092"environment:- KAFKA_CFG_NODE_ID=0- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLERvolumes:- kafka-data:/bitnami/kafkamongodb:image: 'mongo:latest'networks:- app-tiercontainer_name: mongodbports:- "27017:27017"volumes:- mongo-data:/data/dbvolumes:kafka-data:driver: localmongo-data:driver: local
View Code

实现步骤

1. 设备数据上传:
服务端代码
var mqttFactory = new MqttFactory();var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpointPort(1883)//监听的端口
                .WithDefaultEndpoint().WithoutEncryptedEndpoint()// 不启用tls.WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(10 * 1000))//10秒超时.WithPersistentSessions(true)//启用session.WithConnectionBacklog(1000)//积累的最大连接请求数
                .Build();using (var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions)){AddMqttEvents(mqttServer);await mqttServer.StartAsync();Console.WriteLine("Press Enter Ctrl+C to exit.");Console.ReadLine();Console.CancelKeyPress += async (sender, e) =>{e.Cancel = true; // 防止进程直接终止await mqttServer.StopAsync();Environment.Exit(0);};}private static void AddMqttEvents(MqttServer mqttServer){MqttServerEvents mqttEvents = new MqttServerEvents();mqttServer.ClientConnectedAsync += mqttEvents.Server_ClientConnectedAsync;mqttServer.StartedAsync += mqttEvents.Server_StartedAsync;mqttServer.StoppedAsync += mqttEvents.Server_StoppedAsync;mqttServer.ClientSubscribedTopicAsync += mqttEvents.Server_ClientSubscribedTopicAsync;mqttServer.ClientUnsubscribedTopicAsync += mqttEvents.Server_ClientUnsubscribedTopicAsync;mqttServer.ValidatingConnectionAsync += mqttEvents.Server_ValidatingConnectionAsync;mqttServer.ClientDisconnectedAsync += mqttEvents.Server_ClientDisconnectedAsync;mqttServer.InterceptingInboundPacketAsync += mqttEvents.Server_InterceptingInboundPacketAsync;mqttServer.InterceptingOutboundPacketAsync += mqttEvents.Server_InterceptingOutboundPacketAsync;mqttServer.InterceptingPublishAsync += mqttEvents.Server_InterceptingPublishAsync;mqttServer.ApplicationMessageNotConsumedAsync += mqttEvents.Server_ApplicationMessageNotConsumedAsync;mqttServer.ClientAcknowledgedPublishPacketAsync += mqttEvents.Server_ClientAcknowledgedPublishPacketAsync;}
View Code

客户端代码

 var mqttFactory = new MqttFactory();var mqttClient = mqttFactory.CreateMqttClient();var mqttOptions = new MqttClientOptionsBuilder().WithClientId("MqttServiceClient").WithTcpServer("127.0.0.1", 1883).Build();mqttClient.ConnectedAsync+=(e =>{Console.WriteLine("MQTT连接成功");return Task.CompletedTask;});mqttClient.DisconnectedAsync+=(e =>{Console.WriteLine("MQTT连接断开");return Task.CompletedTask;});await mqttClient.ConnectAsync(mqttOptions, CancellationToken.None);//发送消息
MqttApplicationMessage applicationMessage = new MqttApplicationMessage{Topic = "mqtttest",PayloadSegment = new ArraySegment<byte>(System.Text.Encoding.UTF8.GetBytes(input))};var res = await mqttClient.PublishAsync(applicationMessage);
View Code
2. Kafka消息处理:

生产者代码

        var config = new ProducerConfig{BootstrapServers = "localhost:9092"};using var producer = new ProducerBuilder<string, string>(config).Build();try{var message = new Message<string, string>{Key = e.ClientId,Value = JsonConvert.SerializeObject(e.Packet)};var deliveryResult = await producer.ProduceAsync("mqttMsg-topic", message);Console.WriteLine($"Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");}catch (ProduceException<string, string> ke){Console.WriteLine($"Delivery failed: {ke.Error.Reason}");}    
View Code

消费者代码

var config = new ConsumerConfig{GroupId = "my-consumer-group",BootstrapServers = "127.0.0.1:9092",AutoOffsetReset = AutoOffsetReset.Earliest};using var consumer = new ConsumerBuilder<string, string>(config).Build();consumer.Subscribe("mqttMsg-topic");
//消费消息并保存到mongodbvar client = new MongoClient("mongodb://127.0.0.1:27017");var collection = client.GetDatabase("mqtttest").GetCollection<BsonDocument>($"history_{DateTime.UtcNow.Year}_{DateTime.UtcNow.Month}");while (true){try{var consumeResult = consumer.Consume(cancellationToken.Token);Console.WriteLine($"收到Kafka消息 '{consumeResult.Message.Value}' at: '{consumeResult.TopicPartitionOffset}'.");var document = new BsonDocument{{ "clientId", consumeResult.Message.Key },{ "JsonData", MongoDB.Bson.Serialization.BsonSerializer.Deserialize<BsonDocument>(consumeResult.Message.Value) },//不同设备上报数据格式不一定一样{ "created", DateTime.UtcNow }};await collection.InsertOneAsync(document);}catch (ConsumeException e){Console.WriteLine($"处理Kafka消息异常: {e.Error.Reason}");}}
View Code

源码地址:https://github.com/jclown/MqttPersistence

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ryyt.cn/news/46398.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

【C#】WPF 类库项目 无法创建 “资源字典” 文件

解决办法 打开项目工程文件 ( project.csproj) 在 标签添加 下面红色的三句话<Deterministic>true</Deterministic><ProjectTypeGuids>{60dc8134-eba5-43b8-bcc9-bb4bc16c2548};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids> <…

视频通话源码,使用线程池的两大要点分析

视频通话源码,使用线程池的两大要点分析:1、实现动态调整线程池参数2、对线程池运行情况进行监控一,线程池可调整的参数1、核心线程数2、超时时间3、最大线程数4、拒绝策略 而队列BlockingQueue因为是final类型,所以没有对外修改入口。但可以通过重写LinkedBlockingQueue并…

1v1直播源码,保证请求时序的两种常用方法

1v1直播源码,保证请求时序的两种常用方法 在1v1直播源码中经常遇到请求输入查找场景,防抖与截流很好处理了频繁输入问题,但是不能解决最先发起请求结果后返回,覆盖了最后一次的搜索结果,导致搜索结果不正确。我总结一下自己常用的两种方法。一、使用时间戳来过滤返回结果…

WebView2UI - 在WPF之中使用WebView2的一些经验总结

项目地址:https://github.com/skyw18/skyw18-WebView2UI webview简介与生命周期:WPF 应用中的 WebView2 入门 - Microsoft Edge Developer documentation | Microsoft Learn 具体代码可以参考微软官方示例文档 WPF 示例应用 - Microsoft Edge Developer documentation | Micr…

AbpVnext系列三 添加种子项目

一、src下面增加DbMigrations类库,注意是要.Net Framework 类型的类库,不能是.Net Standard 的。 二、添加类库后为项目添加如上三个项目 appsetting.json 配置信息{"ConnectionStrings": {"AidenAdmin": "Server=127.0.0.1;port=3306;Database=…

6. 在WEB中应用MyBatis(使用MVC架构模式)

学习目标:掌握mybatis在web应用中怎么用 mybatis三大对象的作用域和生命周期 ThreadLocal原理及使用 巩固MVC架构模式 为学习MyBatis的接口代理机制做准备实现功能:银行账户转账 使用技术:HTML + Servlet + Mybatis1. 需求描述 ​​ 2. 数据库表的设计和准备数据 创建数据库…

数字园区规划

数字园区规划 | 数字经济产业园规划、数字孪生产业园区规划设计 2024-03-10 19:00书生产业规划内容导读: 【一】数字产业园区发展环境 【二】数字产业园区市场现状 【三】数字产业园区建设需求 【四】数字园区建设赋能手段 【五】数字园区规划建设建议当下,数字经济已上升为我…

Fortran哈希函数库的使用

哈希表hash table,类似于python中的字典,可以实现基于字符串的索引。即根据输入的数据(整数,浮点数,字符串等),对应到唯一的数据。这个特性对于气象编程中的根据站点信息检索数据十分有用。由于Fortran标准库中没有功能的实现,需要自己编写函数。github已经有大神编写好…