Error Handling
Proper error handling is crucial for building robust applications with the Synthreo Builder API. This guide covers different types of errors you may encounter, how to handle them gracefully, and best practices for debugging and recovery.
Types of Errors
1. HTTP Status Code Errors
These are standard HTTP errors returned by the API server before your request is processed.
Authentication Errors (401 Unauthorized)
Cause: Invalid or expired JWT token, incorrect credentials.
{
"error": "Invalid credentials"
}
Common scenarios:
- JWT token has expired (24-hour lifetime)
- Incorrect email, password, or user ID
- Missing or malformed Authorization header
Handling:
import requests
def handle_auth_error(response):
if response.status_code == 401:
print("Authentication failed. Please check your credentials.")
# Re-authenticate and retry
new_token = authenticate()
return new_token
return None
try:
response = requests.get(url, headers=headers)
if response.status_code == 401:
new_token = handle_auth_error(response)
if new_token:
headers['Authorization'] = f'Bearer {new_token}'
response = requests.get(url, headers=headers) # Retry
except requests.exceptions.HTTPError as e:
print(f"HTTP Error: {e}")
Bad Request Errors (400)
Cause: Malformed request payload or missing required fields.
{
"error": "The request body is malformed or missing required fields"
}
Common scenarios:
- Invalid JSON in request body
- Missing required parameters (Action, UserSays)
- Incorrect data types (string instead of integer)
Not Found Errors (404)
Cause: Invalid cognitive diagram ID or endpoint URL.
{
"error": "Cognitive diagram not found"
}
Rate Limiting (429)
Cause: Too many requests in a short time period.
Handling:
async function makeRequestWithRetry(url, options, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
const response = await fetch(url, options);
if (response.status === 429) {
const retryAfter = response.headers.get('Retry-After') || (2 ** i);
console.log(`Rate limited. Retrying after ${retryAfter} seconds...`);
await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
continue;
}
return response;
} catch (error) {
if (i === maxRetries - 1) throw error;
}
}
}
Server Errors (500-599)
Cause: Internal server issues, temporary outages, or system maintenance.
Handling:
import time
import random
def exponential_backoff_retry(func, max_retries=3, base_delay=1):
for attempt in range(max_retries):
try:
return func()
except requests.exceptions.HTTPError as e:
if 500 <= e.response.status_code < 600:
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Server error. Retrying in {delay:.2f} seconds...")
time.sleep(delay)
continue
raise
raise Exception("Max retries exceeded")
2. Asynchronous Job Status Errors
Job Status Codes
When polling job status, different HTTP status codes indicate different states:
Status Code | Meaning | Action |
---|---|---|
202 Accepted | Job is still running | Continue polling |
200 OK | Job completed successfully | Process results |
400 Bad Request | Invalid job ID | Check job ID format |
404 Not Found | Job not found or expired | Job may have been cleaned up |
500 Internal Server Error | Job failed due to system error | Check error logs, possibly retry |
Job Polling Best Practices
def poll_job_with_error_handling(client, job_id, max_attempts=120, interval=30):
"""Poll job status with comprehensive error handling"""
attempts = 0
consecutive_errors = 0
max_consecutive_errors = 3
while attempts < max_attempts:
try:
response = client.get_job_status(job_id)
consecutive_errors = 0 # Reset error counter on success
if response.status_code == 202:
print(f"Job {job_id} still running (attempt {attempts + 1})")
time.sleep(interval)
attempts += 1
continue
elif response.status_code == 200:
print("Job completed successfully")
return response.json()
elif response.status_code == 404:
raise JobNotFoundError(f"Job {job_id} not found or expired")
else:
raise JobStatusError(f"Unexpected status: {response.status_code}")
except requests.exceptions.RequestException as e:
consecutive_errors += 1
print(f"Network error polling job: {e}")
if consecutive_errors >= max_consecutive_errors:
raise JobPollingError("Too many consecutive network errors")
# Exponential backoff for network errors
error_delay = min(interval * (2 ** consecutive_errors), 300)
time.sleep(error_delay)
attempts += 1
raise JobTimeoutError(f"Job {job_id} timed out after {max_attempts} attempts")
class JobError(Exception):
"""Base class for job-related errors"""
pass
class JobNotFoundError(JobError):
pass
class JobTimeoutError(JobError):
pass
class JobStatusError(JobError):
pass
class JobPollingError(JobError):
pass
3. Cognitive Diagram Execution Errors
Even when the HTTP request succeeds (200 OK), the cognitive diagram execution itself may encounter errors. These are returned in the errorData
field of the response.
Understanding errorData
Important: The errorData
field can contain both actual errors and informational messages. Not all content in errorData
indicates a failure.
{
"result": "OK",
"outputData": "Task completed",
"errorData": "[{\"message\":\"Processing completed\",\"type\":\"INFO\"}]"
}
Common Execution Errors
{
"result": "OK",
"outputData": "",
"errorData": "[{\"message\":\"General error: variable not populated\",\"node_name\":\"Azure OpenAI\",\"node_id\":\"abc-123\",\"type\":\"ERROR\"}]"
}
Parsing and Handling Execution Errors
import json
def parse_execution_response(api_response):
"""Parse cognitive diagram execution response with error handling"""
try:
# Check for successful output first
if api_response.get('outputData'):
output_data = api_response['outputData']
# Try to parse as JSON
try:
parsed_output = json.loads(output_data)
if isinstance(parsed_output, list) and parsed_output:
return {"success": True, "data": parsed_output[0]}
elif isinstance(parsed_output, dict):
return {"success": True, "data": parsed_output}
else:
return {"success": True, "data": str(parsed_output)}
except json.JSONDecodeError:
# Return raw string if not JSON
return {"success": True, "data": output_data}
# Check errorData for actual errors
if api_response.get('errorData') and api_response['errorData'] != "[]":
try:
error_list = json.loads(api_response['errorData'])
errors = []
warnings = []
info = []
for item in error_list:
error_type = item.get('type', 'UNKNOWN').upper()
message = item.get('message', 'No message provided')
node_name = item.get('node_name', 'Unknown node')
error_info = {
'message': message,
'node_name': node_name,
'type': error_type
}
if error_type == 'ERROR':
errors.append(error_info)
elif error_type == 'WARNING':
warnings.append(error_info)
else:
info.append(error_info)
if errors:
return {
"success": False,
"errors": errors,
"warnings": warnings,
"info": info
}
else:
# Only warnings/info, treat as success
return {
"success": True,
"data": "Operation completed with warnings",
"warnings": warnings,
"info": info
}
except json.JSONDecodeError:
return {
"success": False,
"errors": [{"message": f"Failed to parse error data: {api_response['errorData']}"}]
}
# No output and no errors
return {"success": False, "errors": [{"message": "No response generated"}]}
except Exception as e:
return {"success": False, "errors": [{"message": f"Response parsing failed: {str(e)}"}]}
# Usage example
def execute_with_error_handling(client, diagram_id, message):
try:
response = client.execute_diagram(diagram_id, message)
result = parse_execution_response(response)
if result['success']:
print("Execution successful!")
print(f"Result: {result['data']}")
if result.get('warnings'):
print("Warnings:")
for warning in result['warnings']:
print(f" - {warning['node_name']}: {warning['message']}")
else:
print("Execution failed!")
for error in result['errors']:
print(f"Error in {error.get('node_name', 'unknown')}: {error['message']}")
return result
except Exception as e:
print(f"Request failed: {e}")
return {"success": False, "errors": [{"message": str(e)}]}
4. Training State Errors
Invalid State Transitions
When triggering agent training, monitor the stateId
to ensure proper state transitions:
def monitor_training_with_error_handling(client, diagram_id, timeout_minutes=60):
"""Monitor training with comprehensive state error handling"""
start_time = time.time()
timeout_seconds = timeout_minutes * 60
last_state = None
state_change_count = 0
max_state_changes = 10 # Prevent infinite state loops
while time.time() - start_time < timeout_seconds:
try:
response = client.get_agent_status(diagram_id)
agent_data = response.json()
current_state = agent_data.get('stateId')
if current_state != last_state:
state_change_count += 1
if state_change_count > max_state_changes:
raise TrainingError("Too many state changes, possible system instability")
print(f"State changed from {last_state} to {current_state}")
last_state = current_state
if current_state == 6: # Training
print("Agent is training...")
time.sleep(60)
elif current_state == 2: # Idle/Ready
print("Training completed successfully!")
return {"success": True, "final_state": current_state}
elif current_state in [3, 4, 5]: # Error states
error_msg = get_state_error_message(current_state)
raise TrainingError(f"Training failed with state {current_state}: {error_msg}")
else:
print(f"Unexpected state {current_state}, continuing to monitor...")
time.sleep(30)
except requests.exceptions.RequestException as e:
print(f"Network error checking training status: {e}")
time.sleep(30) # Continue monitoring despite network errors
raise TrainingTimeoutError(f"Training timed out after {timeout_minutes} minutes")
def get_state_error_message(state_id):
"""Get human-readable error message for state IDs"""
error_messages = {
3: "Training failed due to data issues",
4: "Training failed due to configuration error",
5: "Training failed due to system error"
}
return error_messages.get(state_id, f"Unknown error state: {state_id}")
class TrainingError(Exception):
pass
class TrainingTimeoutError(TrainingError):
pass
5. Network and Connection Errors
Connection Timeouts
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
def create_robust_session():
"""Create a requests session with robust retry strategy"""
session = requests.Session()
# Define retry strategy
retry_strategy = Retry(
total=3, # Total number of retries
backoff_factor=1, # Delay between retries: 1, 2, 4 seconds
status_forcelist=[429, 500, 502, 503, 504], # HTTP status codes to retry
method_whitelist=["HEAD", "GET", "OPTIONS", "POST", "PATCH"]
)
# Mount adapter with retry strategy
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
# Set reasonable timeouts
session.timeout = (10, 300) # (connection timeout, read timeout)
return session
# Usage
session = create_robust_session()
try:
response = session.post(url, json=payload, headers=headers)
response.raise_for_status()
except requests.exceptions.Timeout:
print("Request timed out")
except requests.exceptions.ConnectionError:
print("Connection error occurred")
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
DNS and SSL Errors
import ssl
import socket
def diagnose_connection_error(url):
"""Diagnose common connection issues"""
try:
# Test DNS resolution
from urllib.parse import urlparse
hostname = urlparse(url).hostname
socket.gethostbyname(hostname)
print(f"DNS resolution successful for {hostname}")
# Test SSL connection (if HTTPS)
if url.startswith('https'):
context = ssl.create_default_context()
with socket.create_connection((hostname, 443), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
print(f"SSL connection successful to {hostname}")
except socket.gaierror as e:
print(f"DNS resolution failed: {e}")
except ssl.SSLError as e:
print(f"SSL error: {e}")
except socket.timeout:
print("Connection timed out")
except Exception as e:
print(f"Connection test failed: {e}")
Comprehensive Error Handling Strategy
Complete Error Handling Class
import logging
import time
from typing import Dict, Any, Optional
from enum import Enum
class ErrorSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class SynthreoErrorHandler:
def __init__(self, log_level=logging.INFO):
self.logger = logging.getLogger(__name__)
self.logger.setLevel(log_level)
# Error counters for monitoring
self.error_counts = {
'auth_errors': 0,
'network_errors': 0,
'job_failures': 0,
'execution_errors': 0
}
def handle_api_error(self, error: Exception, context: str = "") -> Dict[str, Any]:
"""Central error handling method"""
error_info = {
'timestamp': time.time(),
'context': context,
'error_type': type(error).__name__,
'message': str(error),
'severity': ErrorSeverity.MEDIUM
}
if isinstance(error, requests.exceptions.HTTPError):
status_code = error.response.status_code
error_info.update(self._handle_http_error(status_code, error))
elif isinstance(error, requests.exceptions.Timeout):
error_info.update(self._handle_timeout_error(error))
elif isinstance(error, requests.exceptions.ConnectionError):
error_info.update(self._handle_connection_error(error))
elif isinstance(error, (JobError, TrainingError)):
error_info.update(self._handle_job_error(error))
else:
error_info.update(self._handle_unknown_error(error))
# Log the error
self._log_error(error_info)
# Update error counters
self._update_error_counts(error_info)
return error_info
def _handle_http_error(self, status_code: int, error: Exception) -> Dict[str, Any]:
severity_map = {
400: ErrorSeverity.MEDIUM,
401: ErrorSeverity.HIGH,
403: ErrorSeverity.HIGH,
404: ErrorSeverity.MEDIUM,
429: ErrorSeverity.LOW,
500: ErrorSeverity.HIGH,
502: ErrorSeverity.HIGH,
503: ErrorSeverity.MEDIUM
}
retry_map = {
429: True, # Rate limit
500: True, # Internal server error
502: True, # Bad gateway
503: True, # Service unavailable
}
return {
'status_code': status_code,
'severity': severity_map.get(status_code, ErrorSeverity.MEDIUM),
'should_retry': retry_map.get(status_code, False),
'retry_delay': self._calculate_retry_delay(status_code)
}
def _handle_timeout_error(self, error: Exception) -> Dict[str, Any]:
self.error_counts['network_errors'] += 1
return {
'severity': ErrorSeverity.MEDIUM,
'should_retry': True,
'retry_delay': 30
}
def _handle_connection_error(self, error: Exception) -> Dict[str, Any]:
self.error_counts['network_errors'] += 1
return {
'severity': ErrorSeverity.HIGH,
'should_retry': True,
'retry_delay': 60
}
def _handle_job_error(self, error: Exception) -> Dict[str, Any]:
self.error_counts['job_failures'] += 1
severity = ErrorSeverity.CRITICAL if isinstance(error, JobTimeoutError) else ErrorSeverity.HIGH
return {
'severity': severity,
'should_retry': False
}
def _handle_unknown_error(self, error: Exception) -> Dict[str, Any]:
return {
'severity': ErrorSeverity.MEDIUM,
'should_retry': False
}
def _calculate_retry_delay(self, status_code: int) -> int:
delay_map = {
429: 60, # Rate limit - wait longer
500: 30, # Server error
502: 15, # Bad gateway
503: 45 # Service unavailable
}
return delay_map.get(status_code, 30)
def _log_error(self, error_info: Dict[str, Any]):
level_map = {
ErrorSeverity.LOW: logging.INFO,
ErrorSeverity.MEDIUM: logging.WARNING,
ErrorSeverity.HIGH: logging.ERROR,
ErrorSeverity.CRITICAL: logging.CRITICAL
}
level = level_map[error_info['severity']]
message = f"[{error_info['context']}] {error_info['error_type']}: {error_info['message']}"
self.logger.log(level, message)
def _update_error_counts(self, error_info: Dict[str, Any]):
if 'status_code' in error_info:
if error_info['status_code'] == 401:
self.error_counts['auth_errors'] += 1
if error_info['error_type'] in ['ConnectionError', 'Timeout']:
self.error_counts['network_errors'] += 1
def get_error_summary(self) -> Dict[str, Any]:
"""Get summary of all errors encountered"""
return {
'error_counts': self.error_counts.copy(),
'total_errors': sum(self.error_counts.values())
}
def should_circuit_break(self, error_type: str, threshold: int = 5) -> bool:
"""Determine if circuit breaker should activate"""
return self.error_counts.get(error_type, 0) >= threshold
# Usage Example
error_handler = SynthreoErrorHandler()
def robust_api_call(client, operation, *args, **kwargs):
max_retries = 3
for attempt in range(max_retries):
try:
return operation(*args, **kwargs)
except Exception as e:
error_info = error_handler.handle_api_error(e, f"Attempt {attempt + 1}")
# Check if we should retry
if attempt < max_retries - 1 and error_info.get('should_retry', False):
delay = error_info.get('retry_delay', 30)
print(f"Retrying in {delay} seconds...")
time.sleep(delay)
continue
else:
# Final attempt failed or shouldn't retry
raise e
raise Exception("All retry attempts failed")
Monitoring and Alerting
Error Rate Monitoring
def monitor_error_rates(error_handler: SynthreoErrorHandler, alert_threshold: float = 0.1):
"""Monitor error rates and trigger alerts"""
summary = error_handler.get_error_summary()
total_errors = summary['total_errors']
# Calculate error rate (you'd track total requests separately)
total_requests = 100 # Example - track this in your application
error_rate = total_errors / total_requests if total_requests > 0 else 0
if error_rate > alert_threshold:
send_alert(f"High error rate detected: {error_rate:.2%}")
# Check for specific error patterns
if summary['error_counts']['auth_errors'] > 5:
send_alert("Multiple authentication failures - check credentials")
if summary['error_counts']['network_errors'] > 10:
send_alert("Network connectivity issues detected")
def send_alert(message: str):
"""Send alert (implement your preferred alerting mechanism)"""
print(f"ALERT: {message}")
# Implement: send email, Slack notification, logging to monitoring system, etc.
Best Practices Summary
- Implement comprehensive error handling for all API interactions
- Use exponential backoff for retry strategies
- Monitor error patterns and rates for early detection of issues
- Parse errorData carefully - it contains both errors and informational messages
- Set appropriate timeouts for different operation types
- Log errors with context for easier debugging
- Implement circuit breakers for cascading failure prevention
- Handle token expiration gracefully with automatic re-authentication
- Validate responses before processing to catch malformed data early
- Use structured error handling with proper exception hierarchies