Compare commits

...

10 Commits

9 changed files with 1027 additions and 0 deletions

23
.env.example Normal file
View File

@ -0,0 +1,23 @@
# Gitea Event Bridge Configuration
# Server Configuration
HOST=0.0.0.0
PORT=5000
DEBUG=false
# OpenCode Server URL (for triggering pipelines)
OPENCODE_URL=http://localhost:8080
# Activity Log File (JSON Lines format)
LOG_FILE=activity.json
# Gitea Webhook Secret (optional)
# Generate a secret in Gitea webhook settings and set it here
# The secret is used to verify webhook signature
GITEA_WEBHOOK_SECRET=
# Auto-Trigger Configuration
# Automatically trigger pipeline when labels/commands detected
AUTO_TRIGGER_PIPELINE=true
# Automatically trigger lead review on PR opened/synchronized
AUTO_TRIGGER_REVIEW=true

28
Dockerfile Normal file
View File

@ -0,0 +1,28 @@
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Create non-root user
RUN useradd --create-home --shell /bin/bash appuser && \
chown -R appuser:appuser /app
# Copy application files
COPY --chown=appuser:appuser . .
# Switch to non-root user
USER appuser
# Expose port
EXPOSE 5000
# Run with Gunicorn (4 workers)
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "4", "--timeout", "120", "app:app"]

195
README.md
View File

@ -0,0 +1,195 @@
# Gitea Event Bridge Service
A Flask-based webhook event bridge service that receives Gitea webhooks, routes them based on trigger conditions, and broadcasts events via Server-Sent Events (SSE).
## Features
- **Webhook Reception**: Accept Gitea webhooks via POST endpoint
- **Event Routing**: Automatically detect and route events based on:
- Labels (`start-pipeline`, `needs-decision`)
- Comment commands (`/pipeline start`)
- PR events (opened, synchronized)
- **SSE Broadcasting**: Real-time event streaming to connected clients
- **Activity Logging**: JSON Lines format for audit trail
- **Webhook Verification**: HMAC signature validation (optional)
## Supported Gitea Event Types
- `issue` (opened, closed, reopened, edited)
- `issue.label` (added, removed)
- `pull_request` (opened, closed, reopened, synchronize, edited)
- `pull_request.label` (added, removed)
- `pull_request.review` (approved, rejected, commented)
- `comment` (created, edited, deleted)
- `repository` (created, deleted, archived)
- `organization` (member_added, member_removed)
## Quick Start
### Using Docker Compose
```bash
# Clone and navigate to the directory
cd gitea-event-bridge
# Copy environment file
cp .env.example .env
# Edit .env with your configuration
# Set GITEA_WEBHOOK_SECRET if using signature verification
# Start the service
docker-compose up -d
# Check health
curl http://localhost:5000/health
```
### Manual Setup
```bash
# Install dependencies
pip install -r requirements.txt
# Copy and configure environment
cp .env.example .env
# Run the application
python app.py
```
## Configuration
| Environment Variable | Default | Description |
|---------------------|---------|-------------|
| `HOST` | `0.0.0.0` | Server bind address |
| `PORT` | `5000` | Server port |
| `DEBUG` | `false` | Enable debug mode |
| `OPENCODE_URL` | `http://localhost:8080` | OpenCode server URL |
| `LOG_FILE` | `activity.json` | Activity log file path |
| `GITEA_WEBHOOK_SECRET` | (empty) | Secret for signature verification |
| `AUTO_TRIGGER_PIPELINE` | `true` | Enable automatic pipeline triggers |
| `AUTO_TRIGGER_REVIEW` | `true` | Enable automatic review triggers |
## API Endpoints
### POST /webhook/gitea
Receive Gitea webhooks.
**Headers:**
- `X-Gitea-Event`: Event type
- `X-Gitea-Signature`: HMAC signature (if secret configured)
**Response:**
```json
{
"success": true,
"event_type": "pull_request",
"action": "opened",
"repository": "owner/repo",
"sender": "username",
"triggers": [
{
"type": "review",
"agent": "lead",
"trigger": "trigger_lead_review",
"reason": "pr:opened"
}
]
}
```
### GET /events
SSE stream endpoint for real-time events.
**Response:**
```
event: gitea_event
data: {"event_type": "pull_request", "action": "opened", ...}
```
### GET /health
Health check endpoint.
**Response:**
```json
{
"status": "healthy",
"sse_connected_clients": 0
}
```
### GET /activity
Get recent activity log entries.
**Query Parameters:**
- `limit`: Maximum entries to return (default: 50, max: 200)
**Response:**
```json
{
"count": 2,
"activities": [
{
"id": "uuid",
"timestamp": "2026-04-08T12:00:00Z",
"event_type": "pull_request",
"action": "opened",
"repository": "owner/repo",
"sender": "username",
"payload": {...},
"routed_to": ["lead"]
}
]
}
```
## Trigger Conditions
### Label Triggers
- `start-pipeline` label → Trigger master pipeline
- `needs-decision` label → Trigger master decision
### Comment Commands
- `/pipeline start` → Trigger pipeline
- `/pipeline` → Trigger pipeline (shorthand)
### PR Events
- PR opened → Trigger lead review
- PR synchronize → Trigger lead review
## Gitea Webhook Setup
1. In Gitea, go to Repository Settings → Webhooks
2. Add new webhook:
- **URL**: `http://<your-server>/webhook/gitea`
- **Secret**: Set to match `GITEA_WEBHOOK_SECRET` (optional)
- **Events**: Select events to send
3. Test the webhook
## Example curl Commands
```bash
# Test health endpoint
curl http://localhost:5000/health
# Test SSE stream
curl -N http://localhost:5000/events
# Get activity log
curl http://localhost:5000/activity?limit=10
# Simulate webhook (if no secret configured)
curl -X POST http://localhost:5000/webhook/gitea \
-H "Content-Type: application/json" \
-H "X-Gitea-Event: pull_request" \
-d '{"action": "opened", "repository": {"full_name": "owner/repo"}, "sender": {"login": "user"}, "pull_request": {"number": 1}}'
```
## License
MIT

281
app.py Normal file
View File

@ -0,0 +1,281 @@
"""Gitea Event Bridge - Flask application."""
import json
import uuid
import logging
from flask import Flask, request, jsonify, Response, stream_with_context
from flask_cors import CORS
from typing import Generator, Dict, Any
from config import (
HOST,
PORT,
DEBUG,
GITEA_EVENT_TYPES,
SSE_RECONNECT_TIME,
SSE_HEARTBEAT_INTERVAL
)
from logger import logger as activity_logger, get_activities
from router import router
# Configure logging
logging.basicConfig(
level=logging.DEBUG if DEBUG else logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Create Flask app
app = Flask(__name__)
CORS(app)
# SSE clients storage
class SSEClient:
"""Represents a connected SSE client."""
def __init__(self, client_id: str, queue: 'Queue'):
self.client_id = client_id
self.queue = queue
class SSEClientManager:
"""Manages SSE client connections."""
def __init__(self):
self._clients: Dict[str, SSEClient] = {}
self._lock = __import__('threading').Lock()
def add_client(self, client_id: str, queue) -> SSEClient:
"""Add a new SSE client."""
with self._lock:
client = SSEClient(client_id, queue)
self._clients[client_id] = client
logger.info(f"SSE client connected: {client_id}")
return client
def remove_client(self, client_id: str):
"""Remove an SSE client."""
with self._lock:
if client_id in self._clients:
del self._clients[client_id]
logger.info(f"SSE client disconnected: {client_id}")
def broadcast(self, event_type: str, data: Dict[str, Any]):
"""Broadcast message to all connected clients."""
message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
with self._lock:
disconnected = []
for client_id, client in self._clients.items():
try:
client.queue.put(message)
except Exception as e:
logger.error(f"Error sending to client {client_id}: {e}")
disconnected.append(client_id)
# Clean up disconnected clients
for client_id in disconnected:
del self._clients[client_id]
def get_client_count(self) -> int:
"""Get number of connected clients."""
with self._lock:
return len(self._clients)
# Global SSE manager
sse_manager = SSEClientManager()
@app.route('/webhook/gitea', methods=['POST'])
def webhook_gitea():
"""
Receive Gitea webhooks.
Validates signature, parses payload, routes event, logs activity,
and broadcasts via SSE.
Headers:
X-Gitea-Event: Event type
X-Gitea-Event-Type: More specific event type
X-Gitea-Signature: HMAC signature (if secret configured)
"""
try:
# Get raw payload for signature verification
raw_payload = request.get_data()
# Get event type from headers
event_type = request.headers.get("X-Gitea-Event", "push")
signature = request.headers.get("X-Gitea-Signature", "")
# Verify signature if configured
if not router.verify_signature(raw_payload, signature):
logger.warning("Invalid webhook signature")
return jsonify({"error": "Invalid signature"}), 401
# Parse JSON payload
try:
payload = request.get_json()
except Exception as e:
logger.error(f"Failed to parse JSON payload: {e}")
return jsonify({"error": "Invalid JSON payload"}), 400
if not payload:
return jsonify({"error": "Empty payload"}), 400
# Log event type
logger.info(f"Received Gitea webhook: {event_type}")
# Route the event and get triggers
parsed, triggers = router.route_event(payload)
# Build response
response_data = {
"success": True,
"event_type": parsed["event_type"],
"action": parsed["action"],
"repository": parsed["repository"],
"sender": parsed["sender"],
"triggers": triggers
}
# Broadcast to SSE clients
broadcast_data = {
"event_type": parsed["event_type"],
"action": parsed["action"],
"repository": parsed["repository"],
"sender": parsed["sender"],
"issue": parsed.get("issue"),
"pr": parsed.get("pr"),
"label": parsed.get("label"),
"triggers": triggers,
"timestamp": parsed.get("timestamp", "")
}
sse_manager.broadcast("gitea_event", broadcast_data)
logger.info(f"Webhook processed successfully. Triggers: {triggers}")
return jsonify(response_data), 200
except Exception as e:
logger.error(f"Error processing webhook: {e}", exc_info=True)
return jsonify({"error": str(e)}), 500
@app.route('/events', methods=['GET'])
def events():
"""
SSE stream endpoint for real-time event updates.
Returns:
Streaming response with SSE format:
event: gitea_event
data: {"event_type": "pull_request", "action": "opened", ...}
"""
import queue
import threading
from time import sleep
client_id = str(uuid.uuid4())
event_queue = queue.Queue()
# Add client to manager
sse_manager.add_client(client_id, event_queue)
def generate() -> Generator[str, None, None]:
"""Generate SSE events."""
try:
# Send initial connection event
yield f"event: gitea_event\ndata: {json.dumps({'status': 'connected', 'client_id': client_id})}\n\n"
while True:
try:
# Get message from queue with timeout
message = event_queue.get(timeout=SSE_HEARTBEAT_INTERVAL)
yield message
except queue.Empty:
# Send heartbeat to keep connection alive
yield f": heartbeat\n\n"
except GeneratorExit:
pass
finally:
sse_manager.remove_client(client_id)
return Response(
stream_with_context(generate()),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no',
'Connection': 'keep-alive',
}
)
@app.route('/health', methods=['GET'])
def health():
"""
Health check endpoint.
Returns:
JSON status indicating service health
"""
return jsonify({
"status": "healthy",
"sse_connected_clients": sse_manager.get_client_count()
}), 200
@app.route('/activity', methods=['GET'])
def activity():
"""
Get recent activity log entries.
Query Parameters:
limit: Maximum number of entries to return (default: 50)
Returns:
JSON array of activity entries
"""
limit = request.args.get('limit', 50, type=int)
limit = min(limit, 200) # Cap at 200
activities = get_activities(limit)
return jsonify({
"count": len(activities),
"activities": activities
}), 200
@app.route('/', methods=['GET'])
def index():
"""Root endpoint with service information."""
return jsonify({
"service": "Gitea Event Bridge",
"version": "1.0.0",
"endpoints": {
"POST /webhook/gitea": "Receive Gitea webhooks",
"GET /events": "SSE event stream",
"GET /health": "Health check",
"GET /activity": "Get activity log"
},
"supported_events": GITEA_EVENT_TYPES
}), 200
@app.errorhandler(404)
def not_found(error):
"""Handle 404 errors."""
return jsonify({"error": "Not found"}), 404
@app.errorhandler(500)
def internal_error(error):
"""Handle 500 errors."""
logger.error(f"Internal server error: {error}")
return jsonify({"error": "Internal server error"}), 500
if __name__ == '__main__':
app.run(host=HOST, port=PORT, debug=DEBUG)

45
config.py Normal file
View File

@ -0,0 +1,45 @@
"""Configuration settings for Gitea Event Bridge."""
import os
from dotenv import load_dotenv
load_dotenv()
# OpenCode Server Configuration
OPENCODE_URL = os.getenv("OPENCODE_URL", "http://localhost:8080")
# Flask Configuration
HOST = os.getenv("HOST", "0.0.0.0")
PORT = int(os.getenv("PORT", "5000"))
DEBUG = os.getenv("DEBUG", "false").lower() == "true"
# Activity Log Configuration (JSON Lines format)
LOG_FILE = os.getenv("LOG_FILE", "activity.json")
# Ensure LOG_FILE is absolute path if set via environment
if LOG_FILE and not LOG_FILE.startswith('/'):
import os
LOG_FILE = os.path.abspath(LOG_FILE)
# Gitea Webhook Configuration
GITEA_WEBHOOK_SECRET = os.getenv("GITEA_WEBHOOK_SECRET", "")
# Auto-Trigger Configuration
AUTO_TRIGGER_PIPELINE = os.getenv("AUTO_TRIGGER_PIPELINE", "true").lower() == "true"
AUTO_TRIGGER_REVIEW = os.getenv("AUTO_TRIGGER_REVIEW", "true").lower() == "true"
# Supported Gitea Events
GITEA_EVENT_TYPES = [
"issue",
"issue.label",
"pull_request",
"pull_request.label",
"pull_request.review",
"comment",
"repository",
"organization"
]
# SSE Configuration
SSE_RECONNECT_TIME = 5
SSE_HEARTBEAT_INTERVAL = 30

34
docker-compose.yml Normal file
View File

@ -0,0 +1,34 @@
version: '3.8'
services:
gitea-bridge:
build:
context: .
dockerfile: Dockerfile
container_name: gitea-event-bridge
ports:
- "5000:5000"
volumes:
- ./logs:/app/logs
environment:
- HOST=0.0.0.0
- PORT=5000
- DEBUG=false
- OPENCODE_URL=http://host.docker.internal:8080
- LOG_FILE=/app/logs/activity.json
- GITEA_WEBHOOK_SECRET=${GITEA_WEBHOOK_SECRET:-}
- AUTO_TRIGGER_PIPELINE=true
- AUTO_TRIGGER_REVIEW=true
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 10s
networks:
- gitea-network
networks:
gitea-network:
driver: bridge

105
logger.py Normal file
View File

@ -0,0 +1,105 @@
"""Activity Logger - Logs all events to JSON Lines file."""
import json
import uuid
import threading
from datetime import datetime, timezone
from typing import List, Dict, Any, Optional
from config import LOG_FILE
class ActivityLogger:
"""Thread-safe activity logger that writes to JSON Lines file."""
def __init__(self, log_file: str = None):
self.log_file = log_file or LOG_FILE
self._lock = threading.Lock()
def _append_line(self, entry: Dict[str, Any]):
"""Append a single JSON line to the file."""
with open(self.log_file, 'a') as f:
f.write(json.dumps(entry) + '\n')
def log_activity(self, event_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Log an activity entry to the JSON Lines file.
Args:
event_data: Dictionary containing event information
- timestamp: ISO format timestamp (optional, auto-generated)
- event_type: Type of event (e.g., pull_request, issue)
- action: Action performed (e.g., opened, closed)
- repository: Repository full name
- sender: Actor who triggered the event
- payload: Full webhook payload
- routed_to: List of agents/targets triggered
Returns:
The logged entry with added id and timestamp
"""
with self._lock:
entry = {
"id": str(uuid.uuid4()),
"timestamp": datetime.now(timezone.utc).isoformat(),
"event_type": event_data.get("event_type", ""),
"action": event_data.get("action", ""),
"repository": event_data.get("repository", ""),
"sender": event_data.get("sender", ""),
"payload": event_data.get("payload", {}),
"routed_to": event_data.get("routed_to", [])
}
self._append_line(entry)
return entry
def get_activities(self, limit: int = 50) -> List[Dict[str, Any]]:
"""
Get recent activity entries from the log file.
Args:
limit: Maximum number of entries to return
Returns:
List of activity entries (most recent last)
"""
with self._lock:
if not self.log_file:
return []
activities = []
try:
with open(self.log_file, 'r') as f:
for line in f:
line = line.strip()
if line:
try:
activities.append(json.loads(line))
except json.JSONDecodeError:
continue
except FileNotFoundError:
return []
# Return most recent entries
return activities[-limit:] if len(activities) > limit else activities
def clear(self):
"""Clear all activity logs."""
with self._lock:
try:
open(self.log_file, 'w').close()
except IOError:
pass
# Global logger instance
logger = ActivityLogger()
def log_activity(event_data: Dict[str, Any]) -> Dict[str, Any]:
"""Convenience function to log activity."""
return logger.log_activity(event_data)
def get_activities(limit: int = 50) -> List[Dict[str, Any]]:
"""Convenience function to get activities."""
return logger.get_activities(limit)

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
flask>=2.3.0
flask-cors>=4.0.0
python-dotenv>=1.0.0
gunicorn>=21.0.0

312
router.py Normal file
View File

@ -0,0 +1,312 @@
"""Event Router - Routes Gitea webhook events to appropriate actions."""
import hmac
import hashlib
import re
from typing import Dict, Any, List, Optional, Tuple
from flask import request
from config import (
GITEA_WEBHOOK_SECRET,
AUTO_TRIGGER_PIPELINE,
AUTO_TRIGGER_REVIEW,
GITEA_EVENT_TYPES
)
from logger import logger
class EventRouter:
"""Routes Gitea webhook events to appropriate actions."""
# Label-based triggers
LABEL_TRIGGERS = {
"start-pipeline": "pipeline",
"needs-decision": "decision",
}
# Event types that should trigger lead review
PR_REVIEW_TRIGGERS = ["opened", "synchronize", "reopened"]
# Comment commands
COMMENT_COMMANDS = {
"/pipeline start": "pipeline",
"/pipeline": "pipeline",
"/review": "review",
}
def __init__(self):
pass
def verify_signature(self, payload: bytes, signature: str) -> bool:
"""
Verify the Gitea webhook signature.
Args:
payload: Raw request payload
signature: Signature from X-Gitea-Signature header
Returns:
True if signature is valid, False otherwise
"""
if not GITEA_WEBHOOK_SECRET:
return True # No secret configured, skip verification
if not signature:
return False
expected = hmac.new(
GITEA_WEBHOOK_SECRET.encode('utf-8'),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
def parse_webhook(self, payload: Dict[str, Any], event_type: str) -> Dict[str, Any]:
"""
Parse webhook payload and extract relevant information.
Args:
payload: Raw webhook payload
event_type: Gitea event type header
Returns:
Parsed event data
"""
result = {
"event_type": event_type,
"action": payload.get("action", ""),
"repository": "",
"sender": "",
"issue": None,
"pr": None,
"label": None,
"comment": None,
"review": None,
"changes": {},
"payload": payload
}
# Extract repository info
if "repository" in payload:
repo = payload["repository"]
result["repository"] = repo.get("full_name", "")
# Extract sender/actor
if "sender" in payload:
result["sender"] = payload["sender"].get("login", "")
# Handle issue events
if "issue" in payload:
issue = payload["issue"]
result["issue"] = issue.get("number")
# Handle pull request events
if "pull_request" in payload:
pr = payload["pull_request"]
result["pr"] = pr.get("number")
# Get PR labels if available
if "labels" in pr:
result["labels"] = [label.get("name", "") for label in pr["labels"]]
# Handle label events (issue.label, pull_request.label)
if "label" in payload:
result["label"] = payload["label"].get("name", "")
# Handle comment events
if "comment" in payload:
comment = payload["comment"]
result["comment"] = comment.get("body", "")
# Handle review events
if "review" in payload:
review = payload["review"]
result["review"] = {
"type": review.get("type", ""),
"state": review.get("state", ""),
"body": review.get("body", "")
}
# Handle changes (for edited events)
if "changes" in payload:
result["changes"] = payload.get("changes", {})
return result
def detect_label_triggers(self, parsed: Dict[str, Any]) -> List[Dict[str, str]]:
"""
Detect triggers based on labels added to issues/PRs.
Triggers:
- start-pipeline label trigger pipeline
- needs-decision label trigger decision
"""
triggers = []
event_type = parsed["event_type"]
action = parsed["action"]
label = parsed.get("label", "")
# Check if this is a label addition event
if action == "created" and label:
if label in self.LABEL_TRIGGERS:
trigger_type = self.LABEL_TRIGGERS[label]
triggers.append({
"type": trigger_type,
"agent": "master",
"trigger": f"trigger_{trigger_type}",
"reason": f"label:{label}"
})
# Also check labels array for PRs (for pull_request.label events)
if event_type == "pull_request.label" and action == "created" and label:
if label in self.LABEL_TRIGGERS:
trigger_type = self.LABEL_TRIGGERS[label]
triggers.append({
"type": trigger_type,
"agent": "master",
"trigger": f"trigger_{trigger_type}",
"reason": f"label:{label}"
})
return triggers
def detect_comment_triggers(self, parsed: Dict[str, Any]) -> List[Dict[str, str]]:
"""
Detect triggers based on comment commands.
Triggers:
- /pipeline start trigger pipeline
"""
triggers = []
event_type = parsed["event_type"]
action = parsed["action"]
comment = parsed.get("comment", "")
if event_type == "comment" and action in ["created", "edited"]:
comment_lower = comment.strip().lower()
if comment_lower == "/pipeline start":
triggers.append({
"type": "pipeline",
"agent": "master",
"trigger": "trigger_pipeline",
"reason": "comment:/pipeline start"
})
elif comment_lower == "/pipeline":
triggers.append({
"type": "pipeline",
"agent": "master",
"trigger": "trigger_pipeline",
"reason": "comment:/pipeline"
})
return triggers
def detect_pr_review_triggers(self, parsed: Dict[str, Any]) -> List[Dict[str, str]]:
"""
Detect triggers based on PR events.
Triggers:
- PR opened/synchronized trigger lead review
"""
triggers = []
event_type = parsed["event_type"]
action = parsed["action"]
if event_type == "pull_request":
if action in self.PR_REVIEW_TRIGGERS:
if AUTO_TRIGGER_REVIEW:
triggers.append({
"type": "review",
"agent": "lead",
"trigger": "trigger_lead_review",
"reason": f"pr:{action}"
})
# Also handle pull_request.review events
if event_type == "pull_request.review":
review = parsed.get("review", {})
review_state = review.get("state", "") if review else ""
# Could trigger based on approval/rejection
if review_state == "approved":
triggers.append({
"type": "review_approved",
"agent": "lead",
"trigger": "review_approved",
"reason": "pr:approved"
})
elif review_state == "rejected":
triggers.append({
"type": "review_rejected",
"agent": "lead",
"trigger": "review_rejected",
"reason": "pr:rejected"
})
return triggers
def route_event(self, payload: Dict[str, Any]) -> Tuple[Dict[str, Any], List[Dict[str, str]]]:
"""
Route the webhook event and determine triggers.
Args:
payload: Webhook payload from Gitea
Returns:
Tuple of (parsed_event, triggers_list)
"""
# Get event type from header
event_type = request.headers.get("X-Gitea-Event", "push")
if not event_type:
event_type = "push"
# Handle X-Gitea-Event-Type header for more specific events
event_type_header = request.headers.get("X-Gitea-Event-Type")
if event_type_header:
event_type = event_type_header
# Normalize event type
event_type = event_type.lower().replace("-", "_")
# Parse the webhook
parsed = self.parse_webhook(payload, event_type)
# Detect all triggers
triggers = []
# Check auto-trigger pipeline setting
if AUTO_TRIGGER_PIPELINE:
triggers.extend(self.detect_label_triggers(parsed))
triggers.extend(self.detect_comment_triggers(parsed))
triggers.extend(self.detect_pr_review_triggers(parsed))
# Log the activity
log_entry = logger.log_activity({
"event_type": parsed["event_type"],
"action": parsed["action"],
"repository": parsed["repository"],
"sender": parsed["sender"],
"payload": parsed["payload"],
"routed_to": [t["agent"] for t in triggers]
})
return parsed, triggers
# Global router instance
router = EventRouter()
def route_event(payload: Dict[str, Any]) -> List[Dict[str, str]]:
"""
Convenience function to route an event.
Args:
payload: Webhook payload
Returns:
List of trigger dictionaries
"""
parsed, triggers = router.route_event(payload)
return triggers