Add Gitea Event Bridge application files
This commit is contained in:
parent
7e24fc26a3
commit
caa9b01f67
105
logger.py
Normal file
105
logger.py
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
"""Activity Logger - Logs all events to JSON Lines file."""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import uuid
|
||||||
|
import threading
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import List, Dict, Any, Optional
|
||||||
|
from config import LOG_FILE
|
||||||
|
|
||||||
|
|
||||||
|
class ActivityLogger:
|
||||||
|
"""Thread-safe activity logger that writes to JSON Lines file."""
|
||||||
|
|
||||||
|
def __init__(self, log_file: str = None):
|
||||||
|
self.log_file = log_file or LOG_FILE
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
def _append_line(self, entry: Dict[str, Any]):
|
||||||
|
"""Append a single JSON line to the file."""
|
||||||
|
with open(self.log_file, 'a') as f:
|
||||||
|
f.write(json.dumps(entry) + '\n')
|
||||||
|
|
||||||
|
def log_activity(self, event_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Log an activity entry to the JSON Lines file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_data: Dictionary containing event information
|
||||||
|
- timestamp: ISO format timestamp (optional, auto-generated)
|
||||||
|
- event_type: Type of event (e.g., pull_request, issue)
|
||||||
|
- action: Action performed (e.g., opened, closed)
|
||||||
|
- repository: Repository full name
|
||||||
|
- sender: Actor who triggered the event
|
||||||
|
- payload: Full webhook payload
|
||||||
|
- routed_to: List of agents/targets triggered
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The logged entry with added id and timestamp
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
entry = {
|
||||||
|
"id": str(uuid.uuid4()),
|
||||||
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||||
|
"event_type": event_data.get("event_type", ""),
|
||||||
|
"action": event_data.get("action", ""),
|
||||||
|
"repository": event_data.get("repository", ""),
|
||||||
|
"sender": event_data.get("sender", ""),
|
||||||
|
"payload": event_data.get("payload", {}),
|
||||||
|
"routed_to": event_data.get("routed_to", [])
|
||||||
|
}
|
||||||
|
|
||||||
|
self._append_line(entry)
|
||||||
|
return entry
|
||||||
|
|
||||||
|
def get_activities(self, limit: int = 50) -> List[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Get recent activity entries from the log file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
limit: Maximum number of entries to return
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of activity entries (most recent last)
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
if not self.log_file:
|
||||||
|
return []
|
||||||
|
|
||||||
|
activities = []
|
||||||
|
try:
|
||||||
|
with open(self.log_file, 'r') as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
if line:
|
||||||
|
try:
|
||||||
|
activities.append(json.loads(line))
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
continue
|
||||||
|
except FileNotFoundError:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# Return most recent entries
|
||||||
|
return activities[-limit:] if len(activities) > limit else activities
|
||||||
|
|
||||||
|
def clear(self):
|
||||||
|
"""Clear all activity logs."""
|
||||||
|
with self._lock:
|
||||||
|
try:
|
||||||
|
open(self.log_file, 'w').close()
|
||||||
|
except IOError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# Global logger instance
|
||||||
|
logger = ActivityLogger()
|
||||||
|
|
||||||
|
|
||||||
|
def log_activity(event_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
"""Convenience function to log activity."""
|
||||||
|
return logger.log_activity(event_data)
|
||||||
|
|
||||||
|
|
||||||
|
def get_activities(limit: int = 50) -> List[Dict[str, Any]]:
|
||||||
|
"""Convenience function to get activities."""
|
||||||
|
return logger.get_activities(limit)
|
||||||
Loading…
Reference in New Issue
Block a user