WABA/main.py
2025-07-10 09:27:23 +08:00

130 lines
6.3 KiB
Python

from fastapi import FastAPI, Query, HTTPException, Request
from pydantic import BaseModel, Field, ValidationError
from typing import Literal, Optional
import base64
import json
import uvicorn
import logging
import urllib.parse # Import for URL decoding
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
app = FastAPI(
title="Inbound Webhook for Messages",
description="A FastAPI application to receive and process inbound messages via a webhook.",
version="1.0.0"
)
# Define the Pydantic model for the inbound message payload
class InboundMessage(BaseModel):
"""
Represents the structure of an inbound message received by the webhook.
"""
MessageId: str = Field(..., description="The unique ID of the message.")
From: str = Field(..., description="The phone number of the message sender.")
To: str = Field(..., description="The phone number of the message receiver.")
Timestamp: int = Field(..., description="A Unix timestamp in milliseconds when the message was sent.")
# Made DisplayName Optional as it seems to be missing in your actual incoming payload
DisplayName: Optional[str] = Field(None, description="The display name of the message sender. Optional if not provided by source.")
Type: Literal[
"TEXT",
"LOCATION",
"DOCUMENT",
"VIDEO",
"AUDIO",
"REPLY",
"IMAGE",
"CONTACTS"
] = Field(..., description="The type of media resources included in the message.")
Message: str = Field(..., description="The content of the message. Its interpretation depends on the 'Type' field.")
Name: str = Field(..., description="The name of the end user.")
@app.get("/inbound-webhook")
async def receive_inbound_message(
request: Request,
response: str
):
"""
Receives inbound messages from the webhook.
The message payload is expected to be a URL-encoded, then Base64 encoded JSON string
passed as a 'response' query parameter.
"""
logger.info(f"Received request from: {request.client.host}")
# Log the first 100 characters of the raw 'response' for brevity and security
logger.info(f"Raw 'response' query parameter (first 100 chars): {response[:100]}...")
try:
# 1. URL-decode the 'response' parameter first
# This handles characters like %3D, ensuring base64.b64decode gets the correct string.
url_decoded_string = urllib.parse.unquote(response)
logger.info(f"URL-decoded string: {url_decoded_string}")
# 2. Base64 decode the URL-decoded string
decoded_bytes = base64.b64decode(url_decoded_string)
decoded_string = decoded_bytes.decode('utf-8')
logger.info(f"Base64 decoded string: {decoded_string}")
# 3. Parse the decoded string as JSON
payload_data = json.loads(decoded_string)
logger.info(f"Parsed JSON payload: {payload_data}")
# Check if the payload is a list and extract the first item
# This handles cases where the webhook sends an array containing a single message object.
if isinstance(payload_data, list):
if not payload_data:
raise HTTPException(status_code=400, detail="Empty list received in 'response' parameter.")
# Assume we only care about the first message in the list for this use case
message_data = payload_data[0]
logger.info(f"Extracted single message from list: {message_data}")
else:
message_data = payload_data
logger.info(f"Payload is a single object: {message_data}")
# 4. Validate the JSON against the Pydantic model
# This will automatically raise a ValidationError if the data doesn't match the model.
inbound_message = InboundMessage(**message_data)
logger.info(f"Successfully validated inbound message: {inbound_message.dict()}")
# 5. Process the inbound message
# This is where your custom business logic goes.
logger.info(f"Processing message from '{inbound_message.From}' (DisplayName: {inbound_message.DisplayName}) of type '{inbound_message.Type}': '{inbound_message.Message}'")
# Example: Specific handling based on message type
if inbound_message.Type == "TEXT":
logger.info(f"Text message content: {inbound_message.Message}")
# You might want to respond to the text message here
elif inbound_message.Type == "IMAGE":
logger.info(f"Image message received. Message field might contain image URL/ID: {inbound_message.Message}")
# Handle image specific logic, e.g., download image, analyze content
# Add more specific handling for other message Types as needed (LOCATION, DOCUMENT, etc.)
return {
"status": "success",
"message": "Inbound message received and processed successfully.",
"received_data": inbound_message.dict() # Return the parsed data for verification/debugging
}
except (urllib.parse.DecompressionError, base64.binascii.Error, json.JSONDecodeError, ValidationError) as e:
# Catch specific errors for better client feedback
if isinstance(e, urllib.parse.DecompressionError):
logger.error(f"URL decoding error: {e}")
detail = f"Invalid URL encoding in 'response' parameter: {e}"
elif isinstance(e, base64.binascii.Error):
logger.error(f"Base64 decoding error: {e}")
detail = f"Invalid Base64 encoding in URL-decoded 'response' parameter: {e}"
elif isinstance(e, json.JSONDecodeError):
logger.error(f"JSON decoding error: {e}")
detail = f"Invalid JSON format in decoded 'response' data: {e}"
elif isinstance(e, ValidationError):
logger.error(f"Pydantic validation error: {e.errors()}")
detail = f"Message data validation failed: {e.errors()}"
raise HTTPException(status_code=400, detail=detail)
except Exception as e:
# Catch any other unexpected errors
logger.error(f"An unexpected error occurred: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal server error: {e}")
# To run this application, use Uvicorn from your terminal:
# uvicorn main:app --host 0.0.0.0 --port 8000