กลไกการสื่อสาร MQTT เป็นการสื่อสารโดยใช้วิธีการแบบใด

MQTT ย่อมาจาก Message Queuing Telemetry Transport เป็นโปรโตคอลสำหรับใช้ในสื่อสารข้อมูลระหว่าง Machine to Machine (M2M) ถูกคิดค้นขึ้นในปี ค.ศ. 1999 โดย Dr Andy Stanford-Clark จาก IBM และ Arlen Nipper จาก Arcom (now Eurotech) ออกแบบมาเพื่อใช้สื่อสารในระบบเครือข่ายที่มีทรัพยากรค่อนข้างจำกัด ใช้งานแบนด์วิธต่ำ สามารถ publish-subscribe ข้อมูลระหว่าง Device เพื่อสื่อสารกันระหว่างอุปกรณ์ และถ้ามองในด้านที่เกี่ยวกับ Internet of Things จะสามารถประยุกต์ให้อุปกรณ์ต่างๆเชื่อมต่อกันผ่านเครือข่ายของอินเทอร์เน็ตได้ ทำให้เราสามารถสร้างสรรค์โครงงานที่เกี่ยวกับการติดตามอุปกรณ์ เช่น มอนิเตอร์อุปกรณ์ผ่านอินเทอร์เน็ต ควบคุมอุปกรณ์ผ่านอินเทอร์เน็ต เป็นต้น

กลไกการสื่อสาร MQTT เป็นการสื่อสารโดยใช้วิธีการแบบใด

MQTT ประกอบด้วย 2 ส่วน คือ

– MQTT Client เป็นส่วน publish ข้อมูลต่างๆ ขึ้นไปยัง MQTT Broker และสามารถ Subscribe ข้อมูลต่างๆจาก MQTT Broker ผ่านทาง TCP/IP Protocol ถ้ามองในมุมมองของ Internet of Things (IoT) อุปกรณ์จำพวกนี้จะเป็น Device ที่สามารถเชื่อมต่อกับระบบเครือข่ายได้ เช่น บอร์ด Arduino Uno Wifi 2, Arduino MKR Wifi 1010, บอร์ด ESP32, บอร์ด ESP8266, บอร์ด Raspberry Pi, เว็ปไซต์, สมาร์ทโฟน

– MQTT Broker หรือ MQTT Server เป็นจุดศูนย์กลางในการรับส่งข้อความระหว่างไคลเอนต์ วิธีการกำหนดเส้นทาง (Routing) กระทำผ่าน Topic โดยไคลเอนต์ Subscribe ใน Topic ที่ตนต้องการ จากนั้นโบรกเกอร์จะส่งข้อความทั้งหมดที่ถูก Publish ใน Topic นั้นๆ ไปให้ ดังนั้นไคลเอนต์จึงสื่อสารกันได้โดยไม่จำเป็นต้องรู้จักกัน ช่วยลดความเกี่ยวพันระหว่างผู้สร้างข้อมูลและผู้ใช้ข้อมูล ส่งผลให้การขยายตัวของเครือข่ายทำได้ง่าย นอกจากนี้หน้าที่ที่สำคัญอีกประการของโบรกเกอร์คือการรักษาความปลอดภัยของไคลเอนต์ (Authorization, Authentication) ซึ่งในส่วนนี้สามารถขยายเพิ่มเติม หรือนำไปเชื่อมกับกลไกความปลอดภัยของระบบหลังบ้านที่มีอยู่แล้วได้ ช่วยให้นำโบรกเกอร์เข้าไปใช้งานเป็นส่วนหนึ่งของระบบอื่นๆ ได้ ส่วน Authorization ของ NETPIE ซึ่งจะได้กล่าวถึงต่อไปในหัวข้อที่ 1.4 ก็ถือเป็นตัวอย่างหนึ่งของการขยายเพิ่มเติมการรักษาความปลอดภัยของโบรกเกอร์ใน MQTT ปัจจุบันมีโบรกเกอร์ MQTT ที่เปิดให้ดาวน์โหลดไปใช้หรือดัดแปลงอยู่หลายรายได้แก่ Mosquitto, RabbitMQ, Erlang, VerneMQ เป็นต้น หรือใช้ Single Board Computer เช่นบอร์ด Raspberry Pi, LattePanda, Beagle Bone, nanoPi, อื่นๆ

เมื่อเปรียบเทียบ MQTT กับ HTTP (REST) ที่มีสถาปัตยกรรมแบบ Request/Response จะพบว่า MQTT มีความได้เปรียบที่โบรกเกอร์สามารถผลัก (Push) ข้อความไปยังไคลเอนต์ได้ตามเหตุการณ์ (Event-driven) ในขณะที่เมื่อใช้ HTTP ฝั่งไคลเอนต์ต้องคอยโพลข้อมูลเป็นระยะๆ และต้องตั้งค่าคาบเวลาการโพลไว้ก่อนล่วงหน้า โดยแต่ละครั้งต้องมีการสร้างการเชื่อมต่อขึ้นใหม่และอาจจะไม่มีข้อมูลใหม่ใดๆ ให้อัพเดท ดังนั้นหากต้องการให้ระบบทำงานแบบ Real Time หรือใกล้เคียง ย่อมหมายถึงต้องตั้งคาบเวลาการโพลให้สั้น และความสิ้นเปลืองของการใช้ช่องสัญญาณที่ไม่จำเป็นที่ตามมา นี่จึงเป็นอีกเหตุผลสำคัญที่ทำให้ MQTT ได้รับความนิยมเหนือ REST สำหรับการใช้งานแบบ M2M นอกเหนือจากการมีน้ำหนักเบา

MQTT Topics

MQTT Topic เป็น UTF-8 String ในลักษณะเดียวกับ File Path คือสามารถจัดเป็นลำดับชั้นได้ด้วยการขั้นด้วย “/” ตัวอย่างเช่น myhome/floor-one/room-c/temperature ไคลเอนต์สามารถเลือก Publish หรือ Subscribe เฉพาะ Topic หรือ Subscribe หลาย Topic พร้อมๆ กันโดยใช้ Single-Level Wildcard (+) เช่น myhome/floor-one/+/temperature หมายถึงการขอเขียนหรือรับข้อความ temperature จากทุกๆ ห้องของ myhome/floor-one หรือ Multi-Level Wildcard (#) เช่น myhome/floor-one/# หมายถึงการขอเขียนหรือรับข้อความทั้งหมดที่มี Topic ขึ้นต้นด้วย myhome/floor-one เป็นต้น

เราสามารถกำหนด Topic อย่างไรก็ได้ โดยมีข้อยกเว้นการขึ้นต้น Topic ด้วยเครื่องหมาย “$” ซึ่งจะจำกัดไว้สำหรับการเก็บสถิติภายในของตัวโบรกเกอร์เท่านั้น ดังนั้นไคลเอนต์จะไม่สามารถ Publish หรือ Subscribe ไปยัง Topic เหล่านี้ได้ โดยทั่วไป Topic เหล่านี้จะขึ้นต้นด้วย $SYS

MQTT Quality of Service (QoS)

ไคลเอนต์จะเป็นผู้กำหนดระดับของบริการส่งและรับข้อความหรือ QoS ที่ตนต้องการในแต่ละ Topic ในแพ็กเกต PUBLISH หรือ SUBSCRIBE และโบรกเกอร์จะตอบสนองด้วย QoS ระดับเดียวกันสำหรับ Topic นั้นๆ

เวลาเราเดินไปซื้อของเล่น หรือเครื่องใช้ไฟฟ้าในปัจุบัน เราเริ่มเห็นกันบ้างแล้วละว่า มันเริ่มมีคำว่า Smart นั่น Smart นี่เต็มไปหมด (พูดเรื่องนี้แล้วคันปาก มัน Smart ยังไงฟร๊ะ !!) ซึ่งจริง ๆ ก็คือ มันทำให้อุปกรณ์มันต่อ Internet ได้ หรือที่เราเรียกว่า IoT (Internet of Things) นั่นเอง

เวลาเราไปหาเรื่องของ IoT เราอาจจะเห็นคนพูดประมาณว่า เป็นยุคที่ของทุกอย่างจะต่ออินเตอร์เน็ตได้ เครื่องมันจะคุยกันเองมากขึ้นนั่นนี่ ทำให้นำมาสู่คำถามที่ว่า เอ๊ะ แล้วมันคุยกันยังไงละ นี่ละคือเรื่องของเราในวันนี้คือ MQTT ซึ่งเป็น Protocol ที่อุปกรณ์ IoT ต่าง ๆ ใช้เป็น Protocol ในการคุยกัน

ก่อนจะ MQTT เครื่องคุยกันยังไงได้บ้าง ?

เราได้กล่าวไปแล้วว่า MQTT เอาไว้ใช้ในการติดต่อกันระหว่างเครื่องกับเครื่อง หรือเราเรียกว่า Machine-to-Machine (M2M) ทีนี้ ถ้าเราไม่ใช้ MQTT เราจะทำยังไงดีละ อื้ม... นั่นสินะ ยังไงดีนะ

กลไกการสื่อสาร MQTT เป็นการสื่อสารโดยใช้วิธีการแบบใด

ถ้าเราเชื่อมต่อผ่านสายละ ก็เป็นไปได้นะ จริง ๆ แล้วเราก็ยังใช้อะไรแบบนี้กันอยู่ อย่างเช่นการเชื่อมต่อผ่าน Serial เด็ก ๆ อาจจะ งง อะไรอะพี่ Serial ไม่รู้จักเลย มันเป็นวิธีหนึ่งในการส่งข้อมูลกัน โดยที่เราจะค่อย ๆ ส่งทีละ Bit เลย ทำให้ ถ้าเราต้องการจะส่งข้อมูลพร้อม ๆ กันสัก 10-Bits นั่นแปลว่า เราจะต้องใช้สายทั้งหมด 10 เส้นไปเลย ตัวอย่างของพวก Serial ถ้าเอาใกล้ ๆ เลยคือถ้าเราไปเล่นพวก Board ต่าง ๆ อย่าง ESP32 หรือ 8266 เวลาเราเสียบเข้าไปในเครื่อง มันก็เป็นการเชื่อมต่อผ่าน Serial เหมือนกัน ทำให้เราสามารถเห็นสิ่งที่เครื่องกำลังทำงานได้นั่นเอง แต่โหยยย มันก็ต้องใช้สายอะไรเยอะแยะมาก ๆ เลยนะ ไหวอ่อ เบเบ้

หรือเราอาจจะใช้ Protocol บนพวก Computer Network อย่างพวก HTTP (Hyper Text Transport Protocol) ก็ได้ ลักษณะของ HTTP เราคิดง่าย ๆ เลย คือ เป็นลักษณะของ Request/Response กล่าวคือ Client ทำการส่ง Request ไปขออะไรบางอย่างจาก Server และ Server ตอบกลับมาเป็น Response คืนให้กับ Client นั่นเอง

ต้องบอกก่อนว่า HTTP มันถูกคิดมานานมาก ๆ แล้ว ก่อนที่เราจะมีคำว่า IoT ซะอีก ดังนั้น ตอนที่เขาคิดมา เขาก็ไม่ได้คิดเผื่อแน่ ๆ ละ พอมาเจอกับ IoT มันเลยมีปัญหาอยู่นิดหน่อย 2 เรื่องคือ น้ำหนัก และ ลักษณะของการส่ง

กลไกการสื่อสาร MQTT เป็นการสื่อสารโดยใช้วิธีการแบบใด

อย่างแรกคือ นำ้หนัก ถ้าเราลองเข้าไปดูดี ๆ เวลาสื่อสารผ่าน HTTP มันไม่ได้มีแค่ข้อมูลที่เราต้องการ แต่มันมีพวก Header ต่าง ๆ ที่ค่อยบอก Status เช่น 200 และ 400 ไหนจะพวก User-agent อะไรอีกเยอะแยะมากมาย ซึ่งต้องเข้าใจว่าพวก IoT Device ส่วนใหญ่ มันมีขนาดเล็ก ทำให้มีพลังในการประมวลผลต่ำ ทำให้บางที การต้องมา Process Header เอง อาจจะเป็นเรื่องที่โหดเกินไป ทำให้กินพลังงานมากกว่าที่ควรจะเป็นแน่ ๆ

และอีกปัญหาคือ ลักษณะการส่ง อย่างที่บอกว่า มันต้องใช้ทั้ง Request และ Response ส่งไปมาระหว่างอุปกรณ์ นึกภาพว่า ถ้าเรามี Board ที่วัดอุณหภูมิ และ ความชื้นแล้วเราต้องส่งค่าทุก 1 วินาที ถ้าเราใช้ HTTP เราก็ต้องยิง Request บอกไปว่า เราจะขอเก็บข้อมูลนี้นะ  Server มันต้องส่ง Response เป็นสถานะกลับไป ระหว่างวัน IoT Device มันก็ต้องรองั้นเหรอ มันก็ไม่ใช่ทางที่มีประสิทธิภาพเท่าไหร่นั่นเอง ยังไม่นับว่า ถ้าโลกมันมี IoT เต็มไปหมดจริง ๆ Server อ้วกแน่นอน เช่นในบ้านมี IoT สัก 150 ตัว ทุก ๆ ตัวส่งข้อมูลทุกวินาที นั่นแปลว่า Server ต้องรับข้อมูลจัดการให้เรียบร้อย และ ตอบ Response กลับไป 150 ครั้งต่อวินาทีเลย ถือว่าโหดอยู่นะ แล้วถ้าเอาไปใช้ในโรงงานจริง ๆ อาจจะต้องกดเป็น 1,000 หรือมากกว่านั้นอีก Server เอาไม่อยู่แน่นอนกับ HTTP

พอความซับซ้อนของ Protocol มันเยอะ นอกจากจะใช้เวลาในการ Process เยอะแล้ว อีกเรื่องที่สำคัญไม่แพ้กันคือ การใช้พลังงาน อย่างที่บอกว่า อนาคต เราอาจจะมี IoT เต็มบ้านไปหมด เราก็ไม่อยากให้บ้านเราจ่ายค่าไฟแพงขึ้นแบบรัว ๆ เพราะการใช้ IoT แน่ ๆ ดังนั้น การลดความซับซ้อนของ Protocol ก็ช่วยให้ CPU ทำงานน้อยลง และ ทำให้เราประหยัดไฟมากขึ้นนั่นเอง

MQTT คืออะไร ?

MQTT หรือ Message Queuing Telemetry Transport เป็น Protocol หนึ่งในการส่งข้อมูลที่เบามาก ๆ มันใช้ Model ที่เรียกว่า Publisher/Subscriber โดยที่มันออกแบบมาเฉพาะกับการใช้งานกับพวก IoT เลย เพราะมีโครงสร้างที่ไม่ซับซ้อน และ มี Header ขนาดเล็กมาก ๆ

ทำให้ตอนนี้เราจะมีตัวละครอยู่ 2 ตัวด้วยกัน คือ Publisher ที่เป็นคนส่งข้อมูลออกไป อาจจะเป็น Board ที่ต่อกับ Sensor ต่าง ๆ เช่น DHT22 สำหรับ อุณหภูมิ และ ความชื้น และ Subscriber ที่จะเป็นตัวรับข้อมูล แต่เอ๊ะ มันก็ยังไม่ได้แก้ปัญหาการเชื่อมต่อแบบ 1-1 นิน่า ใช่แล้ว

เพราะในการทำงาน เรายังขาดตัวละครอีกตัวคือ Broker มาทำหน้าที่เป็นเหมือนตัวกลางเชื่อมระหว่าง Publisher และ Subscriber เข้าด้วยกัน โดยที่ทุกคน ๆ จะต้องเชื่อมต่อกับ Broker แต่แน่นอนว่า เราไม่ได้ใช้ Broker ตัวนึงต่อเรื่อง ๆ เดียวแน่นอน เช่น ส่งแค่ อุณหภูมิอย่างเดียว แล้วถ้าเราต้องการส่งอย่างอื่นเราต้องเปิด Broker ใหม่ ก็ไม่น่าจะดีเท่าไหร่

ทำให้วิธีที่ใช้ในการจัดการกับเรื่องนี้คือ การใส่ Topic ลงไป เพื่อเป็นการบอกหัวเรื่องว่า ข้อมูลนี้คือ เรื่องอะไรนั่นเอง โดยที่ Topic ทั่ว ๆ ไป เราจะใช้เป็น 2 ระดับด้วยกันคือ Device/Topic ตัวอย่างเช่น kitchen/humidity ก็จะเป็นการสื่อถึงอุณหภูมิของห้องครัวก็ได้ หรืออาจจะใช้เป็น kitchen/light/switch เพื่อรับคำสั่งเปิดหรือปิดนั่นเอง

ดังนั้นเวลา Subscriber จะเชื่อมต่อไปที่ Broker เพื่อรับข้อมูลนั้น เราจะเรียกว่าการ Subscribe โดยที่เราจะต้องบอกด้วยว่า เราจะให้มัน Subscribe ที่ Topic ไหน เพื่อบอกให้ Broker รู้ว่า ถ้าได้ข้อมูลจาก Topic นั้น ๆ มา ให้ส่งให้ Subscriber เครื่องนี้ได้เลย

กลับกัน เวลา Publisher จะส่งข้อมูลออกมาให้ Broker เราก็ต้องบอก Topic ด้วยเช่นกัน เพื่อให้ Broker รู้ว่า เรื่องที่ส่งเข้ามามันคืออะไร จะได้ส่งต่อให้ Subscriber ถูกนั่นเอง การทำแบบนี้ ทำให้ IoT ที่ Publish ข้อมูลเข้ามา ไม่ต้องทำหน้าที่ในการส่งข้อมูลให้หลาย ๆ เครื่อง แต่เรากระจายงานนี้ให้กับ Broker แทนนั่นเอง

มาลองเล่น MQTT อย่างง่ายกัน

จากหัวข้อก่อนหน้า เรารู้ตัวละครในการใช้งาน MQTT กันแล้ว ตอนนี้เราจะมาลอง Implement ตัวละครเหล่านี้กัน เริ่มจากการติดตั้ง Broker กันก่อน ซึ่งมันจะมีหลายตัวให้เราเลือกใช้เยอะมาก ตัวที่เราจะให้ลองทำก็คือ Mosquitto ซึ่งเราสามารถติดตั้งลงไปในเครื่องของเราเลยก็ได้ หรืออาจจะติดตั้งลงใน Board ต่าง ๆ เช่น Raspberry Pi ก็ได้

docker run -it -p 1883:1883 -p 9001:9001 -v mosquitto.conf:/mosquitto/config/mosquitto.conf -v /mosquitto/data -v /mosquitto/log eclipse-mosquitto

วิธีการติดตั้ง เราอาจจะติดตั้งลงไปในเครื่องของเราได้ตรง ๆ เลย ถ้าใช้พวก Linux เราสามารถติดตั้งผ่าน APT ได้ตรง ๆ เลย แล้วเข้าไปตั้งค่า หรือ ถ้าเราง่าย เราแนะนำให้ไปใช้เป็น Docker Container น่าจะง่ายกว่า ให้เราติดตั้ง Docker และรันคำสั่งด้านบนได้เลย ก็จะเป็นการสร้าง Mosquitto Container ขึ้นมาพร้อมใช้งานได้เลย อ่านเพิ่มเติมได้ใน Docker Hub หรือ เราใช้ Unraid อยู่ มันมีคนทำ Mosquitto แล้วเพิ่มส่วนในการ Customisation สำหรับ Unraid ได้เลย ดูใน cmccambridge/mosquitto-unraid

pip install paho-mqtt

หลังจากนั้น เราจะต้องมาสร้างตัว Subscriber และ Publisher กัน ซึ่งปกติแล้ว มันก็จะมีโปรแกรมสำเร็จรูปไว้ให้เราทดสอบได้ทั้ง 2 ฝั่ง แต่วันนี้เราจะไม่พูดถึงเรื่องนั้น แต่เราจะมาเขียนเองกันเลยดีกว่า โดยใช้ Python แต่ก่อนอื่น เราจะต้องติดตั้ง Library สำหรับการใช้งาน MQTT กันก่อน โดยผ่านคำสั่งด้านบนได้เลย หลังจากเราติดตั้งแล้ว เรามาค่อย ๆ ลองเขียนทีละส่วนกันดีกว่า

import time 
import paho.mqtt.client as mqtt

# Creating client
client = mqtt.Client()

# Connecting to Broker
client.connect('YOUR_BROKER_ADDRESS', BROKER_PORT)

# Looping Forever
packet_count = 0

while True :
    packet_count += 1
    print('Sending message', packet_count)
    client.publish('computer/switch', 'ON')
    time.sleep(1)
publisher.py

ก่อนอื่นเลย เราขอเริ่มจากการเขียน Publisher ขึ้นมาก่อน จาก Code ด้านบนเลย เราจะเห็นได้เลยว่ามันสั้นมาก ๆ สิ่งที่มันทำหลัก ๆ คือ การเชื่อมต่อไปที่ Broker และทำการ Publish ค่าออกไปทุก 1 วินาทีด้วยกัน เราลองมาดูทีละส่วนกันดีกว่า

import paho.mqtt.client as mqtt

client = mqtt.Client()

อย่างแรกที่เราทำเลยคือ การ Import Client จาก Paho ที่เราพึ่งติดตั้งลงไปนั่นเอง อย่าลืม Client เราต้องใช้ C ตัวใหญ่นะ

client.connect('YOUR_BROKER_ADDRESS', BROKER_PORT)

จากนั้น เราจะต้องทำการบอกให้ Client ที่เราสร้างขึ้นมาเชื่อมต่อไปที่ MQTT Broker ของเราผ่านคำสั่ง connect() โดยสิ่งที่เราต้องบอกมันคือ ที่อยู่ของ Broker ของเราว่าอยู่ที่ไหน ถ้าอยู่ในวง LAN เดียวกันเราก็สามารถบอกเป็น IP Address ได้เลย ส่วน Port ก็จะเป็น Port ที่เราตั้งไว้สำหรับ Broker โดย Default เราจะใช้เป็น 1883

while True :
    packet_count += 1
    print('Sending message', packet_count)
    client.publish('computer/switch', 'ON')
    time.sleep(1)

หลังจากเราเชื่อมต่อเข้าไปได้แล้ว เราจะมาเริ่มส่งค่ากัน ในตัวอย่างนี้ เพื่อให้ง่าย เราจะให้มันวนส่งค่าไปเรื่อย ๆ แบบไม่มีเงื่อนไขเลย เราเลย เอาทั้งหมดไว้ใน While Loop แล้วทำให้มันเป็น Infinity Loop ไปเลย รันไปเรื่อย ๆ จนกว่าเราจะกด Ctrl+C เพื่อหยุดโปรแกรม ภายใน Loop เราจะเขียน packet_count กับ Print ออกมาในบรรทัดต่อไปว่า เราส่งออกไปแล้วกี่รอบ คำสั่งสำคัญเลยคือ client.publish() คือส่วนที่เราใช้ Publish ค่าออกไป โดยที่เราจะต้องกรอก Topic ในตัวอย่างนี้คือ computer/switch และ ค่าที่จะส่งไปคือคำว่า ON ตรง ๆ เลย แต่ถ้าเราทำแค่นี้ เราก็จะมีปัญหาแน่นอน

นึกสภาพว่า เครื่องมันจะส่งรัว ๆ เลย โดยที่ไม่สนอะไรเลย สุดท้าย ถ้าเราเอาแบบนี้ไปใช้งานเข้า Broker แน่นอน มันตายแน่ ดังนั้น เราจะต้องทำการจำกัดการส่งต่อเวลาให้มันน้อยลงหน่อย เราเลยเรียก time.sleep() ขึ้นมาหลังจากส่งเสร็จ ก็คือ เราให้มันหยุดไป 1 วินาที แล้วค่อยเริ่มส่งต่อ ทำให้มันจะค่อย ๆ ส่งทุก 1 วินาทีนั่นเอง

import time
import paho.mqtt.client as mqtt

# Callback Functions
def on_message (client, userdata, message) :
    raw_message = str(message.payload.decode("utf-8"))
    topic = message.topic
    
    print(int(time.time()), 'Received', topic, raw_message)

def on_subscribe (client, obj, mid, granted_qos) :
    print("Subscribe Succeed")

def on_connect (client, userdata, flags, rc) :
    print("Broker is connected")
    client.subscribe('computer/switch')    
    
# Creating client
client = mqtt.Client()

# Configuring Callbacks
client.on_message=on_message 
client.on_connect = on_connect
client.on_subscribe = on_subscribe

# Connecting to Broker
client.connect('YOUR_BROKER_ADDRESS', BROKER_PORT)

client.loop_forever()
subscriber.py

มาดูที่ฝั่งของ Subscriber กันบ้าง สิ่งที่เราต้องการคือ เราต้องการให้โปรแกรมของเราเข้าไปเชื่อมต่อกับ Broker แล้วทำการ Subscribe เข้ากับ Topic ที่ชื่อว่า computer/switch ก็คือตัวเดียวกับที่เรากำหนดลงไปใน Publisher นั่นเอง สุดท้าย พอมีค่าส่งมา เราก็จะให้มันแสดงบนหน้าจอว่า มีข้อมูลเข้ามาแล้วนะ แค่นั้นเลย แต่ Code อาจจะดูซับซ้อนกว่าฝั่ง Publisher หน่อย แต่เราจะค่อย ๆ มาดูกัน

# Creating client
client = mqtt.Client()

# Connecting to Broker
client.connect('YOUR_BROKER_ADDRESS', BROKER_PORT)

เริ่มจากการสร้าง Client และเชื่อมต่อไปที่ Broker ส่วนนี้น่าจะไม่มีปัญหาอะไร เพราะเราทำเหมือนตอนที่เราทำกับ Publisher ได้เลย ก๊อป ๆ มาได้เลย แต่อันที่น่าจะ งง คือส่วนของ Callbacks

# Callback Functions
def on_message (client, userdata, message) :
    raw_message = str(message.payload.decode("utf-8"))
    topic = message.topic
    
    print(int(time.time()), 'Received', topic, raw_message)

def on_subscribe (client, obj, mid, granted_qos) :
    print("Subscribe Succeed")

def on_connect (client, userdata, flags, rc) :
    print("Broker is connected")
    client.subscribe('computer/switch') 
    
# Configuring Callbacks
client.on_message=on_message 
client.on_connect = on_connect
client.on_subscribe = on_subscribe

ส่วนนี้ จะเป็นส่วนของ Callback Function ที่ Paho มันจะเรียกให้ทำงาน เมื่อเจอเหตุการณ์อะไรบางอย่าง โดยที่เราเอามาให้ดู จะมี 3 Function ด้วยกันคือ on_message จะถูกเรียกเมื่อมี Message ที่เรา Subscribe ถูกส่งเข้ามา อันถัดไปคือ on_subscribe คือจะทำงานเมื่อเราทำการ Subscribe Topics

โดยที่แต่ละ Function เราจะต้องทำการสร้างโดยใช้ Signature ที่มันกำหนดไว้เท่านั้น ถ้าไม่อยากเข้าไปอ่าน Document ก็ลอก Pattern ด้านบนไปแก้ไขต่อได้เลย ส่วนด้านล่าง หลังจากเราสร้าง Function แล้ว เราจะต้องมาบอก Client ที่เราสร้างว่า แต่ละ Callback เราจะผูกกับ Function ไหนดี

def on_subscribe (client, obj, mid, granted_qos) :
    print("Subscribe Succeed")

เอาส่วนของ on_subscribe ง่ายที่สุดแล้วคือ ไม่ว่าอะไรก็ตาม เราจะให้มันแสดงบอกว่า การ Subscribe สำเร็จบนหน้าจอไปเลย จริง ๆ มันสามารถเช็คได้จาก granted_qos ได้มันจะคืนกลับมาเป็น Array ลองไปอ่านเพิ่มเติมใน Document ได้

pip install paho-mqtt
0

อันถัดไป เพิ่มขึ้นมาอีกบรรทัดนึง คือ on_connect เราบอกมันว่า ถ้าเชื่อมต่อกับ Broker แล้วให้มันพิมพ์ขึ้นบนหน้าจอว่า ต่อกับ Broker แล้ว และทำการ Subscribe Topic ที่เราต้องการไปเลย แต่จริง ๆ แล้ว subscribe() เราสามารถเรียกข้างนอกก็ได้เหมือนกันนะ ไม่จำเป็นต้องเรียกใน Callback ก็ได้ แต่เราเอาง่าย เราว่า เชื่อมต่อแล้ว เรียกไปเลยละกัน จะได้ง่าย ๆ

pip install paho-mqtt
1

และสุดท้ายคือ on_message จริง ๆ แล้วเราทำง่าย ๆ เลยคือ เราต้องการให้มันแสดงผลออกทางหน้าจอว่า มันมี Topic และ ข้อความอะไรเข้ามาบ้าง เราก็ต้อง Extract Topic และ Message ออกมา เริ่มจาก Message ก่อน เราสามารถดึงจาก Attribute payload ได้เลยจาก message แต่เราจะต้องทำการ decode เป็นลักษณะที่เราอ่านก่อน ซึ่งปกติเราก็จะใช้เป็น UTF-8 ก็บอกมันไป ส่วน Topic ก็ง่าย ๆ เลย ดึงจาก Attribute ชื่อ Topic ได้เลย สุดท้าย เราก็สั่งให้มันแสดงออกบนหน้าจอ ส่วน time.time() ข้างหน้าเป็นคำสั่งไว้ขอเวลาปัจจุบันขึ้นมา เราเอามาแสดงด้วย จะได้รู้ว่าบรรทัดนี้ที่มันขึ้นมา มันขึ้นมาตอนไหนแค่นั้นเลย ไม่มีอะไร

pip install paho-mqtt
2

ทีนี้สิ่งที่เราจะต้องทำคือ เราจะต้องรอ Message รอ ๆ ไปจนกว่ามันจะมาแหละ ดังนั้น เราจะต้องบอกให้มันรอ ซึ่งเราไม่ต้องเขียนเองเลย Paho มันทำมาให้เราแล้ว ผ่านคำสั่ง loop_forever() หรือก็คือ ให้มันรอตลอดไปนั่นเอง มันจะหยุดเมื่อเรากด Ctrl+C เท่านั้นเลย

ลองมารันกันดีกว่า เราก็เปิดขึ้นมาทั้ง 2 ไฟล์เลย เราจะเห็นว่า ค่ามันจะวิ่งจาก Publisher ไปที่ Subscriber เลย จริง ๆ ก็คือแค่นั้นเลย ค่ามันก็จะวิ่งจาก Publisher ไปที่ Broker และไปที่ Subscriber ถ้าเราลองรัน Subscriber หลาย ๆ อัน ก็จะเป็นการจำลองว่า เรามี หลาย ๆ Client ที่รอค่าไป Process นั่นเอง

สรุป

MQTT เป็น Protocol ที่มีน้ำหนักเบา เหมาะสำหรับการนำไปใช้กับอุปกรณ์ขนาดเล็ก และประหยัดพลังงานสูง ๆ อย่าง IoT ขนาดเล็กได้ หรือยาวไปถึงเครื่องจักรขนาดใหญ่ได้เลย ด้วยลักษณะของ Protocol ทำให้การทำงานของเรายืดหยุ่นมาก ๆ เราอาจจะมี IoT หลาย ๆ เครื่อง Publish ค่าจาก Sensor เข้ามา และใช้หลาย ๆ Subscriber เพื่อรับค่าไปทำอะไรต่อก็ได้ อาจจะเอาไปต่อกับ Database อย่าง InfluxDB ที่เป็น Time-Series Database แล้วเอา Grafana มาทำ Visualisation หรืออาจจะยิงเข้าไปใน Python Server ที่เราอาจจะให้มันวิ่งเข้า Machine Learning Model เพื่อ Predict ค่าอะไรบางอย่างแบบ Real-Time แล้วอาจจะมีการ Publish ค่ากลับไปเพื่อเป็น Action ได้อีก สารพัดประโยชน์มาก ๆ