diff --git a/logger.py b/logger.py new file mode 100644 index 0000000..f435729 --- /dev/null +++ b/logger.py @@ -0,0 +1,105 @@ +"""Activity Logger - Logs all events to JSON Lines file.""" + +import json +import uuid +import threading +from datetime import datetime, timezone +from typing import List, Dict, Any, Optional +from config import LOG_FILE + + +class ActivityLogger: + """Thread-safe activity logger that writes to JSON Lines file.""" + + def __init__(self, log_file: str = None): + self.log_file = log_file or LOG_FILE + self._lock = threading.Lock() + + def _append_line(self, entry: Dict[str, Any]): + """Append a single JSON line to the file.""" + with open(self.log_file, 'a') as f: + f.write(json.dumps(entry) + '\n') + + def log_activity(self, event_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Log an activity entry to the JSON Lines file. + + Args: + event_data: Dictionary containing event information + - timestamp: ISO format timestamp (optional, auto-generated) + - event_type: Type of event (e.g., pull_request, issue) + - action: Action performed (e.g., opened, closed) + - repository: Repository full name + - sender: Actor who triggered the event + - payload: Full webhook payload + - routed_to: List of agents/targets triggered + + Returns: + The logged entry with added id and timestamp + """ + with self._lock: + entry = { + "id": str(uuid.uuid4()), + "timestamp": datetime.now(timezone.utc).isoformat(), + "event_type": event_data.get("event_type", ""), + "action": event_data.get("action", ""), + "repository": event_data.get("repository", ""), + "sender": event_data.get("sender", ""), + "payload": event_data.get("payload", {}), + "routed_to": event_data.get("routed_to", []) + } + + self._append_line(entry) + return entry + + def get_activities(self, limit: int = 50) -> List[Dict[str, Any]]: + """ + Get recent activity entries from the log file. + + Args: + limit: Maximum number of entries to return + + Returns: + List of activity entries (most recent last) + """ + with self._lock: + if not self.log_file: + return [] + + activities = [] + try: + with open(self.log_file, 'r') as f: + for line in f: + line = line.strip() + if line: + try: + activities.append(json.loads(line)) + except json.JSONDecodeError: + continue + except FileNotFoundError: + return [] + + # Return most recent entries + return activities[-limit:] if len(activities) > limit else activities + + def clear(self): + """Clear all activity logs.""" + with self._lock: + try: + open(self.log_file, 'w').close() + except IOError: + pass + + +# Global logger instance +logger = ActivityLogger() + + +def log_activity(event_data: Dict[str, Any]) -> Dict[str, Any]: + """Convenience function to log activity.""" + return logger.log_activity(event_data) + + +def get_activities(limit: int = 50) -> List[Dict[str, Any]]: + """Convenience function to get activities.""" + return logger.get_activities(limit) \ No newline at end of file