From 4d7314fcf93cba7d15ff364446d6c73f19ad9247 Mon Sep 17 00:00:00 2001 From: hitanshu Date: Tue, 7 Apr 2026 21:45:59 +0000 Subject: [PATCH] Add Gitea Event Bridge application files --- app.py | 281 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 281 insertions(+) create mode 100644 app.py diff --git a/app.py b/app.py new file mode 100644 index 0000000..23d2572 --- /dev/null +++ b/app.py @@ -0,0 +1,281 @@ +"""Gitea Event Bridge - Flask application.""" + +import json +import uuid +import logging +from flask import Flask, request, jsonify, Response, stream_with_context +from flask_cors import CORS +from typing import Generator, Dict, Any + +from config import ( + HOST, + PORT, + DEBUG, + GITEA_EVENT_TYPES, + SSE_RECONNECT_TIME, + SSE_HEARTBEAT_INTERVAL +) +from logger import logger as activity_logger, get_activities +from router import router + +# Configure logging +logging.basicConfig( + level=logging.DEBUG if DEBUG else logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# Create Flask app +app = Flask(__name__) +CORS(app) + +# SSE clients storage +class SSEClient: + """Represents a connected SSE client.""" + def __init__(self, client_id: str, queue: 'Queue'): + self.client_id = client_id + self.queue = queue + + +class SSEClientManager: + """Manages SSE client connections.""" + + def __init__(self): + self._clients: Dict[str, SSEClient] = {} + self._lock = __import__('threading').Lock() + + def add_client(self, client_id: str, queue) -> SSEClient: + """Add a new SSE client.""" + with self._lock: + client = SSEClient(client_id, queue) + self._clients[client_id] = client + logger.info(f"SSE client connected: {client_id}") + return client + + def remove_client(self, client_id: str): + """Remove an SSE client.""" + with self._lock: + if client_id in self._clients: + del self._clients[client_id] + logger.info(f"SSE client disconnected: {client_id}") + + def broadcast(self, event_type: str, data: Dict[str, Any]): + """Broadcast message to all connected clients.""" + message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n" + + with self._lock: + disconnected = [] + for client_id, client in self._clients.items(): + try: + client.queue.put(message) + except Exception as e: + logger.error(f"Error sending to client {client_id}: {e}") + disconnected.append(client_id) + + # Clean up disconnected clients + for client_id in disconnected: + del self._clients[client_id] + + def get_client_count(self) -> int: + """Get number of connected clients.""" + with self._lock: + return len(self._clients) + + +# Global SSE manager +sse_manager = SSEClientManager() + + +@app.route('/webhook/gitea', methods=['POST']) +def webhook_gitea(): + """ + Receive Gitea webhooks. + + Validates signature, parses payload, routes event, logs activity, + and broadcasts via SSE. + + Headers: + X-Gitea-Event: Event type + X-Gitea-Event-Type: More specific event type + X-Gitea-Signature: HMAC signature (if secret configured) + """ + try: + # Get raw payload for signature verification + raw_payload = request.get_data() + + # Get event type from headers + event_type = request.headers.get("X-Gitea-Event", "push") + signature = request.headers.get("X-Gitea-Signature", "") + + # Verify signature if configured + if not router.verify_signature(raw_payload, signature): + logger.warning("Invalid webhook signature") + return jsonify({"error": "Invalid signature"}), 401 + + # Parse JSON payload + try: + payload = request.get_json() + except Exception as e: + logger.error(f"Failed to parse JSON payload: {e}") + return jsonify({"error": "Invalid JSON payload"}), 400 + + if not payload: + return jsonify({"error": "Empty payload"}), 400 + + # Log event type + logger.info(f"Received Gitea webhook: {event_type}") + + # Route the event and get triggers + parsed, triggers = router.route_event(payload) + + # Build response + response_data = { + "success": True, + "event_type": parsed["event_type"], + "action": parsed["action"], + "repository": parsed["repository"], + "sender": parsed["sender"], + "triggers": triggers + } + + # Broadcast to SSE clients + broadcast_data = { + "event_type": parsed["event_type"], + "action": parsed["action"], + "repository": parsed["repository"], + "sender": parsed["sender"], + "issue": parsed.get("issue"), + "pr": parsed.get("pr"), + "label": parsed.get("label"), + "triggers": triggers, + "timestamp": parsed.get("timestamp", "") + } + sse_manager.broadcast("gitea_event", broadcast_data) + + logger.info(f"Webhook processed successfully. Triggers: {triggers}") + + return jsonify(response_data), 200 + + except Exception as e: + logger.error(f"Error processing webhook: {e}", exc_info=True) + return jsonify({"error": str(e)}), 500 + + +@app.route('/events', methods=['GET']) +def events(): + """ + SSE stream endpoint for real-time event updates. + + Returns: + Streaming response with SSE format: + event: gitea_event + data: {"event_type": "pull_request", "action": "opened", ...} + """ + import queue + import threading + from time import sleep + + client_id = str(uuid.uuid4()) + event_queue = queue.Queue() + + # Add client to manager + sse_manager.add_client(client_id, event_queue) + + def generate() -> Generator[str, None, None]: + """Generate SSE events.""" + try: + # Send initial connection event + yield f"event: gitea_event\ndata: {json.dumps({'status': 'connected', 'client_id': client_id})}\n\n" + + while True: + try: + # Get message from queue with timeout + message = event_queue.get(timeout=SSE_HEARTBEAT_INTERVAL) + yield message + except queue.Empty: + # Send heartbeat to keep connection alive + yield f": heartbeat\n\n" + + except GeneratorExit: + pass + finally: + sse_manager.remove_client(client_id) + + return Response( + stream_with_context(generate()), + mimetype='text/event-stream', + headers={ + 'Cache-Control': 'no-cache', + 'X-Accel-Buffering': 'no', + 'Connection': 'keep-alive', + } + ) + + +@app.route('/health', methods=['GET']) +def health(): + """ + Health check endpoint. + + Returns: + JSON status indicating service health + """ + return jsonify({ + "status": "healthy", + "sse_connected_clients": sse_manager.get_client_count() + }), 200 + + +@app.route('/activity', methods=['GET']) +def activity(): + """ + Get recent activity log entries. + + Query Parameters: + limit: Maximum number of entries to return (default: 50) + + Returns: + JSON array of activity entries + """ + limit = request.args.get('limit', 50, type=int) + limit = min(limit, 200) # Cap at 200 + + activities = get_activities(limit) + + return jsonify({ + "count": len(activities), + "activities": activities + }), 200 + + +@app.route('/', methods=['GET']) +def index(): + """Root endpoint with service information.""" + return jsonify({ + "service": "Gitea Event Bridge", + "version": "1.0.0", + "endpoints": { + "POST /webhook/gitea": "Receive Gitea webhooks", + "GET /events": "SSE event stream", + "GET /health": "Health check", + "GET /activity": "Get activity log" + }, + "supported_events": GITEA_EVENT_TYPES + }), 200 + + +@app.errorhandler(404) +def not_found(error): + """Handle 404 errors.""" + return jsonify({"error": "Not found"}), 404 + + +@app.errorhandler(500) +def internal_error(error): + """Handle 500 errors.""" + logger.error(f"Internal server error: {error}") + return jsonify({"error": "Internal server error"}), 500 + + +if __name__ == '__main__': + app.run(host=HOST, port=PORT, debug=DEBUG) \ No newline at end of file