From a8f489b2cb0791e122fc81d4d1755ded381ccb74 Mon Sep 17 00:00:00 2001 From: mdbk Date: Tue, 25 Feb 2025 17:48:19 -0400 Subject: [PATCH] chore: put ci files in right spot --- .github/workflows/run_tests.yml | 60 +++++ code_style_checker.py | 366 ++++++++++++++++++++++++++ github_ci_plugin.py | 445 ++++++++++++++++++++++++++++++++ requirements.txt | 6 + 4 files changed, 877 insertions(+) create mode 100644 .github/workflows/run_tests.yml create mode 100644 code_style_checker.py create mode 100755 github_ci_plugin.py create mode 100644 requirements.txt diff --git a/.github/workflows/run_tests.yml b/.github/workflows/run_tests.yml new file mode 100644 index 0000000..69d4c24 --- /dev/null +++ b/.github/workflows/run_tests.yml @@ -0,0 +1,60 @@ +name: CI Checks on PR + +on: + pull_request: + types: [opened, synchronize, reopened] + +jobs: + test: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install requests + # Add any other dependencies your project needs + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + + - name: Run CI Plugin + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + python github_ci_plugin.py \ + --token "$GITHUB_TOKEN" \ + --repo "${{ github.repository }}" \ + --pr "${{ github.event.pull_request.number }}" \ + --test-command "python -m unittest discover" + + code-style: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install requests flake8 black isort ruff + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + + - name: Run Code Style Checker + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + python code_style_checker.py \ + --token "$GITHUB_TOKEN" \ + --repo "${{ github.repository }}" \ + --pr "${{ github.event.pull_request.number }}" diff --git a/code_style_checker.py b/code_style_checker.py new file mode 100644 index 0000000..c7dcbf8 --- /dev/null +++ b/code_style_checker.py @@ -0,0 +1,366 @@ +#!/usr/bin/env python3 +""" +GitHub CI Plugin for enforcing code style standards. +This script checks Python code against style guidelines using tools like flake8, black, and isort. +""" + +import os +import sys +import argparse +import requests +import json +import logging +import subprocess +from pathlib import Path +from typing import Dict, List, Optional, Any, Tuple + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger('code_style_checker') + +class CodeStyleChecker: + """ + A plugin to check code style on GitHub pull requests and report results. + """ + + def __init__(self, token: str, repo: str, pr_number: Optional[int] = None): + """ + Initialize the Code Style Checker. + + Args: + token: GitHub API token with appropriate permissions + repo: Repository in format 'owner/repo' + pr_number: Pull request number (optional) + """ + self.token = token + self.repo = repo + self.pr_number = pr_number + self.api_url = f"https://api.github.com/repos/{repo}" + self.headers = { + 'Authorization': f'token {token}', + 'Accept': 'application/vnd.github.v3+json' + } + + def get_pull_request(self, pr_number: int) -> Dict[str, Any]: + """Get details for a specific pull request.""" + url = f"{self.api_url}/pulls/{pr_number}" + response = requests.get(url, headers=self.headers) + response.raise_for_status() + return response.json() + + def get_changed_files(self, pr_number: int) -> List[str]: + """Get list of files changed in the pull request.""" + url = f"{self.api_url}/pulls/{pr_number}/files" + response = requests.get(url, headers=self.headers) + response.raise_for_status() + + files = [] + for file_data in response.json(): + filename = file_data["filename"] + if filename.endswith(".py"): # Only check Python files + files.append(filename) + + return files + + def create_check_run(self, head_sha: str, name: str) -> int: + """Create a new check run for the given commit.""" + url = f"{self.api_url}/check-runs" + data = { + "name": name, + "head_sha": head_sha, + "status": "in_progress", + "started_at": self._get_current_time() + } + response = requests.post(url, headers=self.headers, json=data) + response.raise_for_status() + return response.json()["id"] + + def update_check_run(self, check_run_id: int, conclusion: str, output: Dict[str, Any]) -> None: + """Update an existing check run with results.""" + url = f"{self.api_url}/check-runs/{check_run_id}" + data = { + "status": "completed", + "conclusion": conclusion, + "completed_at": self._get_current_time(), + "output": output + } + response = requests.patch(url, headers=self.headers, json=data) + response.raise_for_status() + + def add_comment(self, pr_number: int, body: str) -> None: + """Add a comment to a pull request.""" + url = f"{self.api_url}/issues/{pr_number}/comments" + data = {"body": body} + response = requests.post(url, headers=self.headers, json=data) + response.raise_for_status() + + def run_flake8(self, files: List[str]) -> Dict[str, Any]: + """Run flake8 on the specified files.""" + if not files: + return {"success": True, "output": "No Python files to check", "violations": []} + + try: + cmd = ["flake8"] + files + result = subprocess.run(cmd, capture_output=True, text=True) + + violations = [] + if result.stdout: + for line in result.stdout.strip().split("\n"): + if line: + parts = line.split(":", 3) + if len(parts) >= 4: + violations.append({ + "file": parts[0], + "line": int(parts[1]), + "column": int(parts[2]), + "message": parts[3].strip() + }) + + return { + "success": result.returncode == 0, + "output": result.stdout, + "violations": violations + } + except Exception as e: + logger.error(f"Error running flake8: {str(e)}") + return { + "success": False, + "output": str(e), + "violations": [] + } + + def run_ruff(self, files: List[str]) -> Dict[str, Any]: + """Run ruff on the specified files.""" + if not files: + return {"success": True, "output": "No Python files to check", "violations": []} + + try: + cmd = ["ruff", "check"] + files + result = subprocess.run(cmd, capture_output=True, text=True) + + violations = [] + if result.stdout: + for line in result.stdout.strip().split("\n"): + if line and ":" in line: + # Ruff output format: file.py:line:column: error code message + parts = line.split(":", 3) + if len(parts) >= 4: + file_path = parts[0] + line_num = int(parts[1]) + col_num = int(parts[2]) + message = parts[3].strip() + violations.append({ + "file": file_path, + "line": line_num, + "column": col_num, + "message": message + }) + + return { + "success": result.returncode == 0, + "output": result.stdout, + "violations": violations + } + except Exception as e: + logger.error(f"Error running ruff: {str(e)}") + return { + "success": False, + "output": str(e), + "violations": [] + } + + def run_black(self, files: List[str]) -> Dict[str, Any]: + """Run black in check mode on the specified files.""" + if not files: + return {"success": True, "output": "No Python files to check", "violations": []} + + try: + cmd = ["black", "--check"] + files + result = subprocess.run(cmd, capture_output=True, text=True) + + violations = [] + if result.stdout: + for line in result.stdout.strip().split("\n"): + if "would reformat" in line: + file_path = line.split("would reformat ", 1)[1].strip() + violations.append({ + "file": file_path, + "message": "File needs reformatting with black" + }) + + return { + "success": result.returncode == 0, + "output": result.stdout + "\n" + result.stderr, + "violations": violations + } + except Exception as e: + logger.error(f"Error running black: {str(e)}") + return { + "success": False, + "output": str(e), + "violations": [] + } + + def run_isort(self, files: List[str]) -> Dict[str, Any]: + """Run isort in check mode on the specified files.""" + if not files: + return {"success": True, "output": "No Python files to check", "violations": []} + + try: + cmd = ["isort", "--check-only"] + files + result = subprocess.run(cmd, capture_output=True, text=True) + + violations = [] + if result.stdout: + for line in result.stdout.strip().split("\n"): + if "ERROR" in line and "would be" in line: + parts = line.split("ERROR", 1)[1].strip() + file_path = parts.split(" ", 1)[0].strip() + violations.append({ + "file": file_path, + "message": "Imports need sorting with isort" + }) + + return { + "success": result.returncode == 0, + "output": result.stdout + "\n" + result.stderr, + "violations": violations + } + except Exception as e: + logger.error(f"Error running isort: {str(e)}") + return { + "success": False, + "output": str(e), + "violations": [] + } + + def check_code_style(self, pr_number: int) -> Dict[str, Any]: + """ + Check code style for files in a pull request. + + Args: + pr_number: Pull request number + + Returns: + Dictionary with check results + """ + pr_data = self.get_pull_request(pr_number) + head_sha = pr_data["head"]["sha"] + + logger.info(f"Checking code style for PR #{pr_number}, commit {head_sha}") + + # Create a check run + check_run_id = self.create_check_run(head_sha, "Code Style Check") + + # Get changed files + files = self.get_changed_files(pr_number) + logger.info(f"Found {len(files)} Python files to check") + + # Run style checks + flake8_results = self.run_flake8(files) + black_results = self.run_black(files) + isort_results = self.run_isort(files) + ruff_results = self.run_ruff(files) + + # Combine results + all_violations = ( + flake8_results["violations"] + + black_results["violations"] + + isort_results["violations"] + + ruff_results["violations"] + ) + + success = ( + flake8_results["success"] and + black_results["success"] and + isort_results["success"] and + ruff_results["success"] + ) + + # Prepare output for GitHub + conclusion = "success" if success else "failure" + + summary = f"Code Style Check: {'Passed' if success else 'Failed'}\n\n" + summary += f"- Flake8: {'Passed' if flake8_results['success'] else 'Failed'}\n" + summary += f"- Black: {'Passed' if black_results['success'] else 'Failed'}\n" + summary += f"- isort: {'Passed' if isort_results['success'] else 'Failed'}\n" + summary += f"- Ruff: {'Passed' if ruff_results['success'] else 'Failed'}\n\n" + summary += f"Total violations: {len(all_violations)}" + + # Prepare detailed output + details = "## Code Style Violations\n\n" + + if all_violations: + for violation in all_violations: + if "line" in violation and "column" in violation: + details += f"- **{violation['file']}:{violation['line']}:{violation['column']}**: {violation['message']}\n" + else: + details += f"- **{violation['file']}**: {violation['message']}\n" + else: + details += "No violations found! 🎉\n" + + # Add tool outputs + details += "\n
\nFlake8 Output\n\n```\n" + details += flake8_results["output"] or "No output" + details += "\n```\n
\n\n" + + details += "
\nBlack Output\n\n```\n" + details += black_results["output"] or "No output" + details += "\n```\n
\n\n" + + details += "
\nisort Output\n\n```\n" + details += isort_results["output"] or "No output" + details += "\n```\n
\n\n" + + details += "
\nRuff Output\n\n```\n" + details += ruff_results["output"] or "No output" + details += "\n```\n
\n" + + output = { + "title": "Code Style Check Results", + "summary": summary, + "text": details + } + + # Update check run with results + self.update_check_run(check_run_id, conclusion, output) + + # Add a comment to the PR + self.add_comment(pr_number, details) + + return { + "success": success, + "violations": all_violations, + "flake8": flake8_results, + "black": black_results, + "isort": isort_results, + "ruff": ruff_results + } + + def _get_current_time(self) -> str: + """Get current time in ISO 8601 format.""" + from datetime import datetime + return datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") + + +def main(): + """Main entry point for the Code Style Checker.""" + parser = argparse.ArgumentParser(description='GitHub CI Plugin for checking code style') + parser.add_argument('--token', required=True, help='GitHub API token') + parser.add_argument('--repo', required=True, help='Repository in format owner/repo') + parser.add_argument('--pr', required=True, type=int, help='Pull request number to process') + + args = parser.parse_args() + + checker = CodeStyleChecker(args.token, args.repo, args.pr) + results = checker.check_code_style(args.pr) + + # Exit with appropriate status code + sys.exit(0 if results["success"] else 1) + + +if __name__ == "__main__": + main() diff --git a/github_ci_plugin.py b/github_ci_plugin.py new file mode 100755 index 0000000..7fe17d4 --- /dev/null +++ b/github_ci_plugin.py @@ -0,0 +1,445 @@ +#!/usr/bin/env python3 +""" +GitHub CI Plugin for running tests on every pull request. +This script interacts with GitHub's API to manage test runs on PRs. +""" + +import os +import sys +import argparse +import requests +import json +import logging +import datetime +import tempfile +import re +from pathlib import Path +from typing import Dict, List, Optional, Any, Tuple + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger('github_ci_plugin') + +class GitHubCIPlugin: + """ + A plugin to run tests on GitHub pull requests and report results. + """ + + def __init__(self, token: str, repo: str, pr_number: Optional[int] = None): + """ + Initialize the GitHub CI Plugin. + + Args: + token: GitHub API token with appropriate permissions + repo: Repository in format 'owner/repo' + pr_number: Pull request number (optional) + """ + self.token = token + self.repo = repo + self.pr_number = pr_number + self.api_url = f"https://api.github.com/repos/{repo}" + self.headers = { + 'Authorization': f'token {token}', + 'Accept': 'application/vnd.github.v3+json' + } + + def get_pull_request(self, pr_number: int) -> Dict[str, Any]: + """Get details for a specific pull request.""" + url = f"{self.api_url}/pulls/{pr_number}" + response = requests.get(url, headers=self.headers) + response.raise_for_status() + return response.json() + + def create_check_run(self, head_sha: str, name: str) -> int: + """Create a new check run for the given commit.""" + url = f"{self.api_url}/check-runs" + data = { + "name": name, + "head_sha": head_sha, + "status": "in_progress", + "started_at": self._get_current_time() + } + response = requests.post(url, headers=self.headers, json=data) + response.raise_for_status() + return response.json()["id"] + + def update_check_run(self, check_run_id: int, conclusion: str, output: Dict[str, Any]) -> None: + """Update an existing check run with results.""" + url = f"{self.api_url}/check-runs/{check_run_id}" + data = { + "status": "completed", + "conclusion": conclusion, + "completed_at": self._get_current_time(), + "output": output + } + response = requests.patch(url, headers=self.headers, json=data) + response.raise_for_status() + + def add_comment(self, pr_number: int, body: str) -> None: + """Add a comment to a pull request.""" + url = f"{self.api_url}/issues/{pr_number}/comments" + data = {"body": body} + response = requests.post(url, headers=self.headers, json=data) + response.raise_for_status() + + def run_tests(self, test_command: str) -> Dict[str, Any]: + """ + Run the specified test command and return results. + + Args: + test_command: Command to run tests + + Returns: + Dictionary with test results including success status and output + """ + import subprocess + import re + + logger.info(f"Running test command: {test_command}") + start_time = datetime.datetime.now() + + try: + result = subprocess.run( + test_command, + shell=True, + capture_output=True, + text=True + ) + end_time = datetime.datetime.now() + duration = (end_time - start_time).total_seconds() + success = result.returncode == 0 + + # Parse test output to extract more detailed information + test_details = self._parse_test_output(result.stdout, result.stderr) + + return { + "success": success, + "exit_code": result.returncode, + "stdout": result.stdout, + "stderr": result.stderr, + "duration": duration, + "start_time": start_time.isoformat(), + "end_time": end_time.isoformat(), + "test_details": test_details + } + except Exception as e: + logger.error(f"Error running tests: {str(e)}") + end_time = datetime.datetime.now() + duration = (end_time - start_time).total_seconds() + + return { + "success": False, + "exit_code": -1, + "stdout": "", + "stderr": str(e), + "duration": duration, + "start_time": start_time.isoformat(), + "end_time": end_time.isoformat(), + "test_details": { + "total": 0, + "passed": 0, + "failed": 0, + "skipped": 0, + "errors": 1, + "failures": [] + } + } + + def _parse_test_output(self, stdout: str, stderr: str) -> Dict[str, Any]: + """ + Parse test output to extract structured information. + + This method attempts to parse unittest, pytest, or other common test formats. + It can be extended to support more test frameworks. + + Returns: + Dictionary with test statistics and details + """ + # Default values + test_details = { + "total": 0, + "passed": 0, + "failed": 0, + "skipped": 0, + "errors": 0, + "failures": [] + } + + combined_output = stdout + "\n" + stderr + + # Try to parse unittest output + unittest_pattern = r"Ran (\d+) tests? in .*\n\n(OK|FAILED)" + unittest_match = re.search(unittest_pattern, combined_output) + if unittest_match: + test_details["total"] = int(unittest_match.group(1)) + if unittest_match.group(2) == "OK": + test_details["passed"] = test_details["total"] + else: + # Try to extract failures and errors + failures_pattern = r"failures=(\d+)" + errors_pattern = r"errors=(\d+)" + + failures_match = re.search(failures_pattern, combined_output) + if failures_match: + test_details["failed"] = int(failures_match.group(1)) + + errors_match = re.search(errors_pattern, combined_output) + if errors_match: + test_details["errors"] = int(errors_match.group(1)) + + test_details["passed"] = test_details["total"] - test_details["failed"] - test_details["errors"] + + # Try to parse pytest output + pytest_pattern = r"=+ (\d+) passed, (\d+) skipped, (\d+) failed, (\d+) error" + pytest_match = re.search(pytest_pattern, combined_output) + if pytest_match: + test_details["passed"] = int(pytest_match.group(1)) + test_details["skipped"] = int(pytest_match.group(2)) + test_details["failed"] = int(pytest_match.group(3)) + test_details["errors"] = int(pytest_match.group(4)) + test_details["total"] = test_details["passed"] + test_details["skipped"] + test_details["failed"] + test_details["errors"] + + # Extract failure details + failure_blocks = re.finditer(r"(ERROR|FAIL): (test\w+).*?\n(.*?)(?=\n\n|$)", combined_output, re.DOTALL) + for match in failure_blocks: + test_details["failures"].append({ + "type": match.group(1), + "test_name": match.group(2), + "details": match.group(3).strip() + }) + + return test_details + + def generate_html_report(self, test_results: Dict[str, Any], pr_data: Dict[str, Any]) -> Tuple[str, str]: + """ + Generate an HTML report from test results. + + Args: + test_results: The test results dictionary + pr_data: Pull request data + + Returns: + Tuple of (file_path, html_content) + """ + # Create a temporary directory for the report + report_dir = Path(tempfile.mkdtemp(prefix="github_ci_report_")) + report_file = report_dir / "test_report.html" + + # Format test details + test_details = test_results.get("test_details", {}) + total = test_details.get("total", 0) + passed = test_details.get("passed", 0) + failed = test_details.get("failed", 0) + skipped = test_details.get("skipped", 0) + errors = test_details.get("errors", 0) + + # Calculate pass rate + pass_rate = 0 + if total > 0: + pass_rate = (passed / total) * 100 + + # Generate HTML content + html_content = f""" + + + + + Test Report for PR #{pr_data["number"]} + + + +
+
+

Test Report

+

Pull Request: #{pr_data["number"]} - {pr_data["title"]}

+

Commit: {pr_data["head"]["sha"]}

+

Run at: {test_results.get("start_time", "Unknown")}

+

Duration: {test_results.get("duration", 0):.2f} seconds

+
+ +
+
+
+ +
+
+

Total

+

{total}

+
+
+

Passed

+

{passed}

+
+
+

Failed

+

{failed + errors}

+
+
+

Skipped

+

{skipped}

+
+
+ +
+

Failures and Errors

+ """ + + # Add failure details + failures = test_details.get("failures", []) + if failures: + for failure in failures: + html_content += f""" +
+

{failure.get("type", "Error")}: {failure.get("test_name", "Unknown Test")}

+
{failure.get("details", "No details available")}
+
+ """ + else: + html_content += "

No failures or errors detected.

" + + html_content += """ +
+ +
+

Test Output

+
+

Standard Output

+
""" + test_results.get("stdout", "No output") + """
+
+ +
+

Standard Error

+
""" + test_results.get("stderr", "No errors") + """
+
+
+
+ + + """ + + # Write the HTML content to the file + report_file.write_text(html_content) + logger.info(f"Generated HTML report at {report_file}") + + return str(report_file), html_content + + def process_pull_request(self, pr_number: int, test_command: str) -> None: + """Process a single pull request by running tests and reporting results.""" + pr_data = self.get_pull_request(pr_number) + head_sha = pr_data["head"]["sha"] + + logger.info(f"Processing PR #{pr_number}, commit {head_sha}") + + # Create a check run + check_run_id = self.create_check_run(head_sha, "Test Suite") + + # Run tests + test_results = self.run_tests(test_command) + + # Generate HTML report + report_path, html_content = self.generate_html_report(test_results, pr_data) + + # Prepare test summary + test_details = test_results.get("test_details", {}) + total = test_details.get("total", 0) + passed = test_details.get("passed", 0) + failed = test_details.get("failed", 0) + errors = test_details.get("errors", 0) + skipped = test_details.get("skipped", 0) + + # Prepare output for GitHub + conclusion = "success" if test_results["success"] else "failure" + summary = f"Tests: {passed} passed, {failed} failed, {errors} errors, {skipped} skipped" + if total > 0: + summary += f" (Pass rate: {(passed/total)*100:.1f}%)" + + summary += f"\nRun duration: {test_results.get('duration', 0):.2f} seconds" + + output = { + "title": "Test Results", + "summary": summary, + "text": f"```\n{test_results['stdout']}\n{test_results['stderr']}\n```" + } + + # Update check run with results + self.update_check_run(check_run_id, conclusion, output) + + # Add a comment to the PR with more detailed information + comment = f"## Test Results\n\n" + comment += f"**Status**: {'✅ Passed' if test_results['success'] else '❌ Failed'}\n\n" + comment += f"**Summary**:\n" + comment += f"- Total: {total}\n" + comment += f"- Passed: {passed}\n" + comment += f"- Failed: {failed}\n" + comment += f"- Errors: {errors}\n" + comment += f"- Skipped: {skipped}\n" + comment += f"- Duration: {test_results.get('duration', 0):.2f} seconds\n\n" + + # Add failure details if any + failures = test_details.get("failures", []) + if failures: + comment += "**Failures**:\n\n" + for failure in failures: + comment += f"- **{failure.get('type', 'Error')}**: {failure.get('test_name', 'Unknown Test')}\n" + comment += f" ```\n {failure.get('details', 'No details available')}\n ```\n\n" + + # Add link to the report if we were to upload it somewhere + comment += f"**Detailed report**: A full HTML report has been generated.\n\n" + + # Add a collapsible section with the full output + comment += "
\n" + comment += "Click to see full test output\n\n" + comment += f"```\n{test_results['stdout']}\n{test_results['stderr']}\n```\n" + comment += "
\n" + + self.add_comment(pr_number, comment) + + # Log the report location + logger.info(f"Test report generated at: {report_path}") + + # Method removed as we'll only process specific PRs + + def _get_current_time(self) -> str: + """Get current time in ISO 8601 format.""" + from datetime import datetime + return datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") + + +def main(): + """Main entry point for the GitHub CI Plugin.""" + parser = argparse.ArgumentParser(description='GitHub CI Plugin for running tests on PRs') + parser.add_argument('--token', required=True, help='GitHub API token') + parser.add_argument('--repo', required=True, help='Repository in format owner/repo') + parser.add_argument('--pr', required=True, type=int, help='Pull request number to process') + parser.add_argument('--test-command', required=True, help='Command to run tests') + parser.add_argument('--report-dir', help='Directory to save the HTML report (default: auto-generated temp dir)') + + args = parser.parse_args() + + plugin = GitHubCIPlugin(args.token, args.repo, args.pr) + logger.info(f"Processing pull request #{args.pr}") + plugin.process_pull_request(args.pr, args.test_command) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..fa10d87 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +requests>=2.28.0 +jinja2>=3.0.0 +flake8>=6.0.0 +black>=23.0.0 +isort>=5.12.0 +ruff>=0.1.5