481 lines
18 KiB
Python
481 lines
18 KiB
Python
import os
|
|
import re
|
|
import argparse
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from datetime import timezone
|
|
from typing import Dict, List, Optional, Any
|
|
|
|
import requests
|
|
import semver
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3 import Retry
|
|
|
|
|
|
|
|
@dataclass
|
|
class LabelConfig:
|
|
priority: int
|
|
title: str
|
|
required: bool = False
|
|
description: str = ""
|
|
aliases: List[str] = field(default_factory=list)
|
|
|
|
|
|
class ConfigurationError(Exception):
|
|
"""Custom exception for configuration related errors."""
|
|
|
|
pass
|
|
|
|
|
|
class ReleaseManager:
|
|
def __init__(self) -> None:
|
|
self.env_vars = self.validate_environment()
|
|
self.api_url: str = self.env_vars["FORGEJO_API_URL"]
|
|
self.repo: str = self.env_vars["REPO"]
|
|
self.token: str = self.env_vars["FORGEJO_TOKEN"]
|
|
self.headers: Dict[str, str] = {"Authorization": f"Bearer {self.token}"}
|
|
self.label_config = self.initialize_label_config()
|
|
self.session = requests.Session()
|
|
retries = Retry(
|
|
total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504]
|
|
)
|
|
self.session.mount("https://", HTTPAdapter(max_retries=retries))
|
|
self.session.headers.update(self.headers)
|
|
|
|
@staticmethod
|
|
def validate_environment() -> Dict[str, str]:
|
|
"""Validate all required environment variables are present."""
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--token", required=True)
|
|
parser.add_argument("--repo", required=True)
|
|
parser.add_argument("--endpoint", required=True)
|
|
args = parser.parse_args()
|
|
|
|
required_vars = {
|
|
"FORGEJO_API_URL": args.endpoint,
|
|
"REPO": args.repo,
|
|
"FORGEJO_TOKEN": args.token,
|
|
}
|
|
|
|
missing_vars = [var for var, value in required_vars.items() if not value]
|
|
|
|
if missing_vars:
|
|
raise ConfigurationError(
|
|
f"Missing required environment variables: {', '.join(missing_vars)}"
|
|
)
|
|
|
|
return required_vars
|
|
|
|
@staticmethod
|
|
def initialize_label_config() -> Dict[str, LabelConfig]:
|
|
"""Initialize and validate label configuration."""
|
|
label_config = {
|
|
"breaking-change": LabelConfig(
|
|
priority=0,
|
|
title="⚠️ Breaking Changes",
|
|
description="Changes that break backward compatibility",
|
|
),
|
|
"security": LabelConfig(
|
|
priority=1,
|
|
title="🔒 Security Updates",
|
|
description="Security related changes",
|
|
),
|
|
"feature": LabelConfig(
|
|
priority=2,
|
|
title="✨ New Features",
|
|
description="New features and functionality",
|
|
),
|
|
"enhancement": LabelConfig(
|
|
priority=3,
|
|
title="🚀 Enhancements",
|
|
description="Improvements to existing features",
|
|
),
|
|
"bug": LabelConfig(
|
|
priority=4,
|
|
title="🐛 Bug Fixes",
|
|
description="Bug fixes",
|
|
aliases=["fix"],
|
|
),
|
|
"performance": LabelConfig(
|
|
priority=5,
|
|
title="⚡ Performance Improvements",
|
|
description="Performance optimizations",
|
|
),
|
|
"documentation": LabelConfig(
|
|
priority=6,
|
|
title="📚 Documentation",
|
|
description="Documentation updates",
|
|
),
|
|
"maintenance": LabelConfig(
|
|
priority=7,
|
|
title="🛠️ Maintenance",
|
|
description="Code maintenance and refactoring",
|
|
),
|
|
"dependency": LabelConfig(
|
|
priority=8, title="📦 Dependencies", description="Dependency updates"
|
|
),
|
|
"test": LabelConfig(
|
|
priority=9, title="🧪 Testing", description="Test-related changes"
|
|
),
|
|
}
|
|
|
|
# Validate label configuration
|
|
priorities = [config.priority for config in label_config.values()]
|
|
if len(priorities) != len(set(priorities)):
|
|
raise ConfigurationError(
|
|
"Duplicate priority values found in label configuration"
|
|
)
|
|
|
|
return label_config
|
|
|
|
def log_error(
|
|
self, message: str, response: Optional[requests.Response] = None
|
|
) -> None:
|
|
"""Log errors with optional response details."""
|
|
print(f"ERROR: {message}")
|
|
if response:
|
|
print(f"Response: {response.status_code}, {response.text}")
|
|
|
|
def get_latest_release(self) -> str | None:
|
|
"""Get the latest release information."""
|
|
response = self.session.get(f"{self.api_url}/repos/{self.repo}/releases")
|
|
if response.status_code == 200 and len(response.json()) > 0:
|
|
latest_release = None
|
|
latest_release_date = datetime.min.replace(tzinfo=timezone.utc)
|
|
releases = response.json()
|
|
non_draft_releases = [
|
|
release for release in releases if not release.get("draft", False)
|
|
]
|
|
for release in non_draft_releases:
|
|
published_date = datetime.fromisoformat(release["published_at"])
|
|
if published_date > latest_release_date:
|
|
latest_release_date = datetime.fromisoformat(
|
|
release["published_at"]
|
|
)
|
|
latest_release = release
|
|
return latest_release.get("tag_name")
|
|
else:
|
|
self.log_error("Failed to fetch releases", response)
|
|
return None
|
|
|
|
def validate_pull_request_labels(self, pull_request: Dict[str, Any]) -> None:
|
|
"""Validate that pull requests have required labels."""
|
|
pr_labels = {label["name"] for label in pull_request["labels"]}
|
|
required_labels = {
|
|
name for name, config in self.label_config.items() if config.required
|
|
}
|
|
|
|
missing_required = required_labels - pr_labels
|
|
if missing_required:
|
|
self.log_error(
|
|
f"Pull request #{pull_request['number']} missing required labels: "
|
|
f"{', '.join(missing_required)}"
|
|
)
|
|
|
|
def determine_release_type(self, pulls: List[Dict[str, Any]]) -> str:
|
|
"""Determine if this should be a major, minor, or patch release based on PR labels."""
|
|
for pr in pulls:
|
|
self.validate_pull_request_labels(pr)
|
|
|
|
has_breaking = any(
|
|
"breaking-change" in [l["name"] for l in pr["labels"]] for pr in pulls
|
|
)
|
|
has_feature = any(
|
|
"feature" in [l["name"] for l in pr["labels"]] for pr in pulls
|
|
)
|
|
|
|
if has_breaking:
|
|
return "major"
|
|
elif has_feature:
|
|
return "minor"
|
|
return "patch"
|
|
|
|
def generate_next_tag(
|
|
self, latest_tag: Optional[str], pulls: List[Dict[str, Any]]
|
|
) -> Optional[str]:
|
|
"""Generate the next version tag based on semantic versioning."""
|
|
if not latest_tag:
|
|
return "v0.1.0"
|
|
|
|
try:
|
|
version = semver.VersionInfo.parse(latest_tag.replace("v", ""))
|
|
release_type = self.determine_release_type(pulls)
|
|
|
|
if release_type == "major":
|
|
next_version = version.bump_major()
|
|
elif release_type == "minor":
|
|
next_version = version.bump_minor()
|
|
else:
|
|
next_version = version.bump_patch()
|
|
|
|
return f"v{next_version}"
|
|
except ValueError:
|
|
self.log_error(f"Invalid semantic version: {latest_tag}")
|
|
return None
|
|
|
|
def get_closed_pull_requests_since_last_release(
|
|
self, latest_tag: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""Fetch all closed pull requests since the last release."""
|
|
if not latest_tag:
|
|
print(
|
|
"No previous release or draft found. Fetching all closed pull requests."
|
|
)
|
|
|
|
# get commit sha from latest release tag
|
|
response = self.session.get(
|
|
f"{self.api_url}/repos/{self.repo}/commits/{latest_tag}/status"
|
|
)
|
|
if response.status_code != 200:
|
|
self.log_error("Failed to get latest tag commit sha", response)
|
|
return []
|
|
latest_tag_sha = response.json().get("sha")
|
|
|
|
# get commits between latest release tag and current main state
|
|
response = self.session.get(
|
|
f"{self.api_url}/repos/{self.repo}/compare/{latest_tag_sha}..main"
|
|
)
|
|
if response.status_code != 200:
|
|
self.log_error("Failed to get commits between latest tag and main", response)
|
|
return []
|
|
commits_since_last_release = response.json().get("commits")
|
|
|
|
# get pull request of each commit
|
|
pulls_since_last_release = []
|
|
for commit in commits_since_last_release:
|
|
response = self.session.get(
|
|
f"{self.api_url}/repos/{self.repo}/commits/{commit.get('sha')}/pull"
|
|
)
|
|
if response.status_code != 200:
|
|
print(
|
|
"Failed to get pull request of commit", commit.get("sha"), response
|
|
)
|
|
continue
|
|
pulls_since_last_release.append(response.json())
|
|
|
|
return pulls_since_last_release
|
|
|
|
def generate_release_notes(self, pulls: List[Dict[str, Any]]) -> str:
|
|
"""Generate formatted release notes from pull requests."""
|
|
if not pulls:
|
|
return "No pull requests found for this release."
|
|
|
|
label_groups = {label: [] for label in self.label_config}
|
|
|
|
# Track latest dependency updates
|
|
dependency_updates = {}
|
|
|
|
def extract_dependency_name(title: str) -> Optional[str]:
|
|
"""Extract dependency name from conventional commit title."""
|
|
if not title.startswith(("chore(deps)", "fix(deps)")):
|
|
return None
|
|
|
|
# Match package names like "@scope/package" or "package"
|
|
match = re.search(
|
|
r"update (?:dependency )?(@?[a-zA-Z0-9-]+(?:/[a-zA-Z0-9-]+)?)",
|
|
title.lower(),
|
|
)
|
|
return match.group(1) if match else None
|
|
|
|
def is_dependency_update(title: str) -> bool:
|
|
"""Check if PR is a dependency update."""
|
|
return title.startswith(("chore(deps)", "fix(deps)"))
|
|
|
|
# First pass to collect latest dependency updates
|
|
for pr in pulls:
|
|
title = pr["title"]
|
|
if not is_dependency_update(title):
|
|
continue
|
|
dep_name = extract_dependency_name(title)
|
|
if not dep_name:
|
|
continue
|
|
if dep_name not in dependency_updates:
|
|
dependency_updates[dep_name] = pr
|
|
# Only keep the most recent update for each dependency
|
|
time_stamp_format = "%Y-%m-%dT%H:%M:%S%z"
|
|
pr_merged_at = datetime.strptime(pr["merged_at"], time_stamp_format)
|
|
dep_merged_at = datetime.strptime(
|
|
dependency_updates[dep_name]["merged_at"], time_stamp_format
|
|
)
|
|
if pr_merged_at > dep_merged_at:
|
|
dependency_updates[dep_name] = pr
|
|
|
|
# Reset and process all PRs
|
|
label_groups["other"] = []
|
|
|
|
for pr in pulls:
|
|
title = pr["title"]
|
|
author = pr["user"]["login"]
|
|
number = pr["number"]
|
|
pr_labels = {label["name"] for label in pr["labels"]}
|
|
|
|
# Skip dependency updates that aren't the latest for their package
|
|
if is_dependency_update(title):
|
|
dep_name = extract_dependency_name(title)
|
|
if dep_name and dependency_updates[dep_name]["number"] != pr["number"]:
|
|
continue
|
|
|
|
# Find the highest priority label
|
|
assigned_label = "other"
|
|
key_prio_tuples = [
|
|
([key] + config.aliases, config.priority)
|
|
for key, config in self.label_config.items()
|
|
]
|
|
key_prio_tuples.sort(key=lambda x: x[1])
|
|
for keys, prio in key_prio_tuples:
|
|
if set(keys) & pr_labels:
|
|
assigned_label = keys[0]
|
|
break
|
|
|
|
# Extract scope from PR title if it follows conventional commits
|
|
scope = ""
|
|
if "(" in title and "):" in title:
|
|
scope = title[title.index("(") + 1 : title.index(")")]
|
|
title = title[title.index("):") + 2 :].strip()
|
|
|
|
# Add reference to linked issues if any
|
|
linked_issues: List[str] = []
|
|
if "body" in pr and pr["body"]:
|
|
for keyword in ["Closes", "Fixes", "Resolves"]:
|
|
if f"{keyword} #" in pr["body"]:
|
|
issues = re.findall(f"{keyword} #(\\d+)", pr["body"])
|
|
linked_issues.extend(issues)
|
|
|
|
# Format the PR entry
|
|
entry = f"- {title}"
|
|
if scope:
|
|
entry = f"- **{scope}:** {title}"
|
|
entry += f" (#{number})"
|
|
if linked_issues:
|
|
entry += f" (Fixes: #{', #'.join(linked_issues)})"
|
|
entry += f" by @{author}"
|
|
|
|
label_groups[assigned_label].append(entry)
|
|
|
|
# Build release notes with friendly titles and emojis
|
|
release_notes = ["# Release Notes\n"]
|
|
|
|
# Add summary section
|
|
total_prs = sum(len(prs) for prs in label_groups.values())
|
|
total_contributors = len(
|
|
{pr.split("by @")[1] for group in label_groups.values() for pr in group}
|
|
)
|
|
release_notes.extend(
|
|
[
|
|
f"📊 **Release Statistics**",
|
|
f"- Total Pull Requests: {total_prs}",
|
|
f"- Contributors: {total_contributors}",
|
|
"",
|
|
]
|
|
)
|
|
|
|
# Add each section in priority order
|
|
for label, config in sorted(
|
|
self.label_config.items(), key=lambda x: x[1].priority
|
|
):
|
|
if label_groups[label]:
|
|
release_notes.append(f"## {config.title}")
|
|
release_notes.extend(sorted(label_groups[label]))
|
|
release_notes.append("")
|
|
|
|
# Add other changes last
|
|
if label_groups["other"]:
|
|
release_notes.append("## 📋 Other Changes")
|
|
release_notes.extend(sorted(label_groups["other"]))
|
|
release_notes.append("")
|
|
|
|
return "\n".join(release_notes).strip()
|
|
|
|
def add_version_comparison(
|
|
self, release_notes: str, previous_tag: str, new_tag: str
|
|
) -> str:
|
|
url_base = self.api_url.replace("/api/v1", "")
|
|
"""Add version comparison section to release notes."""
|
|
comparison_section = f"""\n
|
|
## 📝 Version Compare
|
|
Compare with previous version: [{previous_tag}...{new_tag}]({url_base}/{self.repo}/compare/{previous_tag}...{new_tag})
|
|
"""
|
|
return release_notes + comparison_section
|
|
|
|
def get_existing_draft_release(self) -> Optional[Dict[str, Any]]:
|
|
"""Check if a draft release already exists."""
|
|
response = self.session.get(f"{self.api_url}/repos/{self.repo}/releases")
|
|
if response.status_code != 200:
|
|
self.log_error("Failed to fetch releases", response)
|
|
return None
|
|
draft_releases = [
|
|
release for release in response.json() if release.get("draft", False)
|
|
]
|
|
if draft_releases:
|
|
release = draft_releases[0]
|
|
print(f"Draft release found: {release['tag_name']}")
|
|
return release
|
|
|
|
def create_or_update_release(
|
|
self, new_tag: str, release_notes: str, existing_draft: Optional[Dict[str, Any]]
|
|
) -> None:
|
|
"""Create a new draft release or update existing one."""
|
|
tag = existing_draft.get("tag_name") if existing_draft else new_tag
|
|
release_data = {
|
|
"body": release_notes,
|
|
"hide_archive_links": True,
|
|
"draft": True,
|
|
"tag_name": tag,
|
|
"name": f"Release {tag}",
|
|
}
|
|
release_id = f"/{existing_draft['id']}" if existing_draft else ""
|
|
method = "PATCH" if existing_draft else "POST"
|
|
url = f"{self.api_url}/repos/{self.repo}/releases{release_id}"
|
|
response = self.session.request(method, url, json=release_data)
|
|
if response.status_code == 201 or response.status_code == 200:
|
|
print(
|
|
f"Draft release {tag} created/updated successfully, using '{method}' method!"
|
|
)
|
|
return response.json()['html_url']
|
|
else:
|
|
self.log_error(f"Failed to create/update draft release {tag}", response)
|
|
|
|
def main(self) -> None:
|
|
"""Main execution flow for release management."""
|
|
try:
|
|
# Get the latest release information
|
|
latest_tag = self.get_latest_release()
|
|
|
|
# Get closed PRs since last release
|
|
pulls = self.get_closed_pull_requests_since_last_release(latest_tag)
|
|
|
|
if not pulls:
|
|
print("No new pull requests found. Skipping release creation.")
|
|
return
|
|
|
|
# Generate next version tag
|
|
new_tag = self.generate_next_tag(latest_tag, pulls)
|
|
if not new_tag:
|
|
print("Failed to generate next version tag.")
|
|
return
|
|
|
|
# Generate release notes
|
|
release_notes = self.generate_release_notes(pulls)
|
|
|
|
# Add version comparison if there's a previous release
|
|
existing_draft = self.get_existing_draft_release()
|
|
comparison_tag = new_tag
|
|
if existing_draft:
|
|
comparison_tag = existing_draft["tag_name"]
|
|
|
|
if latest_tag:
|
|
release_notes = self.add_version_comparison(
|
|
release_notes, latest_tag, comparison_tag
|
|
)
|
|
url = self.create_or_update_release(comparison_tag, release_notes, existing_draft)
|
|
print(f"::set-output name=release-url::{url}")
|
|
|
|
except Exception as e:
|
|
print(f"An error occurred: {str(e)}")
|
|
raise
|
|
|
|
|
|
if __name__ == "__main__":
|
|
manager = ReleaseManager()
|
|
manager.main()
|