mkvirtualenv channels-agsi-mqtt
2. pip install django
django-admin startproject mysite
4. 安裝 channels_redis
pip install channels_redis
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | """ ASGI config for mysite project. It exposes the ASGI callable as a module-level variable named ``application``. For more information on this file, see https://docs.djangoproject.com/en/3.0/howto/deployment/asgi/ """ import os import django from channels.routing import get_default_application from channels.layers import get_channel_layer os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings') django.setup() # Application application = get_default_application() # Layers channel_layer = get_channel_layer() |
1 2 3 4 5 6 7 | from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ('celery_app',) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | from __future__ import absolute_import, unicode_literals import os from celery import Celery import mqtt_app.client # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings') print("celery") app = Celery('mysite', broker='redis://localhost:6379') app.config_from_object('django.conf:settings', namespace='CELERY') app.conf.beat_schedule = { 'msg-every-3-seconds': { 'task': 'mqtt_app.tasks.handles', 'schedule': 3, 'args': ('南投好山好水', ) }, } # Load task modules from all registered Django app configs. app.autodiscover_tasks() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | # client.py import paho.mqtt.client as mqtt def on_message(client, userdata, msg): print(msg.topic,msg.payload) def on_connect(client, userdata, flags, rc): print("connect") client.subscribe("nkut/test") client = mqtt.Client('admin') client.on_message = on_message client.on_connect = on_connect client.loop_start() client.connect('broker.hivemq.com', 1883, 60) #client.loop_forever() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | from __future__ import absolute_import, unicode_literals from celery.schedules import crontab from celery import shared_task from asgiref.sync import async_to_sync from channels.layers import get_channel_layer import paho.mqtt.publish as publish channel_layer = get_channel_layer() @shared_task(bind=True) def handles(self, msg): print(f"Celery task say: \"{msg}\"") # publish a message then disconnect. host = "broker.hivemq.com" topic = "tw/rocksaying" # If broker asks user/password. auth = {'username': "", 'password': ""} # If broker asks client ID. client_id = "" publish.single(topic, payload=msg, qos=1, hostname=host) |
celery -A mysite worker --pool=solo -l info
celery -A mysite beat -l info
