MQTT消息分发服务之开源项目mqttwarn

mqttwarn是一个基于Python的MQTT消息分发服务,其简介是:”This program subscribes to any number of MQTT topics (which may include wildcards) and publishes received payloads to one or more notification services”,翻译过来就是:这个程序可以监听任意个MQTT topic(包括通配符),然后将消息实体发布到一个或多个通知服务。来看一下这个项目的架构图:

https://raw.githubusercontent.com/jpmens/mqttwarn/master/assets/mqttwarn.png

可以看到,mqttwarn订阅了消息,当从MQTT broker取到消息后,分发到各个服务中进行下一步处理,这些服务以可插拔的形式提供,具体可参看项目的配置文件。在我们的项目中,右边的服务插件是MySQL,即我们会将订阅到的消息解析后存放到MySQL数据库中,代码目录结构如下图:

├── common.py
├── constant.py
├── database
│     ├── handlers.py
│     ├── __init__.py
│     └── models.py
├── mqttwarn.py
├── etc
│     ├── mqttsync.default
│     ├── mqttsync.ini
│     ├── mqttsync.logrotate
│     └── mqttsync.service
├── functions.py
├── log.py
├── services
│     ├── my.py

│     ├── plugin.py
└── util.py

根目录下的mqttwarn.py作为整个项目的主文件,也就是通过执行“python mqttwarn.py”命令来启动程序,流程如下:

  1. 程序启动后,执行connect()方法,该方法首先从etc/mqttsync.ini加载各个插件的配置并加载services目录下的各个插件模块,如services/my.py,接着初始化数据库,启动以processor方法为入口的多个线程,多线程个数从配置文件读取;
  2. 设置好on_message,on_connect,on_disconnect回调函数,连接到MQTT Broker;
  3. 如果配置文件中有[cron]节,还会启动PeriodicThread类来达到定时调用某个方法的功能,类似于定时器;
  4. 当连接上MQTT Broker后,on_connect响应函数会被调用,这时候从配置文件拿到需要监听的TOPIC,调用subscribe方法逐个监听;
  5. 当接收到消息时,进入on_message方法,拿到消息的TOPIC后,先查找配置文件中是否有匹配的TOPIC,有的话则调用 send_to_targets(section, topic, payload)进行消息分发,send_to_targets进行一系列处理后,生成一个Job对象并添加到全局字典M_QUEUE中;
  6. 以processor为入口的多线程不断从M_QUEUE中取出Job,通过以下两行代码:

module = service_plugins[service][‘module’]

notified = module.plugin(srv, st)

调用到了具体某个service的plugin方法,如services/my.py里面的代码,如下:

MY_CLIENT_CALLBACKS = {
    MY_ENTRY_TYPE: _entry_cb,
    MY_EXIT_TYPE: _exit_cb,
    MY_CARD_TYPE: _card_cb
}

def plugin(srv, item):
    srv.logging.debug("*** MODULE=%s: service=%s, target=%s", __file__, item.service, item.target)

    service = MYService(srv, item)
    if service.is_from_client():
        service.initialize(True, MY_CLIENT_CALLBACKS)
        notified = service.handle_client()
    else:
        notified = True

    if not notified:
        srv.logging.warn("SERVICE=%s, RETURN FALSE.", item.service)

    return notified

可以看到,首先生成一个MYService对象,然后传入MY_CLIENT_CALLBACKS进行初始化。再往上看代码,可以看到这样的定义:

MY_ENTRY_TYPE  =  ‘MYCARCOME’
MY_EXIT_TYPE      =  ‘MYCARGOOUT’
MY_CARD_TYPE    =  ‘MYFAXING’

具体到我们的项目中,可以知道,MYCARCOME是数据同步系统把数据传递过来时,指定了数据是属于哪个MySQL数据表,即MYCARCOME是表名。于是处理逻辑就是,当收到表MYCARCOME的数据时,调用_entry_cb函数进行处理。具体是怎么调用到的,可以再看上面的代码,在服务初始化后,马上调用了handle_client方法,这个方法在services/plugin.py里面的CommonService类中定义,MYService继承自CommonService类。来看下这个方法的实现:

   def handle_client(self):
         notified = True

         json_list = self._client_loads()
         for data in json_list:
             #self.item.data = data
             table_list = data.keys()
             for table_name in table_list:
                 if table_name in self.client_handlers.keys():
                    notified = self.client_handlers[table_name](self.srv, self.item, data)
         return notified

上面代码中,client_handlers成员变量是在MYService调用initialize方法时,由传入的MY_CLIENT_CALLBACKS初始化的。到这里,我们可以大致知道了,如果同步过来的数据对应的表名为MYCARCOME时,_entry_cb函数就这样被调用了,那再来看下这个函数的关键代码,如下:

    parking_info = ParkingInfo(park_id, parking_id, car_no, card_type)
    parking_info.set_entry(entry_time, entry_code)
    notified = handle_entry(srv, item, parking_info)

ParkingInfo在common.py中定义,这里先从MQTT消息的Payload取出一些信息,用于生成一个ParkingInfo对象然后传递给handle_entry函数,后者又做了些什么呢?

handle_entry函数在database/handlers.py中定义,真正进行数据库操作的功能都在handlers.py中实现。handlers.py中import了诸如CurrentParking类等,这些类大都在database/modles.py中定义,并且继承自peewee.Model类,其中peewee是一个Python ORM库。那么调用CurrentParking类的方法,就可以真正的操作数据库了。

发表评论

电子邮件地址不会被公开。