1.安裝套件
python -m pip install django-tasks paho-mqtt
python -m pip install -U django-tasks
2.修改settings.py
python -m pip install django-tasks paho-mqtt
python -m pip install -U django-tasks
2.修改settings.py
mysite\settings.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 | """
Django settings for mysite project.
Generated by 'django-admin startproject' using Django 6.0.1.
For more information on this file, see
https://docs.djangoproject.com/en/6.0/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/6.0/ref/settings/
"""
from pathlib import Path
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/6.0/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'django-insecure-c#x)5h0=87595lwnngn75vm%j-=!bpw84-$!e@rs8^nr4-xms^'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'myapp',
"django_tasks",
"django_tasks.backends.database",
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'mysite.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'mysite.wsgi.application'
# Database
# https://docs.djangoproject.com/en/6.0/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}
}
TASKS = {
"default": {
"BACKEND": "django_tasks.backends.database.DatabaseBackend",
# 其他可選設定依 django-tasks 版本而定;先用預設即可
}
}
# Password validation
# https://docs.djangoproject.com/en/6.0/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/6.0/topics/i18n/
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/6.0/howto/static-files/
STATIC_URL = 'static/'
|
3.建立task.py
myapp\task.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | import json
import paho.mqtt.client as mqtt
from django_tasks import task
@task()
def mqtt_publish(topic, payload):
client = mqtt.Client()
client.connect("mqttgo.io", 1883, 60)
client.publish(topic, json.dumps(payload))
client.disconnect()
@task()
def handle_incoming_mqtt(topic: str, payload: dict):
print("MQTT IN:", topic, payload)
|
4.建立mqtt_subscribe.py
myapp\management\commands\mqtt_subscribe.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | import json
import os
from django.core.management.base import BaseCommand
import paho.mqtt.client as mqtt
from myapp.tasks import handle_incoming_mqtt
MQTT_HOST = os.getenv("MQTT_HOST", "mqttgo.io")
MQTT_PORT = int(os.getenv("MQTT_PORT", "1883"))
MQTT_TOPIC = os.getenv("MQTT_TOPIC", "demo/#")
class Command(BaseCommand):
help = "Subscribe mqttgo.io topics and enqueue processing tasks"
def handle(self, *args, **options):
client = mqtt.Client()
def on_connect(c, userdata, flags, rc):
self.stdout.write(self.style.SUCCESS(f"Connected rc={rc}, subscribe {MQTT_TOPIC}"))
c.subscribe(MQTT_TOPIC, qos=0)
def on_message(c, userdata, msg):
try:
payload = json.loads(msg.payload.decode("utf-8"))
except Exception:
payload = {"raw": msg.payload.decode("utf-8", errors="replace")}
# 收到就 enqueue,交給 db_worker 做後續(入庫/告警/分析)
handle_incoming_mqtt.enqueue(msg.topic, payload)
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_HOST, MQTT_PORT, keepalive=60)
client.loop_forever()
|
5.修改views.py
myapp\views.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | from django.http import JsonResponse from .tasks import mqtt_publish from django.http import HttpResponse def index(request): return HttpResponse("您好! 歡迎光臨雲林縣口湖鄉水井村。") def test_mqtt(request): mqtt_publish.enqueue( "test/hello", {"msg": "hello from django task"} ) return JsonResponse({"status": "queued"}) |
6.修改urls.py
1 2 3 4 5 6 7 | from django.urls import path
from . import views
urlpatterns = [
path("", views.index, name="index"),
path("test-mqtt/", views.test_mqtt),
]
|
8.啟動MQTT Go
網址:https://mqttgo.io
連線參數:
-
Host:
broker.mqttgo.io -
Port:
8084 -
Path:
/mqtt
按下Connect鍵
訂閱Subscribe:
-
Topic:
test/hello
Publish:
-
Topic:
test/hello -
Payload:
9.啟動server
同時啟動2個cmd,其中一個輸入下列命令
python manage.py migrate
python manage.py runserver
10.驗證 MQTT 是否真的送出
回到 mqttgo.io Web Client(第一步那個):
Topic:test/hello
你應該會看到:
{"msg": "hello from django task"}
沒有留言:
張貼留言