import os import re from dataclasses import dataclass, field from datetime import datetime from datetime import timezone from typing import Dict, List, Optional, Any import requests import semver from requests.adapters import HTTPAdapter from urllib3 import Retry @dataclass class LabelConfig: priority: int title: str required: bool = False description: str = "" aliases: List[str] = field(default_factory=list) class ConfigurationError(Exception): """Custom exception for configuration related errors.""" pass class ReleaseManager: def __init__(self) -> None: self.env_vars = self.validate_environment() self.api_url: str = self.env_vars["FORGEJO_API_URL"] self.repo: str = self.env_vars["REPO"] self.token: str = self.env_vars["FORGEJO_TOKEN"] self.headers: Dict[str, str] = {"Authorization": f"Bearer {self.token}"} self.label_config = self.initialize_label_config() self.session = requests.Session() retries = Retry( total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504] ) self.session.mount("https://", HTTPAdapter(max_retries=retries)) self.session.headers.update(self.headers) @staticmethod def validate_environment() -> Dict[str, str]: """Validate all required environment variables are present.""" required_vars = { "FORGEJO_API_URL": os.getenv("INPUT_ENDPOINT"), "REPO": os.getenv("INPUT_REPO"), "FORGEJO_TOKEN": os.getenv("INPUT_TOKEN"), } missing_vars = [var for var, value in required_vars.items() if not value] if missing_vars: raise ConfigurationError( f"Missing required environment variables: {', '.join(missing_vars)}" ) return required_vars @staticmethod def initialize_label_config() -> Dict[str, LabelConfig]: """Initialize and validate label configuration.""" label_config = { "breaking-change": LabelConfig( priority=0, title="โš ๏ธ Breaking Changes", description="Changes that break backward compatibility", ), "security": LabelConfig( priority=1, title="๐Ÿ”’ Security Updates", description="Security related changes", ), "feature": LabelConfig( priority=2, title="โœจ New Features", description="New features and functionality", ), "enhancement": LabelConfig( priority=3, title="๐Ÿš€ Enhancements", description="Improvements to existing features", ), "bug": LabelConfig( priority=4, title="๐Ÿ› Bug Fixes", description="Bug fixes", aliases=["fix"], ), "performance": LabelConfig( priority=5, title="โšก Performance Improvements", description="Performance optimizations", ), "documentation": LabelConfig( priority=6, title="๐Ÿ“š Documentation", description="Documentation updates", ), "maintenance": LabelConfig( priority=7, title="๐Ÿ› ๏ธ Maintenance", description="Code maintenance and refactoring", ), "dependency": LabelConfig( priority=8, title="๐Ÿ“ฆ Dependencies", description="Dependency updates" ), "test": LabelConfig( priority=9, title="๐Ÿงช Testing", description="Test-related changes" ), } # Validate label configuration priorities = [config.priority for config in label_config.values()] if len(priorities) != len(set(priorities)): raise ConfigurationError( "Duplicate priority values found in label configuration" ) return label_config def log_error( self, message: str, response: Optional[requests.Response] = None ) -> None: """Log errors with optional response details.""" print(f"ERROR: {message}") if response: print(f"Response: {response.status_code}, {response.text}") def get_latest_release(self) -> str | None: """Get the latest release information.""" response = self.session.get(f"{self.api_url}/repos/{self.repo}/releases") if response.status_code == 200 and len(response.json()) > 0: latest_release = None latest_release_date = datetime.min.replace(tzinfo=timezone.utc) releases = response.json() non_draft_releases = [ release for release in releases if not release.get("draft", False) ] for release in non_draft_releases: published_date = datetime.fromisoformat(release["published_at"]) if published_date > latest_release_date: latest_release_date = datetime.fromisoformat( release["published_at"] ) latest_release = release return latest_release.get("tag_name") else: self.log_error("Failed to fetch releases", response) return None def validate_pull_request_labels(self, pull_request: Dict[str, Any]) -> None: """Validate that pull requests have required labels.""" pr_labels = {label["name"] for label in pull_request["labels"]} required_labels = { name for name, config in self.label_config.items() if config.required } missing_required = required_labels - pr_labels if missing_required: self.log_error( f"Pull request #{pull_request['number']} missing required labels: " f"{', '.join(missing_required)}" ) def determine_release_type(self, pulls: List[Dict[str, Any]]) -> str: """Determine if this should be a major, minor, or patch release based on PR labels.""" for pr in pulls: self.validate_pull_request_labels(pr) has_breaking = any( "breaking-change" in [l["name"] for l in pr["labels"]] for pr in pulls ) has_feature = any( "feature" in [l["name"] for l in pr["labels"]] for pr in pulls ) if has_breaking: return "major" elif has_feature: return "minor" return "patch" def generate_next_tag( self, latest_tag: Optional[str], pulls: List[Dict[str, Any]] ) -> Optional[str]: """Generate the next version tag based on semantic versioning.""" if not latest_tag: return "v0.1.0" try: version = semver.VersionInfo.parse(latest_tag.replace("v", "")) release_type = self.determine_release_type(pulls) if release_type == "major": next_version = version.bump_major() elif release_type == "minor": next_version = version.bump_minor() else: next_version = version.bump_patch() return f"v{next_version}" except ValueError: self.log_error(f"Invalid semantic version: {latest_tag}") return None def get_closed_pull_requests_since_last_release( self, latest_tag: str ) -> List[Dict[str, Any]]: """Fetch all closed pull requests since the last release.""" if not latest_tag: print( "No previous release or draft found. Fetching all closed pull requests." ) # get commit sha from latest release tag response = self.session.get( f"{self.api_url}/repos/{self.repo}/commits/{latest_tag}/status" ) if response.status_code != 200: self.log_error("Failed to get latest tag commit sha", response) return [] latest_tag_sha = response.json().get("sha") # get commits between latest release tag and current main state response = self.session.get( f"{self.api_url}/repos/{self.repo}/compare/{latest_tag_sha}..main" ) if response.status_code != 200: self.log_error("Failed to get commits between latest tag and main", response) return [] commits_since_last_release = response.json().get("commits") # get pull request of each commit pulls_since_last_release = [] for commit in commits_since_last_release: response = self.session.get( f"{self.api_url}/repos/{self.repo}/commits/{commit.get('sha')}/pull" ) if response.status_code != 200: print( "Failed to get pull request of commit", commit.get("sha"), response ) continue pulls_since_last_release.append(response.json()) return pulls_since_last_release def generate_release_notes(self, pulls: List[Dict[str, Any]]) -> str: """Generate formatted release notes from pull requests.""" if not pulls: return "No pull requests found for this release." label_groups = {label: [] for label in self.label_config} # Track latest dependency updates dependency_updates = {} def extract_dependency_name(title: str) -> Optional[str]: """Extract dependency name from conventional commit title.""" if not title.startswith(("chore(deps)", "fix(deps)")): return None # Match package names like "@scope/package" or "package" match = re.search( r"update (?:dependency )?(@?[a-zA-Z0-9-]+(?:/[a-zA-Z0-9-]+)?)", title.lower(), ) return match.group(1) if match else None def is_dependency_update(title: str) -> bool: """Check if PR is a dependency update.""" return title.startswith(("chore(deps)", "fix(deps)")) # First pass to collect latest dependency updates for pr in pulls: title = pr["title"] if not is_dependency_update(title): continue dep_name = extract_dependency_name(title) if not dep_name: continue if dep_name not in dependency_updates: dependency_updates[dep_name] = pr # Only keep the most recent update for each dependency time_stamp_format = "%Y-%m-%dT%H:%M:%S%z" pr_merged_at = datetime.strptime(pr["merged_at"], time_stamp_format) dep_merged_at = datetime.strptime( dependency_updates[dep_name]["merged_at"], time_stamp_format ) if pr_merged_at > dep_merged_at: dependency_updates[dep_name] = pr # Reset and process all PRs label_groups["other"] = [] for pr in pulls: title = pr["title"] author = pr["user"]["login"] number = pr["number"] pr_labels = {label["name"] for label in pr["labels"]} # Skip dependency updates that aren't the latest for their package if is_dependency_update(title): dep_name = extract_dependency_name(title) if dep_name and dependency_updates[dep_name]["number"] != pr["number"]: continue # Find the highest priority label assigned_label = "other" key_prio_tuples = [ ([key] + config.aliases, config.priority) for key, config in self.label_config.items() ] key_prio_tuples.sort(key=lambda x: x[1]) for keys, prio in key_prio_tuples: if set(keys) & pr_labels: assigned_label = keys[0] break # Extract scope from PR title if it follows conventional commits scope = "" if "(" in title and "):" in title: scope = title[title.index("(") + 1 : title.index(")")] title = title[title.index("):") + 2 :].strip() # Add reference to linked issues if any linked_issues: List[str] = [] if "body" in pr and pr["body"]: for keyword in ["Closes", "Fixes", "Resolves"]: if f"{keyword} #" in pr["body"]: issues = re.findall(f"{keyword} #(\\d+)", pr["body"]) linked_issues.extend(issues) # Format the PR entry entry = f"- {title}" if scope: entry = f"- **{scope}:** {title}" entry += f" (#{number})" if linked_issues: entry += f" (Fixes: #{', #'.join(linked_issues)})" entry += f" by @{author}" label_groups[assigned_label].append(entry) # Build release notes with friendly titles and emojis release_notes = ["# Release Notes\n"] # Add summary section total_prs = sum(len(prs) for prs in label_groups.values()) total_contributors = len( {pr.split("by @")[1] for group in label_groups.values() for pr in group} ) release_notes.extend( [ f"๐Ÿ“Š **Release Statistics**", f"- Total Pull Requests: {total_prs}", f"- Contributors: {total_contributors}", "", ] ) # Add each section in priority order for label, config in sorted( self.label_config.items(), key=lambda x: x[1].priority ): if label_groups[label]: release_notes.append(f"## {config.title}") release_notes.extend(sorted(label_groups[label])) release_notes.append("") # Add other changes last if label_groups["other"]: release_notes.append("## ๐Ÿ“‹ Other Changes") release_notes.extend(sorted(label_groups["other"])) release_notes.append("") return "\n".join(release_notes).strip() def add_version_comparison( self, release_notes: str, previous_tag: str, new_tag: str ) -> str: url_base = self.api_url.replace("/api/v1", "") """Add version comparison section to release notes.""" comparison_section = f"""\n ## ๐Ÿ“ Version Compare Compare with previous version: [{previous_tag}...{new_tag}]({url_base}/{self.repo}/compare/{previous_tag}...{new_tag}) """ return release_notes + comparison_section def get_existing_draft_release(self) -> Optional[Dict[str, Any]]: """Check if a draft release already exists.""" response = self.session.get(f"{self.api_url}/repos/{self.repo}/releases") if response.status_code != 200: self.log_error("Failed to fetch releases", response) return None draft_releases = [ release for release in response.json() if release.get("draft", False) ] if draft_releases: release = draft_releases[0] print(f"Draft release found: {release['tag_name']}") return release def create_or_update_release( self, new_tag: str, release_notes: str, existing_draft: Optional[Dict[str, Any]] ) -> None: """Create a new draft release or update existing one.""" tag = existing_draft.get("tag_name") if existing_draft else new_tag release_data = { "body": release_notes, "hide_archive_links": True, "draft": True, "tag_name": tag, "name": f"Release {tag}", } release_id = f"/{existing_draft['id']}" if existing_draft else "" method = "PATCH" if existing_draft else "POST" url = f"{self.api_url}/repos/{self.repo}/releases{release_id}" response = self.session.request(method, url, json=release_data) if response.status_code == 201 or response.status_code == 200: print( f"Draft release {tag} created/updated successfully, using '{method}' method!" ) return response.json()['html_url'] else: self.log_error(f"Failed to create/update draft release {tag}", response) def main(self) -> None: """Main execution flow for release management.""" try: # Get the latest release information latest_tag = self.get_latest_release() # Get closed PRs since last release pulls = self.get_closed_pull_requests_since_last_release(latest_tag) if not pulls: print("No new pull requests found. Skipping release creation.") return # Generate next version tag new_tag = self.generate_next_tag(latest_tag, pulls) if not new_tag: print("Failed to generate next version tag.") return # Generate release notes release_notes = self.generate_release_notes(pulls) # Add version comparison if there's a previous release existing_draft = self.get_existing_draft_release() comparison_tag = new_tag if existing_draft: comparison_tag = existing_draft["tag_name"] if latest_tag: release_notes = self.add_version_comparison( release_notes, latest_tag, comparison_tag ) url = self.create_or_update_release(comparison_tag, release_notes, existing_draft) with open(os.environ["GITHUB_OUTPUT"], "a") as gh_out: gh_out.write(f"release-url={url}\n") except Exception as e: print(f"An error occurred: {str(e)}") raise if __name__ == "__main__": manager = ReleaseManager() manager.main()