🌍 ระบบแจ้งเตือนแผ่นดินไหวผ่าน Telegram ด้วย Circuit Python

Bot ส่วนตัว https://t.me/DisasterwarningBot

กลุ่มแจ้งเตือน https://t.me/disasterThaiAleart

✨ ประโยชน์ของการใช้ Telegram สำหรับแจ้งเตือน

🚀 ความเร็วสูง

Telegram ส่งข้อความแบบ Real-time รับการแจ้งเตือนทันทีที่เกิดแผ่นดินไหว ไม่มีดีเลย์

📱 รับการแจ้งเตือนทุกที่

เปิดแจ้งเตือนบนมือถือได้ทั้ง iOS และ Android แม้ไม่ได้เปิดแอปก็รับการแจ้งเตือนได้ (Push Notification)

👥 แชร์ข้อมูลแบบกลุ่ม

ส่งการแจ้งเตือนไปยังกลุ่มหรือช่อง Telegram ได้ไม่จำกัดผู้รับ สามารถเพิ่มสมาชิกในองค์กรให้รับข้อมูลพร้อมกัน

🔗 รองรับลิงก์และ Rich Media

แทรกรูปภาพ, ลิงก์ไปยัง USGS, หรือแมปตำแหน่งแผ่นดินไหวได้ในข้อความเดียว

📨 ตัวอย่างการแจ้งเตือนใน Telegram

⚠️ EMERGENCY: Earthquake Alert ⚠️

📍 Location: 50 km NE of Chiang Rai, Thailand

📏 Magnitude: 5.8 (Richter Scale)

🔻 Depth: 12.3 km

⏰ Time: 2024-06-21 08:45:22 (UTC+7)

🔗 View on USGS Website

* ระบบจะส่งข้อความรูปแบบนี้ไปยังกลุ่ม Telegram ทันทีที่ตรวจพบแผ่นดินไหวขนาด 5.0+

# -*- coding: utf-8 -*-
# Circuit Python For ESP32_S3
import wifi
import socketpool
import ssl
import adafruit_requests
import time
import json

# ==================== ตั้งค่าการเชื่อมต่อ ====================
# 1. ตั้งค่า WiFi
SSID = " Your wifi ID"
PASSWORD = "Your wifi Password"

# 2. ตั้งค่า Telegram Bot
BOT_TOKEN = "Your telegram Token"  # เช่น "1234567890:ABCdefGhIJKlmNoPQRsTUVwxyZ"
CHAT_ID = "Your Chat ID"  # ID ของกลุ่ม/ช่อง (ต้องมีเครื่องหมาย -)

# ==================== ฟังก์ชันหลัก ====================
def connect_wifi():
    """เชื่อมต่อ WiFi"""
    print("🔌 กำลังเชื่อมต่อ WiFi...")
    wifi.radio.connect(SSID, PASSWORD)
    print("✅ เชื่อมต่อ WiFi สำเร็จ! IP:", wifi.radio.ipv4_address)

def format_time(ms):
    """แปลงเวลาจากมิลลิวินาทีเป็นรูปแบบอ่านง่าย"""
    t = time.localtime(ms // 1000)
    return f"{t.tm_year}-{t.tm_mon:02d}-{t.tm_mday:02d} {t.tm_hour:02d}:{t.tm_min:02d}:{t.tm_sec:02d} (UTC+7)"

def send_telegram(message):
    """ส่งข้อความไปยัง Telegram"""
    url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage"
    payload = {
        "chat_id": CHAT_ID,
        "text": message,
        "parse_mode": "Markdown"  # ใช้ Markdown เพื่อจัดรูปแบบ
    }
    try:
        response = requests.post(url, json=payload)
        print(f"📤 ส่งข้อความสำเร็จ (Status: {response.status_code})")
        response.close()
    except Exception as e:
        print(f"❌ เกิดข้อผิดพลาดขณะส่งข้อความ: {e}")

def check_earthquake():
    """ตรวจสอบแผ่นดินไหวจาก USGS API"""
    global last_quake_id
    
    # ตั้งค่า API (ตรวจสอบแผ่นดินไหวขนาด 5.0 ขึ้นไป ล่าสุด 1 เหตุการณ์)
    url = "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&minmagnitude=5.0&limit=1"
    
    try:
        print("🔍 กำลังตรวจสอบแผ่นดินไหว...")
        response = requests.get(url)
        data = response.json()
        response.close()

        if data.get("features"):
            quake = data["features"][0]
            quake_id = quake["id"]  # ID ของเหตุการณ์
            
            # เช็คหากเป็นเหตุการณ์เดิม
            if quake_id == last_quake_id:
                print("⏭️ ไม่มีแผ่นดินไหวใหม่")
                return
            
            last_quake_id = quake_id  # อัปเดต ID ล่าสุด
            
            # ดึงข้อมูลแผ่นดินไหว
            props = quake["properties"]
            place = props.get("place", "ไม่ทราบตำแหน่ง")
            mag = props.get("mag", "N/A")
            time_ms = props.get("time", 0)
            depth = quake["geometry"]["coordinates"][2]  # ความลึก (km)
            
            # สร้างข้อความแจ้งเตือน (รูปแบบ Markdown)
            alert_msg = f"""
⚠️ **แจ้งเตือนแผ่นดินไหว** ⚠️
📍 **ตำแหน่ง**: {place}
📏 **ขนาด**: {mag} ริกเตอร์
🔻 **ความลึก**: {depth} กม.
⏰ **เวลา**: {format_time(time_ms)}
[ดูรายละเอียดเพิ่มเติม](https://earthquake.usgs.gov/earthquakes/eventpage/{quake_id})
            """
            send_telegram(alert_msg.strip())
        else:
            print("🌍 ไม่พบข้อมูลแผ่นดินไหวล่าสุด")
            
    except Exception as e:
        print(f"❌ เกิดข้อผิดพลาดขณะตรวจสอบ: {e}")

# ==================== การทำงานหลัก ====================
# เชื่อมต่อ WiFi
connect_wifi()

# สร้าง Session สำหรับการเชื่อมต่อ
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())

# ตัวแปรเก็บ ID ของแผ่นดินไหวล่าสุด
last_quake_id = None

# แจ้งเตือนเมื่อระบบเริ่มทำงาน
send_telegram("🟢 **ระบบตรวจสอบแผ่นดินไหวเริ่มทำงานแล้ว**")

# วนลูปตรวจสอบทุก 10 นาที
while True:
    check_earthquake()
    print(f"⏳ พัก 10 นาทีก่อนตรวจสอบอีกครั้ง...\n")
    time.sleep(600)  # 10 นาที = 600 วินาที

💡 ทำไมต้องใช้ Telegram แจ้งเตือนแผ่นดินไหว?

เพราะ Telegram มีระบบแจ้งเตือนที่เสถียร รองรับการใช้งานแบบกลุ่มฟรี และสามารถส่งข้อมูลได้รวดเร็วแม้ในสภาวะเครือข่ายไม่穩定 เหมาะสำหรับระบบแจ้งเตือนภัยพิบัติที่ต้องพึ่งพาความเร็ว!