main #2

Merged
hitanshu merged 9 commits from hitanshu/Gitea_Event_Bridge:main into main 2026-04-07 21:52:51 +00:00
Showing only changes of commit 4d7314fcf9 - Show all commits

281
app.py Normal file
View File

@ -0,0 +1,281 @@
"""Gitea Event Bridge - Flask application."""
import json
import uuid
import logging
from flask import Flask, request, jsonify, Response, stream_with_context
from flask_cors import CORS
from typing import Generator, Dict, Any
from config import (
HOST,
PORT,
DEBUG,
GITEA_EVENT_TYPES,
SSE_RECONNECT_TIME,
SSE_HEARTBEAT_INTERVAL
)
from logger import logger as activity_logger, get_activities
from router import router
# Configure logging
logging.basicConfig(
level=logging.DEBUG if DEBUG else logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Create Flask app
app = Flask(__name__)
CORS(app)
# SSE clients storage
class SSEClient:
"""Represents a connected SSE client."""
def __init__(self, client_id: str, queue: 'Queue'):
self.client_id = client_id
self.queue = queue
class SSEClientManager:
"""Manages SSE client connections."""
def __init__(self):
self._clients: Dict[str, SSEClient] = {}
self._lock = __import__('threading').Lock()
def add_client(self, client_id: str, queue) -> SSEClient:
"""Add a new SSE client."""
with self._lock:
client = SSEClient(client_id, queue)
self._clients[client_id] = client
logger.info(f"SSE client connected: {client_id}")
return client
def remove_client(self, client_id: str):
"""Remove an SSE client."""
with self._lock:
if client_id in self._clients:
del self._clients[client_id]
logger.info(f"SSE client disconnected: {client_id}")
def broadcast(self, event_type: str, data: Dict[str, Any]):
"""Broadcast message to all connected clients."""
message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
with self._lock:
disconnected = []
for client_id, client in self._clients.items():
try:
client.queue.put(message)
except Exception as e:
logger.error(f"Error sending to client {client_id}: {e}")
disconnected.append(client_id)
# Clean up disconnected clients
for client_id in disconnected:
del self._clients[client_id]
def get_client_count(self) -> int:
"""Get number of connected clients."""
with self._lock:
return len(self._clients)
# Global SSE manager
sse_manager = SSEClientManager()
@app.route('/webhook/gitea', methods=['POST'])
def webhook_gitea():
"""
Receive Gitea webhooks.
Validates signature, parses payload, routes event, logs activity,
and broadcasts via SSE.
Headers:
X-Gitea-Event: Event type
X-Gitea-Event-Type: More specific event type
X-Gitea-Signature: HMAC signature (if secret configured)
"""
try:
# Get raw payload for signature verification
raw_payload = request.get_data()
# Get event type from headers
event_type = request.headers.get("X-Gitea-Event", "push")
signature = request.headers.get("X-Gitea-Signature", "")
# Verify signature if configured
if not router.verify_signature(raw_payload, signature):
logger.warning("Invalid webhook signature")
return jsonify({"error": "Invalid signature"}), 401
# Parse JSON payload
try:
payload = request.get_json()
except Exception as e:
logger.error(f"Failed to parse JSON payload: {e}")
return jsonify({"error": "Invalid JSON payload"}), 400
if not payload:
return jsonify({"error": "Empty payload"}), 400
# Log event type
logger.info(f"Received Gitea webhook: {event_type}")
# Route the event and get triggers
parsed, triggers = router.route_event(payload)
# Build response
response_data = {
"success": True,
"event_type": parsed["event_type"],
"action": parsed["action"],
"repository": parsed["repository"],
"sender": parsed["sender"],
"triggers": triggers
}
# Broadcast to SSE clients
broadcast_data = {
"event_type": parsed["event_type"],
"action": parsed["action"],
"repository": parsed["repository"],
"sender": parsed["sender"],
"issue": parsed.get("issue"),
"pr": parsed.get("pr"),
"label": parsed.get("label"),
"triggers": triggers,
"timestamp": parsed.get("timestamp", "")
}
sse_manager.broadcast("gitea_event", broadcast_data)
logger.info(f"Webhook processed successfully. Triggers: {triggers}")
return jsonify(response_data), 200
except Exception as e:
logger.error(f"Error processing webhook: {e}", exc_info=True)
return jsonify({"error": str(e)}), 500
@app.route('/events', methods=['GET'])
def events():
"""
SSE stream endpoint for real-time event updates.
Returns:
Streaming response with SSE format:
event: gitea_event
data: {"event_type": "pull_request", "action": "opened", ...}
"""
import queue
import threading
from time import sleep
client_id = str(uuid.uuid4())
event_queue = queue.Queue()
# Add client to manager
sse_manager.add_client(client_id, event_queue)
def generate() -> Generator[str, None, None]:
"""Generate SSE events."""
try:
# Send initial connection event
yield f"event: gitea_event\ndata: {json.dumps({'status': 'connected', 'client_id': client_id})}\n\n"
while True:
try:
# Get message from queue with timeout
message = event_queue.get(timeout=SSE_HEARTBEAT_INTERVAL)
yield message
except queue.Empty:
# Send heartbeat to keep connection alive
yield f": heartbeat\n\n"
except GeneratorExit:
pass
finally:
sse_manager.remove_client(client_id)
return Response(
stream_with_context(generate()),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no',
'Connection': 'keep-alive',
}
)
@app.route('/health', methods=['GET'])
def health():
"""
Health check endpoint.
Returns:
JSON status indicating service health
"""
return jsonify({
"status": "healthy",
"sse_connected_clients": sse_manager.get_client_count()
}), 200
@app.route('/activity', methods=['GET'])
def activity():
"""
Get recent activity log entries.
Query Parameters:
limit: Maximum number of entries to return (default: 50)
Returns:
JSON array of activity entries
"""
limit = request.args.get('limit', 50, type=int)
limit = min(limit, 200) # Cap at 200
activities = get_activities(limit)
return jsonify({
"count": len(activities),
"activities": activities
}), 200
@app.route('/', methods=['GET'])
def index():
"""Root endpoint with service information."""
return jsonify({
"service": "Gitea Event Bridge",
"version": "1.0.0",
"endpoints": {
"POST /webhook/gitea": "Receive Gitea webhooks",
"GET /events": "SSE event stream",
"GET /health": "Health check",
"GET /activity": "Get activity log"
},
"supported_events": GITEA_EVENT_TYPES
}), 200
@app.errorhandler(404)
def not_found(error):
"""Handle 404 errors."""
return jsonify({"error": "Not found"}), 404
@app.errorhandler(500)
def internal_error(error):
"""Handle 500 errors."""
logger.error(f"Internal server error: {error}")
return jsonify({"error": "Internal server error"}), 500
if __name__ == '__main__':
app.run(host=HOST, port=PORT, debug=DEBUG)