130 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			130 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from fastapi import FastAPI, Query, HTTPException, Request
 | |
| from pydantic import BaseModel, Field, ValidationError
 | |
| from typing import Literal, Optional
 | |
| import base64
 | |
| import json
 | |
| import uvicorn
 | |
| import logging
 | |
| import urllib.parse  # Import for URL decoding
 | |
| 
 | |
| # Configure logging
 | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| app = FastAPI(
 | |
|     title="Inbound Webhook for Messages",
 | |
|     description="A FastAPI application to receive and process inbound messages via a webhook.",
 | |
|     version="1.0.0"
 | |
| )
 | |
| 
 | |
| # Define the Pydantic model for the inbound message payload
 | |
| class InboundMessage(BaseModel):
 | |
|     """
 | |
|     Represents the structure of an inbound message received by the webhook.
 | |
|     """
 | |
|     MessageId: str = Field(..., description="The unique ID of the message.")
 | |
|     From: str = Field(..., description="The phone number of the message sender.")
 | |
|     To: str = Field(..., description="The phone number of the message receiver.")
 | |
|     Timestamp: int = Field(..., description="A Unix timestamp in milliseconds when the message was sent.")
 | |
|     # Made DisplayName Optional as it seems to be missing in your actual incoming payload
 | |
|     DisplayName: Optional[str] = Field(None, description="The display name of the message sender. Optional if not provided by source.")
 | |
|     Type: Literal[
 | |
|         "TEXT",
 | |
|         "LOCATION",
 | |
|         "DOCUMENT",
 | |
|         "VIDEO",
 | |
|         "AUDIO",
 | |
|         "REPLY",
 | |
|         "IMAGE",
 | |
|         "CONTACTS"
 | |
|     ] = Field(..., description="The type of media resources included in the message.")
 | |
|     Message: str = Field(..., description="The content of the message. Its interpretation depends on the 'Type' field.")
 | |
|     Name: str = Field(..., description="The name of the end user.")
 | |
| 
 | |
| @app.get("/inbound-webhook")
 | |
| async def receive_inbound_message(
 | |
|     request: Request,
 | |
|     response: str
 | |
| ):
 | |
|     """
 | |
|     Receives inbound messages from the webhook.
 | |
|     The message payload is expected to be a URL-encoded, then Base64 encoded JSON string
 | |
|     passed as a 'response' query parameter.
 | |
|     """
 | |
|     logger.info(f"Received request from: {request.client.host}")
 | |
|     # Log the first 100 characters of the raw 'response' for brevity and security
 | |
|     logger.info(f"Raw 'response' query parameter (first 100 chars): {response[:100]}...")
 | |
| 
 | |
|     try:
 | |
|         # 1. URL-decode the 'response' parameter first
 | |
|         # This handles characters like %3D, ensuring base64.b64decode gets the correct string.
 | |
|         url_decoded_string = urllib.parse.unquote(response)
 | |
|         logger.info(f"URL-decoded string: {url_decoded_string}")
 | |
| 
 | |
|         # 2. Base64 decode the URL-decoded string
 | |
|         decoded_bytes = base64.b64decode(url_decoded_string)
 | |
|         decoded_string = decoded_bytes.decode('utf-8')
 | |
|         logger.info(f"Base64 decoded string: {decoded_string}")
 | |
| 
 | |
|         # 3. Parse the decoded string as JSON
 | |
|         payload_data = json.loads(decoded_string)
 | |
|         logger.info(f"Parsed JSON payload: {payload_data}")
 | |
| 
 | |
|         # Check if the payload is a list and extract the first item
 | |
|         # This handles cases where the webhook sends an array containing a single message object.
 | |
|         if isinstance(payload_data, list):
 | |
|             if not payload_data:
 | |
|                 raise HTTPException(status_code=400, detail="Empty list received in 'response' parameter.")
 | |
|             # Assume we only care about the first message in the list for this use case
 | |
|             message_data = payload_data[0]
 | |
|             logger.info(f"Extracted single message from list: {message_data}")
 | |
|         else:
 | |
|             message_data = payload_data
 | |
|             logger.info(f"Payload is a single object: {message_data}")
 | |
| 
 | |
|         # 4. Validate the JSON against the Pydantic model
 | |
|         # This will automatically raise a ValidationError if the data doesn't match the model.
 | |
|         inbound_message = InboundMessage(**message_data)
 | |
|         logger.info(f"Successfully validated inbound message: {inbound_message.dict()}")
 | |
| 
 | |
|         # 5. Process the inbound message
 | |
|         # This is where your custom business logic goes.
 | |
|         logger.info(f"Processing message from '{inbound_message.From}' (DisplayName: {inbound_message.DisplayName}) of type '{inbound_message.Type}': '{inbound_message.Message}'")
 | |
| 
 | |
|         # Example: Specific handling based on message type
 | |
|         if inbound_message.Type == "TEXT":
 | |
|             logger.info(f"Text message content: {inbound_message.Message}")
 | |
|             # You might want to respond to the text message here
 | |
|         elif inbound_message.Type == "IMAGE":
 | |
|             logger.info(f"Image message received. Message field might contain image URL/ID: {inbound_message.Message}")
 | |
|             # Handle image specific logic, e.g., download image, analyze content
 | |
|         # Add more specific handling for other message Types as needed (LOCATION, DOCUMENT, etc.)
 | |
| 
 | |
|         return {
 | |
|             "status": "success",
 | |
|             "message": "Inbound message received and processed successfully.",
 | |
|             "received_data": inbound_message.dict() # Return the parsed data for verification/debugging
 | |
|         }
 | |
| 
 | |
|     except (urllib.parse.DecompressionError, base64.binascii.Error, json.JSONDecodeError, ValidationError) as e:
 | |
|         # Catch specific errors for better client feedback
 | |
|         if isinstance(e, urllib.parse.DecompressionError):
 | |
|             logger.error(f"URL decoding error: {e}")
 | |
|             detail = f"Invalid URL encoding in 'response' parameter: {e}"
 | |
|         elif isinstance(e, base64.binascii.Error):
 | |
|             logger.error(f"Base64 decoding error: {e}")
 | |
|             detail = f"Invalid Base64 encoding in URL-decoded 'response' parameter: {e}"
 | |
|         elif isinstance(e, json.JSONDecodeError):
 | |
|             logger.error(f"JSON decoding error: {e}")
 | |
|             detail = f"Invalid JSON format in decoded 'response' data: {e}"
 | |
|         elif isinstance(e, ValidationError):
 | |
|             logger.error(f"Pydantic validation error: {e.errors()}")
 | |
|             detail = f"Message data validation failed: {e.errors()}"
 | |
|         raise HTTPException(status_code=400, detail=detail)
 | |
|     except Exception as e:
 | |
|         # Catch any other unexpected errors
 | |
|         logger.error(f"An unexpected error occurred: {e}", exc_info=True)
 | |
|         raise HTTPException(status_code=500, detail=f"Internal server error: {e}")
 | |
| 
 | |
| # To run this application, use Uvicorn from your terminal:
 | |
| # uvicorn main:app --host 0.0.0.0 --port 8000 |