forgejo-release-drafter-action/release_drafter.py

474 lines
18 KiB
Python

import os
import re
from dataclasses import dataclass, field
from datetime import datetime
from datetime import timezone
from typing import Dict, List, Optional, Any
import requests
import semver
from requests.adapters import HTTPAdapter
from urllib3 import Retry
@dataclass
class LabelConfig:
priority: int
title: str
required: bool = False
description: str = ""
aliases: List[str] = field(default_factory=list)
class ConfigurationError(Exception):
"""Custom exception for configuration related errors."""
pass
class ReleaseManager:
def __init__(self) -> None:
self.env_vars = self.validate_environment()
self.api_url: str = self.env_vars["FORGEJO_API_URL"]
self.repo: str = self.env_vars["REPO"]
self.token: str = self.env_vars["FORGEJO_TOKEN"]
self.headers: Dict[str, str] = {"Authorization": f"Bearer {self.token}"}
self.label_config = self.initialize_label_config()
self.session = requests.Session()
retries = Retry(
total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504]
)
self.session.mount("https://", HTTPAdapter(max_retries=retries))
self.session.headers.update(self.headers)
@staticmethod
def validate_environment() -> Dict[str, str]:
"""Validate all required environment variables are present."""
required_vars = {
"FORGEJO_API_URL": os.getenv("INPUT_ENDPOINT"),
"REPO": os.getenv("INPUT_REPO"),
"FORGEJO_TOKEN": os.getenv("INPUT_TOKEN"),
}
missing_vars = [var for var, value in required_vars.items() if not value]
if missing_vars:
raise ConfigurationError(
f"Missing required environment variables: {', '.join(missing_vars)}"
)
return required_vars
@staticmethod
def initialize_label_config() -> Dict[str, LabelConfig]:
"""Initialize and validate label configuration."""
label_config = {
"breaking-change": LabelConfig(
priority=0,
title="⚠️ Breaking Changes",
description="Changes that break backward compatibility",
),
"security": LabelConfig(
priority=1,
title="🔒 Security Updates",
description="Security related changes",
),
"feature": LabelConfig(
priority=2,
title="✨ New Features",
description="New features and functionality",
),
"enhancement": LabelConfig(
priority=3,
title="🚀 Enhancements",
description="Improvements to existing features",
),
"bug": LabelConfig(
priority=4,
title="🐛 Bug Fixes",
description="Bug fixes",
aliases=["fix"],
),
"performance": LabelConfig(
priority=5,
title="⚡ Performance Improvements",
description="Performance optimizations",
),
"documentation": LabelConfig(
priority=6,
title="📚 Documentation",
description="Documentation updates",
),
"maintenance": LabelConfig(
priority=7,
title="🛠️ Maintenance",
description="Code maintenance and refactoring",
),
"dependency": LabelConfig(
priority=8, title="📦 Dependencies", description="Dependency updates"
),
"test": LabelConfig(
priority=9, title="🧪 Testing", description="Test-related changes"
),
}
# Validate label configuration
priorities = [config.priority for config in label_config.values()]
if len(priorities) != len(set(priorities)):
raise ConfigurationError(
"Duplicate priority values found in label configuration"
)
return label_config
def log_error(
self, message: str, response: Optional[requests.Response] = None
) -> None:
"""Log errors with optional response details."""
print(f"ERROR: {message}")
if response:
print(f"Response: {response.status_code}, {response.text}")
def get_latest_release(self) -> str | None:
"""Get the latest release information."""
response = self.session.get(f"{self.api_url}/repos/{self.repo}/releases")
if response.status_code == 200 and len(response.json()) > 0:
latest_release = None
latest_release_date = datetime.min.replace(tzinfo=timezone.utc)
releases = response.json()
non_draft_releases = [
release for release in releases if not release.get("draft", False)
]
for release in non_draft_releases:
published_date = datetime.fromisoformat(release["published_at"])
if published_date > latest_release_date:
latest_release_date = datetime.fromisoformat(
release["published_at"]
)
latest_release = release
return latest_release.get("tag_name")
else:
self.log_error("Failed to fetch releases", response)
return None
def validate_pull_request_labels(self, pull_request: Dict[str, Any]) -> None:
"""Validate that pull requests have required labels."""
pr_labels = {label["name"] for label in pull_request["labels"]}
required_labels = {
name for name, config in self.label_config.items() if config.required
}
missing_required = required_labels - pr_labels
if missing_required:
self.log_error(
f"Pull request #{pull_request['number']} missing required labels: "
f"{', '.join(missing_required)}"
)
def determine_release_type(self, pulls: List[Dict[str, Any]]) -> str:
"""Determine if this should be a major, minor, or patch release based on PR labels."""
for pr in pulls:
self.validate_pull_request_labels(pr)
has_breaking = any(
"breaking-change" in [l["name"] for l in pr["labels"]] for pr in pulls
)
has_feature = any(
"feature" in [l["name"] for l in pr["labels"]] for pr in pulls
)
if has_breaking:
return "major"
elif has_feature:
return "minor"
return "patch"
def generate_next_tag(
self, latest_tag: Optional[str], pulls: List[Dict[str, Any]]
) -> Optional[str]:
"""Generate the next version tag based on semantic versioning."""
if not latest_tag:
return "v0.1.0"
try:
version = semver.VersionInfo.parse(latest_tag.replace("v", ""))
release_type = self.determine_release_type(pulls)
if release_type == "major":
next_version = version.bump_major()
elif release_type == "minor":
next_version = version.bump_minor()
else:
next_version = version.bump_patch()
return f"v{next_version}"
except ValueError:
self.log_error(f"Invalid semantic version: {latest_tag}")
return None
def get_closed_pull_requests_since_last_release(
self, latest_tag: str
) -> List[Dict[str, Any]]:
"""Fetch all closed pull requests since the last release."""
if not latest_tag:
print(
"No previous release or draft found. Fetching all closed pull requests."
)
# get commit sha from latest release tag
response = self.session.get(
f"{self.api_url}/repos/{self.repo}/commits/{latest_tag}/status"
)
if response.status_code != 200:
self.log_error("Failed to get latest tag commit sha", response)
return []
latest_tag_sha = response.json().get("sha")
# get commits between latest release tag and current main state
response = self.session.get(
f"{self.api_url}/repos/{self.repo}/compare/{latest_tag_sha}..main"
)
if response.status_code != 200:
self.log_error("Failed to get commits between latest tag and main", response)
return []
commits_since_last_release = response.json().get("commits")
# get pull request of each commit
pulls_since_last_release = []
for commit in commits_since_last_release:
response = self.session.get(
f"{self.api_url}/repos/{self.repo}/commits/{commit.get('sha')}/pull"
)
if response.status_code != 200:
print(
"Failed to get pull request of commit", commit.get("sha"), response
)
continue
pulls_since_last_release.append(response.json())
return pulls_since_last_release
def generate_release_notes(self, pulls: List[Dict[str, Any]]) -> str:
"""Generate formatted release notes from pull requests."""
if not pulls:
return "No pull requests found for this release."
label_groups = {label: [] for label in self.label_config}
# Track latest dependency updates
dependency_updates = {}
def extract_dependency_name(title: str) -> Optional[str]:
"""Extract dependency name from conventional commit title."""
if not title.startswith(("chore(deps)", "fix(deps)")):
return None
# Match package names like "@scope/package" or "package"
match = re.search(
r"update (?:dependency )?(@?[a-zA-Z0-9-]+(?:/[a-zA-Z0-9-]+)?)",
title.lower(),
)
return match.group(1) if match else None
def is_dependency_update(title: str) -> bool:
"""Check if PR is a dependency update."""
return title.startswith(("chore(deps)", "fix(deps)"))
# First pass to collect latest dependency updates
for pr in pulls:
title = pr["title"]
if not is_dependency_update(title):
continue
dep_name = extract_dependency_name(title)
if not dep_name:
continue
if dep_name not in dependency_updates:
dependency_updates[dep_name] = pr
# Only keep the most recent update for each dependency
time_stamp_format = "%Y-%m-%dT%H:%M:%S%z"
pr_merged_at = datetime.strptime(pr["merged_at"], time_stamp_format)
dep_merged_at = datetime.strptime(
dependency_updates[dep_name]["merged_at"], time_stamp_format
)
if pr_merged_at > dep_merged_at:
dependency_updates[dep_name] = pr
# Reset and process all PRs
label_groups["other"] = []
for pr in pulls:
title = pr["title"]
author = pr["user"]["login"]
number = pr["number"]
pr_labels = {label["name"] for label in pr["labels"]}
# Skip dependency updates that aren't the latest for their package
if is_dependency_update(title):
dep_name = extract_dependency_name(title)
if dep_name and dependency_updates[dep_name]["number"] != pr["number"]:
continue
# Find the highest priority label
assigned_label = "other"
key_prio_tuples = [
([key] + config.aliases, config.priority)
for key, config in self.label_config.items()
]
key_prio_tuples.sort(key=lambda x: x[1])
for keys, prio in key_prio_tuples:
if set(keys) & pr_labels:
assigned_label = keys[0]
break
# Extract scope from PR title if it follows conventional commits
scope = ""
if "(" in title and "):" in title:
scope = title[title.index("(") + 1 : title.index(")")]
title = title[title.index("):") + 2 :].strip()
# Add reference to linked issues if any
linked_issues: List[str] = []
if "body" in pr and pr["body"]:
for keyword in ["Closes", "Fixes", "Resolves"]:
if f"{keyword} #" in pr["body"]:
issues = re.findall(f"{keyword} #(\\d+)", pr["body"])
linked_issues.extend(issues)
# Format the PR entry
entry = f"- {title}"
if scope:
entry = f"- **{scope}:** {title}"
entry += f" (#{number})"
if linked_issues:
entry += f" (Fixes: #{', #'.join(linked_issues)})"
entry += f" by @{author}"
label_groups[assigned_label].append(entry)
# Build release notes with friendly titles and emojis
release_notes = ["# Release Notes\n"]
# Add summary section
total_prs = sum(len(prs) for prs in label_groups.values())
total_contributors = len(
{pr.split("by @")[1] for group in label_groups.values() for pr in group}
)
release_notes.extend(
[
f"📊 **Release Statistics**",
f"- Total Pull Requests: {total_prs}",
f"- Contributors: {total_contributors}",
"",
]
)
# Add each section in priority order
for label, config in sorted(
self.label_config.items(), key=lambda x: x[1].priority
):
if label_groups[label]:
release_notes.append(f"## {config.title}")
release_notes.extend(sorted(label_groups[label]))
release_notes.append("")
# Add other changes last
if label_groups["other"]:
release_notes.append("## 📋 Other Changes")
release_notes.extend(sorted(label_groups["other"]))
release_notes.append("")
return "\n".join(release_notes).strip()
def add_version_comparison(
self, release_notes: str, previous_tag: str, new_tag: str
) -> str:
url_base = self.api_url.replace("/api/v1", "")
"""Add version comparison section to release notes."""
comparison_section = f"""\n
## 📝 Version Compare
Compare with previous version: [{previous_tag}...{new_tag}]({url_base}/{self.repo}/compare/{previous_tag}...{new_tag})
"""
return release_notes + comparison_section
def get_existing_draft_release(self) -> Optional[Dict[str, Any]]:
"""Check if a draft release already exists."""
response = self.session.get(f"{self.api_url}/repos/{self.repo}/releases")
if response.status_code != 200:
self.log_error("Failed to fetch releases", response)
return None
draft_releases = [
release for release in response.json() if release.get("draft", False)
]
if draft_releases:
release = draft_releases[0]
print(f"Draft release found: {release['tag_name']}")
return release
def create_or_update_release(
self, new_tag: str, release_notes: str, existing_draft: Optional[Dict[str, Any]]
) -> None:
"""Create a new draft release or update existing one."""
tag = existing_draft.get("tag_name") if existing_draft else new_tag
release_data = {
"body": release_notes,
"hide_archive_links": True,
"draft": True,
"tag_name": tag,
"name": f"Release {tag}",
}
release_id = f"/{existing_draft['id']}" if existing_draft else ""
method = "PATCH" if existing_draft else "POST"
url = f"{self.api_url}/repos/{self.repo}/releases{release_id}"
response = self.session.request(method, url, json=release_data)
if response.status_code == 201 or response.status_code == 200:
print(
f"Draft release {tag} created/updated successfully, using '{method}' method!"
)
return response.json()['html_url']
else:
self.log_error(f"Failed to create/update draft release {tag}", response)
def main(self) -> None:
"""Main execution flow for release management."""
try:
# Get the latest release information
latest_tag = self.get_latest_release()
# Get closed PRs since last release
pulls = self.get_closed_pull_requests_since_last_release(latest_tag)
if not pulls:
print("No new pull requests found. Skipping release creation.")
return
# Generate next version tag
new_tag = self.generate_next_tag(latest_tag, pulls)
if not new_tag:
print("Failed to generate next version tag.")
return
# Generate release notes
release_notes = self.generate_release_notes(pulls)
# Add version comparison if there's a previous release
existing_draft = self.get_existing_draft_release()
comparison_tag = new_tag
if existing_draft:
comparison_tag = existing_draft["tag_name"]
if latest_tag:
release_notes = self.add_version_comparison(
release_notes, latest_tag, comparison_tag
)
url = self.create_or_update_release(comparison_tag, release_notes, existing_draft)
with open(os.environ["GITHUB_OUTPUT"], "a") as gh_out:
gh_out.write(f"release-url={url}\n")
except Exception as e:
print(f"An error occurred: {str(e)}")
raise
if __name__ == "__main__":
manager = ReleaseManager()
manager.main()