mirror of
https://github.com/ivuorinen/ghaw-auditor.git
synced 2026-01-26 11:24:00 +00:00
271 lines
9.3 KiB
Python
271 lines
9.3 KiB
Python
"""CLI interface for GitHub Actions & Workflows Auditor."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import typer
|
|
from rich.console import Console
|
|
from rich.logging import RichHandler
|
|
|
|
from ghaw_auditor import __version__
|
|
from ghaw_auditor.analyzer import Analyzer
|
|
from ghaw_auditor.differ import Differ
|
|
from ghaw_auditor.factory import AuditServiceFactory
|
|
from ghaw_auditor.models import Policy
|
|
from ghaw_auditor.parser import Parser
|
|
from ghaw_auditor.policy import PolicyValidator
|
|
from ghaw_auditor.renderer import Renderer
|
|
from ghaw_auditor.scanner import Scanner
|
|
from ghaw_auditor.services import DiffService, ScanResult
|
|
|
|
app = typer.Typer(
|
|
name="ghaw-auditor",
|
|
help="GitHub Actions & Workflows Auditor - analyze and audit GitHub Actions ecosystem",
|
|
)
|
|
console = Console()
|
|
|
|
|
|
def setup_logging(verbose: bool = False, quiet: bool = False, log_json: bool = False) -> None:
|
|
"""Configure logging."""
|
|
if quiet:
|
|
level = logging.ERROR
|
|
elif verbose:
|
|
level = logging.DEBUG
|
|
else:
|
|
level = logging.INFO
|
|
|
|
if log_json:
|
|
logging.basicConfig(level=level, format="%(message)s")
|
|
else:
|
|
logging.basicConfig(
|
|
level=level, format="%(message)s", handlers=[RichHandler(console=console, rich_tracebacks=True)]
|
|
)
|
|
|
|
|
|
def _render_reports(
|
|
renderer: Renderer,
|
|
result: ScanResult,
|
|
format_type: str,
|
|
) -> None:
|
|
"""Render reports based on format type."""
|
|
console.print("[cyan]Generating reports...[/cyan]")
|
|
if format_type in ("json", "all"):
|
|
renderer.render_json(result.workflows, result.actions, result.violations)
|
|
if format_type in ("md", "all"):
|
|
renderer.render_markdown(result.workflows, result.actions, result.violations, result.analysis)
|
|
|
|
|
|
def _handle_diff_mode(
|
|
result: ScanResult,
|
|
baseline: Path,
|
|
output: Path,
|
|
) -> None:
|
|
"""Handle diff mode comparison."""
|
|
console.print("[cyan]Running diff...[/cyan]")
|
|
diff_service = DiffService(Differ(baseline))
|
|
try:
|
|
workflow_diffs, action_diffs = diff_service.compare(result.workflows, result.actions)
|
|
|
|
diff_dir = output / "diff"
|
|
diff_dir.mkdir(exist_ok=True)
|
|
diff_service.differ.render_diff_markdown(workflow_diffs, action_diffs, diff_dir / "report.diff.md")
|
|
console.print(f"[green]Diff report written to {diff_dir / 'report.diff.md'}[/green]")
|
|
except FileNotFoundError as e:
|
|
logger = logging.getLogger(__name__)
|
|
logger.error(f"Baseline not found: {e}")
|
|
|
|
|
|
def _write_baseline(result: ScanResult, baseline_path: Path, commit_sha: str | None = None) -> None:
|
|
"""Write baseline snapshot."""
|
|
differ = Differ(baseline_path)
|
|
differ.save_baseline(result.workflows, result.actions, commit_sha)
|
|
console.print(f"[green]Baseline saved to {baseline_path}[/green]")
|
|
|
|
|
|
def _enforce_policy(violations: list[dict[str, Any]]) -> None:
|
|
"""Enforce policy and exit if errors found."""
|
|
error_violations = [v for v in violations if v.get("severity") == "error"]
|
|
if error_violations:
|
|
console.print(f"[red]Policy enforcement failed: {len(error_violations)} errors[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def scan(
|
|
repo: str = typer.Option(".", help="Repository path or URL"),
|
|
token: str | None = typer.Option(None, help="GitHub token", envvar="GITHUB_TOKEN"),
|
|
output: Path = typer.Option(".ghaw-auditor", help="Output directory"),
|
|
format_type: str = typer.Option("all", help="Output format: json, md, or all"),
|
|
cache_dir: Path | None = typer.Option(None, help="Cache directory"),
|
|
offline: bool = typer.Option(False, help="Offline mode (no API calls)"),
|
|
concurrency: int = typer.Option(4, help="Concurrency for API calls"),
|
|
enforce: bool = typer.Option(False, help="Enforce policy (exit non-zero on violations)"),
|
|
policy_file: Path | None = typer.Option(None, help="Policy file path"),
|
|
diff: bool = typer.Option(False, help="Run in diff mode"),
|
|
baseline: Path | None = typer.Option(None, help="Baseline path for diff"),
|
|
write_baseline: bool = typer.Option(False, help="Write baseline after scan"),
|
|
verbose: bool = typer.Option(False, "--verbose", "-v", help="Verbose output"),
|
|
quiet: bool = typer.Option(False, "--quiet", "-q", help="Quiet output"),
|
|
log_json: bool = typer.Option(False, help="JSON logging"),
|
|
) -> None:
|
|
"""Scan repository for GitHub Actions and workflows."""
|
|
setup_logging(verbose, quiet, log_json)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
# Validate repository path
|
|
repo_path = Path(repo).resolve()
|
|
if not repo_path.exists():
|
|
console.print(f"[red]Repository not found: {repo_path}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
# Load policy if specified
|
|
policy = None
|
|
if policy_file and policy_file.exists():
|
|
# TODO: Load policy from YAML file
|
|
policy = Policy()
|
|
|
|
# Create service via factory
|
|
service = AuditServiceFactory.create(
|
|
repo_path=repo_path,
|
|
token=token,
|
|
offline=offline,
|
|
cache_dir=cache_dir,
|
|
concurrency=concurrency,
|
|
policy=policy,
|
|
)
|
|
|
|
# Execute scan
|
|
console.print("[cyan]Scanning repository...[/cyan]")
|
|
result = service.scan(offline=offline)
|
|
|
|
# Display summary
|
|
console.print(f"Found {result.workflow_count} workflows and {result.action_count} actions")
|
|
console.print(f"Found {result.unique_action_count} unique action references")
|
|
|
|
if result.violations:
|
|
console.print(f"Found {len(result.violations)} policy violations")
|
|
|
|
# Render reports
|
|
renderer = Renderer(output)
|
|
_render_reports(renderer, result, format_type)
|
|
|
|
# Handle diff mode
|
|
if diff and baseline:
|
|
_handle_diff_mode(result, baseline, output)
|
|
|
|
# Write baseline
|
|
if write_baseline:
|
|
baseline_path = baseline or (output / "baseline")
|
|
_write_baseline(result, baseline_path)
|
|
|
|
console.print(f"[green]✓ Audit complete! Reports in {output}[/green]")
|
|
|
|
# Enforce policy
|
|
if enforce and result.violations:
|
|
_enforce_policy(result.violations)
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Scan failed: {e}")
|
|
raise typer.Exit(2) from None
|
|
|
|
|
|
@app.command()
|
|
def inventory(
|
|
repo: str = typer.Option(".", help="Repository path"),
|
|
verbose: bool = typer.Option(False, "--verbose", "-v"),
|
|
) -> None:
|
|
"""Print deduplicated action inventory."""
|
|
setup_logging(verbose)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
repo_path = Path(repo).resolve()
|
|
scanner = Scanner(repo_path)
|
|
parser = Parser(repo_path)
|
|
analyzer = Analyzer()
|
|
|
|
workflow_files = scanner.find_workflows()
|
|
all_actions = []
|
|
|
|
for wf_file in workflow_files:
|
|
try:
|
|
workflow = parser.parse_workflow(wf_file)
|
|
all_actions.extend(workflow.actions_used)
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse {wf_file}: {e}")
|
|
if verbose:
|
|
logger.exception(e)
|
|
|
|
unique_actions = analyzer.deduplicate_actions(all_actions)
|
|
|
|
console.print(f"\n[cyan]Unique Actions: {len(unique_actions)}[/cyan]\n")
|
|
for key, _action in sorted(unique_actions.items()):
|
|
console.print(f" • {key}")
|
|
|
|
|
|
@app.command()
|
|
def validate(
|
|
repo: str = typer.Option(".", help="Repository path"),
|
|
policy_file: Path | None = typer.Option(None, help="Policy file"),
|
|
enforce: bool = typer.Option(False, help="Exit non-zero on violations"),
|
|
verbose: bool = typer.Option(False, "--verbose", "-v"),
|
|
) -> None:
|
|
"""Validate workflows against policy."""
|
|
setup_logging(verbose)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
repo_path = Path(repo).resolve()
|
|
scanner = Scanner(repo_path)
|
|
parser = Parser(repo_path)
|
|
|
|
workflow_files = scanner.find_workflows()
|
|
workflows = {}
|
|
all_actions = []
|
|
|
|
for wf_file in workflow_files:
|
|
try:
|
|
workflow = parser.parse_workflow(wf_file)
|
|
rel_path = str(wf_file.relative_to(repo_path))
|
|
workflows[rel_path] = workflow
|
|
all_actions.extend(workflow.actions_used)
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse {wf_file}: {e}")
|
|
if verbose:
|
|
logger.exception(e)
|
|
|
|
# Load or use default policy
|
|
policy = Policy()
|
|
if policy_file and policy_file.exists():
|
|
# TODO: Parse YAML policy file here
|
|
pass
|
|
|
|
validator = PolicyValidator(policy)
|
|
violations = validator.validate(workflows, all_actions)
|
|
|
|
if violations:
|
|
console.print(f"\n[yellow]Found {len(violations)} policy violations:[/yellow]\n")
|
|
for v in violations:
|
|
severity = v.get("severity", "warning").upper()
|
|
color = "red" if severity == "ERROR" else "yellow"
|
|
console.print(f"[{color}]{severity}[/{color}] {v['workflow']}: {v['message']}")
|
|
|
|
if enforce:
|
|
errors = [v for v in violations if v.get("severity") == "error"]
|
|
if errors:
|
|
raise typer.Exit(1)
|
|
else:
|
|
console.print("[green]✓ No policy violations found[/green]")
|
|
|
|
|
|
@app.command()
|
|
def version() -> None:
|
|
"""Show version information."""
|
|
console.print(f"ghaw-auditor version {__version__}")
|
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover
|
|
app()
|