"""Gitea Event Bridge - Flask application.""" import json import uuid import logging from flask import Flask, request, jsonify, Response, stream_with_context from flask_cors import CORS from typing import Generator, Dict, Any from config import ( HOST, PORT, DEBUG, GITEA_EVENT_TYPES, SSE_RECONNECT_TIME, SSE_HEARTBEAT_INTERVAL ) from logger import logger as activity_logger, get_activities from router import router # Configure logging logging.basicConfig( level=logging.DEBUG if DEBUG else logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Create Flask app app = Flask(__name__) CORS(app) # SSE clients storage class SSEClient: """Represents a connected SSE client.""" def __init__(self, client_id: str, queue: 'Queue'): self.client_id = client_id self.queue = queue class SSEClientManager: """Manages SSE client connections.""" def __init__(self): self._clients: Dict[str, SSEClient] = {} self._lock = __import__('threading').Lock() def add_client(self, client_id: str, queue) -> SSEClient: """Add a new SSE client.""" with self._lock: client = SSEClient(client_id, queue) self._clients[client_id] = client logger.info(f"SSE client connected: {client_id}") return client def remove_client(self, client_id: str): """Remove an SSE client.""" with self._lock: if client_id in self._clients: del self._clients[client_id] logger.info(f"SSE client disconnected: {client_id}") def broadcast(self, event_type: str, data: Dict[str, Any]): """Broadcast message to all connected clients.""" message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n" with self._lock: disconnected = [] for client_id, client in self._clients.items(): try: client.queue.put(message) except Exception as e: logger.error(f"Error sending to client {client_id}: {e}") disconnected.append(client_id) # Clean up disconnected clients for client_id in disconnected: del self._clients[client_id] def get_client_count(self) -> int: """Get number of connected clients.""" with self._lock: return len(self._clients) # Global SSE manager sse_manager = SSEClientManager() @app.route('/webhook/gitea', methods=['POST']) def webhook_gitea(): """ Receive Gitea webhooks. Validates signature, parses payload, routes event, logs activity, and broadcasts via SSE. Headers: X-Gitea-Event: Event type X-Gitea-Event-Type: More specific event type X-Gitea-Signature: HMAC signature (if secret configured) """ try: # Get raw payload for signature verification raw_payload = request.get_data() # Get event type from headers event_type = request.headers.get("X-Gitea-Event", "push") signature = request.headers.get("X-Gitea-Signature", "") # Verify signature if configured if not router.verify_signature(raw_payload, signature): logger.warning("Invalid webhook signature") return jsonify({"error": "Invalid signature"}), 401 # Parse JSON payload try: payload = request.get_json() except Exception as e: logger.error(f"Failed to parse JSON payload: {e}") return jsonify({"error": "Invalid JSON payload"}), 400 if not payload: return jsonify({"error": "Empty payload"}), 400 # Log event type logger.info(f"Received Gitea webhook: {event_type}") # Route the event and get triggers parsed, triggers = router.route_event(payload) # Build response response_data = { "success": True, "event_type": parsed["event_type"], "action": parsed["action"], "repository": parsed["repository"], "sender": parsed["sender"], "triggers": triggers } # Broadcast to SSE clients broadcast_data = { "event_type": parsed["event_type"], "action": parsed["action"], "repository": parsed["repository"], "sender": parsed["sender"], "issue": parsed.get("issue"), "pr": parsed.get("pr"), "label": parsed.get("label"), "triggers": triggers, "timestamp": parsed.get("timestamp", "") } sse_manager.broadcast("gitea_event", broadcast_data) logger.info(f"Webhook processed successfully. Triggers: {triggers}") return jsonify(response_data), 200 except Exception as e: logger.error(f"Error processing webhook: {e}", exc_info=True) return jsonify({"error": str(e)}), 500 @app.route('/events', methods=['GET']) def events(): """ SSE stream endpoint for real-time event updates. Returns: Streaming response with SSE format: event: gitea_event data: {"event_type": "pull_request", "action": "opened", ...} """ import queue import threading from time import sleep client_id = str(uuid.uuid4()) event_queue = queue.Queue() # Add client to manager sse_manager.add_client(client_id, event_queue) def generate() -> Generator[str, None, None]: """Generate SSE events.""" try: # Send initial connection event yield f"event: gitea_event\ndata: {json.dumps({'status': 'connected', 'client_id': client_id})}\n\n" while True: try: # Get message from queue with timeout message = event_queue.get(timeout=SSE_HEARTBEAT_INTERVAL) yield message except queue.Empty: # Send heartbeat to keep connection alive yield f": heartbeat\n\n" except GeneratorExit: pass finally: sse_manager.remove_client(client_id) return Response( stream_with_context(generate()), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no', 'Connection': 'keep-alive', } ) @app.route('/health', methods=['GET']) def health(): """ Health check endpoint. Returns: JSON status indicating service health """ return jsonify({ "status": "healthy", "sse_connected_clients": sse_manager.get_client_count() }), 200 @app.route('/activity', methods=['GET']) def activity(): """ Get recent activity log entries. Query Parameters: limit: Maximum number of entries to return (default: 50) Returns: JSON array of activity entries """ limit = request.args.get('limit', 50, type=int) limit = min(limit, 200) # Cap at 200 activities = get_activities(limit) return jsonify({ "count": len(activities), "activities": activities }), 200 @app.route('/', methods=['GET']) def index(): """Root endpoint with service information.""" return jsonify({ "service": "Gitea Event Bridge", "version": "1.0.0", "endpoints": { "POST /webhook/gitea": "Receive Gitea webhooks", "GET /events": "SSE event stream", "GET /health": "Health check", "GET /activity": "Get activity log" }, "supported_events": GITEA_EVENT_TYPES }), 200 @app.errorhandler(404) def not_found(error): """Handle 404 errors.""" return jsonify({"error": "Not found"}), 404 @app.errorhandler(500) def internal_error(error): """Handle 500 errors.""" logger.error(f"Internal server error: {error}") return jsonify({"error": "Internal server error"}), 500 if __name__ == '__main__': app.run(host=HOST, port=PORT, debug=DEBUG)