Monitoring & Observability
Comprehensive monitoring and observability for webMCP applications. Track performance, detect issues, and ensure optimal operation.
Monitoring Capabilities
Comprehensive observability for webMCP applications and infrastructure
Real-Time Monitoring
Monitor webMCP performance and health in real-time
- Live performance metrics
- Real-time error tracking
- System health monitoring
- Resource utilization tracking
Intelligent Alerting
Smart alerts for performance issues and anomalies
- Threshold-based alerts
- Anomaly detection
- Smart notification routing
- Alert escalation policies
Health Checks
Comprehensive health monitoring and diagnostics
- Endpoint health checks
- Dependency monitoring
- Service availability tracking
- Performance diagnostics
Key Monitoring Metrics
Essential metrics for comprehensive webMCP monitoring
Performance Metrics
Processing Time
msAverage time to process webMCP requests
Throughput
req/sNumber of requests processed per second
Token Reduction Rate
%Percentage of tokens reduced
Success Rate
%Percentage of successful optimizations
System Metrics
CPU Usage
%Processor utilization
Memory Usage
MBMemory consumption
Network I/O
MbpsNetwork bandwidth utilization
Disk Usage
%Storage space utilization
Business Metrics
Cost per Optimization
$Average cost per processed request
Error Rate
%Percentage of failed requests
User Satisfaction
scoreUser satisfaction score
API Response Time
msAverage API response time
Alerting Strategies
Intelligent alerting approaches for different monitoring scenarios
Threshold-Based Alerts
Traditional threshold monitoring with static limits
Use Case:
Basic monitoring with known performance baselines
Advantages:
- Simple to configure
- Predictable behavior
- Low false positive rate
Configuration:
Set static thresholds for key metrics like response time, error rate, and resource usage
Anomaly Detection
ML-powered detection of unusual patterns and behaviors
Use Case:
Complex systems with dynamic performance patterns
Advantages:
- Adapts to usage patterns
- Detects unknown issues
- Reduces manual tuning
Configuration:
Use machine learning models to establish baselines and detect deviations
Composite Alerts
Combine multiple metrics for intelligent alerting
Use Case:
Reducing alert fatigue while maintaining coverage
Advantages:
- Lower false positives
- Context-aware alerts
- Actionable notifications
Configuration:
Create rules that consider multiple metrics and system context
Predictive Alerts
Proactive alerts based on trend analysis
Use Case:
Preventing issues before they impact users
Advantages:
- Early warning system
- Prevents outages
- Capacity planning
Configuration:
Analyze trends to predict when thresholds will be breached
Implementation Examples
Practical examples for implementing monitoring in your applications
Basic Monitoring Setup
Set up comprehensive monitoring for webMCP applications
JavaScriptimport { WebMCPProcessor, Monitor, AlertManager } from '@webmcp/core';
class WebMCPMonitor {
constructor(options = {}) {
this.processor = new WebMCPProcessor();
this.monitor = new Monitor({
collectInterval: options.collectInterval || 10000, // 10 seconds
metricsRetention: options.metricsRetention || 86400000, // 24 hours
enableRealTime: options.enableRealTime !== false
});
this.alertManager = new AlertManager({
channels: options.alertChannels || ['console', 'webhook'],
webhookUrl: options.webhookUrl,
escalationPolicy: options.escalationPolicy
});
this.metrics = {
performance: new Map(),
system: new Map(),
business: new Map()
};
this.setupMetrics();
this.setupAlerts();
}
setupMetrics() {
// Performance metrics
this.monitor.gauge('webmcp_processing_time', {
description: 'Time taken to process webMCP requests',
unit: 'milliseconds',
labels: ['model', 'complexity']
});
this.monitor.counter('webmcp_requests_total', {
description: 'Total number of webMCP requests',
labels: ['status', 'model']
});
this.monitor.histogram('webmcp_token_reduction', {
description: 'Token reduction percentage distribution',
buckets: [10, 25, 50, 65, 75, 85, 95, 100]
});
this.monitor.gauge('webmcp_cost_per_request', {
description: 'Cost per webMCP optimization',
unit: 'dollars'
});
// System metrics
this.monitor.gauge('system_cpu_usage', {
description: 'CPU usage percentage',
unit: 'percent'
});
this.monitor.gauge('system_memory_usage', {
description: 'Memory usage in bytes',
unit: 'bytes'
});
this.monitor.gauge('system_active_connections', {
description: 'Number of active connections'
});
}
setupAlerts() {
// Performance alerts
this.alertManager.addRule({
name: 'high_processing_time',
metric: 'webmcp_processing_time',
condition: 'above',
threshold: 5000, // 5 seconds
duration: 300000, // 5 minutes
severity: 'warning',
message: 'WebMCP processing time is above normal levels'
});
this.alertManager.addRule({
name: 'low_success_rate',
metric: 'webmcp_requests_total',
condition: 'rate_below',
threshold: 0.95, // 95% success rate
duration: 600000, // 10 minutes
severity: 'critical',
message: 'WebMCP success rate has dropped below 95%'
});
this.alertManager.addRule({
name: 'high_error_rate',
metric: 'webmcp_requests_total',
condition: 'error_rate_above',
threshold: 0.05, // 5% error rate
duration: 300000, // 5 minutes
severity: 'critical',
message: 'WebMCP error rate is critically high'
});
// System alerts
this.alertManager.addRule({
name: 'high_cpu_usage',
metric: 'system_cpu_usage',
condition: 'above',
threshold: 80,
duration: 900000, // 15 minutes
severity: 'warning',
message: 'System CPU usage is high'
});
this.alertManager.addRule({
name: 'high_memory_usage',
metric: 'system_memory_usage',
condition: 'above',
threshold: 2147483648, // 2GB in bytes
duration: 600000, // 10 minutes
severity: 'critical',
message: 'System memory usage is critical'
});
}
// Monitor webMCP operations
async monitorOperation(operation, context = {}) {
const startTime = Date.now();
const operationId = `op_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
try {
// Start monitoring
this.monitor.increment('webmcp_requests_total', { status: 'started', model: context.model || 'unknown' });
// Execute operation
const result = await operation();
const endTime = Date.now();
const duration = endTime - startTime;
// Record metrics
this.monitor.observe('webmcp_processing_time', duration, {
model: context.model || 'unknown',
complexity: this.assessComplexity(result)
});
this.monitor.increment('webmcp_requests_total', { status: 'success', model: context.model || 'unknown' });
if (result.tokenReduction) {
this.monitor.observe('webmcp_token_reduction', result.tokenReduction);
}
if (result.cost) {
this.monitor.observe('webmcp_cost_per_request', result.cost);
}
// Check for performance issues
if (duration > 10000) { // 10 seconds
this.alertManager.trigger('slow_operation', {
operationId,
duration,
context
});
}
return result;
} catch (error) {
const endTime = Date.now();
const duration = endTime - startTime;
// Record error metrics
this.monitor.increment('webmcp_requests_total', { status: 'error', model: context.model || 'unknown' });
this.monitor.observe('webmcp_processing_time', duration, {
model: context.model || 'unknown',
complexity: 'error'
});
// Trigger error alert
this.alertManager.trigger('operation_error', {
operationId,
error: error.message,
duration,
context
});
throw error;
}
}
// Assess operation complexity
assessComplexity(result) {
const elementCount = result.elements?.length || 0;
const originalTokens = result.originalTokens || 0;
if (elementCount > 20 || originalTokens > 2000) return 'high';
if (elementCount > 10 || originalTokens > 1000) return 'medium';
return 'low';
}
// Get current system metrics
async collectSystemMetrics() {
try {
// CPU usage (simplified - in real implementation, use system libraries)
const cpuUsage = await this.getCPUUsage();
this.monitor.set('system_cpu_usage', cpuUsage);
// Memory usage
const memoryUsage = process.memoryUsage();
this.monitor.set('system_memory_usage', memoryUsage.heapUsed);
// Active connections (example)
const activeConnections = this.getActiveConnections();
this.monitor.set('system_active_connections', activeConnections);
} catch (error) {
console.error('Failed to collect system metrics:', error);
}
}
// Health check endpoint
async healthCheck() {
const health = {
status: 'healthy',
timestamp: new Date().toISOString(),
checks: {}
};
try {
// Check processor health
health.checks.processor = await this.checkProcessorHealth();
// Check system resources
health.checks.system = await this.checkSystemHealth();
// Check external dependencies
health.checks.dependencies = await this.checkDependencies();
// Determine overall status
const allHealthy = Object.values(health.checks).every(check => check.status === 'healthy');
health.status = allHealthy ? 'healthy' : 'degraded';
} catch (error) {
health.status = 'unhealthy';
health.error = error.message;
}
return health;
}
async checkProcessorHealth() {
try {
// Simple test operation
const testHtml = '<form><input name="test"><button>Test</button></form>';
const startTime = Date.now();
await this.processor.parseWebMCP(testHtml);
const duration = Date.now() - startTime;
return {
status: duration < 5000 ? 'healthy' : 'degraded',
responseTime: duration,
lastCheck: new Date().toISOString()
};
} catch (error) {
return {
status: 'unhealthy',
error: error.message,
lastCheck: new Date().toISOString()
};
}
}
async checkSystemHealth() {
const memoryUsage = process.memoryUsage();
const cpuUsage = await this.getCPUUsage();
const memoryHealthy = memoryUsage.heapUsed < 2147483648; // 2GB
const cpuHealthy = cpuUsage < 90;
return {
status: memoryHealthy && cpuHealthy ? 'healthy' : 'degraded',
memory: {
used: memoryUsage.heapUsed,
total: memoryUsage.heapTotal,
healthy: memoryHealthy
},
cpu: {
usage: cpuUsage,
healthy: cpuHealthy
}
};
}
async checkDependencies() {
// Check external API availability
const dependencies = {
openai: { status: 'unknown' },
anthropic: { status: 'unknown' }
};
// In real implementation, ping external services
// This is a simplified example
dependencies.openai.status = 'healthy';
dependencies.anthropic.status = 'healthy';
const allHealthy = Object.values(dependencies).every(dep => dep.status === 'healthy');
return {
status: allHealthy ? 'healthy' : 'degraded',
services: dependencies
};
}
// Utility methods (simplified implementations)
async getCPUUsage() {
// In real implementation, use system monitoring libraries
return Math.random() * 100; // Mock CPU usage
}
getActiveConnections() {
// In real implementation, get actual connection count
return Math.floor(Math.random() * 100); // Mock connection count
}
// Start monitoring
start() {
console.log('Starting WebMCP monitoring...');
// Collect system metrics every 10 seconds
setInterval(() => {
this.collectSystemMetrics();
}, 10000);
// Health check every 30 seconds
setInterval(async () => {
const health = await this.healthCheck();
if (health.status !== 'healthy') {
this.alertManager.trigger('health_check_failed', health);
}
}, 30000);
}
// Get monitoring dashboard data
getDashboardData() {
return {
metrics: this.monitor.getMetrics(),
alerts: this.alertManager.getActiveAlerts(),
health: this.healthCheck()
};
}
}
// Usage example
const monitor = new WebMCPMonitor({
alertChannels: ['console', 'webhook'],
webhookUrl: 'https://hooks.slack.com/services/...',
collectInterval: 5000 // 5 seconds
});
// Start monitoring
monitor.start();
// Monitor webMCP operations
const monitoredProcessor = {
async parseWebMCP(html, options = {}) {
return monitor.monitorOperation(async () => {
const processor = new WebMCPProcessor();
return processor.parseWebMCP(html, options);
}, { model: options.targetModel });
}
};
// Health check endpoint (Express.js example)
app.get('/health', async (req, res) => {
const health = await monitor.healthCheck();
const statusCode = health.status === 'healthy' ? 200 :
health.status === 'degraded' ? 200 : 503;
res.status(statusCode).json(health);
});
// Metrics endpoint
app.get('/metrics', (req, res) => {
const dashboardData = monitor.getDashboardData();
res.json(dashboardData);
});Advanced Alerting Configuration
Set up intelligent alerting with escalation policies
JavaScriptclass AdvancedAlertManager {
constructor(config) {
this.config = config;
this.rules = new Map();
this.activeAlerts = new Map();
this.escalationPolicies = new Map();
this.channels = new Map();
this.silences = new Map();
this.setupChannels();
this.setupEscalationPolicies();
}
setupChannels() {
// Email channel
this.channels.set('email', {
type: 'email',
send: async (alert, recipients) => {
// Email implementation
console.log(`Sending email alert: ${alert.message} to ${recipients.join(', ')}`);
}
});
// Slack channel
this.channels.set('slack', {
type: 'slack',
send: async (alert, config) => {
const payload = {
text: `🚨 *${alert.severity.toUpperCase()}*: ${alert.message}`,
channel: config.channel,
attachments: [{
color: alert.severity === 'critical' ? 'danger' : 'warning',
fields: [
{ title: 'Metric', value: alert.metric, short: true },
{ title: 'Value', value: alert.value, short: true },
{ title: 'Threshold', value: alert.threshold, short: true },
{ title: 'Duration', value: alert.duration, short: true }
],
ts: Math.floor(Date.now() / 1000)
}]
};
// Send to Slack webhook
await fetch(config.webhookUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload)
});
}
});
// PagerDuty channel
this.channels.set('pagerduty', {
type: 'pagerduty',
send: async (alert, config) => {
const payload = {
routing_key: config.routingKey,
event_action: 'trigger',
dedup_key: alert.id,
payload: {
summary: alert.message,
severity: alert.severity,
source: 'webmcp-monitor',
component: alert.component,
group: alert.group,
class: alert.class,
custom_details: alert.context
}
};
await fetch('https://events.pagerduty.com/v2/enqueue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload)
});
}
});
}
setupEscalationPolicies() {
// Standard escalation policy
this.escalationPolicies.set('standard', {
name: 'Standard Escalation',
steps: [
{
delay: 0, // Immediate
channels: [
{ type: 'slack', config: { channel: '#alerts', webhookUrl: process.env.SLACK_WEBHOOK } }
]
},
{
delay: 300000, // 5 minutes
channels: [
{ type: 'email', recipients: ['devops@company.com'] }
]
},
{
delay: 900000, // 15 minutes
channels: [
{ type: 'pagerduty', config: { routingKey: process.env.PAGERDUTY_KEY } }
]
}
]
});
// Critical escalation policy
this.escalationPolicies.set('critical', {
name: 'Critical Escalation',
steps: [
{
delay: 0, // Immediate
channels: [
{ type: 'slack', config: { channel: '#critical-alerts', webhookUrl: process.env.SLACK_WEBHOOK } },
{ type: 'pagerduty', config: { routingKey: process.env.PAGERDUTY_KEY } }
]
},
{
delay: 180000, // 3 minutes
channels: [
{ type: 'email', recipients: ['oncall@company.com', 'cto@company.com'] }
]
}
]
});
}
// Add alerting rule with advanced configuration
addAdvancedRule(rule) {
const ruleId = rule.id || `rule_${Date.now()}`;
const advancedRule = {
...rule,
id: ruleId,
state: 'normal',
lastTriggered: null,
escalationLevel: 0,
suppressionCount: 0,
// Advanced conditions
conditions: rule.conditions || [],
// Composite conditions (multiple metrics)
compositeCondition: rule.compositeCondition,
// Time-based conditions
timeWindows: rule.timeWindows || [],
// Anomaly detection settings
anomalyDetection: rule.anomalyDetection,
// Auto-resolution settings
autoResolve: rule.autoResolve !== false,
resolveThreshold: rule.resolveThreshold,
// Notification settings
escalationPolicy: rule.escalationPolicy || 'standard',
suppressDuration: rule.suppressDuration || 300000, // 5 minutes
maxEscalations: rule.maxEscalations || 3
};
this.rules.set(ruleId, advancedRule);
return ruleId;
}
// Evaluate rules against metrics
async evaluateRules(metrics) {
for (const [ruleId, rule] of this.rules) {
try {
if (this.isRuleSilenced(ruleId)) continue;
const shouldTrigger = await this.evaluateRule(rule, metrics);
const currentState = rule.state;
if (shouldTrigger && currentState === 'normal') {
// Trigger alert
await this.triggerAlert(rule, metrics);
} else if (!shouldTrigger && currentState === 'alerting') {
// Resolve alert
await this.resolveAlert(rule);
}
} catch (error) {
console.error(`Error evaluating rule ${ruleId}:`, error);
}
}
}
async evaluateRule(rule, metrics) {
// Simple threshold evaluation
if (rule.condition && rule.threshold !== undefined) {
const metricValue = metrics.get(rule.metric);
if (metricValue === undefined) return false;
switch (rule.condition) {
case 'above': return metricValue > rule.threshold;
case 'below': return metricValue < rule.threshold;
case 'equals': return metricValue === rule.threshold;
default: return false;
}
}
// Composite condition evaluation
if (rule.compositeCondition) {
return this.evaluateCompositeCondition(rule.compositeCondition, metrics);
}
// Anomaly detection
if (rule.anomalyDetection) {
return this.evaluateAnomalyDetection(rule, metrics);
}
return false;
}
evaluateCompositeCondition(condition, metrics) {
switch (condition.operator) {
case 'AND':
return condition.conditions.every(cond => this.evaluateSingleCondition(cond, metrics));
case 'OR':
return condition.conditions.some(cond => this.evaluateSingleCondition(cond, metrics));
case 'NOT':
return !this.evaluateSingleCondition(condition.condition, metrics);
default:
return false;
}
}
evaluateSingleCondition(condition, metrics) {
const metricValue = metrics.get(condition.metric);
if (metricValue === undefined) return false;
switch (condition.operator) {
case '>': return metricValue > condition.value;
case '<': return metricValue < condition.value;
case '>=': return metricValue >= condition.value;
case '<=': return metricValue <= condition.value;
case '==': return metricValue === condition.value;
case '!=': return metricValue !== condition.value;
default: return false;
}
}
evaluateAnomalyDetection(rule, metrics) {
// Simplified anomaly detection
// In production, use proper statistical methods or ML models
const metricValue = metrics.get(rule.metric);
if (metricValue === undefined) return false;
const historical = rule.historicalData || [];
if (historical.length < 10) {
// Not enough data for anomaly detection
historical.push(metricValue);
return false;
}
const mean = historical.reduce((sum, val) => sum + val, 0) / historical.length;
const stdDev = Math.sqrt(historical.reduce((sum, val) => sum + Math.pow(val - mean, 2), 0) / historical.length);
const threshold = rule.anomalyDetection.threshold || 2; // 2 standard deviations
const deviation = Math.abs(metricValue - mean) / stdDev;
// Update historical data (sliding window)
historical.push(metricValue);
if (historical.length > 100) {
historical.shift();
}
return deviation > threshold;
}
async triggerAlert(rule, metrics) {
const alertId = `alert_${rule.id}_${Date.now()}`;
const metricValue = metrics.get(rule.metric);
const alert = {
id: alertId,
ruleId: rule.id,
ruleName: rule.name,
message: rule.message,
severity: rule.severity,
metric: rule.metric,
value: metricValue,
threshold: rule.threshold,
timestamp: Date.now(),
state: 'firing',
escalationLevel: 0,
context: {
...rule.context,
evaluatedAt: new Date().toISOString(),
metrics: Object.fromEntries(metrics)
}
};
// Update rule state
rule.state = 'alerting';
rule.lastTriggered = Date.now();
rule.escalationLevel = 0;
// Store active alert
this.activeAlerts.set(alertId, alert);
// Send notifications
await this.sendNotifications(alert, rule.escalationPolicy);
// Schedule escalation
this.scheduleEscalation(alert, rule);
console.log(`Alert triggered: ${alertId}`);
}
async resolveAlert(rule) {
// Find active alerts for this rule
const activeAlerts = Array.from(this.activeAlerts.values())
.filter(alert => alert.ruleId === rule.id);
for (const alert of activeAlerts) {
alert.state = 'resolved';
alert.resolvedAt = Date.now();
// Send resolution notification
await this.sendResolutionNotification(alert);
// Remove from active alerts
this.activeAlerts.delete(alert.id);
}
// Update rule state
rule.state = 'normal';
rule.escalationLevel = 0;
console.log(`Alert resolved for rule: ${rule.id}`);
}
async sendNotifications(alert, escalationPolicyName) {
const policy = this.escalationPolicies.get(escalationPolicyName);
if (!policy) {
console.error(`Escalation policy not found: ${escalationPolicyName}`);
return;
}
const step = policy.steps[alert.escalationLevel] || policy.steps[0];
for (const channelConfig of step.channels) {
const channel = this.channels.get(channelConfig.type);
if (channel) {
try {
await channel.send(alert, channelConfig.config || channelConfig);
} catch (error) {
console.error(`Failed to send notification via ${channelConfig.type}:`, error);
}
}
}
}
scheduleEscalation(alert, rule) {
const policy = this.escalationPolicies.get(rule.escalationPolicy);
if (!policy || alert.escalationLevel >= policy.steps.length - 1) return;
const nextStep = policy.steps[alert.escalationLevel + 1];
if (!nextStep) return;
setTimeout(async () => {
if (this.activeAlerts.has(alert.id) && alert.state === 'firing') {
alert.escalationLevel++;
await this.sendNotifications(alert, rule.escalationPolicy);
// Schedule next escalation
this.scheduleEscalation(alert, rule);
}
}, nextStep.delay);
}
// Silence alerts for a specific rule or globally
silenceRule(ruleId, duration = 3600000) { // 1 hour default
this.silences.set(ruleId, {
until: Date.now() + duration,
silencedAt: Date.now()
});
}
isRuleSilenced(ruleId) {
const silence = this.silences.get(ruleId);
if (!silence) return false;
if (Date.now() > silence.until) {
this.silences.delete(ruleId);
return false;
}
return true;
}
// Get active alerts
getActiveAlerts() {
return Array.from(this.activeAlerts.values());
}
// Get alert statistics
getAlertStats(timeWindow = 86400000) { // 24 hours
const now = Date.now();
const cutoff = now - timeWindow;
const recentAlerts = Array.from(this.activeAlerts.values())
.filter(alert => alert.timestamp >= cutoff);
return {
total: recentAlerts.length,
critical: recentAlerts.filter(a => a.severity === 'critical').length,
warning: recentAlerts.filter(a => a.severity === 'warning').length,
resolved: recentAlerts.filter(a => a.state === 'resolved').length,
firing: recentAlerts.filter(a => a.state === 'firing').length
};
}
}
// Usage example
const alertManager = new AdvancedAlertManager({
channels: ['slack', 'email', 'pagerduty']
});
// Add complex alerting rules
alertManager.addAdvancedRule({
id: 'high_error_rate_with_latency',
name: 'High Error Rate with High Latency',
message: 'System experiencing high error rate combined with high latency',
severity: 'critical',
escalationPolicy: 'critical',
compositeCondition: {
operator: 'AND',
conditions: [
{ metric: 'error_rate', operator: '>', value: 0.05 },
{ metric: 'avg_latency', operator: '>', value: 2000 }
]
},
autoResolve: true,
resolveThreshold: {
operator: 'AND',
conditions: [
{ metric: 'error_rate', operator: '<', value: 0.01 },
{ metric: 'avg_latency', operator: '<', value: 1000 }
]
}
});
// Add anomaly detection rule
alertManager.addAdvancedRule({
id: 'token_reduction_anomaly',
name: 'Token Reduction Anomaly',
message: 'Unusual token reduction pattern detected',
severity: 'warning',
escalationPolicy: 'standard',
anomalyDetection: {
metric: 'token_reduction_rate',
threshold: 2.5, // 2.5 standard deviations
minSamples: 20
}
});Python Monitoring Integration
Monitoring setup for Python applications with Prometheus
Pythonimport time
import logging
import threading
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import psutil
import requests
from webmcp_core import WebMCPProcessor
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class WebMCPMonitor:
def __init__(self, port: int = 8000):
self.port = port
self.processor = WebMCPProcessor()
self.is_running = False
# Prometheus metrics
self.setup_metrics()
# Health check data
self.last_health_check = None
self.health_status = 'unknown'
# Performance tracking
self.performance_history = []
self.max_history_size = 1000
def setup_metrics(self):
"""Initialize Prometheus metrics"""
# Request metrics
self.request_counter = Counter(
'webmcp_requests_total',
'Total number of webMCP requests',
['status', 'model', 'complexity']
)
self.request_duration = Histogram(
'webmcp_request_duration_seconds',
'Time spent processing webMCP requests',
['model', 'complexity'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
self.token_reduction = Histogram(
'webmcp_token_reduction_percent',
'Token reduction percentage distribution',
buckets=[10, 25, 50, 65, 75, 85, 95, 100]
)
self.cost_per_request = Histogram(
'webmcp_cost_per_request_dollars',
'Cost per webMCP optimization in dollars',
buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0]
)
# System metrics
self.cpu_usage = Gauge('system_cpu_usage_percent', 'CPU usage percentage')
self.memory_usage = Gauge('system_memory_usage_bytes', 'Memory usage in bytes')
self.memory_available = Gauge('system_memory_available_bytes', 'Available memory in bytes')
self.disk_usage = Gauge('system_disk_usage_percent', 'Disk usage percentage')
# Application metrics
self.active_requests = Gauge('webmcp_active_requests', 'Number of active requests')
self.queue_size = Gauge('webmcp_queue_size', 'Number of requests in queue')
self.error_rate = Gauge('webmcp_error_rate', 'Current error rate')
# Health metrics
self.health_status_gauge = Gauge('webmcp_health_status', 'Health status (1=healthy, 0=unhealthy)')
self.last_successful_request = Gauge('webmcp_last_successful_request_timestamp', 'Timestamp of last successful request')
def start_monitoring(self):
"""Start the monitoring server and background tasks"""
# Start Prometheus metrics server
start_http_server(self.port)
logger.info(f"Prometheus metrics server started on port {self.port}")
self.is_running = True
# Start background monitoring threads
threading.Thread(target=self._collect_system_metrics, daemon=True).start()
threading.Thread(target=self._health_check_loop, daemon=True).start()
threading.Thread(target=self._calculate_derived_metrics, daemon=True).start()
logger.info("Monitoring started successfully")
def stop_monitoring(self):
"""Stop monitoring"""
self.is_running = False
logger.info("Monitoring stopped")
def monitor_request(self, func):
"""Decorator to monitor webMCP requests"""
def wrapper(*args, **kwargs):
start_time = time.time()
model = kwargs.get('target_model', 'unknown')
complexity = 'unknown'
# Increment active requests
self.active_requests.inc()
try:
# Execute the function
result = func(*args, **kwargs)
# Calculate metrics
duration = time.time() - start_time
complexity = self._assess_complexity(result)
# Record successful request
self.request_counter.labels(status='success', model=model, complexity=complexity).inc()
self.request_duration.labels(model=model, complexity=complexity).observe(duration)
# Record optimization metrics
if hasattr(result, 'token_reduction'):
self.token_reduction.observe(result.token_reduction)
if hasattr(result, 'cost'):
self.cost_per_request.observe(result.cost)
# Update last successful request timestamp
self.last_successful_request.set(time.time())
# Store performance data
self._store_performance_data({
'timestamp': datetime.now(),
'duration': duration,
'model': model,
'complexity': complexity,
'token_reduction': getattr(result, 'token_reduction', 0),
'cost': getattr(result, 'cost', 0),
'success': True
})
return result
except Exception as e:
# Record failed request
duration = time.time() - start_time
self.request_counter.labels(status='error', model=model, complexity=complexity).inc()
self.request_duration.labels(model=model, complexity=complexity).observe(duration)
# Store error data
self._store_performance_data({
'timestamp': datetime.now(),
'duration': duration,
'model': model,
'complexity': complexity,
'error': str(e),
'success': False
})
logger.error(f"Request failed: {e}")
raise
finally:
# Decrement active requests
self.active_requests.dec()
return wrapper
def _assess_complexity(self, result) -> str:
"""Assess the complexity of a webMCP operation"""
if not hasattr(result, 'elements'):
return 'unknown'
element_count = len(result.elements)
original_tokens = getattr(result, 'original_tokens', 0)
if element_count > 20 or original_tokens > 2000:
return 'high'
elif element_count > 10 or original_tokens > 1000:
return 'medium'
else:
return 'low'
def _store_performance_data(self, data: Dict[str, Any]):
"""Store performance data for analysis"""
self.performance_history.append(data)
# Keep only recent data
if len(self.performance_history) > self.max_history_size:
self.performance_history.pop(0)
def _collect_system_metrics(self):
"""Collect system metrics in background"""
while self.is_running:
try:
# CPU metrics
cpu_percent = psutil.cpu_percent(interval=1)
self.cpu_usage.set(cpu_percent)
# Memory metrics
memory = psutil.virtual_memory()
self.memory_usage.set(memory.used)
self.memory_available.set(memory.available)
# Disk metrics
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
self.disk_usage.set(disk_percent)
except Exception as e:
logger.error(f"Failed to collect system metrics: {e}")
time.sleep(10) # Collect every 10 seconds
def _health_check_loop(self):
"""Perform health checks in background"""
while self.is_running:
try:
health_status = self.perform_health_check()
self.last_health_check = datetime.now()
self.health_status = health_status['status']
# Update health metric
health_value = 1 if health_status['status'] == 'healthy' else 0
self.health_status_gauge.set(health_value)
if health_status['status'] != 'healthy':
logger.warning(f"Health check failed: {health_status}")
except Exception as e:
logger.error(f"Health check failed: {e}")
self.health_status_gauge.set(0)
time.sleep(30) # Health check every 30 seconds
def _calculate_derived_metrics(self):
"""Calculate derived metrics like error rates"""
while self.is_running:
try:
# Calculate error rate from recent requests
recent_data = [
d for d in self.performance_history
if d['timestamp'] > datetime.now() - timedelta(minutes=5)
]
if recent_data:
error_count = sum(1 for d in recent_data if not d['success'])
total_count = len(recent_data)
error_rate = (error_count / total_count) * 100
self.error_rate.set(error_rate)
except Exception as e:
logger.error(f"Failed to calculate derived metrics: {e}")
time.sleep(60) # Calculate every minute
def perform_health_check(self) -> Dict[str, Any]:
"""Perform comprehensive health check"""
health = {
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'checks': {}
}
try:
# Test processor functionality
test_html = '<form><input name="test"><button>Test</button></form>'
start_time = time.time()
result = self.processor.parse_webmcp(test_html)
duration = time.time() - start_time
health['checks']['processor'] = {
'status': 'healthy' if duration < 5.0 else 'degraded',
'response_time': duration,
'test_successful': True
}
except Exception as e:
health['checks']['processor'] = {
'status': 'unhealthy',
'error': str(e),
'test_successful': False
}
health['status'] = 'unhealthy'
# Check system resources
try:
memory = psutil.virtual_memory()
cpu_percent = psutil.cpu_percent()
disk = psutil.disk_usage('/')
memory_healthy = memory.percent < 90
cpu_healthy = cpu_percent < 90
disk_healthy = (disk.used / disk.total) < 0.9
health['checks']['system'] = {
'status': 'healthy' if all([memory_healthy, cpu_healthy, disk_healthy]) else 'degraded',
'memory_percent': memory.percent,
'cpu_percent': cpu_percent,
'disk_percent': (disk.used / disk.total) * 100
}
if not all([memory_healthy, cpu_healthy, disk_healthy]):
health['status'] = 'degraded'
except Exception as e:
health['checks']['system'] = {
'status': 'unhealthy',
'error': str(e)
}
health['status'] = 'unhealthy'
return health
def get_performance_summary(self, minutes: int = 60) -> Dict[str, Any]:
"""Get performance summary for the last N minutes"""
cutoff_time = datetime.now() - timedelta(minutes=minutes)
recent_data = [
d for d in self.performance_history
if d['timestamp'] > cutoff_time and d['success']
]
if not recent_data:
return {'error': 'No recent data available'}
durations = [d['duration'] for d in recent_data]
token_reductions = [d['token_reduction'] for d in recent_data if d['token_reduction'] > 0]
costs = [d['cost'] for d in recent_data if d['cost'] > 0]
return {
'time_window_minutes': minutes,
'total_requests': len(recent_data),
'avg_duration': sum(durations) / len(durations),
'min_duration': min(durations),
'max_duration': max(durations),
'avg_token_reduction': sum(token_reductions) / len(token_reductions) if token_reductions else 0,
'avg_cost': sum(costs) / len(costs) if costs else 0,
'requests_per_minute': len(recent_data) / minutes
}
def export_metrics(self, format: str = 'prometheus') -> str:
"""Export metrics in specified format"""
if format == 'prometheus':
# Return Prometheus format (this is handled by prometheus_client)
return f"Metrics available at http://localhost:{self.port}/metrics"
elif format == 'json':
return {
'health': self.perform_health_check(),
'performance': self.get_performance_summary(),
'system': {
'cpu_usage': psutil.cpu_percent(),
'memory_usage': psutil.virtual_memory().percent,
'disk_usage': (psutil.disk_usage('/').used / psutil.disk_usage('/').total) * 100
}
}
else:
raise ValueError(f"Unsupported format: {format}")
# Usage example
monitor = WebMCPMonitor(port=8000)
# Create monitored processor
class MonitoredWebMCPProcessor:
def __init__(self):
self.processor = WebMCPProcessor()
self.monitor = monitor
@monitor.monitor_request
def parse_webmcp(self, html_content, **kwargs):
return self.processor.parse_webmcp(html_content, **kwargs)
# Start monitoring
monitor.start_monitoring()
# Use monitored processor
monitored_processor = MonitoredWebMCPProcessor()
# Example usage in Flask app
from flask import Flask, jsonify, request
app = Flask(__name__)
@app.route('/health')
def health_check():
health = monitor.perform_health_check()
status_code = 200 if health['status'] == 'healthy' else 503
return jsonify(health), status_code
@app.route('/metrics/summary')
def metrics_summary():
summary = monitor.get_performance_summary(minutes=60)
return jsonify(summary)
@app.route('/process', methods=['POST'])
def process_html():
try:
html_content = request.json.get('html')
result = monitored_processor.parse_webmcp(html_content)
return jsonify(result.to_dict())
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)Popular Monitoring Integrations
Ready-to-use integrations with popular monitoring platforms
Prometheus & Grafana
Complete monitoring stack with metrics collection and visualization
Features:
- Time-series metrics
- Custom dashboards
- Alert manager integration
- Historical analysis
ELK Stack
Elasticsearch, Logstash, and Kibana for log analysis
Features:
- Centralized logging
- Log aggregation
- Search and analysis
- Real-time dashboards
DataDog
Cloud-based monitoring and analytics platform
Features:
- Infrastructure monitoring
- APM tracing
- Log management
- Synthetic monitoring
New Relic
Full-stack observability platform
Features:
- Application monitoring
- Infrastructure insights
- Browser monitoring
- Mobile monitoring
Monitor with Confidence
Implement comprehensive monitoring to ensure optimal webMCP performance and reliability