diff --git a/release_drafter.py b/release_drafter.py new file mode 100644 index 0000000..19d9a36 --- /dev/null +++ b/release_drafter.py @@ -0,0 +1,481 @@ +import os +import re +import argparse +from dataclasses import dataclass, field +from datetime import datetime +from datetime import timezone +from typing import Dict, List, Optional, Any + +import requests +import semver +from requests.adapters import HTTPAdapter +from urllib3 import Retry + + + +@dataclass +class LabelConfig: + priority: int + title: str + required: bool = False + description: str = "" + aliases: List[str] = field(default_factory=list) + + +class ConfigurationError(Exception): + """Custom exception for configuration related errors.""" + + pass + + +class ReleaseManager: + def __init__(self) -> None: + self.env_vars = self.validate_environment() + self.api_url: str = self.env_vars["FORGEJO_API_URL"] + self.repo: str = self.env_vars["REPO"] + self.token: str = self.env_vars["FORGEJO_TOKEN"] + self.headers: Dict[str, str] = {"Authorization": f"Bearer {self.token}"} + self.label_config = self.initialize_label_config() + self.session = requests.Session() + retries = Retry( + total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504] + ) + self.session.mount("https://", HTTPAdapter(max_retries=retries)) + self.session.headers.update(self.headers) + + @staticmethod + def validate_environment() -> Dict[str, str]: + """Validate all required environment variables are present.""" + parser = argparse.ArgumentParser() + parser.add_argument("--token", required=True) + parser.add_argument("--repo", required=True) + parser.add_argument("--endpoint", required=True) + args = parser.parse_args() + + required_vars = { + "FORGEJO_API_URL": args.endpoint, + "REPO": args.repo, + "FORGEJO_TOKEN": args.token, + } + + missing_vars = [var for var, value in required_vars.items() if not value] + + if missing_vars: + raise ConfigurationError( + f"Missing required environment variables: {', '.join(missing_vars)}" + ) + + return required_vars + + @staticmethod + def initialize_label_config() -> Dict[str, LabelConfig]: + """Initialize and validate label configuration.""" + label_config = { + "breaking-change": LabelConfig( + priority=0, + title="โš ๏ธ Breaking Changes", + description="Changes that break backward compatibility", + ), + "security": LabelConfig( + priority=1, + title="๐Ÿ”’ Security Updates", + description="Security related changes", + ), + "feature": LabelConfig( + priority=2, + title="โœจ New Features", + description="New features and functionality", + ), + "enhancement": LabelConfig( + priority=3, + title="๐Ÿš€ Enhancements", + description="Improvements to existing features", + ), + "bug": LabelConfig( + priority=4, + title="๐Ÿ› Bug Fixes", + description="Bug fixes", + aliases=["fix"], + ), + "performance": LabelConfig( + priority=5, + title="โšก Performance Improvements", + description="Performance optimizations", + ), + "documentation": LabelConfig( + priority=6, + title="๐Ÿ“š Documentation", + description="Documentation updates", + ), + "maintenance": LabelConfig( + priority=7, + title="๐Ÿ› ๏ธ Maintenance", + description="Code maintenance and refactoring", + ), + "dependency": LabelConfig( + priority=8, title="๐Ÿ“ฆ Dependencies", description="Dependency updates" + ), + "test": LabelConfig( + priority=9, title="๐Ÿงช Testing", description="Test-related changes" + ), + } + + # Validate label configuration + priorities = [config.priority for config in label_config.values()] + if len(priorities) != len(set(priorities)): + raise ConfigurationError( + "Duplicate priority values found in label configuration" + ) + + return label_config + + def log_error( + self, message: str, response: Optional[requests.Response] = None + ) -> None: + """Log errors with optional response details.""" + print(f"ERROR: {message}") + if response: + print(f"Response: {response.status_code}, {response.text}") + + def get_latest_release(self) -> str | None: + """Get the latest release information.""" + response = self.session.get(f"{self.api_url}/repos/{self.repo}/releases") + if response.status_code == 200 and len(response.json()) > 0: + latest_release = None + latest_release_date = datetime.min.replace(tzinfo=timezone.utc) + releases = response.json() + non_draft_releases = [ + release for release in releases if not release.get("draft", False) + ] + for release in non_draft_releases: + published_date = datetime.fromisoformat(release["published_at"]) + if published_date > latest_release_date: + latest_release_date = datetime.fromisoformat( + release["published_at"] + ) + latest_release = release + return latest_release.get("tag_name") + else: + self.log_error("Failed to fetch releases", response) + return None + + def validate_pull_request_labels(self, pull_request: Dict[str, Any]) -> None: + """Validate that pull requests have required labels.""" + pr_labels = {label["name"] for label in pull_request["labels"]} + required_labels = { + name for name, config in self.label_config.items() if config.required + } + + missing_required = required_labels - pr_labels + if missing_required: + self.log_error( + f"Pull request #{pull_request['number']} missing required labels: " + f"{', '.join(missing_required)}" + ) + + def determine_release_type(self, pulls: List[Dict[str, Any]]) -> str: + """Determine if this should be a major, minor, or patch release based on PR labels.""" + for pr in pulls: + self.validate_pull_request_labels(pr) + + has_breaking = any( + "breaking-change" in [l["name"] for l in pr["labels"]] for pr in pulls + ) + has_feature = any( + "feature" in [l["name"] for l in pr["labels"]] for pr in pulls + ) + + if has_breaking: + return "major" + elif has_feature: + return "minor" + return "patch" + + def generate_next_tag( + self, latest_tag: Optional[str], pulls: List[Dict[str, Any]] + ) -> Optional[str]: + """Generate the next version tag based on semantic versioning.""" + if not latest_tag: + return "v0.1.0" + + try: + version = semver.VersionInfo.parse(latest_tag.replace("v", "")) + release_type = self.determine_release_type(pulls) + + if release_type == "major": + next_version = version.bump_major() + elif release_type == "minor": + next_version = version.bump_minor() + else: + next_version = version.bump_patch() + + return f"v{next_version}" + except ValueError: + self.log_error(f"Invalid semantic version: {latest_tag}") + return None + + def get_closed_pull_requests_since_last_release( + self, latest_tag: str + ) -> List[Dict[str, Any]]: + """Fetch all closed pull requests since the last release.""" + if not latest_tag: + print( + "No previous release or draft found. Fetching all closed pull requests." + ) + + # get commit sha from latest release tag + response = self.session.get( + f"{self.api_url}/repos/{self.repo}/commits/{latest_tag}/status" + ) + if response.status_code != 200: + self.log_error("Failed to get latest tag commit sha", response) + return [] + latest_tag_sha = response.json().get("sha") + + # get commits between latest release tag and current main state + response = self.session.get( + f"{self.api_url}/repos/{self.repo}/compare/{latest_tag_sha}..main" + ) + if response.status_code != 200: + self.log_error("Failed to get commits between latest tag and main", response) + return [] + commits_since_last_release = response.json().get("commits") + + # get pull request of each commit + pulls_since_last_release = [] + for commit in commits_since_last_release: + response = self.session.get( + f"{self.api_url}/repos/{self.repo}/commits/{commit.get('sha')}/pull" + ) + if response.status_code != 200: + print( + "Failed to get pull request of commit", commit.get("sha"), response + ) + continue + pulls_since_last_release.append(response.json()) + + return pulls_since_last_release + + def generate_release_notes(self, pulls: List[Dict[str, Any]]) -> str: + """Generate formatted release notes from pull requests.""" + if not pulls: + return "No pull requests found for this release." + + label_groups = {label: [] for label in self.label_config} + + # Track latest dependency updates + dependency_updates = {} + + def extract_dependency_name(title: str) -> Optional[str]: + """Extract dependency name from conventional commit title.""" + if not title.startswith(("chore(deps)", "fix(deps)")): + return None + + # Match package names like "@scope/package" or "package" + match = re.search( + r"update (?:dependency )?(@?[a-zA-Z0-9-]+(?:/[a-zA-Z0-9-]+)?)", + title.lower(), + ) + return match.group(1) if match else None + + def is_dependency_update(title: str) -> bool: + """Check if PR is a dependency update.""" + return title.startswith(("chore(deps)", "fix(deps)")) + + # First pass to collect latest dependency updates + for pr in pulls: + title = pr["title"] + if not is_dependency_update(title): + continue + dep_name = extract_dependency_name(title) + if not dep_name: + continue + if dep_name not in dependency_updates: + dependency_updates[dep_name] = pr + # Only keep the most recent update for each dependency + time_stamp_format = "%Y-%m-%dT%H:%M:%S%z" + pr_merged_at = datetime.strptime(pr["merged_at"], time_stamp_format) + dep_merged_at = datetime.strptime( + dependency_updates[dep_name]["merged_at"], time_stamp_format + ) + if pr_merged_at > dep_merged_at: + dependency_updates[dep_name] = pr + + # Reset and process all PRs + label_groups["other"] = [] + + for pr in pulls: + title = pr["title"] + author = pr["user"]["login"] + number = pr["number"] + pr_labels = {label["name"] for label in pr["labels"]} + + # Skip dependency updates that aren't the latest for their package + if is_dependency_update(title): + dep_name = extract_dependency_name(title) + if dep_name and dependency_updates[dep_name]["number"] != pr["number"]: + continue + + # Find the highest priority label + assigned_label = "other" + key_prio_tuples = [ + ([key] + config.aliases, config.priority) + for key, config in self.label_config.items() + ] + key_prio_tuples.sort(key=lambda x: x[1]) + for keys, prio in key_prio_tuples: + if set(keys) & pr_labels: + assigned_label = keys[0] + break + + # Extract scope from PR title if it follows conventional commits + scope = "" + if "(" in title and "):" in title: + scope = title[title.index("(") + 1 : title.index(")")] + title = title[title.index("):") + 2 :].strip() + + # Add reference to linked issues if any + linked_issues: List[str] = [] + if "body" in pr and pr["body"]: + for keyword in ["Closes", "Fixes", "Resolves"]: + if f"{keyword} #" in pr["body"]: + issues = re.findall(f"{keyword} #(\\d+)", pr["body"]) + linked_issues.extend(issues) + + # Format the PR entry + entry = f"- {title}" + if scope: + entry = f"- **{scope}:** {title}" + entry += f" (#{number})" + if linked_issues: + entry += f" (Fixes: #{', #'.join(linked_issues)})" + entry += f" by @{author}" + + label_groups[assigned_label].append(entry) + + # Build release notes with friendly titles and emojis + release_notes = ["# Release Notes\n"] + + # Add summary section + total_prs = sum(len(prs) for prs in label_groups.values()) + total_contributors = len( + {pr.split("by @")[1] for group in label_groups.values() for pr in group} + ) + release_notes.extend( + [ + f"๐Ÿ“Š **Release Statistics**", + f"- Total Pull Requests: {total_prs}", + f"- Contributors: {total_contributors}", + "", + ] + ) + + # Add each section in priority order + for label, config in sorted( + self.label_config.items(), key=lambda x: x[1].priority + ): + if label_groups[label]: + release_notes.append(f"## {config.title}") + release_notes.extend(sorted(label_groups[label])) + release_notes.append("") + + # Add other changes last + if label_groups["other"]: + release_notes.append("## ๐Ÿ“‹ Other Changes") + release_notes.extend(sorted(label_groups["other"])) + release_notes.append("") + + return "\n".join(release_notes).strip() + + def add_version_comparison( + self, release_notes: str, previous_tag: str, new_tag: str + ) -> str: + url_base = self.api_url.replace("/api/v1", "") + """Add version comparison section to release notes.""" + comparison_section = f"""\n +## ๐Ÿ“ Version Compare +Compare with previous version: [{previous_tag}...{new_tag}]({url_base}/{self.repo}/compare/{previous_tag}...{new_tag}) +""" + return release_notes + comparison_section + + def get_existing_draft_release(self) -> Optional[Dict[str, Any]]: + """Check if a draft release already exists.""" + response = self.session.get(f"{self.api_url}/repos/{self.repo}/releases") + if response.status_code != 200: + self.log_error("Failed to fetch releases", response) + return None + draft_releases = [ + release for release in response.json() if release.get("draft", False) + ] + if draft_releases: + release = draft_releases[0] + print(f"Draft release found: {release['tag_name']}") + return release + + def create_or_update_release( + self, new_tag: str, release_notes: str, existing_draft: Optional[Dict[str, Any]] + ) -> None: + """Create a new draft release or update existing one.""" + tag = existing_draft.get("tag_name") if existing_draft else new_tag + release_data = { + "body": release_notes, + "hide_archive_links": True, + "draft": True, + "tag_name": tag, + "name": f"Release {tag}", + } + release_id = f"/{existing_draft['id']}" if existing_draft else "" + method = "PATCH" if existing_draft else "POST" + url = f"{self.api_url}/repos/{self.repo}/releases{release_id}" + response = self.session.request(method, url, json=release_data) + if response.status_code == 201 or response.status_code == 200: + print( + f"Draft release {tag} created/updated successfully, using '{method}' method!" + ) + return response.json()['html_url'] + else: + self.log_error(f"Failed to create/update draft release {tag}", response) + + def main(self) -> None: + """Main execution flow for release management.""" + try: + # Get the latest release information + latest_tag = self.get_latest_release() + + # Get closed PRs since last release + pulls = self.get_closed_pull_requests_since_last_release(latest_tag) + + if not pulls: + print("No new pull requests found. Skipping release creation.") + return + + # Generate next version tag + new_tag = self.generate_next_tag(latest_tag, pulls) + if not new_tag: + print("Failed to generate next version tag.") + return + + # Generate release notes + release_notes = self.generate_release_notes(pulls) + + # Add version comparison if there's a previous release + existing_draft = self.get_existing_draft_release() + comparison_tag = new_tag + if existing_draft: + comparison_tag = existing_draft["tag_name"] + + if latest_tag: + release_notes = self.add_version_comparison( + release_notes, latest_tag, comparison_tag + ) + url = self.create_or_update_release(comparison_tag, release_notes, existing_draft) + print(f"::set-output name=release-url::{url}") + + except Exception as e: + print(f"An error occurred: {str(e)}") + raise + + +if __name__ == "__main__": + manager = ReleaseManager() + manager.main()