2026年1月11日 星期日

[Django 6.0] 整合Django和MQTT

延續上一篇文章:[Django 6.0] 建立第一個視圖(View)應用程式

1.安裝套件
python -m pip install django-tasks paho-mqtt
python -m pip install -U django-tasks

2.修改settings.py
mysite\settings.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""
Django settings for mysite project.

Generated by 'django-admin startproject' using Django 6.0.1.

For more information on this file, see
https://docs.djangoproject.com/en/6.0/topics/settings/

For the full list of settings and their values, see
https://docs.djangoproject.com/en/6.0/ref/settings/
"""

from pathlib import Path

# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent


# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/6.0/howto/deployment/checklist/

# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'django-insecure-c#x)5h0=87595lwnngn75vm%j-=!bpw84-$!e@rs8^nr4-xms^'

# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True

ALLOWED_HOSTS = []


# Application definition

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'myapp',
    "django_tasks",
    "django_tasks.backends.database",

]

MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

ROOT_URLCONF = 'mysite.urls'

TEMPLATES = [
    {
        'BACKEND': 'django.template.backends.django.DjangoTemplates',
        'DIRS': [],
        'APP_DIRS': True,
        'OPTIONS': {
            'context_processors': [
                'django.template.context_processors.request',
                'django.contrib.auth.context_processors.auth',
                'django.contrib.messages.context_processors.messages',
            ],
        },
    },
]

WSGI_APPLICATION = 'mysite.wsgi.application'


# Database
# https://docs.djangoproject.com/en/6.0/ref/settings/#databases

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': BASE_DIR / 'db.sqlite3',
    }
}

TASKS = {
    "default": {
        "BACKEND": "django_tasks.backends.database.DatabaseBackend",
        # 其他可選設定依 django-tasks 版本而定;先用預設即可
    }
}

# Password validation
# https://docs.djangoproject.com/en/6.0/ref/settings/#auth-password-validators

AUTH_PASSWORD_VALIDATORS = [
    {
        'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
    },
]


# Internationalization
# https://docs.djangoproject.com/en/6.0/topics/i18n/

LANGUAGE_CODE = 'en-us'

TIME_ZONE = 'UTC'

USE_I18N = True

USE_TZ = True


# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/6.0/howto/static-files/

STATIC_URL = 'static/'

3.建立task.py
myapp\task.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import json
import paho.mqtt.client as mqtt
from django_tasks import task

@task()
def mqtt_publish(topic, payload):
    client = mqtt.Client()
    client.connect("mqttgo.io", 1883, 60)
    client.publish(topic, json.dumps(payload))
    client.disconnect()

@task()
def handle_incoming_mqtt(topic: str, payload: dict):
    print("MQTT IN:", topic, payload)

4.建立mqtt_subscribe.py
myapp\management\commands\mqtt_subscribe.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import json
import os
from django.core.management.base import BaseCommand
import paho.mqtt.client as mqtt
from myapp.tasks import handle_incoming_mqtt

MQTT_HOST = os.getenv("MQTT_HOST", "mqttgo.io")
MQTT_PORT = int(os.getenv("MQTT_PORT", "1883"))
MQTT_TOPIC = os.getenv("MQTT_TOPIC", "demo/#")

class Command(BaseCommand):
    help = "Subscribe mqttgo.io topics and enqueue processing tasks"

    def handle(self, *args, **options):
        client = mqtt.Client()

        def on_connect(c, userdata, flags, rc):
            self.stdout.write(self.style.SUCCESS(f"Connected rc={rc}, subscribe {MQTT_TOPIC}"))
            c.subscribe(MQTT_TOPIC, qos=0)

        def on_message(c, userdata, msg):
            try:
                payload = json.loads(msg.payload.decode("utf-8"))
            except Exception:
                payload = {"raw": msg.payload.decode("utf-8", errors="replace")}

            # 收到就 enqueue,交給 db_worker 做後續(入庫/告警/分析)
            handle_incoming_mqtt.enqueue(msg.topic, payload)

        client.on_connect = on_connect
        client.on_message = on_message

        client.connect(MQTT_HOST, MQTT_PORT, keepalive=60)
        client.loop_forever()

5.修改views.py
myapp\views.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from django.http import JsonResponse
from .tasks import mqtt_publish
from django.http import HttpResponse


def index(request):
    return HttpResponse("您好! 歡迎光臨雲林縣口湖鄉水井村。")

def test_mqtt(request):
    mqtt_publish.enqueue(
        "test/hello",
        {"msg": "hello from django task"}
    )
    return JsonResponse({"status": "queued"})

6.修改urls.py

1
2
3
4
5
6
7
from django.urls import path
from . import views

urlpatterns = [
    path("", views.index, name="index"),
    path("test-mqtt/", views.test_mqtt),
]

8.啟動MQTT Go
網址:https://mqttgo.io
連線參數:
    • Host:broker.mqttgo.io

    • Port:8084

    • Path:/mqtt

  • 按下Connect鍵

  • 訂閱Subscribe:

    • Topic:test/hello

  • Publish:

    • Topic:test/hello

    • Payload:

      {"msg": "hello from mqttgo web"}
  • 測試是否有收到

    9.啟動server
    同時啟動2個cmd,其中一個輸入下列命令
    python manage.py migrate
    python manage.py runserver



    另一個cmd輸入:
    python manage.py db_worker



    再開啟瀏覽器,輸入
    http://127.0.0.1:8000/myapp/test-mqtt/




    10.驗證 MQTT 是否真的送出

    回到 mqttgo.io Web Client(第一步那個):

    Topic:test/hello

    你應該會看到:

    {"msg": "hello from django task"}

    沒有留言:

    張貼留言