Gitea_Event_Bridge/logger.py

105 lines
3.5 KiB
Python

"""Activity Logger - Logs all events to JSON Lines file."""
import json
import uuid
import threading
from datetime import datetime, timezone
from typing import List, Dict, Any, Optional
from config import LOG_FILE
class ActivityLogger:
"""Thread-safe activity logger that writes to JSON Lines file."""
def __init__(self, log_file: str = None):
self.log_file = log_file or LOG_FILE
self._lock = threading.Lock()
def _append_line(self, entry: Dict[str, Any]):
"""Append a single JSON line to the file."""
with open(self.log_file, 'a') as f:
f.write(json.dumps(entry) + '\n')
def log_activity(self, event_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Log an activity entry to the JSON Lines file.
Args:
event_data: Dictionary containing event information
- timestamp: ISO format timestamp (optional, auto-generated)
- event_type: Type of event (e.g., pull_request, issue)
- action: Action performed (e.g., opened, closed)
- repository: Repository full name
- sender: Actor who triggered the event
- payload: Full webhook payload
- routed_to: List of agents/targets triggered
Returns:
The logged entry with added id and timestamp
"""
with self._lock:
entry = {
"id": str(uuid.uuid4()),
"timestamp": datetime.now(timezone.utc).isoformat(),
"event_type": event_data.get("event_type", ""),
"action": event_data.get("action", ""),
"repository": event_data.get("repository", ""),
"sender": event_data.get("sender", ""),
"payload": event_data.get("payload", {}),
"routed_to": event_data.get("routed_to", [])
}
self._append_line(entry)
return entry
def get_activities(self, limit: int = 50) -> List[Dict[str, Any]]:
"""
Get recent activity entries from the log file.
Args:
limit: Maximum number of entries to return
Returns:
List of activity entries (most recent last)
"""
with self._lock:
if not self.log_file:
return []
activities = []
try:
with open(self.log_file, 'r') as f:
for line in f:
line = line.strip()
if line:
try:
activities.append(json.loads(line))
except json.JSONDecodeError:
continue
except FileNotFoundError:
return []
# Return most recent entries
return activities[-limit:] if len(activities) > limit else activities
def clear(self):
"""Clear all activity logs."""
with self._lock:
try:
open(self.log_file, 'w').close()
except IOError:
pass
# Global logger instance
logger = ActivityLogger()
def log_activity(event_data: Dict[str, Any]) -> Dict[str, Any]:
"""Convenience function to log activity."""
return logger.log_activity(event_data)
def get_activities(limit: int = 50) -> List[Dict[str, Any]]:
"""Convenience function to get activities."""
return logger.get_activities(limit)