Skip to main content

Streaming Analyze (WebSocket)

For real-time content moderation on streaming text data, DynamoGuard also supports a WebSocket-based streaming analyze endpoint. This is particularly useful for monitoring AI model responses as they are generated, enabling immediate policy enforcement and content filtering.

WebSocket Connection

Endpoint: ws://your-domain/v1/moderation/stream/analyze

Authentication

Authentication is handled via an auth event that must be sent immediately after establishing the WebSocket connection.

Client Events (Events You Send)

1. auth - Authenticate Connection

Purpose: Authenticates the WebSocket connection using your API key.

Data Structure:

{
"token": "string (your-api-key)"
}

Required Fields:

  • token: Your DynamoAI API key

Server Response:

  • Acknowledgment: client-info event (indicates successful authentication)
  • Error: error event if authentication fails
2. start - Initialize Streaming Session

Purpose: Establishes a new streaming session for content analysis.

Data Structure:

{
"messages": [
{
"role": "user",
"content": "string"
}
],
"policyIds": ["string (24-character hex)"],
"modelId": "string (24-character hex)"
}

Required Fields:

  • messages: Array with a single message object containing the input prompt
  • policyIds: Array of policy IDs to apply for moderation
  • modelId: Model ID to associate with the session

Server Response:

  • Acknowledgment: session_start event with { started: true }
  • Error: WsException if session already exists or validation fails
3. analyze - Stream Text for Analysis

Purpose: Sends text chunks for real-time analysis against configured policies.

Data Structure:

{
"text": "string"
}

Required Fields:

  • text: The text chunk to analyze

Server Response:

  • Analysis Result: analyze_result event
  • Error: WsException if session not found
4. end - Terminate Streaming Session

Purpose: Ends the current streaming session and logs final results.

Data Structure:

{}

Server Response:

  • Acknowledgment: session_end event with session summary
  • Error: WsException if session not found

Server Events (Events You Receive)

1. client-info - Authentication Confirmation

Triggered: When an auth event is successfully processed.

Data Structure:

{
"authenticated": true
}
2. session_start - Session Initialization Confirmation

Triggered: When a start event is successfully processed.

Data Structure:

{
"started": true
}
3. analyze_result - Policy Analysis Results

Triggered: As a response to the analyze event. Actual moderation is performed only when the buffer threshold is reached. Till then the API simply returns PolicyAction as NONE.

Data Structure:

{
"text": "string",
"finalAction": "BLOCK|WARN|REDACT|SANITIZE|NONE",
"policyResults": [
{
"policyId": "string",
"workerRequestId": "string",
"outputs": "object",
"action": "BLOCK|WARN|REDACT|SANITIZE|NONE",
"violated": "boolean"
}
],
"numTokens": "number",
"warning": "string (optional)"
}

Fields:

  • text: The analyzed text chunk
  • finalAction: Overall action determined by all policies
  • policyResults: Individual results for each applied policy
  • numTokens: Number of tokens analyzed in this chunk
  • warning: Token limit warning (if applicable)
4. session_end - Session Termination Confirmation

Triggered: When an end event is successfully processed.

Data Structure:

{
"ended": true,
"numTokens": "number",
"warning": "string (optional)"
}

Fields:

  • ended: Confirmation that session ended
  • numTokens: Total tokens analyzed in the session
  • warning: Token limit warning (if applicable)
5. session_reset - Session Reset Notification

Triggered: When a new WebSocket connection is established (existing sessions are cleared).

Data Structure:

{
"reset": true
}
6. error - Error Notification

Triggered: When an error occurs during processing.

Data Structure:

{
"message": "string",
"code": "string (optional)"
}

Error Handling

WebSocket Exceptions

The server may send error events for various error conditions:

  1. Authentication Errors:

    • Invalid API key
    • Missing authentication
  2. Session Errors:

    • StreamingSessionError: Session not found or already exists
    • TokenLimitError: Input exceeds token limits
  3. Validation Errors:

    • Invalid message format
    • Missing required fields
    • Invalid policy IDs

Usage Flow

  1. Connect to the WebSocket endpoint
  2. Send auth event with your API key
  3. Receive client-info confirmation
  4. Send start event to initialize session
  5. Receive session_start confirmation
  6. Send analyze events with text chunks and receive analyze_result as response for each of them
  7. Send end event to terminate session
  8. Receive session_end confirmation with final summary

Important Notes

Authentication
  • Authentication must be performed immediately after WebSocket connection
  • The auth event must be sent before any other operations
  • Connection will be rejected if authentication fails
Token Buffering
  • Text is buffered until it reaches the default threshold (20 tokens)
  • Policies are only applied when the buffer threshold is exceeded
  • This reduces API calls while maintaining real-time analysis
Token Limits
  • Input messages are validated against policy token limits
  • Only the first N tokens (as per policy limits) are analyzed
  • Warnings are provided when token limits are exceeded
Session Management
  • Each WebSocket connection can have only one active session
  • Sessions are automatically cleaned up on connection close
  • New connections reset any existing sessions
Policy Actions
  • BLOCK: Content violates policy, should be blocked
  • WARN: Content may be problematic, warning recommended
  • REDACT: Sensitive content should be redacted
  • SANITIZE: Content should be cleaned/sanitized
  • NONE: No policy violations detected

Example Implementation

Basic WebSocket Client

import json
import logging
from typing import Any, Iterator

import certifi
from openai import OpenAI
from pydantic import Field
from pydantic_settings import BaseSettings
from websocket import WebSocket, create_connection


logger = logging.getLogger("stream_example")
logger.setLevel("INFO")

WS_API_URL = "<dynamoai-url>"
DYNAMO_API_KEY = "<dynamoai-api-token>"
OPENAI_API_KEY = "<openai-key>"

POLICY_IDS = ["68890c1246e0b250a324af8b"]
MODEL_ID = "66f243e15a47dc8bdbb82504"
OPENAI_MODEL = "mistral-large-latest"


def send_ws_msg(ws: WebSocket, event: str, data: dict):
msg = json.dumps({"event": event, "data": data})
logger.debug(msg)
ws.send(msg)


def recv_ws_msg(ws: WebSocket, target_event: str) -> Any:
while True:
response = json.loads(ws.recv())
logger.debug(response)
if response["event"] == target_event:
return response["data"]
elif response["event"] == "error":
raise ValueError(response)


def create_ws_connection() -> WebSocket:
ws = create_connection(
f"{WS_API_URL}/v1/moderation/stream/analyze",
sslopt={"ca_certs": certifi.where()},
)
logger.debug("Authorizing")
send_ws_msg(ws, "auth", {"token": DYNAMO_API_KEY})
recv_ws_msg(ws, "client-info")
logger.debug("Authorized")
return ws


def start_session(ws: WebSocket, input_prompt: str):
send_ws_msg(
ws,
"start",
{
"messages": [{"role": "user", "content": input_prompt}],
"policyIds": POLICY_IDS,
"modelId": MODEL_ID,
},
)

recv_ws_msg(ws, "session_start")


def analyze(ws: WebSocket, output_chunk: str) -> str:
send_ws_msg(ws, "analyze", {"text": output_chunk})
data = recv_ws_msg(ws, "analyze_result")
return data["finalAction"]


def end_session(ws: WebSocket):
send_ws_msg(ws, "end", {})
recv_ws_msg(ws, "session_end")


client = OpenAI(api_key=OPENAI_API_KEY, base_url="https://api.mistral.ai/v1")


def guarded_chat(ws: WebSocket, prompt: str) -> Iterator[str]:
start_session(ws, prompt)
response = client.chat.completions.create(
model=OPENAI_MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=0,
stream=True,
)
for chunk in response:
choice = chunk.choices[0]
message = choice.delta.content or ""
end_of_stream = bool(choice.finish_reason)
if not end_of_stream:
action = analyze(ws, message)
if action == "BLOCK":
yield f"{message}[BLOCKED]"
break
yield message
end_session(ws)


def chat(prompt: str):
ws = create_ws_connection()
for chunk in guarded_chat(ws, prompt):
print(chunk, end="")


chat(
"Write a draft email for a credit score report update. It should start generic and then add details like fake names, phone numbers, addresses, etc."
)

Key Features

  • Token Buffering: Text is buffered until it reaches the default threshold (20 tokens), reducing API calls while maintaining real-time analysis
  • Real-time Analysis: Policies are applied as text streams in, enabling immediate content filtering
  • Session Management: Each WebSocket connection can manage one active session with automatic cleanup
  • Comprehensive Error Handling: Built-in error handling for various failure scenarios