บทนำ

ในยุคที่เทคโนโลยี IoT (Internet of Things) เข้ามามีบทบาทสำคัญในชีวิตประจำวัน การพัฒนาอุปกรณ์ที่สามารถเชื่อมต่อกับอินเทอร์เน็ตและทำงานร่วมกับแพลตฟอร์มออนไลน์ได้อย่างมีประสิทธิภาพจึงกลายเป็นสิ่งจำเป็น ESP32-S3 Cam เป็นหนึ่งในโมดูลที่ได้รับความนิยมอย่างมากในวงการพัฒนาอุปกรณ์ IoT โดยเฉพาะสำหรับงานที่เกี่ยวข้องกับการประมวลผลภาพและวิดีโอ ESP32-S3 Cam มาพร้อมกับความสามารถในการประมวลผลภาพผ่านกล้อง OV2640 และรองรับการเชื่อมต่อ Wi-Fi และ Bluetooth ซึ่งเหมาะสำหรับการสร้างระบบตรวจจับหรือตรวจสอบแบบเรียลไทม์

หนึ่งในแอปพลิเคชันที่น่าสนใจของการใช้งาน ESP32-S3 Cam คือการส่งข้อมูลรูปภาพไปยังแพลตฟอร์มโซเชียลมีเดียหรือแอปพลิเคชันแชท เช่น Telegram ซึ่งเป็นแอปพลิเคชันที่มี API รองรับการพัฒนาบอทและการส่งข้อมูลต่าง ๆ รวมถึงไฟล์ภาพ การใช้งาน ESP32-S3 Cam กับ Telegram ช่วยให้เราสามารถสร้างระบบแจ้งเตือนหรือตรวจสอบระยะไกลได้อย่างสะดวกและรวดเร็ว เช่น การตรวจสอบความปลอดภัยภายในบ้าน การจับภาพเหตุการณ์สำคัญ หรือแม้แต่การใช้งานในโครงการเกษตรกรรมอัจฉริยะ

ประโยชน์ของการใช้ ESP32-S3 Cam สำหรับการรับส่งข้อมูลรูปภาพกับ Telegram

  1. การตรวจสอบระยะไกลแบบเรียลไทม์
    ESP32-S3 Cam สามารถจับภาพและส่งข้อมูลไปยัง Telegram ได้อย่างรวดเร็ว ทำให้ผู้ใช้งานสามารถตรวจสอบสถานการณ์ที่เกิดขึ้นในสถานที่ห่างไกลผ่านสมาร์ทโฟนหรือคอมพิวเตอร์ได้ทันที

  2. ประหยัดต้นทุนและพื้นที่
    ESP32-S3 Cam มีขนาดเล็กและราคาถูกเมื่อเทียบกับกล้องวงจรปิดแบบดั้งเดิม แต่ยังคงมอบประสิทธิภาพที่เพียงพอสำหรับการใช้งานในระดับบ้านหรือโครงการขนาดเล็ก

  3. การแจ้งเตือนเหตุการณ์สำคัญ
    ด้วยการเขียนโปรแกรมควบคุม ESP32-S3 Cam ให้จับภาพเมื่อตรวจพบการเคลื่อนไหวหรือเหตุการณ์เฉพาะเจาะจง แล้วส่งภาพไปยัง Telegram จะช่วยให้ผู้ใช้งานได้รับการแจ้งเตือนทันทีเมื่อมีสิ่งผิดปกติเกิดขึ้น

  4. การใช้งานร่วมกับ AI สำหรับการประมวลผลภาพ
    ESP32-S3 มีทรัพยากรที่เหมาะสมสำหรับการประมวลผลภาพเบื้องต้น เช่น การตรวจจับใบหน้าหรือวัตถุ ซึ่งสามารถส่งผลลัพธ์หรือภาพที่ประมวลผลแล้วไปยัง Telegram เพื่อใช้ในการตัดสินใจหรือตรวจสอบ

  5. การสร้างระบบอัตโนมัติ
    การเชื่อมต่อระหว่าง ESP32-S3 Cam และ Telegram สามารถนำไปพัฒนาเป็นระบบอัตโนมัติ เช่น การเปิด-ปิดอุปกรณ์อื่น ๆ ผ่านคำสั่งที่ส่งจาก Telegram หรือการส่งภาพเมื่อมีการทำงานบางอย่างเกิดขึ้น

  6. ความสะดวกและความยืดหยุ่น
    Telegram เป็นแพลตฟอร์มที่ใช้งานง่ายและรองรับการส่งข้อความ ไฟล์ภาพ และวิดีโอได้หลากหลายรูปแบบ ทำให้การพัฒนาโครงการด้วย ESP32-S3 Cam มีความยืดหยุ่นสูงและสามารถปรับแต่งได้ตามความต้องการของผู้ใช้งาน

  7. การพัฒนาโครงการในอนาคต
    การใช้งาน ESP32-S3 Cam ร่วมกับ Telegram ไม่เพียงแต่ตอบโจทย์การใช้งานในปัจจุบันเท่านั้น แต่ยังสามารถขยายไปสู่โครงการที่ซับซ้อนขึ้นในอนาคต เช่น การสร้างระบบ Smart Home, การตรวจสอบความปลอดภัยในโรงงาน หรือการใช้งานในงานวิจัยทางวิทยาศาสตร์

การใช้ ESP32-S3 Cam สำหรับการรับส่งข้อมูลรูปภาพ

เมื่อผู้เรียนมีประสบการณ์เบื้องต้นเกี่ยวกับการใช้ ESP32, MicroPython และ Telegram ในการควบคุมอุปกรณ์ IoT และการส่งข้อมูลพื้นฐาน เช่น ข้อความหรือค่าเซ็นเซอร์แล้ว ก็ถึงเวลาที่จะก้าวไปสู่โครงการที่ซับซ้อนมากขึ้น เช่น การใช้ ESP32-S3 Cam สำหรับการจับภาพและรับส่งข้อมูลรูปภาพผ่าน Telegram ในส่วนนี้ เราจะพูดถึงการทำงานของ ESP32-S3 Cam และแนวทางในการพัฒนาโครงการเพื่อให้สามารถส่งรูปภาพไปยัง Telegram ได้อย่างมีประสิทธิภาพ


1. แนะนำ ESP32-S3 Cam

ESP32-S3 Cam เป็นโมดูลที่ออกแบบมาสำหรับงานประมวลผลภาพ โดยมีคุณสมบัติดังนี้:

  • กล้อง OV2640 : รองรับการจับภาพความละเอียดสูงสุดที่ 2 ล้านพิกเซล (UXGA)
  • Wi-Fi และ Bluetooth : รองรับการเชื่อมต่อไร้สายเพื่อส่งข้อมูลไปยังแพลตฟอร์มออนไลน์
  • ทรัพยากรที่เหมาะสม : มี RAM และ Flash Memory เพียงพอสำหรับการประมวลผลภาพและการทำงานแบบ IoT
  • ขนาดเล็กและประหยัดพลังงาน : เหมาะสำหรับโครงการที่ต้องการความกะทัดรัดและความยืดหยุ่น

2. ขั้นตอนการใช้งาน ESP32-S3 Cam

2.1 ติดตั้งเฟิร์มแวร์ MicroPython

ESP32-S3 Cam ไม่มีเฟิร์มแวร์ MicroPython มาจากโรงงาน ดังนั้นจำเป็นต้องแฟลชเฟิร์มแวร์ MicroPython ลงบนโมดูล:

  1. ดาวน์โหลดเฟิร์มแวร์ MicroPython สำหรับ ESP32-S3 จาก MicroPython Official Website
  2. ใช้ esptool.py เพื่อล้างเฟิร์มแวร์เดิมและแฟลชเฟิร์มแวร์ใหม่:
    esptool.py --port <COM_PORT> erase_flash
    esptool.py --port <COM_PORT> write_flash 0x1000 <path_to_firmware.bin>

2.2 เชื่อมต่อ ESP32-S3 Cam กับคอมพิวเตอร์

  • ใช้สาย USB เพื่อเชื่อมต่อ ESP32-S3 Cam กับคอมพิวเตอร์
  • เปิด Thonny IDE และเลือกพอร์ตที่ถูกต้องในเมนู "Port"

3. โค้ดพื้นฐานสำหรับการจับภาพ

ESP32-S3 Cam สามารถใช้งานร่วมกับไลบรารี camera เพื่อจับภาพและบันทึกเป็นไฟล์ภาพ ตัวอย่างโค้ดพื้นฐานสำหรับการจับภาพ:

import camera
import time
# ตั้งค่ากล้อง
camera.init(0, format=camera.JPEG)
# จับภาพ
img = camera.capture()
# บันทึกภาพลงในไฟล์
with open("image.jpg", "wb") as f:
f.write(img)
print("Image captured and saved as image.jpg")
# ปิดกล้อง
camera.deinit()

4. การส่งรูปภาพไปยัง Telegram

เมื่อได้ภาพจาก ESP32-S3 Cam แล้ว ขั้นตอนต่อไปคือการส่งภาพไปยัง Telegram โดยใช้ API ของ Telegram Bot ตัวอย่างโค้ดสำหรับการส่งภาพ:

import urequests
import camera
# ตั้งค่า Telegram Bot
TELEGRAM_BOT_TOKEN = "your_bot_token"
CHAT_ID = "your_chat_id"
# ตั้งค่ากล้อง
camera.init(0, format=camera.JPEG)
# จับภาพ
img = camera.capture()
# ส่งภาพไปยัง Telegram
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto"
files = {"photo": img}
data = {"chat_id": CHAT_ID}
response = urequests.post(url, data=data, files=files)
# ตรวจสอบผลลัพธ์
if response.status_code == 200:
print("Image sent successfully")
else:
print("Failed to send image")
# ปิดกล้อง
camera.deinit()

5. แนวทางการพัฒนา Project

5.1 การจับภาพเมื่อตรวจพบเหตุการณ์

  • ใช้เซ็นเซอร์ PIR (Passive Infrared Sensor) เพื่อตรวจจับการเคลื่อนไหว
  • เมื่อตรวจพบการเคลื่อนไหว ให้ ESP32-S3 Cam จับภาพและส่งไปยัง Telegram

5.2 การประมวลผลภาพเบื้องต้น

  • ใช้ไลบรารี AI หรือ Machine Learning บน ESP32-S3 Cam เพื่อประมวลผลภาพ เช่น การตรวจจับใบหน้าหรือวัตถุ
  • ส่งผลลัพธ์ของการประมวลผล (เช่น แจ้งเตือนว่าพบคน) ไปยัง Telegram

5.3 การสร้างระบบตรวจสอบระยะไกล

  • พัฒนาแอปพลิเคชันที่ผู้ใช้สามารถส่งคำสั่งผ่าน Telegram เพื่อให้ ESP32-S3 Cam จับภาพ และ ส่งกลับมา
  • ใช้คำสั่ง /capture ใน Telegram เพื่อควบคุมการจับภาพ

6. ข้อควรระวัง

  • การจัดการหน่วยความจำ : ESP32-S3 Cam มี RAM และ Flash Memory จำกัด ดังนั้นควรหลีกเลี่ยงการประมวลผลภาพที่ซับซ้อนเกินไป
  • การจัดการพลังงาน : หากใช้งานนอกสถานที่ ควรพิจารณาแหล่งพลังงานที่เหมาะสม เช่น แบตเตอรี่ LiPo
  • ความปลอดภัย : ตรวจสอบความปลอดภัยของข้อมูลที่ส่งผ่าน Telegram โดยเฉพาะหากใช้งานในโครงการที่เกี่ยวข้องกับความเป็นส่วนตัว

7. สรุป

ESP32-S3 Cam เป็นโมดูลที่ทรงพลังสำหรับการจับภาพและส่งข้อมูลรูปภาพผ่านแพลตฟอร์มออนไลน์ เช่น Telegram การเริ่มต้นพัฒนาโครงการด้วย ESP32-S3 Cam จะช่วยให้ผู้เรียนสามารถสร้างระบบตรวจสอบระยะไกลที่ตอบโจทย์การใช้งานได้อย่างมีประสิทธิภาพ ไม่ว่าจะเป็นการตรวจสอบความปลอดภัย การจับภาพเหตุการณ์สำคัญ หรือการใช้งานในโครงการเกษตรกรรมอัจฉริยะ การฝึกฝนและทดลองกับโค้ดพื้นฐานจะช่วยให้ผู้เรียนเข้าใจการทำงานของ ESP32-S3 Cam และนำไปปรับใช้ในโครงการที่ซับซ้อนมากขึ้นได้ในอนาคต

🌍 ระบบแจ้งเตือนแผ่นดินไหวผ่าน Telegram ด้วย Circuit Python

Bot ส่วนตัว https://t.me/DisasterwarningBot

กลุ่มแจ้งเตือน https://t.me/disasterThaiAleart

✨ ประโยชน์ของการใช้ Telegram สำหรับแจ้งเตือน

🚀 ความเร็วสูง

Telegram ส่งข้อความแบบ Real-time รับการแจ้งเตือนทันทีที่เกิดแผ่นดินไหว ไม่มีดีเลย์

📱 รับการแจ้งเตือนทุกที่

เปิดแจ้งเตือนบนมือถือได้ทั้ง iOS และ Android แม้ไม่ได้เปิดแอปก็รับการแจ้งเตือนได้ (Push Notification)

👥 แชร์ข้อมูลแบบกลุ่ม

ส่งการแจ้งเตือนไปยังกลุ่มหรือช่อง Telegram ได้ไม่จำกัดผู้รับ สามารถเพิ่มสมาชิกในองค์กรให้รับข้อมูลพร้อมกัน

🔗 รองรับลิงก์และ Rich Media

แทรกรูปภาพ, ลิงก์ไปยัง USGS, หรือแมปตำแหน่งแผ่นดินไหวได้ในข้อความเดียว

📨 ตัวอย่างการแจ้งเตือนใน Telegram

⚠️ EMERGENCY: Earthquake Alert ⚠️

📍 Location: 50 km NE of Chiang Rai, Thailand

📏 Magnitude: 5.8 (Richter Scale)

🔻 Depth: 12.3 km

⏰ Time: 2024-06-21 08:45:22 (UTC+7)

🔗 View on USGS Website

* ระบบจะส่งข้อความรูปแบบนี้ไปยังกลุ่ม Telegram ทันทีที่ตรวจพบแผ่นดินไหวขนาด 5.0+

# -*- coding: utf-8 -*-
# Circuit Python For ESP32_S3
import wifi
import socketpool
import ssl
import adafruit_requests
import time
import json

# ==================== ตั้งค่าการเชื่อมต่อ ====================
# 1. ตั้งค่า WiFi
SSID = " Your wifi ID"
PASSWORD = "Your wifi Password"

# 2. ตั้งค่า Telegram Bot
BOT_TOKEN = "Your telegram Token"  # เช่น "1234567890:ABCdefGhIJKlmNoPQRsTUVwxyZ"
CHAT_ID = "Your Chat ID"  # ID ของกลุ่ม/ช่อง (ต้องมีเครื่องหมาย -)

# ==================== ฟังก์ชันหลัก ====================
def connect_wifi():
    """เชื่อมต่อ WiFi"""
    print("🔌 กำลังเชื่อมต่อ WiFi...")
    wifi.radio.connect(SSID, PASSWORD)
    print("✅ เชื่อมต่อ WiFi สำเร็จ! IP:", wifi.radio.ipv4_address)

def format_time(ms):
    """แปลงเวลาจากมิลลิวินาทีเป็นรูปแบบอ่านง่าย"""
    t = time.localtime(ms // 1000)
    return f"{t.tm_year}-{t.tm_mon:02d}-{t.tm_mday:02d} {t.tm_hour:02d}:{t.tm_min:02d}:{t.tm_sec:02d} (UTC+7)"

def send_telegram(message):
    """ส่งข้อความไปยัง Telegram"""
    url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage"
    payload = {
        "chat_id": CHAT_ID,
        "text": message,
        "parse_mode": "Markdown"  # ใช้ Markdown เพื่อจัดรูปแบบ
    }
    try:
        response = requests.post(url, json=payload)
        print(f"📤 ส่งข้อความสำเร็จ (Status: {response.status_code})")
        response.close()
    except Exception as e:
        print(f"❌ เกิดข้อผิดพลาดขณะส่งข้อความ: {e}")

def check_earthquake():
    """ตรวจสอบแผ่นดินไหวจาก USGS API"""
    global last_quake_id
    
    # ตั้งค่า API (ตรวจสอบแผ่นดินไหวขนาด 5.0 ขึ้นไป ล่าสุด 1 เหตุการณ์)
    url = "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&minmagnitude=5.0&limit=1"
    
    try:
        print("🔍 กำลังตรวจสอบแผ่นดินไหว...")
        response = requests.get(url)
        data = response.json()
        response.close()

        if data.get("features"):
            quake = data["features"][0]
            quake_id = quake["id"]  # ID ของเหตุการณ์
            
            # เช็คหากเป็นเหตุการณ์เดิม
            if quake_id == last_quake_id:
                print("⏭️ ไม่มีแผ่นดินไหวใหม่")
                return
            
            last_quake_id = quake_id  # อัปเดต ID ล่าสุด
            
            # ดึงข้อมูลแผ่นดินไหว
            props = quake["properties"]
            place = props.get("place", "ไม่ทราบตำแหน่ง")
            mag = props.get("mag", "N/A")
            time_ms = props.get("time", 0)
            depth = quake["geometry"]["coordinates"][2]  # ความลึก (km)
            
            # สร้างข้อความแจ้งเตือน (รูปแบบ Markdown)
            alert_msg = f"""
⚠️ **แจ้งเตือนแผ่นดินไหว** ⚠️
📍 **ตำแหน่ง**: {place}
📏 **ขนาด**: {mag} ริกเตอร์
🔻 **ความลึก**: {depth} กม.
⏰ **เวลา**: {format_time(time_ms)}
[ดูรายละเอียดเพิ่มเติม](https://earthquake.usgs.gov/earthquakes/eventpage/{quake_id})
            """
            send_telegram(alert_msg.strip())
        else:
            print("🌍 ไม่พบข้อมูลแผ่นดินไหวล่าสุด")
            
    except Exception as e:
        print(f"❌ เกิดข้อผิดพลาดขณะตรวจสอบ: {e}")

# ==================== การทำงานหลัก ====================
# เชื่อมต่อ WiFi
connect_wifi()

# สร้าง Session สำหรับการเชื่อมต่อ
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())

# ตัวแปรเก็บ ID ของแผ่นดินไหวล่าสุด
last_quake_id = None

# แจ้งเตือนเมื่อระบบเริ่มทำงาน
send_telegram("🟢 **ระบบตรวจสอบแผ่นดินไหวเริ่มทำงานแล้ว**")

# วนลูปตรวจสอบทุก 10 นาที
while True:
    check_earthquake()
    print(f"⏳ พัก 10 นาทีก่อนตรวจสอบอีกครั้ง...\n")
    time.sleep(600)  # 10 นาที = 600 วินาที

💡 ทำไมต้องใช้ Telegram แจ้งเตือนแผ่นดินไหว?

เพราะ Telegram มีระบบแจ้งเตือนที่เสถียร รองรับการใช้งานแบบกลุ่มฟรี และสามารถส่งข้อมูลได้รวดเร็วแม้ในสภาวะเครือข่ายไม่穩定 เหมาะสำหรับระบบแจ้งเตือนภัยพิบัติที่ต้องพึ่งพาความเร็ว!

Non-GUI (Command-Line Interface) และ GUI (Graphical User Interface) โดยทั้งสองส่วนนี้ใช้คลาส UARTHandler เป็นแกนกลางในการจัดการการสื่อสารผ่านพอร์ตอนุกรม (UART) อย่างไรก็ตาม การใช้งานในสองรูปแบบนี้มีความแตกต่างกันในแง่ของการออกแบบและการทำงานของโปรแกรม มาดูรายละเอียดและวิเคราะห์แต่ละประเด็นที่คุณสงสัยกันครับ


1. การใช้งาน self ในคลาส UARTHandler

บทบาทของ self

  • Attributes :

    • self.port, self.baudrate, self.buffer, self.data_queue, และอื่น ๆ เป็น attributes ของคลาส UARTHandler
    • Attributes เหล่านี้ถูกกำหนดในคอนสตรักเตอร์ (__init__) และสามารถเรียกใช้งานได้ใน methods อื่น ๆ เช่น _read_loop, send_data, get_received_line, และ close
  • Methods :

    • การใช้ self.method_name() เช่น self.send_data(data) ช่วยให้ methods ภายในคลาสสามารถเข้าถึง attributes และ methods อื่น ๆ ได้
    • การใช้ self ยังช่วยให้ threads (เช่น _read_loop) และ callbacks สามารถทำงานร่วมกับ attributes ของคลาสได้อย่างปลอดภัย

ตัวอย่างที่สำคัญ :

python
def _read_loop(self):
while not self.stop_thread.is_set():
if self.uart.is_open and self.uart.in_waiting:
data = self.uart.read(self.uart.in_waiting).decode('utf-8', errors='ignore')
self.buffer += data
  • self.uart คือ attribute ที่ถูกกำหนดใน __init__ และถูกใช้งานใน _read_loop
  • self.buffer เป็นตัวแปรที่เก็บข้อมูลที่อ่านมาจาก UART และสามารถถูกเรียกใช้งานใน methods อื่น ๆ

2. ความแตกต่างระหว่าง GUI และ Non-GUI

Non-GUI (serial_monitor.py)

  • การทำงาน :

    • โปรแกรมทำงานบน command-line interface (CLI) โดยไม่มีหน้าจอกราฟิก
    • ใช้ input() เพื่อรับข้อมูลจากผู้ใช้ และแสดงผลทาง terminal
    • การรับข้อมูลจาก UART ทำผ่าน thread (receive_loop) เพื่อให้สามารถรับข้อมูลแบบ real-time โดยไม่บล็อกการทำงานของ main loop
  • การใช้ UARTHandler :

    • UARTHandler ถูกสร้างเป็นออบเจ็กต์เพื่อจัดการการสื่อสาร UART
    • Thread (receive_loop) ใช้ method get_received_line จาก UARTHandler เพื่อรับข้อมูลจาก queue (data_queue) และแสดงผลใน terminal

GUI (serial_monitor_gui.py)

  • การทำงาน :

    • โปรแกรมแสดงผลผ่าน graphical user interface (GUI) โดยใช้ไลบรารี ttkbootstrap
    • มี widget เช่น dropdown menu, text box, และ button เพื่อให้ผู้ใช้สามารถเลือกพอร์ต, baud rate, ส่งข้อมูล, และดูข้อความที่รับ-ส่งได้
    • การรับข้อมูลจาก UART ทำผ่าน thread (receive_loop) เช่นเดียวกับ Non-GUI แต่ข้อมูลจะถูกแสดงผลใน widget (output_box) แทนที่จะเป็น terminal
  • การใช้ UARTHandler :

    • UARTHandler ถูกสร้างเป็นออบเจ็กต์เมื่อผู้ใช้กดปุ่ม "Connect"
    • Thread (receive_loop) ใช้ method get_received_line จาก UARTHandler เพื่อรับข้อมูลจาก queue และแสดงผลใน output_box

3. ประเด็นที่กล่าวถึง

(1) การใช้ with เพื่อเปิด/ปิดพอร์ตอย่างปลอดภัย

  • Non-GUI :

    • ใน serial_monitor.py การปิดพอร์ตทำผ่าน method close() ของ UARTHandler
    • การใช้ with อาจไม่จำเป็นในกรณีนี้ เพราะโปรแกรมทำงานแบบ CLI และมีการจัดการทรัพยากรผ่าน try-finally หรือ finally block
  • GUI :

    • ใน serial_monitor_gui.py การปิดพอร์ตทำผ่าน method disconnect() ซึ่งเรียก close() ของ UARTHandler
    • การใช้ with อาจเหมาะสำหรับการจัดการทรัพยากรในบางกรณี เช่น เมื่อต้องการเปิด/ปิดพอร์ตเฉพาะในขอบเขตบางส่วนของโปรแกรม

(2) การจัดการ Logging

  • Non-GUI :

    • Logging ถูกแสดงผลใน terminal และบันทึกลงไฟล์ uart_debug.log
    • การใช้ logging.basicConfig กำหนด format และ handler สำหรับ logging
  • GUI :

    • Logging ถูกแสดงผลใน widget (output_box) และบันทึกลงไฟล์ uart_debug.log
    • การแสดงผลใน GUI ทำผ่าน method append_output ซึ่งอัปเดตข้อความใน output_box

(3) การใช้ Queue

  • Non-GUI :

    • data_queue ถูกใช้เพื่อรับข้อมูลจาก thread (_read_loop) และส่งข้อมูลกลับไปยัง main loop
    • Main loop ใช้ get_received_line เพื่อรับข้อมูลจาก queue และแสดงผลใน terminal
  • GUI :

    • data_queue ถูกใช้ในลักษณะเดียวกัน แต่ข้อมูลจะถูกแสดงผลใน output_box แทนที่จะเป็น terminal

(4) การรองรับ ASCII และ Hex

  • Non-GUI :

    • Method send_data ตรวจสอบว่าข้อมูลที่ส่งเป็น ASCII หรือ Hex (0x...)
    • หากเป็น Hex จะแปลงข้อมูลด้วย bytes.fromhex ก่อนส่งออก
  • GUI :

    • การรองรับ ASCII และ Hex ทำในลักษณะเดียวกัน โดยใช้ method send_data ของ UARTHandler

(5) การใช้งานในโปรเจกต์ใหญ่

  • Modular Design :
    • UARTHandler ถูกออกแบบให้เป็นโมดูลแยกต่างหาก เพื่อให้สามารถนำกลับมาใช้ใหม่ได้ในโปรเจกต์อื่น
    • การใช้งานใน Non-GUI และ GUI แสดงให้เห็นถึงความยืดหยุ่นของคลาสนี้

สรุปความแตกต่างระหว่าง GUI และ Non-GUI

Feature
Non-GUI
GUI
Interface
Command-line interface (CLI)
Graphical user interface (GUI)
Input/Output
Terminal input/output
Widgets (dropdown, text box, button)
Thread Management
Thread (receive_loop)
Thread (receive_loop)
Logging
Terminal + File
Widget (output_box) + File
Queue Usage
data_queue
data_queue
Port Management
Manual (close())
Manual (connect/disconnect)

Non GUI

# serial_monitor.py

import threading
import time
from uart_handler import UARTHandler

def receive_loop(uart: UARTHandler):
    """
    รับข้อมูลจาก UART แบบ Real-time แล้วแสดงออกหน้าจอ
    """
    try:
        while True:
            line = uart.get_received_line(timeout=0.1)
            if line:
                print(f"\033[92m<<< {line}\033[0m")  # สีเขียวสำหรับข้อความขาเข้า
    except Exception as e:
        print(f"Receive thread error: {e}")

def main():
    port = 'COM11'         # เปลี่ยนตามพอร์ตของคุณ
    baudrate = 115200
    buffer_size = 2048

    try:
        uart = UARTHandler(port, baudrate, buffer_size)
    except Exception as e:
        print(f"Failed to initialize UART: {e}")
        return

    print(f"=== Serial Monitor (baud: {baudrate}) ===")
    print("Type and press Enter to send. Press Ctrl+C to exit.\n")

    # Start receiving thread
    receiver = threading.Thread(target=receive_loop, args=(uart,), daemon=True)
    receiver.start()

    try:
        while True:
            user_input = input(">>> ")  # สีขาวสำหรับข้อความขาออก
            if user_input.lower() == "exit":
                break
            uart.send_data(user_input)
    except KeyboardInterrupt:
        print("\nExiting...")
    finally:
        uart.close()

if __name__ == "__main__":
    main()

Python GUI

# serial_monitor_gui.py

import ttkbootstrap as ttk
from ttkbootstrap.constants import *
import serial.tools.list_ports
import threading
from uart_handler import UARTHandler

class SerialMonitorApp:
    def __init__(self, root):
        self.root = root
        self.root.title("Python Serial Monitor")

        self.uart = None
        self.receiver_thread = None
        self.running = False

        self.port_var = ttk.StringVar()
        self.baud_var = ttk.IntVar(value=115200)
        self.input_var = ttk.StringVar()

        self.setup_widgets()

    def setup_widgets(self):
        frame_top = ttk.Frame(self.root, padding=10)
        frame_top.pack(fill=X)

        # Serial port dropdown
        ttk.Label(frame_top, text="Port:").pack(side=LEFT, padx=(0, 5))
        self.port_combo = ttk.Combobox(frame_top, textvariable=self.port_var, width=15)
        self.port_combo.pack(side=LEFT, padx=5)
        self.refresh_ports()

        # Baud rate
        ttk.Label(frame_top, text="Baudrate:").pack(side=LEFT, padx=(10, 5))
        self.baud_entry = ttk.Entry(frame_top, textvariable=self.baud_var, width=10)
        self.baud_entry.pack(side=LEFT, padx=5)

        # Connect button
        self.connect_btn = ttk.Button(frame_top, text="Connect", command=self.toggle_connection, bootstyle=SUCCESS)
        self.connect_btn.pack(side=LEFT, padx=10)

        # Output box
        self.output_box = ttk.ScrolledText(self.root, height=20, wrap='word')
        self.output_box.pack(fill=BOTH, expand=True, padx=10, pady=(5, 0))
        self.output_box.configure(state='disabled')

        # Send box
        frame_bottom = ttk.Frame(self.root, padding=10)
        frame_bottom.pack(fill=X)

        self.input_entry = ttk.Entry(frame_bottom, textvariable=self.input_var)
        self.input_entry.pack(side=LEFT, fill=X, expand=True, padx=(0, 5))
        self.input_entry.bind("<Return>", self.send_data)

        self.send_btn = ttk.Button(frame_bottom, text="Send", command=self.send_data)
        self.send_btn.pack(side=LEFT)

    def refresh_ports(self):
        ports = serial.tools.list_ports.comports()
        port_list = [port.device for port in ports]
        self.port_combo['values'] = port_list
        if port_list:
            self.port_combo.current(0)

    def toggle_connection(self):
        if self.uart:
            self.disconnect()
        else:
            self.connect()

    def connect(self):
        port = self.port_var.get()
        baud = self.baud_var.get()
        if not port:
            self.show_message("No COM port selected!", "error")
            return
        try:
            self.uart = UARTHandler(port, baud)
            self.running = True
            self.connect_btn.config(text="Disconnect", bootstyle=DANGER)
            self.start_receiver()
        except Exception as e:
            self.show_message(f"Connection failed: {e}", "error")

    def disconnect(self):
        self.running = False
        if self.uart:
            self.uart.close()
            self.uart = None
        self.connect_btn.config(text="Connect", bootstyle=SUCCESS)
        self.show_message("Disconnected", "info")

    def start_receiver(self):
        self.receiver_thread = threading.Thread(target=self.receive_loop, daemon=True)
        self.receiver_thread.start()

    def receive_loop(self):
        while self.running and self.uart:
            line = self.uart.get_received_line(timeout=0.1)
            if line:
                self.append_output(f"[RX] {line}")
        
    def send_data(self, event=None):
        msg = self.input_var.get().strip()
        if self.uart and msg:
            self.uart.send_data(msg)
            self.append_output(f"[TX] {msg}")
            self.input_var.set("")

    def append_output(self, text):
        self.output_box.configure(state='normal')
        self.output_box.insert('end', text + '\n')
        self.output_box.see('end')
        self.output_box.configure(state='disabled')

    def show_message(self, text, level="info"):
        if level == "error":
            ttk.messagebox.showerror("Error", text)
        else:
            ttk.messagebox.showinfo("Info", text)

def main():
    app = ttk.Window(themename="cyborg", title="Serial Monitor", size=(600, 500))
    SerialMonitorApp(app)
    app.mainloop()

if __name__ == "__main__":
    main()

บทนำ การแจ้งเตือนเหตุแผ่นดินไหว

แผ่นดินไหวเป็นภัยธรรมชาติที่เกิดขึ้นได้ทุกเมื่อและส่งผลกระทบอย่างรุนแรงต่อชีวิต และ ทรัพย์สิน การรับรู้ข้อมูลแผ่นดินไหวอย่างรวดเร็วจึงมีความสำคัญอย่างยิ่ง ในบทความนี้ ขอแนะนำ แอปพลิเคชันตรวจสอบการแจ้งเตือนเหตุแผ่นดินไหว ที่พัฒนาขึ้นด้วยภาษา Python โดยใช้ API จาก USGS (United States Geological Survey) ซึ่งเป็นแหล่งข้อมูลแผ่นดินไหวที่น่าเชื่อถือและอัปเดตแบบเรียลไทม์ 

Telegram Bot https://t.me/DisasterwarningBot

หรือ กลุ่ม https://t.me/disasterThaiAleart

วัตถุประสงค์ของแอปพลิเคชัน

  • ดึงข้อมูลแผ่นดินไหวล่าสุดจาก USGS Earthquake API
  • กรองข้อมูลตามช่วงเวลา, ขนาดแผ่นดินไหว (แมกนิจูด), และจำนวนเหตุการณ์ที่ต้องการ
  • แสดงผลข้อมูลในรูปแบบที่อ่านง่าย เพื่อให้ผู้ใช้สามารถรับทราบข้อมูลแผ่นดินไหวได้อย่างรวดเร็ว

เทคโนโลยีและเครื่องมือที่ใช้

  • ภาษา Python – ใช้สำหรับเขียนโค้ดหลักของแอปพลิเคชัน
  • Requests Library – สำหรับส่ง HTTP Request ไปยัง USGS API
  • USGS Earthquake API – ให้ข้อมูลแผ่นดินไหวในรูปแบบ GeoJSON
  • Datetime Module – สำหรับจัดการและจัดรูปแบบเวลา

ขั้นตอนการทำงาน

1. การเชื่อมต่อกับ USGS API

import requests
from datetime import datetime

def fetch_earthquake_data(start_time=None, end_time=None, min_magnitude=None, max_magnitude=None, limit=10):
    base_url = "https://earthquake.usgs.gov/fdsnws/event/1/query"
    params = {
        "format": "geojson",
        "starttime": start_time,
        "endtime": end_time,
        "minmagnitude": min_magnitude,
        "maxmagnitude": max_magnitude,
        "limit": limit
    }
    response = requests.get(base_url, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error: Unable to fetch data. Status code: {response.status_code}")
        return None

2. การแสดงผลข้อมูลแผ่นดินไหว

def display_earthquake_info(data):
    if not data or "features" not in data:
        print("No earthquake data available.")
        return

    print("Earthquake Information:")
    print("-" * 40)
    for feature in data["features"]:
        properties = feature.get("properties", {})
        geometry = feature.get("geometry", {})
        
        place = properties.get("place", "N/A")
        magnitude = properties.get("mag", "N/A")
        time = properties.get("time", "N/A")
        coordinates = geometry.get("coordinates", [None, None, None])

        if time and isinstance(time, int):
            time_formatted = datetime.utcfromtimestamp(time / 1000).strftime('%Y-%m-%d %H:%M:%S')
        else:
            time_formatted = "N/A"

        print(f"Place: {place}")
        print(f"Magnitude: {magnitude}")
        print(f"Time: {time_formatted}")
        print(f"Coordinates: Latitude={coordinates[1]}, Longitude={coordinates[0]}, Depth={coordinates[2]} km")
        print("-" * 40)

3. ตัวอย่างการใช้งาน

if __name__ == "__main__":
    start_time = "2025-03-28T00:00:00"
    end_time = "2025-03-28T20:59:59"
    min_magnitude = 5.0  # แสดงเฉพาะแผ่นดินไหวขนาด 5.0 ขึ้นไป
    limit = 5  # แสดงผลล่าสุด 5 เหตุการณ์

    earthquake_data = fetch_earthquake_data(
        start_time=start_time,
        end_time=end_time,
        min_magnitude=min_magnitude,
        limit=limit
    )

    if earthquake_data:
        display_earthquake_info(earthquake_data)

ประโยชน์ของแอปพลิเคชัน

  • รับข้อมูลแบบ Real-time – สามารถตรวจสอบแผ่นดินไหวล่าสุดได้ทันที
  • ปรับแต่งการค้นหาได้ – สามารถกำหนดช่วงเวลาและขนาดแผ่นดินไหวที่ต้องการ
  • ใช้งานง่าย – แสดงผลข้อมูลในรูปแบบที่เข้าใจง่าย

แนวทางการพัฒนาในอนาคต

  • เพิ่มการแจ้งเตือนแบบ Real-time (ผ่าน Email, LINE, หรือ SMS)
  • แสดงผลบนแผนที่ โดยใช้ไลบรารีเช่น Folium หรือ Google Maps API
  • พัฒนาเป็นเว็บแอปหรือโมบายแอป เพื่อให้เข้าถึงได้สะดวกยิ่งขึ้น

สรุป

แอปพลิเคชันตรวจสอบการแจ้งเตือนเหตุแผ่นดินไหวนี้เป็นตัวอย่างที่ดีของการนำ API ข้อมูลเปิด (Open Data API) มาใช้ประโยชน์ โดยใช้ Python ซึ่งเป็นภาษาที่มีความยืดหยุ่นสูง หากคุณสนใจสามารถนำโค้ดนี้ไปพัฒนาต่อยอดได้ตามต้องการ เพื่อเพิ่มประสิทธิภาพในการรับมือกับภัยแผ่นดินไหวได้ดียิ่งขึ้น

Micropython ESP32

import urequests
import utime
import ujson

def get_current_utc_time():
    """
    Get current UTC time in ISO format (YYYY-MM-DDTHH:MM:SS) using utime
    """
    now = utime.gmtime()
    return "{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}".format(
        now[0], now[1], now[2], now[3], now[4], now[5]
    )

def format_timestamp(timestamp_ms):
    """
    Format UNIX timestamp in milliseconds to readable string
    """
    if not timestamp_ms:
        return "N/A"
    timestamp_sec = timestamp_ms // 1000
    t = utime.localtime(timestamp_sec)
    return "{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(
        t[0], t[1], t[2], t[3], t[4], t[5]
    )

def fetch_earthquake_data(start_time=None, end_time=None, min_magnitude=None, max_magnitude=None, limit=10):
    """
    Fetch earthquake data from USGS API
    """
    base_url = "https://earthquake.usgs.gov/fdsnws/event/1/query"
    params = {
        "format": "geojson",
        "limit": limit
    }
    
    if start_time:
        params["starttime"] = start_time
    if end_time:
        params["endtime"] = end_time
    if min_magnitude:
        params["minmagnitude"] = min_magnitude
    if max_magnitude:
        params["maxmagnitude"] = max_magnitude
    
    try:
        response = urequests.get(base_url, params=params)
        if response.status_code == 200:
            data = ujson.loads(response.text)
            response.close()
            return data
        else:
            print("Error:", response.status_code)
            response.close()
            return None
    except Exception as e:
        print("Request failed:", e)
        return None

def display_earthquake_info(data):
    """
    Display earthquake information
    """
    if not data or "features" not in data or not data["features"]:
        print("No earthquake data available")
        return
    
    print("\nEarthquake Information:")
    print("-" * 40)
    for feature in data["features"]:
        props = feature.get("properties", {})
        geom = feature.get("geometry", {})
        coords = geom.get("coordinates", [])
        
        print("ID:", feature.get("id", "N/A"))
        print("Place:", props.get("place", "N/A"))
        print("Magnitude:", props.get("mag", "N/A"))
        print("Time:", format_timestamp(props.get("time")))
        if len(coords) >= 2:
            print("Location: {:.2f}°N, {:.2f}°E".format(coords[1], coords[0]))
        if len(coords) >= 3:
            print("Depth: {:.1f} km".format(coords[2]))
        print("-" * 40)

# Main loop
print("Starting earthquake monitor...")
while True:
    print("\nChecking for earthquakes...")
    data = fetch_earthquake_data(
        start_time=get_current_utc_time(),
        min_magnitude=5.0,
        limit=5
    )
    
    if data:
        display_earthquake_info(data)
    else:
        print("Failed to fetch data")
    
    print("Waiting 30 seconds...")
    utime.sleep(30)

ของแถม . สำหรับท่านที่สนใจและใช้ Arduino

#include <WiFi.h>
#include <WiFiClientSecure.h>
#include <ArduinoJson.h>

// WiFi Credentials
const char* ssid = "YOUR_WIFI_SSID";
const char* password = "YOUR_WIFI_PASSWORD";

// USGS API URL
const char* usgsApiUrl = "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&limit=1&minmagnitude=5";

void setup() {
  // Initialize Serial Monitor
  Serial.begin(115200);

  // Connect to WiFi
  WiFi.begin(ssid, password);
  Serial.println("Connecting to WiFi...");
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }
  Serial.println("\nWiFi connected");
}

void loop() {
  // Fetch Earthquake Data
  WiFiClientSecure client;
  client.setInsecure(); // Disable SSL certificate verification (not recommended for production)
  if (!client.connect("earthquake.usgs.gov", 443)) {
    Serial.println("Connection to API failed");
    return;
  }

  // Send GET Request
  client.print(String("GET ") + usgsApiUrl + " HTTP/1.1\r\n" +
               "Host: earthquake.usgs.gov\r\n" +
               "Connection: close\r\n\r\n");

  // Read Response
  String payload = "";
  while (client.connected() || client.available()) {
    if (client.available()) {
      String line = client.readStringUntil('\n');
      payload += line;
    }
  }
  client.stop();

  // Parse JSON
  DynamicJsonDocument doc(1024);
  DeserializationError error = deserializeJson(doc, payload);
  if (error) {
    Serial.println("JSON parsing failed");
    return;
  }

  // Extract Earthquake Data
  JsonObject properties = doc["features"][0]["properties"];
  const char* place = properties["place"];
  float magnitude = properties["mag"];
  long time = properties["time"];

  // Convert Time
  time_t epochTime = time / 1000;
  struct tm* timeInfo = gmtime(&epochTime);
  char timeString[20];
  strftime(timeString, sizeof(timeString), "%Y-%m-%d %H:%M:%S", timeInfo);

  // Display on Serial Monitor
  Serial.println("Earthquake Alert!");
  Serial.println("----------------");
  Serial.print("Place: ");
  Serial.println(place);
  Serial.print("Magnitude: ");
  Serial.println(magnitude);
  Serial.print("Time: ");
  Serial.println(timeString);
  Serial.println("");

  // Wait before next check
  delay(60000); // Check every 60 seconds
}


การสร้าง และ กำหนดค่า Telegram Bot

Telegram Bot เป็นส่วนสำคัญที่ช่วยให้คุณสามารถโต้ตอบกับสมาชิกในกลุ่มได้ ขั้นตอนการสร้างและกำหนดค่ามีดังนี้:

สร้าง Telegram Bot:

  • พูดคุยกับ @BotFather บน Telegram
  • ใช้คำสั่ง /newbot เพื่อสร้างบอทใหม่ และบันทึก Token ที่ได้รับไว้

เปิดโหมด Group Privacy:

หากต้องการให้บอทสามารถอ่านข้อความทั่วไปในกลุ่ม (ไม่จำกัดเฉพาะคำสั่ง) ให้ปิดโหมด Privacy:

  • ส่งคำสั่ง /setprivacy ไปยัง BotFather
  • เลือกบอทของคุณ และตั้งค่าเป็น Disable

เพิ่มบอทเข้ากลุ่ม:

  • เชิญบอทเข้ากลุ่มตามขั้นตอนปกติของ Telegram

เชื่อมโยง Joomla กับ Telegram Bot

การเชื่อมโยง Joomla กับ Telegram Bot ต้องอาศัยการเขียนโค้ดเพื่อให้บอทสามารถโต้ตอบกับสมาชิกในกลุ่มได้ โดยใช้ Telegram Bot API ในการรับ-ส่งข้อมูล

ขั้นตอนการเชื่อมโยง:

สร้าง Webhook สำหรับ Telegram Bot:

  • Webhook คือ URL ที่ Telegram จะส่งข้อมูล (Update) เมื่อมีการโต้ตอบในกลุ่ม
  • สร้างไฟล์ PHP เช่น telegram-bot.php บนเว็บไซต์ของคุณ

ตั้งค่า Webhook:

curl -F "url=https://yourdomain.com/telegram-bot.php" https://api.telegram.org/bot<YourBotToken>/setWebhook

ตัวอย่างโค้ด PHP รับ-ส่งข้อความ:

<?php
// รับข้อมูลจาก Telegram
$content = file_get_contents("php://input");
$update = json_decode($content, true);

// ตรวจสอบข้อความ
if (isset($update['message'])) {
  $chat_id = $update['message']['chat']['id'];
  $text = $update['message']['text'];

  if ($text == "/start") {
    $reply = "สวัสดีครับ! ผมคือบอทของคุณ";
  } else {
    $reply = "คุณพิมพ์: " . $text;
  }

  // ส่งข้อความกลับไปยัง Telegram
  $url = "https://api.telegram.org/bot<YourBotToken>/sendMessage?chat_id=$chat_id&text=" . urlencode($reply);
  file_get_contents($url);
}
?>

ตัวอย่างโค้ด Python รับ-ส่งข้อความ:

import requests

TOKEN = '<YourBotToken>'
BASE_URL = f'https://api.telegram.org/bot{TOKEN}'

def send_message(chat_id, text):
    url = f'{BASE_URL}/sendMessage'
    payload = {'chat_id': chat_id, 'text': text}
    requests.post(url, json=payload)

def get_updates(offset=None):
    url = f'{BASE_URL}/getUpdates'
    response = requests.get(url)
    return response.json()

# ตัวอย่างเรียกใช้
if __name__ == '__main__':
    updates = get_updates()
    for update in updates['result']:
        chat_id = update['message']['chat']['id']
        text = update['message']['text']
        send_message(chat_id, f'คุณพิมพ์: {text}')

ทดสอบการทำงาน:

  • ส่งข้อความในกลุ่ม Telegram และตรวจสอบว่าบอทตอบกลับถูกต้องหรือไม่
  • หากพบปัญหา ให้ตรวจสอบ Log ของ Server หรือ Debug โค้ด

ข้อควรระวัง

  • ความปลอดภัย: อย่าเปิดเผย Token ของบอท ใช้ HTTPS สำหรับ Webhook
  • ประสิทธิภาพ: ทดสอบการตอบกลับข้อความเมื่อกลุ่มมีสมาชิกจำนวนมาก

สรุป

การเชื่อมต่อกับ Telegram Bot ต้องอาศัยการผสมผสานระหว่างการจัดการเนื้อหา และ การพัฒนาโปรแกรม หากคุณมีพื้นฐานด้านการเขียนโค้ด จะช่วยให้การเชื่อมโยง Telegram Bot เป็นไปอย่างราบรื่น

หากคุณมีคำถามเพิ่มเติมเกี่ยวกับการเขียนโค้ด หรือ การตั้งค่า สามารถสอบถามมาได้เลยครับ! 😊