2020年1月23日 星期四

採用Python、Django、Channels、MQTT、Celery等技術打造網站背景執行的服務


問題:
在生活中,我們經常地使用網站查資料,因此大多數的網站設計都比較偏向於前端設計,提供很美的人機介面,然而網站需要背景執行來和物聯網設備連接。


1.建構虛擬環境
mkvirtualenv channels-agsi-mqtt

2. pip install django

3.建立專案
django-admin startproject mysite

4. 安裝 channels_redis
pip install channels_redis

mysite.asgi.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
"""
ASGI config for mysite project.

It exposes the ASGI callable as a module-level variable named ``application``.

For more information on this file, see
https://docs.djangoproject.com/en/3.0/howto/deployment/asgi/
"""

import os
import django
from channels.routing import get_default_application
from channels.layers import get_channel_layer

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')
django.setup()

# Application
application = get_default_application()

# Layers
channel_layer = get_channel_layer()

mysite/__init__.py
1
2
3
4
5
6
7
from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

mysite/celery.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
import mqtt_app.client

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')

print("celery")
app = Celery('mysite', broker='redis://localhost:6379')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.conf.beat_schedule = {
    'msg-every-3-seconds': {
        'task': 'mqtt_app.tasks.handles',
        'schedule': 3,
        'args': ('南投好山好水', )
    },
}

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

mqtt_app/client.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# client.py
import paho.mqtt.client as mqtt

def on_message(client, userdata, msg):
    print(msg.topic,msg.payload)
    
def on_connect(client, userdata, flags, rc):
    print("connect")
    client.subscribe("nkut/test")

client = mqtt.Client('admin')

client.on_message = on_message
client.on_connect = on_connect

client.loop_start()
client.connect('broker.hivemq.com', 1883, 60)

#client.loop_forever()

mqtt_app/tasks.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from __future__ import absolute_import, unicode_literals

from celery.schedules import crontab
from celery import shared_task

from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
import paho.mqtt.publish as publish

channel_layer = get_channel_layer()

@shared_task(bind=True)
def handles(self, msg):
    print(f"Celery task say: \"{msg}\"")
    # publish a message then disconnect.
    host = "broker.hivemq.com"
    topic = "tw/rocksaying"

    # If broker asks user/password.
    auth = {'username': "", 'password': ""}

    # If broker asks client ID.
    client_id = ""

    publish.single(topic, payload=msg, qos=1, hostname=host)

測試
redis-server

celery -A mysite worker --pool=solo -l info
celery -A mysite beat -l info








參考資料:

Django Channels
Using Celery and MQTT to launch long running tasks with feedback to the user

Asynchronous Tasks With Django and Celery

Celery User Guide

How to use celery in mqtt


沒有留言:

張貼留言