Skip to main content

Streaming Analyze (WebSocket)

For real-time content moderation on streaming text data, DynamoGuard also supports a WebSocket-based streaming analyze endpoint. This is particularly useful for monitoring AI model responses as they are generated, enabling immediate policy enforcement and content filtering.

WebSocket Connection

Endpoint: ws://your-domain/v1/moderation/stream/analyze

Authentication

Authentication is handled via an auth event that must be sent immediately after establishing the WebSocket connection.

Client Events (Events You Send)

1. auth - Authenticate Connection

Purpose: Authenticates the WebSocket connection using your API key.

Data Structure:

{
"token": "string (your-api-key)"
}

Required Fields:

  • token: Your DynamoAI API key

Server Response:

  • Acknowledgment: client-info event (indicates successful authentication)
  • Error: error event if authentication fails
2. start - Initialize Streaming Session

Purpose: Establishes a new streaming session for content analysis.

Data Structure:

{
"messages": [
{
"role": "user",
"content": "string"
}
],
"policyIds": ["string (24-character hex)"],
"modelId": "string (24-character hex)",
"responseTokenBufferLength": "number"
}

Required Fields:

  • messages: Array with a single message object containing the input prompt
  • policyIds: Array of policy IDs to apply for moderation
  • modelId: Model ID to associate with the session
  • responseTokenBufferLength: Number of tokens to buffer before applying policies

Server Response:

  • Acknowledgment: session_start event with { started: true }
  • Error: WsException if session already exists or validation fails
3. analyze - Stream Text for Analysis

Purpose: Sends text chunks for real-time analysis against configured policies.

Data Structure:

{
"text": "string"
}

Required Fields:

  • text: The text chunk to analyze

Server Response:

  • Conditional Analysis Result: analyze_result event (only when buffered text exceeds responseTokenBufferLength)
  • Error: WsException if session not found
4. end - Terminate Streaming Session

Purpose: Ends the current streaming session and logs final results.

Data Structure:

{}

Server Response:

  • Acknowledgment: session_end event with session summary
  • Error: WsException if session not found

Server Events (Events You Receive)

1. client-info - Authentication Confirmation

Triggered: When an auth event is successfully processed.

Data Structure:

{
"authenticated": true
}
2. session_start - Session Initialization Confirmation

Triggered: When a start event is successfully processed.

Data Structure:

{
"started": true
}
3. analyze_result - Policy Analysis Results

Triggered: When buffered text exceeds the responseTokenBufferLength threshold and policies are applied.

Data Structure:

{
"text": "string",
"finalAction": "BLOCK|WARN|REDACT|SANITIZE|NONE",
"policyResults": [
{
"policyId": "string",
"workerRequestId": "string",
"outputs": "object",
"action": "BLOCK|WARN|REDACT|SANITIZE|NONE",
"violated": "boolean"
}
],
"numTokens": "number",
"warning": "string (optional)"
}

Fields:

  • text: The analyzed text chunk
  • finalAction: Overall action determined by all policies
  • policyResults: Individual results for each applied policy
  • numTokens: Number of tokens analyzed in this chunk
  • warning: Token limit warning (if applicable)
4. session_end - Session Termination Confirmation

Triggered: When an end event is successfully processed.

Data Structure:

{
"ended": true,
"numTokens": "number",
"warning": "string (optional)"
}

Fields:

  • ended: Confirmation that session ended
  • numTokens: Total tokens analyzed in the session
  • warning: Token limit warning (if applicable)
5. session_reset - Session Reset Notification

Triggered: When a new WebSocket connection is established (existing sessions are cleared).

Data Structure:

{
"reset": true
}
6. error - Error Notification

Triggered: When an error occurs during processing.

Data Structure:

{
"message": "string",
"code": "string (optional)"
}

Error Handling

WebSocket Exceptions

The server may send error events for various error conditions:

  1. Authentication Errors:

    • Invalid API key
    • Missing authentication
  2. Session Errors:

    • StreamingSessionError: Session not found or already exists
    • TokenLimitError: Input exceeds token limits
  3. Validation Errors:

    • Invalid message format
    • Missing required fields
    • Invalid policy IDs

Usage Flow

  1. Connect to the WebSocket endpoint
  2. Send auth event with your API key
  3. Receive client-info confirmation
  4. Send start event to initialize session
  5. Receive session_start confirmation
  6. Send multiple analyze events with text chunks
  7. Receive analyze_result events when buffering threshold is met
  8. Send end event to terminate session
  9. Receive session_end confirmation with final summary

Important Notes

Authentication
  • Authentication must be performed immediately after WebSocket connection
  • The auth event must be sent before any other operations
  • Connection will be rejected if authentication fails
Token Buffering
  • Text is buffered until it reaches responseTokenBufferLength tokens
  • Policies are only applied when the buffer threshold is exceeded
  • This reduces API calls while maintaining real-time analysis
Token Limits
  • Input messages are validated against policy token limits
  • Only the first N tokens (as per policy limits) are analyzed
  • Warnings are provided when token limits are exceeded
Session Management
  • Each WebSocket connection can have only one active session
  • Sessions are automatically cleaned up on connection close
  • New connections reset any existing sessions
Policy Actions
  • BLOCK: Content violates policy, should be blocked
  • WARN: Content may be problematic, warning recommended
  • REDACT: Sensitive content should be redacted
  • SANITIZE: Content should be cleaned/sanitized
  • NONE: No policy violations detected

Example Implementation

Basic WebSocket Client
import json
import websocket
import threading
import time
from openai import OpenAI

class AnalyzeStreamingClient:
def __init__(self, uri: str, api_key: str):
self.uri = uri
self.api_key = api_key
self.ws = None
self.connected = False
self.authenticated = False
self.session_active = False
self.latest_analysis_result = None
self.lock = threading.Lock()

def on_message(self, ws, message):
"""Handle incoming WebSocket messages"""
try:
response = json.loads(message)
event_type = response.get("event")
data = response.get("data", {})

if event_type == "client-info":
self.authenticated = True
print("Authentication successful")
elif event_type == "session_start":
self.session_active = True
print("Session started:", data)
elif event_type == "analyze_result":
with self.lock:
self.latest_analysis_result = data
elif event_type == "session_end":
self.session_active = False
print("Session ended:", data)
elif event_type == "error":
print("Error:", data)

except json.JSONDecodeError:
print("Failed to parse message:", message)
except Exception as e:
print("Error handling message:", e)

def on_error(self, ws, error):
print(f"WebSocket error: {error}")

def on_close(self, ws, close_status_code, close_msg):
print("WebSocket connection closed")
self.connected = False
self.authenticated = False
self.session_active = False

def on_open(self, ws):
print("WebSocket connection opened")
self.connected = True
# Authenticate immediately after connection
self.authenticate()

def authenticate(self):
"""Authenticate with the API key"""
self.ws.send(json.dumps({
"event": "auth",
"data": {"token": self.api_key}
}))

def connect(self):
"""Connect to the WebSocket endpoint"""
self.ws = websocket.WebSocketApp(
self.uri,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)

# Run WebSocket in a separate thread
wst = threading.Thread(target=self.ws.run_forever)
wst.daemon = True
wst.start()

# Wait for connection and authentication
while not self.connected or not self.authenticated:
time.sleep(0.1)

def start_session(self, input_prompt: str, policy_ids: list,
model_id: str, response_token_buffer_length: int = 20):
"""Start a new streaming session"""
start_data = {
"messages": [{"role": "user", "content": input_prompt}],
"policyIds": policy_ids,
"modelId": model_id,
"responseTokenBufferLength": response_token_buffer_length
}

self.ws.send(json.dumps({
"event": "start",
"data": start_data
}))

def analyze_text(self, text: str):
"""Send text chunk for analysis"""
self.ws.send(json.dumps({
"event": "analyze",
"data": {"text": text}
}))

def get_latest_analysis_result(self):
"""Get the latest analysis result"""
with self.lock:
result = self.latest_analysis_result
self.latest_analysis_result = None # Clear after reading
return result

def end_session(self):
"""End the current streaming session"""
self.ws.send(json.dumps({
"event": "end",
"data": {}
}))

def close(self):
"""Close the WebSocket connection"""
if self.ws:
self.ws.close()

# Example: Real-time content moderation with OpenAI streaming
def guarded_chat(prompt: str, wait_for_analysis: bool = True):
"""
Example showing real-time content moderation with OpenAI streaming

Args:
prompt: User input prompt
wait_for_analysis: If True, wait for analysis results before printing.
If False, print chunks immediately as they come.
"""

# Configuration
DYNAMO_WS_ENDPOINT = "wss://your-domain/v1/moderation/stream/analyze"
DYNAMO_API_KEY = "your-dynamo-api-key"
OPENAI_API_KEY = "your-openai-api-key"
POLICY_IDS = ["507f1f77bcf86cd799439011"]
MODEL_ID = "507f1f77bcf86cd799439012"

# Initialize clients
dynamo_client = AnalyzeStreamingClient(DYNAMO_WS_ENDPOINT, DYNAMO_API_KEY)
openai_client = OpenAI(api_key=OPENAI_API_KEY, base_url="https://api.mistral.ai/v1")

try:
# Connect to DynamoGuard
dynamo_client.connect()
print("Connected to DynamoGuard")

# Start DynamoGuard session
dynamo_client.start_session(
input_prompt=prompt,
policy_ids=POLICY_IDS,
model_id=MODEL_ID,
response_token_buffer_length=50
)

# Get streaming response from OpenAI
response = openai_client.chat.completions.create(
model="mistral-large-latest",
messages=[{"role": "user", "content": prompt}],
stream=True,
temperature=0.7
)

print("AI Response (with real-time moderation):")
print("-" * 50)

for chunk in response:
choice = chunk.choices[0]
content = choice.delta.content or ""
end_of_stream = bool(choice.finish_reason)
if not end_of_stream:
dynamo_client.analyze_text(content)
if wait_for_analysis:
# Wait for analysis mode: only print when we get analysis results
analysis_result = dynamo_client.get_latest_analysis_result()

if analysis_result:
final_action = analysis_result.get("finalAction")
text_to_yield = analysis_result.get("text", "")

if final_action == "BLOCK":
print(f"{text_to_yield}[BLOCKED]")
break
elif final_action == "NONE":
print(text_to_yield, end="")
else:
# Don't wait for analysis mode: check if analysis is available, but don't wait
analysis_result = dynamo_client.get_latest_analysis_result()

if analysis_result:
final_action = analysis_result.get("finalAction")
if final_action == "BLOCK":
print(f"{content}[BLOCKED]")
break
elif final_action == "NONE":
print(content, end="")
else:
# No analysis result yet, yield the chunk anyway
print(content, end="")


print("\n" + "-" * 50)

except Exception as e:
print(f"Error: {e}")
finally:
# Clean up
dynamo_client.end_session()
dynamo_client.close()

# Example usage with different modes
def main():
prompt = "Write a detailed email about a credit report, start generic then add fake names, phone numbers, addresses, etc."

# Mode 1: Wait for analysis before printing (safer, shows only moderated content)
print("=== Mode 1: Wait for analysis before printing ===")
guarded_chat(prompt, wait_for_analysis=True)

print("\n\n=== Mode 2: Print chunks immediately ===")
# Mode 2: Print immediately without waiting for analysis (faster, but may show unmoderated content)
guarded_chat(prompt, wait_for_analysis=False)

# Run the example
if __name__ == "__main__":
main()

Key Features

  • Token Buffering: Text is buffered until it reaches responseTokenBufferLength tokens, reducing API calls while maintaining real-time analysis
  • Real-time Analysis: Policies are applied as text streams in, enabling immediate content filtering
  • Session Management: Each WebSocket connection can manage one active session with automatic cleanup
  • Comprehensive Error Handling: Built-in error handling for various failure scenarios
  • Flexible Output Modes: Choose between immediate output or waiting for moderated results