"""Activity Logger - Logs all events to JSON Lines file.""" import json import uuid import threading from datetime import datetime, timezone from typing import List, Dict, Any, Optional from config import LOG_FILE class ActivityLogger: """Thread-safe activity logger that writes to JSON Lines file.""" def __init__(self, log_file: str = None): self.log_file = log_file or LOG_FILE self._lock = threading.Lock() def _append_line(self, entry: Dict[str, Any]): """Append a single JSON line to the file.""" with open(self.log_file, 'a') as f: f.write(json.dumps(entry) + '\n') def log_activity(self, event_data: Dict[str, Any]) -> Dict[str, Any]: """ Log an activity entry to the JSON Lines file. Args: event_data: Dictionary containing event information - timestamp: ISO format timestamp (optional, auto-generated) - event_type: Type of event (e.g., pull_request, issue) - action: Action performed (e.g., opened, closed) - repository: Repository full name - sender: Actor who triggered the event - payload: Full webhook payload - routed_to: List of agents/targets triggered Returns: The logged entry with added id and timestamp """ with self._lock: entry = { "id": str(uuid.uuid4()), "timestamp": datetime.now(timezone.utc).isoformat(), "event_type": event_data.get("event_type", ""), "action": event_data.get("action", ""), "repository": event_data.get("repository", ""), "sender": event_data.get("sender", ""), "payload": event_data.get("payload", {}), "routed_to": event_data.get("routed_to", []) } self._append_line(entry) return entry def get_activities(self, limit: int = 50) -> List[Dict[str, Any]]: """ Get recent activity entries from the log file. Args: limit: Maximum number of entries to return Returns: List of activity entries (most recent last) """ with self._lock: if not self.log_file: return [] activities = [] try: with open(self.log_file, 'r') as f: for line in f: line = line.strip() if line: try: activities.append(json.loads(line)) except json.JSONDecodeError: continue except FileNotFoundError: return [] # Return most recent entries return activities[-limit:] if len(activities) > limit else activities def clear(self): """Clear all activity logs.""" with self._lock: try: open(self.log_file, 'w').close() except IOError: pass # Global logger instance logger = ActivityLogger() def log_activity(event_data: Dict[str, Any]) -> Dict[str, Any]: """Convenience function to log activity.""" return logger.log_activity(event_data) def get_activities(limit: int = 50) -> List[Dict[str, Any]]: """Convenience function to get activities.""" return logger.get_activities(limit)