forked from Hithomelabs/Gitea_Event_Bridge
281 lines
8.2 KiB
Python
281 lines
8.2 KiB
Python
"""Gitea Event Bridge - Flask application."""
|
|
|
|
import json
|
|
import uuid
|
|
import logging
|
|
from flask import Flask, request, jsonify, Response, stream_with_context
|
|
from flask_cors import CORS
|
|
from typing import Generator, Dict, Any
|
|
|
|
from config import (
|
|
HOST,
|
|
PORT,
|
|
DEBUG,
|
|
GITEA_EVENT_TYPES,
|
|
SSE_RECONNECT_TIME,
|
|
SSE_HEARTBEAT_INTERVAL
|
|
)
|
|
from logger import logger as activity_logger, get_activities
|
|
from router import router
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if DEBUG else logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Create Flask app
|
|
app = Flask(__name__)
|
|
CORS(app)
|
|
|
|
# SSE clients storage
|
|
class SSEClient:
|
|
"""Represents a connected SSE client."""
|
|
def __init__(self, client_id: str, queue: 'Queue'):
|
|
self.client_id = client_id
|
|
self.queue = queue
|
|
|
|
|
|
class SSEClientManager:
|
|
"""Manages SSE client connections."""
|
|
|
|
def __init__(self):
|
|
self._clients: Dict[str, SSEClient] = {}
|
|
self._lock = __import__('threading').Lock()
|
|
|
|
def add_client(self, client_id: str, queue) -> SSEClient:
|
|
"""Add a new SSE client."""
|
|
with self._lock:
|
|
client = SSEClient(client_id, queue)
|
|
self._clients[client_id] = client
|
|
logger.info(f"SSE client connected: {client_id}")
|
|
return client
|
|
|
|
def remove_client(self, client_id: str):
|
|
"""Remove an SSE client."""
|
|
with self._lock:
|
|
if client_id in self._clients:
|
|
del self._clients[client_id]
|
|
logger.info(f"SSE client disconnected: {client_id}")
|
|
|
|
def broadcast(self, event_type: str, data: Dict[str, Any]):
|
|
"""Broadcast message to all connected clients."""
|
|
message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
|
|
|
|
with self._lock:
|
|
disconnected = []
|
|
for client_id, client in self._clients.items():
|
|
try:
|
|
client.queue.put(message)
|
|
except Exception as e:
|
|
logger.error(f"Error sending to client {client_id}: {e}")
|
|
disconnected.append(client_id)
|
|
|
|
# Clean up disconnected clients
|
|
for client_id in disconnected:
|
|
del self._clients[client_id]
|
|
|
|
def get_client_count(self) -> int:
|
|
"""Get number of connected clients."""
|
|
with self._lock:
|
|
return len(self._clients)
|
|
|
|
|
|
# Global SSE manager
|
|
sse_manager = SSEClientManager()
|
|
|
|
|
|
@app.route('/webhook/gitea', methods=['POST'])
|
|
def webhook_gitea():
|
|
"""
|
|
Receive Gitea webhooks.
|
|
|
|
Validates signature, parses payload, routes event, logs activity,
|
|
and broadcasts via SSE.
|
|
|
|
Headers:
|
|
X-Gitea-Event: Event type
|
|
X-Gitea-Event-Type: More specific event type
|
|
X-Gitea-Signature: HMAC signature (if secret configured)
|
|
"""
|
|
try:
|
|
# Get raw payload for signature verification
|
|
raw_payload = request.get_data()
|
|
|
|
# Get event type from headers
|
|
event_type = request.headers.get("X-Gitea-Event", "push")
|
|
signature = request.headers.get("X-Gitea-Signature", "")
|
|
|
|
# Verify signature if configured
|
|
if not router.verify_signature(raw_payload, signature):
|
|
logger.warning("Invalid webhook signature")
|
|
return jsonify({"error": "Invalid signature"}), 401
|
|
|
|
# Parse JSON payload
|
|
try:
|
|
payload = request.get_json()
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse JSON payload: {e}")
|
|
return jsonify({"error": "Invalid JSON payload"}), 400
|
|
|
|
if not payload:
|
|
return jsonify({"error": "Empty payload"}), 400
|
|
|
|
# Log event type
|
|
logger.info(f"Received Gitea webhook: {event_type}")
|
|
|
|
# Route the event and get triggers
|
|
parsed, triggers = router.route_event(payload)
|
|
|
|
# Build response
|
|
response_data = {
|
|
"success": True,
|
|
"event_type": parsed["event_type"],
|
|
"action": parsed["action"],
|
|
"repository": parsed["repository"],
|
|
"sender": parsed["sender"],
|
|
"triggers": triggers
|
|
}
|
|
|
|
# Broadcast to SSE clients
|
|
broadcast_data = {
|
|
"event_type": parsed["event_type"],
|
|
"action": parsed["action"],
|
|
"repository": parsed["repository"],
|
|
"sender": parsed["sender"],
|
|
"issue": parsed.get("issue"),
|
|
"pr": parsed.get("pr"),
|
|
"label": parsed.get("label"),
|
|
"triggers": triggers,
|
|
"timestamp": parsed.get("timestamp", "")
|
|
}
|
|
sse_manager.broadcast("gitea_event", broadcast_data)
|
|
|
|
logger.info(f"Webhook processed successfully. Triggers: {triggers}")
|
|
|
|
return jsonify(response_data), 200
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing webhook: {e}", exc_info=True)
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
@app.route('/events', methods=['GET'])
|
|
def events():
|
|
"""
|
|
SSE stream endpoint for real-time event updates.
|
|
|
|
Returns:
|
|
Streaming response with SSE format:
|
|
event: gitea_event
|
|
data: {"event_type": "pull_request", "action": "opened", ...}
|
|
"""
|
|
import queue
|
|
import threading
|
|
from time import sleep
|
|
|
|
client_id = str(uuid.uuid4())
|
|
event_queue = queue.Queue()
|
|
|
|
# Add client to manager
|
|
sse_manager.add_client(client_id, event_queue)
|
|
|
|
def generate() -> Generator[str, None, None]:
|
|
"""Generate SSE events."""
|
|
try:
|
|
# Send initial connection event
|
|
yield f"event: gitea_event\ndata: {json.dumps({'status': 'connected', 'client_id': client_id})}\n\n"
|
|
|
|
while True:
|
|
try:
|
|
# Get message from queue with timeout
|
|
message = event_queue.get(timeout=SSE_HEARTBEAT_INTERVAL)
|
|
yield message
|
|
except queue.Empty:
|
|
# Send heartbeat to keep connection alive
|
|
yield f": heartbeat\n\n"
|
|
|
|
except GeneratorExit:
|
|
pass
|
|
finally:
|
|
sse_manager.remove_client(client_id)
|
|
|
|
return Response(
|
|
stream_with_context(generate()),
|
|
mimetype='text/event-stream',
|
|
headers={
|
|
'Cache-Control': 'no-cache',
|
|
'X-Accel-Buffering': 'no',
|
|
'Connection': 'keep-alive',
|
|
}
|
|
)
|
|
|
|
|
|
@app.route('/health', methods=['GET'])
|
|
def health():
|
|
"""
|
|
Health check endpoint.
|
|
|
|
Returns:
|
|
JSON status indicating service health
|
|
"""
|
|
return jsonify({
|
|
"status": "healthy",
|
|
"sse_connected_clients": sse_manager.get_client_count()
|
|
}), 200
|
|
|
|
|
|
@app.route('/activity', methods=['GET'])
|
|
def activity():
|
|
"""
|
|
Get recent activity log entries.
|
|
|
|
Query Parameters:
|
|
limit: Maximum number of entries to return (default: 50)
|
|
|
|
Returns:
|
|
JSON array of activity entries
|
|
"""
|
|
limit = request.args.get('limit', 50, type=int)
|
|
limit = min(limit, 200) # Cap at 200
|
|
|
|
activities = get_activities(limit)
|
|
|
|
return jsonify({
|
|
"count": len(activities),
|
|
"activities": activities
|
|
}), 200
|
|
|
|
|
|
@app.route('/', methods=['GET'])
|
|
def index():
|
|
"""Root endpoint with service information."""
|
|
return jsonify({
|
|
"service": "Gitea Event Bridge",
|
|
"version": "1.0.0",
|
|
"endpoints": {
|
|
"POST /webhook/gitea": "Receive Gitea webhooks",
|
|
"GET /events": "SSE event stream",
|
|
"GET /health": "Health check",
|
|
"GET /activity": "Get activity log"
|
|
},
|
|
"supported_events": GITEA_EVENT_TYPES
|
|
}), 200
|
|
|
|
|
|
@app.errorhandler(404)
|
|
def not_found(error):
|
|
"""Handle 404 errors."""
|
|
return jsonify({"error": "Not found"}), 404
|
|
|
|
|
|
@app.errorhandler(500)
|
|
def internal_error(error):
|
|
"""Handle 500 errors."""
|
|
logger.error(f"Internal server error: {error}")
|
|
return jsonify({"error": "Internal server error"}), 500
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host=HOST, port=PORT, debug=DEBUG) |