Files
2025-10-19 09:52:13 +03:00

271 lines
9.3 KiB
Python

"""CLI interface for GitHub Actions & Workflows Auditor."""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any
import typer
from rich.console import Console
from rich.logging import RichHandler
from ghaw_auditor import __version__
from ghaw_auditor.analyzer import Analyzer
from ghaw_auditor.differ import Differ
from ghaw_auditor.factory import AuditServiceFactory
from ghaw_auditor.models import Policy
from ghaw_auditor.parser import Parser
from ghaw_auditor.policy import PolicyValidator
from ghaw_auditor.renderer import Renderer
from ghaw_auditor.scanner import Scanner
from ghaw_auditor.services import DiffService, ScanResult
app = typer.Typer(
name="ghaw-auditor",
help="GitHub Actions & Workflows Auditor - analyze and audit GitHub Actions ecosystem",
)
console = Console()
def setup_logging(verbose: bool = False, quiet: bool = False, log_json: bool = False) -> None:
"""Configure logging."""
if quiet:
level = logging.ERROR
elif verbose:
level = logging.DEBUG
else:
level = logging.INFO
if log_json:
logging.basicConfig(level=level, format="%(message)s")
else:
logging.basicConfig(
level=level, format="%(message)s", handlers=[RichHandler(console=console, rich_tracebacks=True)]
)
def _render_reports(
renderer: Renderer,
result: ScanResult,
format_type: str,
) -> None:
"""Render reports based on format type."""
console.print("[cyan]Generating reports...[/cyan]")
if format_type in ("json", "all"):
renderer.render_json(result.workflows, result.actions, result.violations)
if format_type in ("md", "all"):
renderer.render_markdown(result.workflows, result.actions, result.violations, result.analysis)
def _handle_diff_mode(
result: ScanResult,
baseline: Path,
output: Path,
) -> None:
"""Handle diff mode comparison."""
console.print("[cyan]Running diff...[/cyan]")
diff_service = DiffService(Differ(baseline))
try:
workflow_diffs, action_diffs = diff_service.compare(result.workflows, result.actions)
diff_dir = output / "diff"
diff_dir.mkdir(exist_ok=True)
diff_service.differ.render_diff_markdown(workflow_diffs, action_diffs, diff_dir / "report.diff.md")
console.print(f"[green]Diff report written to {diff_dir / 'report.diff.md'}[/green]")
except FileNotFoundError as e:
logger = logging.getLogger(__name__)
logger.error(f"Baseline not found: {e}")
def _write_baseline(result: ScanResult, baseline_path: Path, commit_sha: str | None = None) -> None:
"""Write baseline snapshot."""
differ = Differ(baseline_path)
differ.save_baseline(result.workflows, result.actions, commit_sha)
console.print(f"[green]Baseline saved to {baseline_path}[/green]")
def _enforce_policy(violations: list[dict[str, Any]]) -> None:
"""Enforce policy and exit if errors found."""
error_violations = [v for v in violations if v.get("severity") == "error"]
if error_violations:
console.print(f"[red]Policy enforcement failed: {len(error_violations)} errors[/red]")
raise typer.Exit(1)
@app.command()
def scan(
repo: str = typer.Option(".", help="Repository path or URL"),
token: str | None = typer.Option(None, help="GitHub token", envvar="GITHUB_TOKEN"),
output: Path = typer.Option(".ghaw-auditor", help="Output directory"),
format_type: str = typer.Option("all", help="Output format: json, md, or all"),
cache_dir: Path | None = typer.Option(None, help="Cache directory"),
offline: bool = typer.Option(False, help="Offline mode (no API calls)"),
concurrency: int = typer.Option(4, help="Concurrency for API calls"),
enforce: bool = typer.Option(False, help="Enforce policy (exit non-zero on violations)"),
policy_file: Path | None = typer.Option(None, help="Policy file path"),
diff: bool = typer.Option(False, help="Run in diff mode"),
baseline: Path | None = typer.Option(None, help="Baseline path for diff"),
write_baseline: bool = typer.Option(False, help="Write baseline after scan"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Verbose output"),
quiet: bool = typer.Option(False, "--quiet", "-q", help="Quiet output"),
log_json: bool = typer.Option(False, help="JSON logging"),
) -> None:
"""Scan repository for GitHub Actions and workflows."""
setup_logging(verbose, quiet, log_json)
logger = logging.getLogger(__name__)
try:
# Validate repository path
repo_path = Path(repo).resolve()
if not repo_path.exists():
console.print(f"[red]Repository not found: {repo_path}[/red]")
raise typer.Exit(1)
# Load policy if specified
policy = None
if policy_file and policy_file.exists():
# TODO: Load policy from YAML file
policy = Policy()
# Create service via factory
service = AuditServiceFactory.create(
repo_path=repo_path,
token=token,
offline=offline,
cache_dir=cache_dir,
concurrency=concurrency,
policy=policy,
)
# Execute scan
console.print("[cyan]Scanning repository...[/cyan]")
result = service.scan(offline=offline)
# Display summary
console.print(f"Found {result.workflow_count} workflows and {result.action_count} actions")
console.print(f"Found {result.unique_action_count} unique action references")
if result.violations:
console.print(f"Found {len(result.violations)} policy violations")
# Render reports
renderer = Renderer(output)
_render_reports(renderer, result, format_type)
# Handle diff mode
if diff and baseline:
_handle_diff_mode(result, baseline, output)
# Write baseline
if write_baseline:
baseline_path = baseline or (output / "baseline")
_write_baseline(result, baseline_path)
console.print(f"[green]✓ Audit complete! Reports in {output}[/green]")
# Enforce policy
if enforce and result.violations:
_enforce_policy(result.violations)
except Exception as e:
logger.exception(f"Scan failed: {e}")
raise typer.Exit(2) from None
@app.command()
def inventory(
repo: str = typer.Option(".", help="Repository path"),
verbose: bool = typer.Option(False, "--verbose", "-v"),
) -> None:
"""Print deduplicated action inventory."""
setup_logging(verbose)
logger = logging.getLogger(__name__)
repo_path = Path(repo).resolve()
scanner = Scanner(repo_path)
parser = Parser(repo_path)
analyzer = Analyzer()
workflow_files = scanner.find_workflows()
all_actions = []
for wf_file in workflow_files:
try:
workflow = parser.parse_workflow(wf_file)
all_actions.extend(workflow.actions_used)
except Exception as e:
logger.error(f"Failed to parse {wf_file}: {e}")
if verbose:
logger.exception(e)
unique_actions = analyzer.deduplicate_actions(all_actions)
console.print(f"\n[cyan]Unique Actions: {len(unique_actions)}[/cyan]\n")
for key, _action in sorted(unique_actions.items()):
console.print(f"{key}")
@app.command()
def validate(
repo: str = typer.Option(".", help="Repository path"),
policy_file: Path | None = typer.Option(None, help="Policy file"),
enforce: bool = typer.Option(False, help="Exit non-zero on violations"),
verbose: bool = typer.Option(False, "--verbose", "-v"),
) -> None:
"""Validate workflows against policy."""
setup_logging(verbose)
logger = logging.getLogger(__name__)
repo_path = Path(repo).resolve()
scanner = Scanner(repo_path)
parser = Parser(repo_path)
workflow_files = scanner.find_workflows()
workflows = {}
all_actions = []
for wf_file in workflow_files:
try:
workflow = parser.parse_workflow(wf_file)
rel_path = str(wf_file.relative_to(repo_path))
workflows[rel_path] = workflow
all_actions.extend(workflow.actions_used)
except Exception as e:
logger.error(f"Failed to parse {wf_file}: {e}")
if verbose:
logger.exception(e)
# Load or use default policy
policy = Policy()
if policy_file and policy_file.exists():
# TODO: Parse YAML policy file here
pass
validator = PolicyValidator(policy)
violations = validator.validate(workflows, all_actions)
if violations:
console.print(f"\n[yellow]Found {len(violations)} policy violations:[/yellow]\n")
for v in violations:
severity = v.get("severity", "warning").upper()
color = "red" if severity == "ERROR" else "yellow"
console.print(f"[{color}]{severity}[/{color}] {v['workflow']}: {v['message']}")
if enforce:
errors = [v for v in violations if v.get("severity") == "error"]
if errors:
raise typer.Exit(1)
else:
console.print("[green]✓ No policy violations found[/green]")
@app.command()
def version() -> None:
"""Show version information."""
console.print(f"ghaw-auditor version {__version__}")
if __name__ == "__main__": # pragma: no cover
app()