บทนำ
ในยุคที่เทคโนโลยี IoT (Internet of Things) เข้ามามีบทบาทสำคัญในชีวิตประจำวัน การพัฒนาอุปกรณ์ที่สามารถเชื่อมต่อกับอินเทอร์เน็ตและทำงานร่วมกับแพลตฟอร์มออนไลน์ได้อย่างมีประสิทธิภาพจึงกลายเป็นสิ่งจำเป็น ESP32-S3 Cam เป็นหนึ่งในโมดูลที่ได้รับความนิยมอย่างมากในวงการพัฒนาอุปกรณ์ IoT โดยเฉพาะสำหรับงานที่เกี่ยวข้องกับการประมวลผลภาพและวิดีโอ ESP32-S3 Cam มาพร้อมกับความสามารถในการประมวลผลภาพผ่านกล้อง OV2640 และรองรับการเชื่อมต่อ Wi-Fi และ Bluetooth ซึ่งเหมาะสำหรับการสร้างระบบตรวจจับหรือตรวจสอบแบบเรียลไทม์
ประโยชน์ของการใช้ ESP32-S3 Cam สำหรับการรับส่งข้อมูลรูปภาพกับ Telegram
-
การตรวจสอบระยะไกลแบบเรียลไทม์
ESP32-S3 Cam สามารถจับภาพและส่งข้อมูลไปยัง Telegram ได้อย่างรวดเร็ว ทำให้ผู้ใช้งานสามารถตรวจสอบสถานการณ์ที่เกิดขึ้นในสถานที่ห่างไกลผ่านสมาร์ทโฟนหรือคอมพิวเตอร์ได้ทันที -
ประหยัดต้นทุนและพื้นที่
ESP32-S3 Cam มีขนาดเล็กและราคาถูกเมื่อเทียบกับกล้องวงจรปิดแบบดั้งเดิม แต่ยังคงมอบประสิทธิภาพที่เพียงพอสำหรับการใช้งานในระดับบ้านหรือโครงการขนาดเล็ก -
การแจ้งเตือนเหตุการณ์สำคัญ
ด้วยการเขียนโปรแกรมควบคุม ESP32-S3 Cam ให้จับภาพเมื่อตรวจพบการเคลื่อนไหวหรือเหตุการณ์เฉพาะเจาะจง แล้วส่งภาพไปยัง Telegram จะช่วยให้ผู้ใช้งานได้รับการแจ้งเตือนทันทีเมื่อมีสิ่งผิดปกติเกิดขึ้น -
การใช้งานร่วมกับ AI สำหรับการประมวลผลภาพ
ESP32-S3 มีทรัพยากรที่เหมาะสมสำหรับการประมวลผลภาพเบื้องต้น เช่น การตรวจจับใบหน้าหรือวัตถุ ซึ่งสามารถส่งผลลัพธ์หรือภาพที่ประมวลผลแล้วไปยัง Telegram เพื่อใช้ในการตัดสินใจหรือตรวจสอบ -
การสร้างระบบอัตโนมัติ
การเชื่อมต่อระหว่าง ESP32-S3 Cam และ Telegram สามารถนำไปพัฒนาเป็นระบบอัตโนมัติ เช่น การเปิด-ปิดอุปกรณ์อื่น ๆ ผ่านคำสั่งที่ส่งจาก Telegram หรือการส่งภาพเมื่อมีการทำงานบางอย่างเกิดขึ้น -
ความสะดวกและความยืดหยุ่น
Telegram เป็นแพลตฟอร์มที่ใช้งานง่ายและรองรับการส่งข้อความ ไฟล์ภาพ และวิดีโอได้หลากหลายรูปแบบ ทำให้การพัฒนาโครงการด้วย ESP32-S3 Cam มีความยืดหยุ่นสูงและสามารถปรับแต่งได้ตามความต้องการของผู้ใช้งาน -
การพัฒนาโครงการในอนาคต
การใช้งาน ESP32-S3 Cam ร่วมกับ Telegram ไม่เพียงแต่ตอบโจทย์การใช้งานในปัจจุบันเท่านั้น แต่ยังสามารถขยายไปสู่โครงการที่ซับซ้อนขึ้นในอนาคต เช่น การสร้างระบบ Smart Home, การตรวจสอบความปลอดภัยในโรงงาน หรือการใช้งานในงานวิจัยทางวิทยาศาสตร์
การใช้ ESP32-S3 Cam สำหรับการรับส่งข้อมูลรูปภาพ
เมื่อผู้เรียนมีประสบการณ์เบื้องต้นเกี่ยวกับการใช้ ESP32, MicroPython และ Telegram ในการควบคุมอุปกรณ์ IoT และการส่งข้อมูลพื้นฐาน เช่น ข้อความหรือค่าเซ็นเซอร์แล้ว ก็ถึงเวลาที่จะก้าวไปสู่โครงการที่ซับซ้อนมากขึ้น เช่น การใช้ ESP32-S3 Cam สำหรับการจับภาพและรับส่งข้อมูลรูปภาพผ่าน Telegram ในส่วนนี้ เราจะพูดถึงการทำงานของ ESP32-S3 Cam และแนวทางในการพัฒนาโครงการเพื่อให้สามารถส่งรูปภาพไปยัง Telegram ได้อย่างมีประสิทธิภาพ
1. แนะนำ ESP32-S3 Cam
ESP32-S3 Cam เป็นโมดูลที่ออกแบบมาสำหรับงานประมวลผลภาพ โดยมีคุณสมบัติดังนี้:
- กล้อง OV2640 : รองรับการจับภาพความละเอียดสูงสุดที่ 2 ล้านพิกเซล (UXGA)
- Wi-Fi และ Bluetooth : รองรับการเชื่อมต่อไร้สายเพื่อส่งข้อมูลไปยังแพลตฟอร์มออนไลน์
- ทรัพยากรที่เหมาะสม : มี RAM และ Flash Memory เพียงพอสำหรับการประมวลผลภาพและการทำงานแบบ IoT
- ขนาดเล็กและประหยัดพลังงาน : เหมาะสำหรับโครงการที่ต้องการความกะทัดรัดและความยืดหยุ่น

2. ขั้นตอนการใช้งาน ESP32-S3 Cam
2.1 ติดตั้งเฟิร์มแวร์ MicroPython
ESP32-S3 Cam ไม่มีเฟิร์มแวร์ MicroPython มาจากโรงงาน ดังนั้นจำเป็นต้องแฟลชเฟิร์มแวร์ MicroPython ลงบนโมดูล:
- ดาวน์โหลดเฟิร์มแวร์ MicroPython สำหรับ ESP32-S3 จาก MicroPython Official Website
- ใช้
esptool.py
เพื่อล้างเฟิร์มแวร์เดิมและแฟลชเฟิร์มแวร์ใหม่:esptool.py --port <COM_PORT> erase_flashesptool.py --port <COM_PORT> write_flash 0x1000 <path_to_firmware.bin>
2.2 เชื่อมต่อ ESP32-S3 Cam กับคอมพิวเตอร์
- ใช้สาย USB เพื่อเชื่อมต่อ ESP32-S3 Cam กับคอมพิวเตอร์
- เปิด Thonny IDE และเลือกพอร์ตที่ถูกต้องในเมนู "Port"
3. โค้ดพื้นฐานสำหรับการจับภาพ
ESP32-S3 Cam สามารถใช้งานร่วมกับไลบรารี camera
เพื่อจับภาพและบันทึกเป็นไฟล์ภาพ ตัวอย่างโค้ดพื้นฐานสำหรับการจับภาพ:
4. การส่งรูปภาพไปยัง Telegram
เมื่อได้ภาพจาก ESP32-S3 Cam แล้ว ขั้นตอนต่อไปคือการส่งภาพไปยัง Telegram โดยใช้ API ของ Telegram Bot ตัวอย่างโค้ดสำหรับการส่งภาพ:
5. แนวทางการพัฒนา Project
5.1 การจับภาพเมื่อตรวจพบเหตุการณ์
- ใช้เซ็นเซอร์ PIR (Passive Infrared Sensor) เพื่อตรวจจับการเคลื่อนไหว
- เมื่อตรวจพบการเคลื่อนไหว ให้ ESP32-S3 Cam จับภาพและส่งไปยัง Telegram
5.2 การประมวลผลภาพเบื้องต้น
- ใช้ไลบรารี AI หรือ Machine Learning บน ESP32-S3 Cam เพื่อประมวลผลภาพ เช่น การตรวจจับใบหน้าหรือวัตถุ
- ส่งผลลัพธ์ของการประมวลผล (เช่น แจ้งเตือนว่าพบคน) ไปยัง Telegram
5.3 การสร้างระบบตรวจสอบระยะไกล
- พัฒนาแอปพลิเคชันที่ผู้ใช้สามารถส่งคำสั่งผ่าน Telegram เพื่อให้ ESP32-S3 Cam จับภาพ และ ส่งกลับมา
- ใช้คำสั่ง
/capture
ใน Telegram เพื่อควบคุมการจับภาพ
6. ข้อควรระวัง
- การจัดการหน่วยความจำ : ESP32-S3 Cam มี RAM และ Flash Memory จำกัด ดังนั้นควรหลีกเลี่ยงการประมวลผลภาพที่ซับซ้อนเกินไป
- การจัดการพลังงาน : หากใช้งานนอกสถานที่ ควรพิจารณาแหล่งพลังงานที่เหมาะสม เช่น แบตเตอรี่ LiPo
- ความปลอดภัย : ตรวจสอบความปลอดภัยของข้อมูลที่ส่งผ่าน Telegram โดยเฉพาะหากใช้งานในโครงการที่เกี่ยวข้องกับความเป็นส่วนตัว
7. สรุป
ESP32-S3 Cam เป็นโมดูลที่ทรงพลังสำหรับการจับภาพและส่งข้อมูลรูปภาพผ่านแพลตฟอร์มออนไลน์ เช่น Telegram การเริ่มต้นพัฒนาโครงการด้วย ESP32-S3 Cam จะช่วยให้ผู้เรียนสามารถสร้างระบบตรวจสอบระยะไกลที่ตอบโจทย์การใช้งานได้อย่างมีประสิทธิภาพ ไม่ว่าจะเป็นการตรวจสอบความปลอดภัย การจับภาพเหตุการณ์สำคัญ หรือการใช้งานในโครงการเกษตรกรรมอัจฉริยะ การฝึกฝนและทดลองกับโค้ดพื้นฐานจะช่วยให้ผู้เรียนเข้าใจการทำงานของ ESP32-S3 Cam และนำไปปรับใช้ในโครงการที่ซับซ้อนมากขึ้นได้ในอนาคต
🌍 ระบบแจ้งเตือนแผ่นดินไหวผ่าน Telegram ด้วย Circuit Python
Bot ส่วนตัว https://t.me/DisasterwarningBot
กลุ่มแจ้งเตือน https://t.me/disasterThaiAleart
✨ ประโยชน์ของการใช้ Telegram สำหรับแจ้งเตือน
🚀 ความเร็วสูง
Telegram ส่งข้อความแบบ Real-time รับการแจ้งเตือนทันทีที่เกิดแผ่นดินไหว ไม่มีดีเลย์
📱 รับการแจ้งเตือนทุกที่
เปิดแจ้งเตือนบนมือถือได้ทั้ง iOS และ Android แม้ไม่ได้เปิดแอปก็รับการแจ้งเตือนได้ (Push Notification)
👥 แชร์ข้อมูลแบบกลุ่ม
ส่งการแจ้งเตือนไปยังกลุ่มหรือช่อง Telegram ได้ไม่จำกัดผู้รับ สามารถเพิ่มสมาชิกในองค์กรให้รับข้อมูลพร้อมกัน
🔗 รองรับลิงก์และ Rich Media
แทรกรูปภาพ, ลิงก์ไปยัง USGS, หรือแมปตำแหน่งแผ่นดินไหวได้ในข้อความเดียว
📨 ตัวอย่างการแจ้งเตือนใน Telegram
⚠️ EMERGENCY: Earthquake Alert ⚠️
📍 Location: 50 km NE of Chiang Rai, Thailand
📏 Magnitude: 5.8 (Richter Scale)
🔻 Depth: 12.3 km
⏰ Time: 2024-06-21 08:45:22 (UTC+7)
🔗 View on USGS Website* ระบบจะส่งข้อความรูปแบบนี้ไปยังกลุ่ม Telegram ทันทีที่ตรวจพบแผ่นดินไหวขนาด 5.0+
# -*- coding: utf-8 -*-
# Circuit Python For ESP32_S3
import wifi
import socketpool
import ssl
import adafruit_requests
import time
import json
# ==================== ตั้งค่าการเชื่อมต่อ ====================
# 1. ตั้งค่า WiFi
SSID = " Your wifi ID"
PASSWORD = "Your wifi Password"
# 2. ตั้งค่า Telegram Bot
BOT_TOKEN = "Your telegram Token" # เช่น "1234567890:ABCdefGhIJKlmNoPQRsTUVwxyZ"
CHAT_ID = "Your Chat ID" # ID ของกลุ่ม/ช่อง (ต้องมีเครื่องหมาย -)
# ==================== ฟังก์ชันหลัก ====================
def connect_wifi():
"""เชื่อมต่อ WiFi"""
print("🔌 กำลังเชื่อมต่อ WiFi...")
wifi.radio.connect(SSID, PASSWORD)
print("✅ เชื่อมต่อ WiFi สำเร็จ! IP:", wifi.radio.ipv4_address)
def format_time(ms):
"""แปลงเวลาจากมิลลิวินาทีเป็นรูปแบบอ่านง่าย"""
t = time.localtime(ms // 1000)
return f"{t.tm_year}-{t.tm_mon:02d}-{t.tm_mday:02d} {t.tm_hour:02d}:{t.tm_min:02d}:{t.tm_sec:02d} (UTC+7)"
def send_telegram(message):
"""ส่งข้อความไปยัง Telegram"""
url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage"
payload = {
"chat_id": CHAT_ID,
"text": message,
"parse_mode": "Markdown" # ใช้ Markdown เพื่อจัดรูปแบบ
}
try:
response = requests.post(url, json=payload)
print(f"📤 ส่งข้อความสำเร็จ (Status: {response.status_code})")
response.close()
except Exception as e:
print(f"❌ เกิดข้อผิดพลาดขณะส่งข้อความ: {e}")
def check_earthquake():
"""ตรวจสอบแผ่นดินไหวจาก USGS API"""
global last_quake_id
# ตั้งค่า API (ตรวจสอบแผ่นดินไหวขนาด 5.0 ขึ้นไป ล่าสุด 1 เหตุการณ์)
url = "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&minmagnitude=5.0&limit=1"
try:
print("🔍 กำลังตรวจสอบแผ่นดินไหว...")
response = requests.get(url)
data = response.json()
response.close()
if data.get("features"):
quake = data["features"][0]
quake_id = quake["id"] # ID ของเหตุการณ์
# เช็คหากเป็นเหตุการณ์เดิม
if quake_id == last_quake_id:
print("⏭️ ไม่มีแผ่นดินไหวใหม่")
return
last_quake_id = quake_id # อัปเดต ID ล่าสุด
# ดึงข้อมูลแผ่นดินไหว
props = quake["properties"]
place = props.get("place", "ไม่ทราบตำแหน่ง")
mag = props.get("mag", "N/A")
time_ms = props.get("time", 0)
depth = quake["geometry"]["coordinates"][2] # ความลึก (km)
# สร้างข้อความแจ้งเตือน (รูปแบบ Markdown)
alert_msg = f"""
⚠️ **แจ้งเตือนแผ่นดินไหว** ⚠️
📍 **ตำแหน่ง**: {place}
📏 **ขนาด**: {mag} ริกเตอร์
🔻 **ความลึก**: {depth} กม.
⏰ **เวลา**: {format_time(time_ms)}
[ดูรายละเอียดเพิ่มเติม](https://earthquake.usgs.gov/earthquakes/eventpage/{quake_id})
"""
send_telegram(alert_msg.strip())
else:
print("🌍 ไม่พบข้อมูลแผ่นดินไหวล่าสุด")
except Exception as e:
print(f"❌ เกิดข้อผิดพลาดขณะตรวจสอบ: {e}")
# ==================== การทำงานหลัก ====================
# เชื่อมต่อ WiFi
connect_wifi()
# สร้าง Session สำหรับการเชื่อมต่อ
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
# ตัวแปรเก็บ ID ของแผ่นดินไหวล่าสุด
last_quake_id = None
# แจ้งเตือนเมื่อระบบเริ่มทำงาน
send_telegram("🟢 **ระบบตรวจสอบแผ่นดินไหวเริ่มทำงานแล้ว**")
# วนลูปตรวจสอบทุก 10 นาที
while True:
check_earthquake()
print(f"⏳ พัก 10 นาทีก่อนตรวจสอบอีกครั้ง...\n")
time.sleep(600) # 10 นาที = 600 วินาที
💡 ทำไมต้องใช้ Telegram แจ้งเตือนแผ่นดินไหว?
เพราะ Telegram มีระบบแจ้งเตือนที่เสถียร รองรับการใช้งานแบบกลุ่มฟรี และสามารถส่งข้อมูลได้รวดเร็วแม้ในสภาวะเครือข่ายไม่穩定 เหมาะสำหรับระบบแจ้งเตือนภัยพิบัติที่ต้องพึ่งพาความเร็ว!
Non-GUI (Command-Line Interface) และ GUI (Graphical User Interface) โดยทั้งสองส่วนนี้ใช้คลาส UARTHandler
เป็นแกนกลางในการจัดการการสื่อสารผ่านพอร์ตอนุกรม (UART) อย่างไรก็ตาม การใช้งานในสองรูปแบบนี้มีความแตกต่างกันในแง่ของการออกแบบและการทำงานของโปรแกรม มาดูรายละเอียดและวิเคราะห์แต่ละประเด็นที่คุณสงสัยกันครับ
1. การใช้งาน self
ในคลาส UARTHandler
บทบาทของ self
-
Attributes :
self.port
,self.baudrate
,self.buffer
,self.data_queue
, และอื่น ๆ เป็น attributes ของคลาสUARTHandler
- Attributes เหล่านี้ถูกกำหนดในคอนสตรักเตอร์ (
__init__
) และสามารถเรียกใช้งานได้ใน methods อื่น ๆ เช่น_read_loop
,send_data
,get_received_line
, และclose
-
Methods :
- การใช้
self.method_name()
เช่นself.send_data(data)
ช่วยให้ methods ภายในคลาสสามารถเข้าถึง attributes และ methods อื่น ๆ ได้ - การใช้
self
ยังช่วยให้ threads (เช่น_read_loop
) และ callbacks สามารถทำงานร่วมกับ attributes ของคลาสได้อย่างปลอดภัย
- การใช้
ตัวอย่างที่สำคัญ :
self.uart
คือ attribute ที่ถูกกำหนดใน__init__
และถูกใช้งานใน_read_loop
self.buffer
เป็นตัวแปรที่เก็บข้อมูลที่อ่านมาจาก UART และสามารถถูกเรียกใช้งานใน methods อื่น ๆ
2. ความแตกต่างระหว่าง GUI และ Non-GUI
Non-GUI (serial_monitor.py
)
-
การทำงาน :
- โปรแกรมทำงานบน command-line interface (CLI) โดยไม่มีหน้าจอกราฟิก
- ใช้
input()
เพื่อรับข้อมูลจากผู้ใช้ และแสดงผลทาง terminal - การรับข้อมูลจาก UART ทำผ่าน thread (
receive_loop
) เพื่อให้สามารถรับข้อมูลแบบ real-time โดยไม่บล็อกการทำงานของ main loop
-
การใช้
UARTHandler
:UARTHandler
ถูกสร้างเป็นออบเจ็กต์เพื่อจัดการการสื่อสาร UART- Thread (
receive_loop
) ใช้ methodget_received_line
จากUARTHandler
เพื่อรับข้อมูลจาก queue (data_queue
) และแสดงผลใน terminal
GUI (serial_monitor_gui.py
)
-
การทำงาน :
- โปรแกรมแสดงผลผ่าน graphical user interface (GUI) โดยใช้ไลบรารี
ttkbootstrap
- มี widget เช่น dropdown menu, text box, และ button เพื่อให้ผู้ใช้สามารถเลือกพอร์ต, baud rate, ส่งข้อมูล, และดูข้อความที่รับ-ส่งได้
- การรับข้อมูลจาก UART ทำผ่าน thread (
receive_loop
) เช่นเดียวกับ Non-GUI แต่ข้อมูลจะถูกแสดงผลใน widget (output_box
) แทนที่จะเป็น terminal
- โปรแกรมแสดงผลผ่าน graphical user interface (GUI) โดยใช้ไลบรารี
-
การใช้
UARTHandler
:UARTHandler
ถูกสร้างเป็นออบเจ็กต์เมื่อผู้ใช้กดปุ่ม "Connect"- Thread (
receive_loop
) ใช้ methodget_received_line
จากUARTHandler
เพื่อรับข้อมูลจาก queue และแสดงผลในoutput_box
3. ประเด็นที่กล่าวถึง
(1) การใช้ with
เพื่อเปิด/ปิดพอร์ตอย่างปลอดภัย
-
Non-GUI :
- ใน
serial_monitor.py
การปิดพอร์ตทำผ่าน methodclose()
ของUARTHandler
- การใช้
with
อาจไม่จำเป็นในกรณีนี้ เพราะโปรแกรมทำงานแบบ CLI และมีการจัดการทรัพยากรผ่านtry-finally
หรือfinally
block
- ใน
-
GUI :
- ใน
serial_monitor_gui.py
การปิดพอร์ตทำผ่าน methoddisconnect()
ซึ่งเรียกclose()
ของUARTHandler
- การใช้
with
อาจเหมาะสำหรับการจัดการทรัพยากรในบางกรณี เช่น เมื่อต้องการเปิด/ปิดพอร์ตเฉพาะในขอบเขตบางส่วนของโปรแกรม
- ใน
(2) การจัดการ Logging
-
Non-GUI :
- Logging ถูกแสดงผลใน terminal และบันทึกลงไฟล์
uart_debug.log
- การใช้
logging.basicConfig
กำหนด format และ handler สำหรับ logging
- Logging ถูกแสดงผลใน terminal และบันทึกลงไฟล์
-
GUI :
- Logging ถูกแสดงผลใน widget (
output_box
) และบันทึกลงไฟล์uart_debug.log
- การแสดงผลใน GUI ทำผ่าน method
append_output
ซึ่งอัปเดตข้อความในoutput_box
- Logging ถูกแสดงผลใน widget (
(3) การใช้ Queue
-
Non-GUI :
data_queue
ถูกใช้เพื่อรับข้อมูลจาก thread (_read_loop
) และส่งข้อมูลกลับไปยัง main loop- Main loop ใช้
get_received_line
เพื่อรับข้อมูลจาก queue และแสดงผลใน terminal
-
GUI :
data_queue
ถูกใช้ในลักษณะเดียวกัน แต่ข้อมูลจะถูกแสดงผลในoutput_box
แทนที่จะเป็น terminal
(4) การรองรับ ASCII และ Hex
-
Non-GUI :
- Method
send_data
ตรวจสอบว่าข้อมูลที่ส่งเป็น ASCII หรือ Hex (0x...
) - หากเป็น Hex จะแปลงข้อมูลด้วย
bytes.fromhex
ก่อนส่งออก
- Method
-
GUI :
- การรองรับ ASCII และ Hex ทำในลักษณะเดียวกัน โดยใช้ method
send_data
ของUARTHandler
- การรองรับ ASCII และ Hex ทำในลักษณะเดียวกัน โดยใช้ method
(5) การใช้งานในโปรเจกต์ใหญ่
- Modular Design :
UARTHandler
ถูกออกแบบให้เป็นโมดูลแยกต่างหาก เพื่อให้สามารถนำกลับมาใช้ใหม่ได้ในโปรเจกต์อื่น- การใช้งานใน Non-GUI และ GUI แสดงให้เห็นถึงความยืดหยุ่นของคลาสนี้
สรุปความแตกต่างระหว่าง GUI และ Non-GUI
Non GUI
# serial_monitor.py
import threading
import time
from uart_handler import UARTHandler
def receive_loop(uart: UARTHandler):
"""
รับข้อมูลจาก UART แบบ Real-time แล้วแสดงออกหน้าจอ
"""
try:
while True:
line = uart.get_received_line(timeout=0.1)
if line:
print(f"\033[92m<<< {line}\033[0m") # สีเขียวสำหรับข้อความขาเข้า
except Exception as e:
print(f"Receive thread error: {e}")
def main():
port = 'COM11' # เปลี่ยนตามพอร์ตของคุณ
baudrate = 115200
buffer_size = 2048
try:
uart = UARTHandler(port, baudrate, buffer_size)
except Exception as e:
print(f"Failed to initialize UART: {e}")
return
print(f"=== Serial Monitor (baud: {baudrate}) ===")
print("Type and press Enter to send. Press Ctrl+C to exit.\n")
# Start receiving thread
receiver = threading.Thread(target=receive_loop, args=(uart,), daemon=True)
receiver.start()
try:
while True:
user_input = input(">>> ") # สีขาวสำหรับข้อความขาออก
if user_input.lower() == "exit":
break
uart.send_data(user_input)
except KeyboardInterrupt:
print("\nExiting...")
finally:
uart.close()
if __name__ == "__main__":
main()
Python GUI
# serial_monitor_gui.py
import ttkbootstrap as ttk
from ttkbootstrap.constants import *
import serial.tools.list_ports
import threading
from uart_handler import UARTHandler
class SerialMonitorApp:
def __init__(self, root):
self.root = root
self.root.title("Python Serial Monitor")
self.uart = None
self.receiver_thread = None
self.running = False
self.port_var = ttk.StringVar()
self.baud_var = ttk.IntVar(value=115200)
self.input_var = ttk.StringVar()
self.setup_widgets()
def setup_widgets(self):
frame_top = ttk.Frame(self.root, padding=10)
frame_top.pack(fill=X)
# Serial port dropdown
ttk.Label(frame_top, text="Port:").pack(side=LEFT, padx=(0, 5))
self.port_combo = ttk.Combobox(frame_top, textvariable=self.port_var, width=15)
self.port_combo.pack(side=LEFT, padx=5)
self.refresh_ports()
# Baud rate
ttk.Label(frame_top, text="Baudrate:").pack(side=LEFT, padx=(10, 5))
self.baud_entry = ttk.Entry(frame_top, textvariable=self.baud_var, width=10)
self.baud_entry.pack(side=LEFT, padx=5)
# Connect button
self.connect_btn = ttk.Button(frame_top, text="Connect", command=self.toggle_connection, bootstyle=SUCCESS)
self.connect_btn.pack(side=LEFT, padx=10)
# Output box
self.output_box = ttk.ScrolledText(self.root, height=20, wrap='word')
self.output_box.pack(fill=BOTH, expand=True, padx=10, pady=(5, 0))
self.output_box.configure(state='disabled')
# Send box
frame_bottom = ttk.Frame(self.root, padding=10)
frame_bottom.pack(fill=X)
self.input_entry = ttk.Entry(frame_bottom, textvariable=self.input_var)
self.input_entry.pack(side=LEFT, fill=X, expand=True, padx=(0, 5))
self.input_entry.bind("<Return>", self.send_data)
self.send_btn = ttk.Button(frame_bottom, text="Send", command=self.send_data)
self.send_btn.pack(side=LEFT)
def refresh_ports(self):
ports = serial.tools.list_ports.comports()
port_list = [port.device for port in ports]
self.port_combo['values'] = port_list
if port_list:
self.port_combo.current(0)
def toggle_connection(self):
if self.uart:
self.disconnect()
else:
self.connect()
def connect(self):
port = self.port_var.get()
baud = self.baud_var.get()
if not port:
self.show_message("No COM port selected!", "error")
return
try:
self.uart = UARTHandler(port, baud)
self.running = True
self.connect_btn.config(text="Disconnect", bootstyle=DANGER)
self.start_receiver()
except Exception as e:
self.show_message(f"Connection failed: {e}", "error")
def disconnect(self):
self.running = False
if self.uart:
self.uart.close()
self.uart = None
self.connect_btn.config(text="Connect", bootstyle=SUCCESS)
self.show_message("Disconnected", "info")
def start_receiver(self):
self.receiver_thread = threading.Thread(target=self.receive_loop, daemon=True)
self.receiver_thread.start()
def receive_loop(self):
while self.running and self.uart:
line = self.uart.get_received_line(timeout=0.1)
if line:
self.append_output(f"[RX] {line}")
def send_data(self, event=None):
msg = self.input_var.get().strip()
if self.uart and msg:
self.uart.send_data(msg)
self.append_output(f"[TX] {msg}")
self.input_var.set("")
def append_output(self, text):
self.output_box.configure(state='normal')
self.output_box.insert('end', text + '\n')
self.output_box.see('end')
self.output_box.configure(state='disabled')
def show_message(self, text, level="info"):
if level == "error":
ttk.messagebox.showerror("Error", text)
else:
ttk.messagebox.showinfo("Info", text)
def main():
app = ttk.Window(themename="cyborg", title="Serial Monitor", size=(600, 500))
SerialMonitorApp(app)
app.mainloop()
if __name__ == "__main__":
main()
บทนำ การแจ้งเตือนเหตุแผ่นดินไหว
แผ่นดินไหวเป็นภัยธรรมชาติที่เกิดขึ้นได้ทุกเมื่อและส่งผลกระทบอย่างรุนแรงต่อชีวิต และ ทรัพย์สิน การรับรู้ข้อมูลแผ่นดินไหวอย่างรวดเร็วจึงมีความสำคัญอย่างยิ่ง ในบทความนี้ ขอแนะนำ แอปพลิเคชันตรวจสอบการแจ้งเตือนเหตุแผ่นดินไหว ที่พัฒนาขึ้นด้วยภาษา Python โดยใช้ API จาก USGS (United States Geological Survey) ซึ่งเป็นแหล่งข้อมูลแผ่นดินไหวที่น่าเชื่อถือและอัปเดตแบบเรียลไทม์
Telegram Bot https://t.me/DisasterwarningBot
หรือ กลุ่ม https://t.me/disasterThaiAleart
วัตถุประสงค์ของแอปพลิเคชัน
- ดึงข้อมูลแผ่นดินไหวล่าสุดจาก USGS Earthquake API
- กรองข้อมูลตามช่วงเวลา, ขนาดแผ่นดินไหว (แมกนิจูด), และจำนวนเหตุการณ์ที่ต้องการ
- แสดงผลข้อมูลในรูปแบบที่อ่านง่าย เพื่อให้ผู้ใช้สามารถรับทราบข้อมูลแผ่นดินไหวได้อย่างรวดเร็ว
เทคโนโลยีและเครื่องมือที่ใช้
- ภาษา Python – ใช้สำหรับเขียนโค้ดหลักของแอปพลิเคชัน
- Requests Library – สำหรับส่ง HTTP Request ไปยัง USGS API
- USGS Earthquake API – ให้ข้อมูลแผ่นดินไหวในรูปแบบ GeoJSON
- Datetime Module – สำหรับจัดการและจัดรูปแบบเวลา
ขั้นตอนการทำงาน
1. การเชื่อมต่อกับ USGS API
import requests
from datetime import datetime
def fetch_earthquake_data(start_time=None, end_time=None, min_magnitude=None, max_magnitude=None, limit=10):
base_url = "https://earthquake.usgs.gov/fdsnws/event/1/query"
params = {
"format": "geojson",
"starttime": start_time,
"endtime": end_time,
"minmagnitude": min_magnitude,
"maxmagnitude": max_magnitude,
"limit": limit
}
response = requests.get(base_url, params=params)
if response.status_code == 200:
return response.json()
else:
print(f"Error: Unable to fetch data. Status code: {response.status_code}")
return None
2. การแสดงผลข้อมูลแผ่นดินไหว
def display_earthquake_info(data):
if not data or "features" not in data:
print("No earthquake data available.")
return
print("Earthquake Information:")
print("-" * 40)
for feature in data["features"]:
properties = feature.get("properties", {})
geometry = feature.get("geometry", {})
place = properties.get("place", "N/A")
magnitude = properties.get("mag", "N/A")
time = properties.get("time", "N/A")
coordinates = geometry.get("coordinates", [None, None, None])
if time and isinstance(time, int):
time_formatted = datetime.utcfromtimestamp(time / 1000).strftime('%Y-%m-%d %H:%M:%S')
else:
time_formatted = "N/A"
print(f"Place: {place}")
print(f"Magnitude: {magnitude}")
print(f"Time: {time_formatted}")
print(f"Coordinates: Latitude={coordinates[1]}, Longitude={coordinates[0]}, Depth={coordinates[2]} km")
print("-" * 40)
3. ตัวอย่างการใช้งาน
if __name__ == "__main__":
start_time = "2025-03-28T00:00:00"
end_time = "2025-03-28T20:59:59"
min_magnitude = 5.0 # แสดงเฉพาะแผ่นดินไหวขนาด 5.0 ขึ้นไป
limit = 5 # แสดงผลล่าสุด 5 เหตุการณ์
earthquake_data = fetch_earthquake_data(
start_time=start_time,
end_time=end_time,
min_magnitude=min_magnitude,
limit=limit
)
if earthquake_data:
display_earthquake_info(earthquake_data)
ประโยชน์ของแอปพลิเคชัน
- รับข้อมูลแบบ Real-time – สามารถตรวจสอบแผ่นดินไหวล่าสุดได้ทันที
- ปรับแต่งการค้นหาได้ – สามารถกำหนดช่วงเวลาและขนาดแผ่นดินไหวที่ต้องการ
- ใช้งานง่าย – แสดงผลข้อมูลในรูปแบบที่เข้าใจง่าย
แนวทางการพัฒนาในอนาคต
- เพิ่มการแจ้งเตือนแบบ Real-time (ผ่าน Email, LINE, หรือ SMS)
- แสดงผลบนแผนที่ โดยใช้ไลบรารีเช่น Folium หรือ Google Maps API
- พัฒนาเป็นเว็บแอปหรือโมบายแอป เพื่อให้เข้าถึงได้สะดวกยิ่งขึ้น
สรุป
แอปพลิเคชันตรวจสอบการแจ้งเตือนเหตุแผ่นดินไหวนี้เป็นตัวอย่างที่ดีของการนำ API ข้อมูลเปิด (Open Data API) มาใช้ประโยชน์ โดยใช้ Python ซึ่งเป็นภาษาที่มีความยืดหยุ่นสูง หากคุณสนใจสามารถนำโค้ดนี้ไปพัฒนาต่อยอดได้ตามต้องการ เพื่อเพิ่มประสิทธิภาพในการรับมือกับภัยแผ่นดินไหวได้ดียิ่งขึ้น
Micropython ESP32
import urequests
import utime
import ujson
def get_current_utc_time():
"""
Get current UTC time in ISO format (YYYY-MM-DDTHH:MM:SS) using utime
"""
now = utime.gmtime()
return "{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}".format(
now[0], now[1], now[2], now[3], now[4], now[5]
)
def format_timestamp(timestamp_ms):
"""
Format UNIX timestamp in milliseconds to readable string
"""
if not timestamp_ms:
return "N/A"
timestamp_sec = timestamp_ms // 1000
t = utime.localtime(timestamp_sec)
return "{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(
t[0], t[1], t[2], t[3], t[4], t[5]
)
def fetch_earthquake_data(start_time=None, end_time=None, min_magnitude=None, max_magnitude=None, limit=10):
"""
Fetch earthquake data from USGS API
"""
base_url = "https://earthquake.usgs.gov/fdsnws/event/1/query"
params = {
"format": "geojson",
"limit": limit
}
if start_time:
params["starttime"] = start_time
if end_time:
params["endtime"] = end_time
if min_magnitude:
params["minmagnitude"] = min_magnitude
if max_magnitude:
params["maxmagnitude"] = max_magnitude
try:
response = urequests.get(base_url, params=params)
if response.status_code == 200:
data = ujson.loads(response.text)
response.close()
return data
else:
print("Error:", response.status_code)
response.close()
return None
except Exception as e:
print("Request failed:", e)
return None
def display_earthquake_info(data):
"""
Display earthquake information
"""
if not data or "features" not in data or not data["features"]:
print("No earthquake data available")
return
print("\nEarthquake Information:")
print("-" * 40)
for feature in data["features"]:
props = feature.get("properties", {})
geom = feature.get("geometry", {})
coords = geom.get("coordinates", [])
print("ID:", feature.get("id", "N/A"))
print("Place:", props.get("place", "N/A"))
print("Magnitude:", props.get("mag", "N/A"))
print("Time:", format_timestamp(props.get("time")))
if len(coords) >= 2:
print("Location: {:.2f}°N, {:.2f}°E".format(coords[1], coords[0]))
if len(coords) >= 3:
print("Depth: {:.1f} km".format(coords[2]))
print("-" * 40)
# Main loop
print("Starting earthquake monitor...")
while True:
print("\nChecking for earthquakes...")
data = fetch_earthquake_data(
start_time=get_current_utc_time(),
min_magnitude=5.0,
limit=5
)
if data:
display_earthquake_info(data)
else:
print("Failed to fetch data")
print("Waiting 30 seconds...")
utime.sleep(30)
ของแถม . สำหรับท่านที่สนใจและใช้ Arduino
#include <WiFi.h>
#include <WiFiClientSecure.h>
#include <ArduinoJson.h>
// WiFi Credentials
const char* ssid = "YOUR_WIFI_SSID";
const char* password = "YOUR_WIFI_PASSWORD";
// USGS API URL
const char* usgsApiUrl = "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&limit=1&minmagnitude=5";
void setup() {
// Initialize Serial Monitor
Serial.begin(115200);
// Connect to WiFi
WiFi.begin(ssid, password);
Serial.println("Connecting to WiFi...");
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("\nWiFi connected");
}
void loop() {
// Fetch Earthquake Data
WiFiClientSecure client;
client.setInsecure(); // Disable SSL certificate verification (not recommended for production)
if (!client.connect("earthquake.usgs.gov", 443)) {
Serial.println("Connection to API failed");
return;
}
// Send GET Request
client.print(String("GET ") + usgsApiUrl + " HTTP/1.1\r\n" +
"Host: earthquake.usgs.gov\r\n" +
"Connection: close\r\n\r\n");
// Read Response
String payload = "";
while (client.connected() || client.available()) {
if (client.available()) {
String line = client.readStringUntil('\n');
payload += line;
}
}
client.stop();
// Parse JSON
DynamicJsonDocument doc(1024);
DeserializationError error = deserializeJson(doc, payload);
if (error) {
Serial.println("JSON parsing failed");
return;
}
// Extract Earthquake Data
JsonObject properties = doc["features"][0]["properties"];
const char* place = properties["place"];
float magnitude = properties["mag"];
long time = properties["time"];
// Convert Time
time_t epochTime = time / 1000;
struct tm* timeInfo = gmtime(&epochTime);
char timeString[20];
strftime(timeString, sizeof(timeString), "%Y-%m-%d %H:%M:%S", timeInfo);
// Display on Serial Monitor
Serial.println("Earthquake Alert!");
Serial.println("----------------");
Serial.print("Place: ");
Serial.println(place);
Serial.print("Magnitude: ");
Serial.println(magnitude);
Serial.print("Time: ");
Serial.println(timeString);
Serial.println("");
// Wait before next check
delay(60000); // Check every 60 seconds
}
การสร้าง และ กำหนดค่า Telegram Bot
Telegram Bot เป็นส่วนสำคัญที่ช่วยให้คุณสามารถโต้ตอบกับสมาชิกในกลุ่มได้ ขั้นตอนการสร้างและกำหนดค่ามีดังนี้:
สร้าง Telegram Bot:
- พูดคุยกับ @BotFather บน Telegram
- ใช้คำสั่ง
/newbot
เพื่อสร้างบอทใหม่ และบันทึก Token ที่ได้รับไว้
เปิดโหมด Group Privacy:
หากต้องการให้บอทสามารถอ่านข้อความทั่วไปในกลุ่ม (ไม่จำกัดเฉพาะคำสั่ง) ให้ปิดโหมด Privacy:
- ส่งคำสั่ง
/setprivacy
ไปยัง BotFather - เลือกบอทของคุณ และตั้งค่าเป็น Disable
เพิ่มบอทเข้ากลุ่ม:
- เชิญบอทเข้ากลุ่มตามขั้นตอนปกติของ Telegram
เชื่อมโยง Joomla กับ Telegram Bot
การเชื่อมโยง Joomla กับ Telegram Bot ต้องอาศัยการเขียนโค้ดเพื่อให้บอทสามารถโต้ตอบกับสมาชิกในกลุ่มได้ โดยใช้ Telegram Bot API ในการรับ-ส่งข้อมูล
ขั้นตอนการเชื่อมโยง:
สร้าง Webhook สำหรับ Telegram Bot:
- Webhook คือ URL ที่ Telegram จะส่งข้อมูล (Update) เมื่อมีการโต้ตอบในกลุ่ม
- สร้างไฟล์ PHP เช่น telegram-bot.php บนเว็บไซต์ของคุณ
ตั้งค่า Webhook:
curl -F "url=https://yourdomain.com/telegram-bot.php" https://api.telegram.org/bot<YourBotToken>/setWebhook
ตัวอย่างโค้ด PHP รับ-ส่งข้อความ:
<?php
// รับข้อมูลจาก Telegram
$content = file_get_contents("php://input");
$update = json_decode($content, true);
// ตรวจสอบข้อความ
if (isset($update['message'])) {
$chat_id = $update['message']['chat']['id'];
$text = $update['message']['text'];
if ($text == "/start") {
$reply = "สวัสดีครับ! ผมคือบอทของคุณ";
} else {
$reply = "คุณพิมพ์: " . $text;
}
// ส่งข้อความกลับไปยัง Telegram
$url = "https://api.telegram.org/bot<YourBotToken>/sendMessage?chat_id=$chat_id&text=" . urlencode($reply);
file_get_contents($url);
}
?>
ตัวอย่างโค้ด Python รับ-ส่งข้อความ:
import requests
TOKEN = '<YourBotToken>'
BASE_URL = f'https://api.telegram.org/bot{TOKEN}'
def send_message(chat_id, text):
url = f'{BASE_URL}/sendMessage'
payload = {'chat_id': chat_id, 'text': text}
requests.post(url, json=payload)
def get_updates(offset=None):
url = f'{BASE_URL}/getUpdates'
response = requests.get(url)
return response.json()
# ตัวอย่างเรียกใช้
if __name__ == '__main__':
updates = get_updates()
for update in updates['result']:
chat_id = update['message']['chat']['id']
text = update['message']['text']
send_message(chat_id, f'คุณพิมพ์: {text}')
ทดสอบการทำงาน:
- ส่งข้อความในกลุ่ม Telegram และตรวจสอบว่าบอทตอบกลับถูกต้องหรือไม่
- หากพบปัญหา ให้ตรวจสอบ Log ของ Server หรือ Debug โค้ด
ข้อควรระวัง
- ความปลอดภัย: อย่าเปิดเผย Token ของบอท ใช้ HTTPS สำหรับ Webhook
- ประสิทธิภาพ: ทดสอบการตอบกลับข้อความเมื่อกลุ่มมีสมาชิกจำนวนมาก
สรุป
การเชื่อมต่อกับ Telegram Bot ต้องอาศัยการผสมผสานระหว่างการจัดการเนื้อหา และ การพัฒนาโปรแกรม หากคุณมีพื้นฐานด้านการเขียนโค้ด จะช่วยให้การเชื่อมโยง Telegram Bot เป็นไปอย่างราบรื่น
หากคุณมีคำถามเพิ่มเติมเกี่ยวกับการเขียนโค้ด หรือ การตั้งค่า สามารถสอบถามมาได้เลยครับ! 😊