按照說明,修改兩個地方,一個是broker位址,另一個是專案名稱。
chasgimqtt -H broker.hivemq.com -p 1883 --topic=some_topic:2 mysite.asgi:channel_layer
執行結果:
由於我們設定虛擬環境是channels-asgi-mqtt,因此要修訂\Envs\channels-asgi-mqtt\Lib\site-packages\chasgimqtt程式碼(187-191行):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 | import os import asyncio import functools import logging import time import signal import json import paho.mqtt.client as mqtt logger = logging.getLogger(__name__) async def mqtt_send(future, channel_layer, channel, event): result = await channel_layer.send(channel, event) future.set_result(result) async def mqtt_group_send(future, channel_layer, group, event): result = await channel_layer.group_send(group, event) future.set_result(result) # Solo para grupos async def mqtt_group_add(future, channel_layer, group): channel_layer.channel_name = channel_layer.channel_name or await channel_layer.new_channel() result = await channel_layer.group_add(group, channel_layer.channel_name) future.set_result(result) # Solo para grupos async def mqtt_group_discard(future, channel_layer, group): result = await channel_layer.group_discard(group, channel_layer.channel_name) future.set_result(result) class Server(object): def __init__(self, channel, host, port, username=None, password=None, client_id=None, topics_subscription=None, mqtt_channel_name = None, mqtt_channel_sub=None, mqtt_channel_pub=None): self.channel = channel self.host = host self.port = port self.client_id = client_id self.client = mqtt.Client(client_id=self.client_id, userdata={ "server": self, "channel": self.channel, "host": self.host, "port": self.port, }) self.username = username self.password = password self.client.on_connect = self._on_connect self.client.on_disconnect = self._on_disconnect self.client.on_message = self._on_message self.topics_subscription = topics_subscription or [("#", 2),] assert isinstance(self.topics_subscription, list), "Topic subscription must be a list with (topic, qos)" self.mqtt_channel_name = mqtt_channel_name or "mqtt" self.mqtt_channel_pub = mqtt_channel_pub or "mqtt.pub" self.mqtt_channel_sub = mqtt_channel_sub or "mqtt.sub" def _on_connect(self, client, userdata, flags, rc): logger.info("Connected with status {}".format(rc)) client.subscribe(self.topics_subscription) def _on_disconnect(self, client, userdata, rc): logger.info("Disconnected") if not self.stop: j = 3 for i in range(j): logger.info("Trying to reconnect") try: client.reconnect() logger.info("Reconnected") break except Exception as e: if i < j: logger.warn(e) time.sleep(1) continue else: raise def _mqtt_send_got_result(self, future): logger.debug("Sending message to MQTT channel, with result\r\n%s", future.result()) def _on_message(self, client, userdata, message): logger.debug("Received message from topic {}".format(message.topic)) payload = message.payload.decode("utf-8") try: payload = json.loads(payload) except: logger.debug("Payload is nos a JSON Serializable\r\n%s", payload) pass msg = { "topic": message.topic, "payload": payload, "qos": message.qos, "host": userdata["host"], "port": userdata["port"], } try: future = asyncio.Future() asyncio.ensure_future( mqtt_send( future, self.channel, self.mqtt_channel_name, { "type": self.mqtt_channel_sub, "text": msg }) ) future.add_done_callback(self._mqtt_send_got_result) except Exception as e: logger.error("Cannot send message {}".format(msg)) logger.exception(e) async def client_pool_start(self): # Loop para recibir mensajes MQTT if self.username: self.client.username_pw_set(username=self.username, password=self.password) self.client.connect(self.host, self.port) logger.info("Starting loop") while True: self.client.loop(0.1) #logger.debug("Restarting loop") await asyncio.sleep(0.1) def _mqtt_receive(self, msg): """ Recibe un mensaje desde el Channel `mqtt.pub` y lo envia al broker MQTT """ # Solo nos interesan los messages del channel asociado al mqtt_channel_pub if msg['type'] == self.mqtt_channel_pub: payload = msg['text'] if not isinstance(payload, dict): payload = json.loads(payload) logger.info("Recibe un menssage con payload: %s", msg) self.client.publish( payload['topic'], payload['payload'], qos=payload.get('qos', 2), retain=False) async def client_pool_message(self): logger.info("Loop de recepción de messages") while True: logger.info("Espera recibir un message desde el channel %s", self.mqtt_channel_name) result = await self.channel.receive(self.mqtt_channel_name) self._mqtt_receive(result) await asyncio.sleep(0.1) def stop_server(self, signum): logger.info("Received signal {}, terminating".format(signum)) self.stop = True for task in asyncio.Task.all_tasks(): task.cancel() self.loop.stop() def run(self): self.stop = False loop = asyncio.get_event_loop() self.loop = loop #for signame in ('SIGINT', 'SIGTERM'): # loop.add_signal_handler( # getattr(signal, signame), # functools.partial(self.stop_server, signame) # ) print("Event loop running forever, press Ctrl+C to interrupt.") print("pid %s: send SIGINT or SIGTERM to exit." % os.getpid()) tasks = asyncio.gather(*[ asyncio.ensure_future(self.client_pool_start()), asyncio.ensure_future(self.client_pool_message()), ]) asyncio.wait(tasks) try: loop.run_forever() finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close() self.client.disconnect() |
再執行完就可以正常工作:
chasgimqtt -H broker.hivemq.com -p 1883 --topic=some_topic:2 mysite.asgi:channel_layer
執行結果:
另外要再執行下列命令。
manage.py runworker mqtt
並利用MQTTBox來進行測試,測試結果如上圖。
接下來,在mqtt_app/tasks.py程式中增加一小段程式,理由是我們發現原程式無法送出MQTT訊息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | from __future__ import absolute_import, unicode_literals from celery.schedules import crontab from celery import shared_task from asgiref.sync import async_to_sync from channels.layers import get_channel_layer import paho.mqtt.publish as publish channel_layer = get_channel_layer() @shared_task(bind=True) def mqtt_test(self, msg): print(f"Celery task say: \"{msg}\"") # publish a message then disconnect. host = "broker.hivemq.com" topic = "some_topic" # If broker asks user/password. auth = {'username': "", 'password': ""} # If broker asks client ID. client_id = "" publish.single(topic, payload=msg+"123", qos=1, hostname=host) async_to_sync(channel_layer.send)('mqtt', { 'type': 'mqtt_pub', 'text': { 'topic': 'some_topic', 'payload': f"{msg} - {self.request.id}" } }) |
測試我們加開3個cmd,分別執行下列指令:
redis-server
celery -A mysite worker --pool=solo -l info
celery -A mysite beat -l info
我們故意在我們程式中加上"123",可以發現僅有加上"123"的"hello"訊號可以送到MQTTBOX。
沒有留言:
張貼留言