2025年1月1日 星期三

IoT MQTT Panel來察看micro:bit和電腦通訊情形


實作上圖的控制面板

前兩篇文章:micro:bit和電腦的串列通訊程式micro:bit和電腦的串列通訊程式加上MQTT

MQTT Broker:broker.MQTTGO.io

IoT MQTT Panel:AndriodiOS

IoT MQTT Panel的連線設定:

新增一個溫度面板:


三個按鈕面板:







micro:bit和電腦的串列通訊程式加上MQTT

 前一篇文章:micro:bit和電腦的串列通訊程式


電腦執行的Python程式:


  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
import serial
import random
import paho.mqtt.client as mqtt
import time

# MQTT 設定
BROKER = "broker.mqttgo.io"
PORT = 1883
TOPIC_PUBLISH = "temperature"
TOPIC_SUBSCRIBE = "messages"
MQTT_CLIENT_ID = "microbit_client"
USERNAME = ""  # 如果需要帳號,填入使用者名稱
PASSWORD = ""  # 如果需要密碼,填入密碼
message = ""

def on_connect(client, userdata, flags, rc):
    print(f"MQTT 連線狀態: {rc}")
    if rc == 0:
        print("成功連線到 MQTT Broker")
        client.subscribe(TOPIC_SUBSCRIBE)
    else:
        print("無法連線到 MQTT Broker")

def on_message(client, userdata, msg):
    global message
    print(f"接收到來自 {msg.topic} 的訊息: {msg.payload.decode('utf-8')}")
    # 處理收到的訊息
    payload = msg.payload.decode('utf-8')
    if payload in ["heart", "OK", "Error"]:
        message = payload
        print(f"處理訂閱訊息: {payload}")
    else:
        print("收到未知訊息")

def main():
    # 配置串列埠參數
    global message
    port = 'COM4'  # 替換為您的串列埠名稱
    baudrate = 115200  # 串列埠速率

    # 初始化 MQTT 客戶端
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, MQTT_CLIENT_ID)
    client.username_pw_set(USERNAME, PASSWORD)
    client.on_connect = on_connect
    client.on_message = on_message

    # 嘗試連線到 MQTT Broker
    try:
        client.connect(BROKER, PORT, 60)
        client.loop_start()

        # 開啟串列埠
        ser = serial.Serial(port, baudrate, timeout=1)
        print(f"已開啟串列埠: {ser.port}, 波特率: {ser.baudrate}")

        buffer = ""  # 初始化資料緩衝區

        while True:
            # 處理串列埠的輸入
            if ser.in_waiting > 0:
                data = ser.read(ser.in_waiting).decode('latin-1', errors='ignore').strip()
                buffer += data
                print(f"目前緩衝區資料: {buffer}")

                while '|' in buffer:
                    line, buffer = buffer.split('|', 1)  # 提取一行資料
                    line = line.strip()
                    print(f"處理資料: {line}")

                    if line.startswith("T="):
                        try:
                            # 發佈溫度值到 MQTT Broker
                            temperature = int(line.split("=")[1])
                            print(f"當前溫度: {temperature} 度")
                            client.publish(TOPIC_PUBLISH, temperature)
                        except ValueError:
                            print("無法解析溫度值")

            # 模擬隨機傳送訊息給 micro:bit
            #message = random.choice(["heart", "OK", "Error"])
            ser.write((message + "\n").encode('utf-8'))
            print(f"已傳送: {message}")

            time.sleep(1)

    except serial.SerialException as e:
        print(f"串列埠錯誤: {e}")

    except KeyboardInterrupt:
        print("程序中止")

    finally:
        # 清理資源
        if 'ser' in locals() and ser.is_open:
            ser.close()
            print("串列埠已關閉")

        client.loop_stop()
        client.disconnect()
        print("已斷開 MQTT 連線")

if __name__ == "__main__":
    main()

執行結果:
目前緩衝區資料: T=2|
處理資料: T=2
當前溫度: 2 度
已傳送: OK
已傳送: OK
目前緩衝區資料: T=21|
處理資料: T=21
當前溫度: 21 度
已傳送: OK
目前緩衝區資料: T=21
已傳送: OK
目前緩衝區資料: T=21T=21|
處理資料: T=21T=21
無法解析溫度值
已傳送: OK
目前緩衝區資料: T=21|
處理資料: T=21
當前溫度: 21 度
已傳送: OK

連上MQTTGO的執行結果如最上圖

micro:bit和電腦的串列通訊程式


功能:把micro:bit的溫度值經由串列埠傳給電腦,電腦端隨機產生字串傳給micro:bit,改變micro:bit的圖案。

micro:bit 積木程式:



micro:bit端的Python程式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
MyString = ""
serial.redirect(SerialPin.USB_TX, SerialPin.USB_RX, BaudRate.BAUD_RATE115200)

def on_forever():
    global MyString
    MyString = convert_to_text(input.temperature())
    serial.write_string("T=" + MyString + "|")
    MyString = convert_to_text(input.light_level())
    if MyString == "heart":
        basic.show_icon(IconNames.HEART)
    elif MyString == "OK":
        basic.show_icon(IconNames.YES)
    elif MyString == "Error":
        basic.show_icon(IconNames.NO)
    basic.pause(500)
basic.forever(on_forever)

電腦端:
先裝serial套件
pip install pyserial


注意,要觀察當micro:bit連線插到電腦USB埠時,會用那一個通訊埠來和電腦溝通。
Python程式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import serial
import random


def main():
    # 配置串列埠參數
    port = 'COM4'  # 替換為您的串列埠名稱(例如:'/dev/ttyUSB0' 或 'COM3')
    baudrate = 115200  # 串列埠速率
    messages = ["heart", "OK", "Error"]

    try:
        # 開啟串列埠
        ser = serial.Serial(port, baudrate, timeout=1)
        print(f"已開啟串列埠: {ser.port}, 波特率: {ser.baudrate}")

        buffer = ""  # 初始化資料緩衝區

        while True:
            if ser.in_waiting > 0:
                # 嘗試用指定編碼解碼資料
                data = ser.read(ser.in_waiting).decode('latin-1', errors='ignore').strip()
                buffer += data  # 將新資料串接到緩衝區
                print(f"目前緩衝區資料: {buffer.strip()}")

                # 嘗試處理緩衝區中的每一行資料
                while '|' in buffer:
                    line, buffer = buffer.split('|', 1)  # 提取第一行,更新緩衝區
                    line = line.strip()  # 去除行首尾空白
                    print(f"處理資料: {line}")

                    # 檢查是否為溫度資料
                    if line.startswith("T="):
                        try:
                            # 提取溫度值
                            temperature = int(line.split("=")[1])
                            print(f"當前溫度: {temperature} 度")
                        except ValueError:
                            print("無法解析溫度值")

                            
                    # 偵測退出指令
                    if line.lower() == "exit":
                        print("收到 'exit',結束程序")
                        return
                message = random.choice(messages)
                ser.write((message + "\n").encode('utf-8'))
                print(f"已傳送: {message}")

    except serial.SerialException as e:
        print(f"串列埠錯誤: {e}")

    finally:
        # 關閉串列埠
        if 'ser' in locals() and ser.is_open:
            ser.close()
            print("串列埠已關閉")

if __name__ == "__main__":
    main()

執行結果:
目前緩衝區資料: T=21|
處理資料: T=21
當前溫度: 21 度
已傳送: Error
目前緩衝區資料: T=21|
處理資料: T=21
當前溫度: 21 度
已傳送: OK
目前緩衝區資料: T=21|
處理資料: T=21
當前溫度: 21 度
已傳送: heart