"""Event Router - Routes Gitea webhook events to appropriate actions.""" import hmac import hashlib import re from typing import Dict, Any, List, Optional, Tuple from flask import request from config import ( GITEA_WEBHOOK_SECRET, AUTO_TRIGGER_PIPELINE, AUTO_TRIGGER_REVIEW, GITEA_EVENT_TYPES ) from logger import logger class EventRouter: """Routes Gitea webhook events to appropriate actions.""" # Label-based triggers LABEL_TRIGGERS = { "start-pipeline": "pipeline", "needs-decision": "decision", } # Event types that should trigger lead review PR_REVIEW_TRIGGERS = ["opened", "synchronize", "reopened"] # Comment commands COMMENT_COMMANDS = { "/pipeline start": "pipeline", "/pipeline": "pipeline", "/review": "review", } def __init__(self): pass def verify_signature(self, payload: bytes, signature: str) -> bool: """ Verify the Gitea webhook signature. Args: payload: Raw request payload signature: Signature from X-Gitea-Signature header Returns: True if signature is valid, False otherwise """ if not GITEA_WEBHOOK_SECRET: return True # No secret configured, skip verification if not signature: return False expected = hmac.new( GITEA_WEBHOOK_SECRET.encode('utf-8'), payload, hashlib.sha256 ).hexdigest() return hmac.compare_digest(f"sha256={expected}", signature) def parse_webhook(self, payload: Dict[str, Any], event_type: str) -> Dict[str, Any]: """ Parse webhook payload and extract relevant information. Args: payload: Raw webhook payload event_type: Gitea event type header Returns: Parsed event data """ result = { "event_type": event_type, "action": payload.get("action", ""), "repository": "", "sender": "", "issue": None, "pr": None, "label": None, "comment": None, "review": None, "changes": {}, "payload": payload } # Extract repository info if "repository" in payload: repo = payload["repository"] result["repository"] = repo.get("full_name", "") # Extract sender/actor if "sender" in payload: result["sender"] = payload["sender"].get("login", "") # Handle issue events if "issue" in payload: issue = payload["issue"] result["issue"] = issue.get("number") # Handle pull request events if "pull_request" in payload: pr = payload["pull_request"] result["pr"] = pr.get("number") # Get PR labels if available if "labels" in pr: result["labels"] = [label.get("name", "") for label in pr["labels"]] # Handle label events (issue.label, pull_request.label) if "label" in payload: result["label"] = payload["label"].get("name", "") # Handle comment events if "comment" in payload: comment = payload["comment"] result["comment"] = comment.get("body", "") # Handle review events if "review" in payload: review = payload["review"] result["review"] = { "type": review.get("type", ""), "state": review.get("state", ""), "body": review.get("body", "") } # Handle changes (for edited events) if "changes" in payload: result["changes"] = payload.get("changes", {}) return result def detect_label_triggers(self, parsed: Dict[str, Any]) -> List[Dict[str, str]]: """ Detect triggers based on labels added to issues/PRs. Triggers: - start-pipeline label → trigger pipeline - needs-decision label → trigger decision """ triggers = [] event_type = parsed["event_type"] action = parsed["action"] label = parsed.get("label", "") # Check if this is a label addition event if action == "created" and label: if label in self.LABEL_TRIGGERS: trigger_type = self.LABEL_TRIGGERS[label] triggers.append({ "type": trigger_type, "agent": "master", "trigger": f"trigger_{trigger_type}", "reason": f"label:{label}" }) # Also check labels array for PRs (for pull_request.label events) if event_type == "pull_request.label" and action == "created" and label: if label in self.LABEL_TRIGGERS: trigger_type = self.LABEL_TRIGGERS[label] triggers.append({ "type": trigger_type, "agent": "master", "trigger": f"trigger_{trigger_type}", "reason": f"label:{label}" }) return triggers def detect_comment_triggers(self, parsed: Dict[str, Any]) -> List[Dict[str, str]]: """ Detect triggers based on comment commands. Triggers: - /pipeline start → trigger pipeline """ triggers = [] event_type = parsed["event_type"] action = parsed["action"] comment = parsed.get("comment", "") if event_type == "comment" and action in ["created", "edited"]: comment_lower = comment.strip().lower() if comment_lower == "/pipeline start": triggers.append({ "type": "pipeline", "agent": "master", "trigger": "trigger_pipeline", "reason": "comment:/pipeline start" }) elif comment_lower == "/pipeline": triggers.append({ "type": "pipeline", "agent": "master", "trigger": "trigger_pipeline", "reason": "comment:/pipeline" }) return triggers def detect_pr_review_triggers(self, parsed: Dict[str, Any]) -> List[Dict[str, str]]: """ Detect triggers based on PR events. Triggers: - PR opened/synchronized → trigger lead review """ triggers = [] event_type = parsed["event_type"] action = parsed["action"] if event_type == "pull_request": if action in self.PR_REVIEW_TRIGGERS: if AUTO_TRIGGER_REVIEW: triggers.append({ "type": "review", "agent": "lead", "trigger": "trigger_lead_review", "reason": f"pr:{action}" }) # Also handle pull_request.review events if event_type == "pull_request.review": review = parsed.get("review", {}) review_state = review.get("state", "") if review else "" # Could trigger based on approval/rejection if review_state == "approved": triggers.append({ "type": "review_approved", "agent": "lead", "trigger": "review_approved", "reason": "pr:approved" }) elif review_state == "rejected": triggers.append({ "type": "review_rejected", "agent": "lead", "trigger": "review_rejected", "reason": "pr:rejected" }) return triggers def route_event(self, payload: Dict[str, Any]) -> Tuple[Dict[str, Any], List[Dict[str, str]]]: """ Route the webhook event and determine triggers. Args: payload: Webhook payload from Gitea Returns: Tuple of (parsed_event, triggers_list) """ # Get event type from header event_type = request.headers.get("X-Gitea-Event", "push") if not event_type: event_type = "push" # Handle X-Gitea-Event-Type header for more specific events event_type_header = request.headers.get("X-Gitea-Event-Type") if event_type_header: event_type = event_type_header # Normalize event type event_type = event_type.lower().replace("-", "_") # Parse the webhook parsed = self.parse_webhook(payload, event_type) # Detect all triggers triggers = [] # Check auto-trigger pipeline setting if AUTO_TRIGGER_PIPELINE: triggers.extend(self.detect_label_triggers(parsed)) triggers.extend(self.detect_comment_triggers(parsed)) triggers.extend(self.detect_pr_review_triggers(parsed)) # Log the activity log_entry = logger.log_activity({ "event_type": parsed["event_type"], "action": parsed["action"], "repository": parsed["repository"], "sender": parsed["sender"], "payload": parsed["payload"], "routed_to": [t["agent"] for t in triggers] }) return parsed, triggers # Global router instance router = EventRouter() def route_event(payload: Dict[str, Any]) -> List[Dict[str, str]]: """ Convenience function to route an event. Args: payload: Webhook payload Returns: List of trigger dictionaries """ parsed, triggers = router.route_event(payload) return triggers