Streaming Analyze (WebSocket)
For real-time content moderation on streaming text data, DynamoGuard also supports a WebSocket-based streaming analyze endpoint. This is particularly useful for monitoring AI model responses as they are generated, enabling immediate policy enforcement and content filtering.
WebSocket Connection
Endpoint: ws://your-domain/v1/moderation/stream/analyze
Authentication
Authentication is handled via an auth event that must be sent immediately after establishing the WebSocket connection.
Client Events (Events You Send)
1. auth - Authenticate Connection
Purpose: Authenticates the WebSocket connection using your API key.
Data Structure:
{
"token": "string (your-api-key)"
}
Required Fields:
token: Your DynamoAI API key
Server Response:
- ✅ Acknowledgment:
client-infoevent (indicates successful authentication) - ❌ Error:
errorevent if authentication fails
2. start - Initialize Streaming Session
Purpose: Establishes a new streaming session for content analysis.
Data Structure:
{
"messages": [
{
"role": "user",
"content": "string"
}
],
"policyIds": ["string (24-character hex)"],
"modelId": "string (24-character hex)",
"responseTokenBufferLength": "number"
}
Required Fields:
messages: Array with a single message object containing the input promptpolicyIds: Array of policy IDs to apply for moderationmodelId: Model ID to associate with the sessionresponseTokenBufferLength: Number of tokens to buffer before applying policies
Server Response:
- ✅ Acknowledgment:
session_startevent with{ started: true } - ❌ Error:
WsExceptionif session already exists or validation fails
3. analyze - Stream Text for Analysis
Purpose: Sends text chunks for real-time analysis against configured policies.
Data Structure:
{
"text": "string"
}
Required Fields:
text: The text chunk to analyze
Server Response:
- ✅ Conditional Analysis Result:
analyze_resultevent (only when buffered text exceedsresponseTokenBufferLength) - ❌ Error:
WsExceptionif session not found
4. end - Terminate Streaming Session
Purpose: Ends the current streaming session and logs final results.
Data Structure:
{}
Server Response:
- ✅ Acknowledgment:
session_endevent with session summary - ❌ Error:
WsExceptionif session not found
Server Events (Events You Receive)
1. client-info - Authentication Confirmation
Triggered: When an auth event is successfully processed.
Data Structure:
{
"authenticated": true
}
2. session_start - Session Initialization Confirmation
Triggered: When a start event is successfully processed.
Data Structure:
{
"started": true
}
3. analyze_result - Policy Analysis Results
Triggered: When buffered text exceeds the responseTokenBufferLength threshold and policies are applied.
Data Structure:
{
"text": "string",
"finalAction": "BLOCK|WARN|REDACT|SANITIZE|NONE",
"policyResults": [
{
"policyId": "string",
"workerRequestId": "string",
"outputs": "object",
"action": "BLOCK|WARN|REDACT|SANITIZE|NONE",
"violated": "boolean"
}
],
"numTokens": "number",
"warning": "string (optional)"
}
Fields:
text: The analyzed text chunkfinalAction: Overall action determined by all policiespolicyResults: Individual results for each applied policynumTokens: Number of tokens analyzed in this chunkwarning: Token limit warning (if applicable)
4. session_end - Session Termination Confirmation
Triggered: When an end event is successfully processed.
Data Structure:
{
"ended": true,
"numTokens": "number",
"warning": "string (optional)"
}
Fields:
ended: Confirmation that session endednumTokens: Total tokens analyzed in the sessionwarning: Token limit warning (if applicable)
5. session_reset - Session Reset Notification
Triggered: When a new WebSocket connection is established (existing sessions are cleared).
Data Structure:
{
"reset": true
}
6. error - Error Notification
Triggered: When an error occurs during processing.
Data Structure:
{
"message": "string",
"code": "string (optional)"
}
Error Handling
WebSocket Exceptions
The server may send error events for various error conditions:
-
Authentication Errors:
- Invalid API key
- Missing authentication
-
Session Errors:
StreamingSessionError: Session not found or already existsTokenLimitError: Input exceeds token limits
-
Validation Errors:
- Invalid message format
- Missing required fields
- Invalid policy IDs
Usage Flow
- Connect to the WebSocket endpoint
- Send
authevent with your API key - Receive
client-infoconfirmation - Send
startevent to initialize session - Receive
session_startconfirmation - Send multiple
analyzeevents with text chunks - Receive
analyze_resultevents when buffering threshold is met - Send
endevent to terminate session - Receive
session_endconfirmation with final summary
Important Notes
Authentication
- Authentication must be performed immediately after WebSocket connection
- The
authevent must be sent before any other operations - Connection will be rejected if authentication fails
Token Buffering
- Text is buffered until it reaches
responseTokenBufferLengthtokens - Policies are only applied when the buffer threshold is exceeded
- This reduces API calls while maintaining real-time analysis
Token Limits
- Input messages are validated against policy token limits
- Only the first N tokens (as per policy limits) are analyzed
- Warnings are provided when token limits are exceeded
Session Management
- Each WebSocket connection can have only one active session
- Sessions are automatically cleaned up on connection close
- New connections reset any existing sessions
Policy Actions
BLOCK: Content violates policy, should be blockedWARN: Content may be problematic, warning recommendedREDACT: Sensitive content should be redactedSANITIZE: Content should be cleaned/sanitizedNONE: No policy violations detected
Example Implementation
Basic WebSocket Client
import json
import websocket
import threading
import time
from openai import OpenAI
class AnalyzeStreamingClient:
def __init__(self, uri: str, api_key: str):
self.uri = uri
self.api_key = api_key
self.ws = None
self.connected = False
self.authenticated = False
self.session_active = False
self.latest_analysis_result = None
self.lock = threading.Lock()
def on_message(self, ws, message):
"""Handle incoming WebSocket messages"""
try:
response = json.loads(message)
event_type = response.get("event")
data = response.get("data", {})
if event_type == "client-info":
self.authenticated = True
print("Authentication successful")
elif event_type == "session_start":
self.session_active = True
print("Session started:", data)
elif event_type == "analyze_result":
with self.lock:
self.latest_analysis_result = data
elif event_type == "session_end":
self.session_active = False
print("Session ended:", data)
elif event_type == "error":
print("Error:", data)
except json.JSONDecodeError:
print("Failed to parse message:", message)
except Exception as e:
print("Error handling message:", e)
def on_error(self, ws, error):
print(f"WebSocket error: {error}")
def on_close(self, ws, close_status_code, close_msg):
print("WebSocket connection closed")
self.connected = False
self.authenticated = False
self.session_active = False
def on_open(self, ws):
print("WebSocket connection opened")
self.connected = True
# Authenticate immediately after connection
self.authenticate()
def authenticate(self):
"""Authenticate with the API key"""
self.ws.send(json.dumps({
"event": "auth",
"data": {"token": self.api_key}
}))
def connect(self):
"""Connect to the WebSocket endpoint"""
self.ws = websocket.WebSocketApp(
self.uri,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
# Run WebSocket in a separate thread
wst = threading.Thread(target=self.ws.run_forever)
wst.daemon = True
wst.start()
# Wait for connection and authentication
while not self.connected or not self.authenticated:
time.sleep(0.1)
def start_session(self, input_prompt: str, policy_ids: list,
model_id: str, response_token_buffer_length: int = 20):
"""Start a new streaming session"""
start_data = {
"messages": [{"role": "user", "content": input_prompt}],
"policyIds": policy_ids,
"modelId": model_id,
"responseTokenBufferLength": response_token_buffer_length
}
self.ws.send(json.dumps({
"event": "start",
"data": start_data
}))
def analyze_text(self, text: str):
"""Send text chunk for analysis"""
self.ws.send(json.dumps({
"event": "analyze",
"data": {"text": text}
}))
def get_latest_analysis_result(self):
"""Get the latest analysis result"""
with self.lock:
result = self.latest_analysis_result
self.latest_analysis_result = None # Clear after reading
return result
def end_session(self):
"""End the current streaming session"""
self.ws.send(json.dumps({
"event": "end",
"data": {}
}))
def close(self):
"""Close the WebSocket connection"""
if self.ws:
self.ws.close()
# Example: Real-time content moderation with OpenAI streaming
def guarded_chat(prompt: str, wait_for_analysis: bool = True):
"""
Example showing real-time content moderation with OpenAI streaming
Args:
prompt: User input prompt
wait_for_analysis: If True, wait for analysis results before printing.
If False, print chunks immediately as they come.
"""
# Configuration
DYNAMO_WS_ENDPOINT = "wss://your-domain/v1/moderation/stream/analyze"
DYNAMO_API_KEY = "your-dynamo-api-key"
OPENAI_API_KEY = "your-openai-api-key"
POLICY_IDS = ["507f1f77bcf86cd799439011"]
MODEL_ID = "507f1f77bcf86cd799439012"
# Initialize clients
dynamo_client = AnalyzeStreamingClient(DYNAMO_WS_ENDPOINT, DYNAMO_API_KEY)
openai_client = OpenAI(api_key=OPENAI_API_KEY, base_url="https://api.mistral.ai/v1")
try:
# Connect to DynamoGuard
dynamo_client.connect()
print("Connected to DynamoGuard")
# Start DynamoGuard session
dynamo_client.start_session(
input_prompt=prompt,
policy_ids=POLICY_IDS,
model_id=MODEL_ID,
response_token_buffer_length=50
)
# Get streaming response from OpenAI
response = openai_client.chat.completions.create(
model="mistral-large-latest",
messages=[{"role": "user", "content": prompt}],
stream=True,
temperature=0.7
)
print("AI Response (with real-time moderation):")
print("-" * 50)
for chunk in response:
choice = chunk.choices[0]
content = choice.delta.content or ""
end_of_stream = bool(choice.finish_reason)
if not end_of_stream:
dynamo_client.analyze_text(content)
if wait_for_analysis:
# Wait for analysis mode: only print when we get analysis results
analysis_result = dynamo_client.get_latest_analysis_result()
if analysis_result:
final_action = analysis_result.get("finalAction")
text_to_yield = analysis_result.get("text", "")
if final_action == "BLOCK":
print(f"{text_to_yield}[BLOCKED]")
break
elif final_action == "NONE":
print(text_to_yield, end="")
else:
# Don't wait for analysis mode: check if analysis is available, but don't wait
analysis_result = dynamo_client.get_latest_analysis_result()
if analysis_result:
final_action = analysis_result.get("finalAction")
if final_action == "BLOCK":
print(f"{content}[BLOCKED]")
break
elif final_action == "NONE":
print(content, end="")
else:
# No analysis result yet, yield the chunk anyway
print(content, end="")
print("\n" + "-" * 50)
except Exception as e:
print(f"Error: {e}")
finally:
# Clean up
dynamo_client.end_session()
dynamo_client.close()
# Example usage with different modes
def main():
prompt = "Write a detailed email about a credit report, start generic then add fake names, phone numbers, addresses, etc."
# Mode 1: Wait for analysis before printing (safer, shows only moderated content)
print("=== Mode 1: Wait for analysis before printing ===")
guarded_chat(prompt, wait_for_analysis=True)
print("\n\n=== Mode 2: Print chunks immediately ===")
# Mode 2: Print immediately without waiting for analysis (faster, but may show unmoderated content)
guarded_chat(prompt, wait_for_analysis=False)
# Run the example
if __name__ == "__main__":
main()
Key Features
- Token Buffering: Text is buffered until it reaches
responseTokenBufferLengthtokens, reducing API calls while maintaining real-time analysis - Real-time Analysis: Policies are applied as text streams in, enabling immediate content filtering
- Session Management: Each WebSocket connection can manage one active session with automatic cleanup
- Comprehensive Error Handling: Built-in error handling for various failure scenarios
- Flexible Output Modes: Choose between immediate output or waiting for moderated results