130 lines
6.3 KiB
Python
130 lines
6.3 KiB
Python
from fastapi import FastAPI, Query, HTTPException, Request
|
|
from pydantic import BaseModel, Field, ValidationError
|
|
from typing import Literal, Optional
|
|
import base64
|
|
import json
|
|
import uvicorn
|
|
import logging
|
|
import urllib.parse # Import for URL decoding
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = FastAPI(
|
|
title="Inbound Webhook for Messages",
|
|
description="A FastAPI application to receive and process inbound messages via a webhook.",
|
|
version="1.0.0"
|
|
)
|
|
|
|
# Define the Pydantic model for the inbound message payload
|
|
class InboundMessage(BaseModel):
|
|
"""
|
|
Represents the structure of an inbound message received by the webhook.
|
|
"""
|
|
MessageId: str = Field(..., description="The unique ID of the message.")
|
|
From: str = Field(..., description="The phone number of the message sender.")
|
|
To: str = Field(..., description="The phone number of the message receiver.")
|
|
Timestamp: int = Field(..., description="A Unix timestamp in milliseconds when the message was sent.")
|
|
# Made DisplayName Optional as it seems to be missing in your actual incoming payload
|
|
DisplayName: Optional[str] = Field(None, description="The display name of the message sender. Optional if not provided by source.")
|
|
Type: Literal[
|
|
"TEXT",
|
|
"LOCATION",
|
|
"DOCUMENT",
|
|
"VIDEO",
|
|
"AUDIO",
|
|
"REPLY",
|
|
"IMAGE",
|
|
"CONTACTS"
|
|
] = Field(..., description="The type of media resources included in the message.")
|
|
Message: str = Field(..., description="The content of the message. Its interpretation depends on the 'Type' field.")
|
|
Name: str = Field(..., description="The name of the end user.")
|
|
|
|
@app.get("/inbound-webhook")
|
|
async def receive_inbound_message(
|
|
request: Request,
|
|
response: str
|
|
):
|
|
"""
|
|
Receives inbound messages from the webhook.
|
|
The message payload is expected to be a URL-encoded, then Base64 encoded JSON string
|
|
passed as a 'response' query parameter.
|
|
"""
|
|
logger.info(f"Received request from: {request.client.host}")
|
|
# Log the first 100 characters of the raw 'response' for brevity and security
|
|
logger.info(f"Raw 'response' query parameter (first 100 chars): {response[:100]}...")
|
|
|
|
try:
|
|
# 1. URL-decode the 'response' parameter first
|
|
# This handles characters like %3D, ensuring base64.b64decode gets the correct string.
|
|
url_decoded_string = urllib.parse.unquote(response)
|
|
logger.info(f"URL-decoded string: {url_decoded_string}")
|
|
|
|
# 2. Base64 decode the URL-decoded string
|
|
decoded_bytes = base64.b64decode(url_decoded_string)
|
|
decoded_string = decoded_bytes.decode('utf-8')
|
|
logger.info(f"Base64 decoded string: {decoded_string}")
|
|
|
|
# 3. Parse the decoded string as JSON
|
|
payload_data = json.loads(decoded_string)
|
|
logger.info(f"Parsed JSON payload: {payload_data}")
|
|
|
|
# Check if the payload is a list and extract the first item
|
|
# This handles cases where the webhook sends an array containing a single message object.
|
|
if isinstance(payload_data, list):
|
|
if not payload_data:
|
|
raise HTTPException(status_code=400, detail="Empty list received in 'response' parameter.")
|
|
# Assume we only care about the first message in the list for this use case
|
|
message_data = payload_data[0]
|
|
logger.info(f"Extracted single message from list: {message_data}")
|
|
else:
|
|
message_data = payload_data
|
|
logger.info(f"Payload is a single object: {message_data}")
|
|
|
|
# 4. Validate the JSON against the Pydantic model
|
|
# This will automatically raise a ValidationError if the data doesn't match the model.
|
|
inbound_message = InboundMessage(**message_data)
|
|
logger.info(f"Successfully validated inbound message: {inbound_message.dict()}")
|
|
|
|
# 5. Process the inbound message
|
|
# This is where your custom business logic goes.
|
|
logger.info(f"Processing message from '{inbound_message.From}' (DisplayName: {inbound_message.DisplayName}) of type '{inbound_message.Type}': '{inbound_message.Message}'")
|
|
|
|
# Example: Specific handling based on message type
|
|
if inbound_message.Type == "TEXT":
|
|
logger.info(f"Text message content: {inbound_message.Message}")
|
|
# You might want to respond to the text message here
|
|
elif inbound_message.Type == "IMAGE":
|
|
logger.info(f"Image message received. Message field might contain image URL/ID: {inbound_message.Message}")
|
|
# Handle image specific logic, e.g., download image, analyze content
|
|
# Add more specific handling for other message Types as needed (LOCATION, DOCUMENT, etc.)
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "Inbound message received and processed successfully.",
|
|
"received_data": inbound_message.dict() # Return the parsed data for verification/debugging
|
|
}
|
|
|
|
except (urllib.parse.DecompressionError, base64.binascii.Error, json.JSONDecodeError, ValidationError) as e:
|
|
# Catch specific errors for better client feedback
|
|
if isinstance(e, urllib.parse.DecompressionError):
|
|
logger.error(f"URL decoding error: {e}")
|
|
detail = f"Invalid URL encoding in 'response' parameter: {e}"
|
|
elif isinstance(e, base64.binascii.Error):
|
|
logger.error(f"Base64 decoding error: {e}")
|
|
detail = f"Invalid Base64 encoding in URL-decoded 'response' parameter: {e}"
|
|
elif isinstance(e, json.JSONDecodeError):
|
|
logger.error(f"JSON decoding error: {e}")
|
|
detail = f"Invalid JSON format in decoded 'response' data: {e}"
|
|
elif isinstance(e, ValidationError):
|
|
logger.error(f"Pydantic validation error: {e.errors()}")
|
|
detail = f"Message data validation failed: {e.errors()}"
|
|
raise HTTPException(status_code=400, detail=detail)
|
|
except Exception as e:
|
|
# Catch any other unexpected errors
|
|
logger.error(f"An unexpected error occurred: {e}", exc_info=True)
|
|
raise HTTPException(status_code=500, detail=f"Internal server error: {e}")
|
|
|
|
# To run this application, use Uvicorn from your terminal:
|
|
# uvicorn main:app --host 0.0.0.0 --port 8000 |