mirror of
https://github.com/ivuorinen/ghaw-auditor.git
synced 2026-02-07 07:46:40 +00:00
165 lines
6.5 KiB
Python
165 lines
6.5 KiB
Python
"""Action resolver for GitHub actions."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from pathlib import Path
|
|
|
|
from ghaw_auditor.cache import Cache
|
|
from ghaw_auditor.github_client import GitHubClient
|
|
from ghaw_auditor.models import ActionManifest, ActionRef, ActionType
|
|
from ghaw_auditor.parser import Parser
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Resolver:
|
|
"""Resolves action references and fetches manifests."""
|
|
|
|
def __init__(
|
|
self,
|
|
github_client: GitHubClient,
|
|
cache: Cache,
|
|
repo_path: Path,
|
|
concurrency: int = 4,
|
|
) -> None:
|
|
"""Initialize resolver."""
|
|
self.github_client = github_client
|
|
self.cache = cache
|
|
self.parser = Parser(repo_path)
|
|
self.repo_path = repo_path
|
|
self.concurrency = concurrency
|
|
|
|
def resolve_actions(self, actions: list[ActionRef]) -> dict[str, ActionManifest]:
|
|
"""Resolve multiple action references in parallel."""
|
|
resolved: dict[str, ActionManifest] = {}
|
|
|
|
with ThreadPoolExecutor(max_workers=self.concurrency) as executor:
|
|
futures = {executor.submit(self._resolve_action, action): action for action in actions}
|
|
|
|
for future in as_completed(futures):
|
|
action = futures[future]
|
|
try:
|
|
key, manifest = future.result()
|
|
if key and manifest:
|
|
resolved[key] = manifest
|
|
except Exception as e:
|
|
logger.error(f"Failed to resolve {action.canonical_key()}: {e}")
|
|
|
|
return resolved
|
|
|
|
def _resolve_action(self, action: ActionRef) -> tuple[str, ActionManifest | None]:
|
|
"""Resolve a single action reference."""
|
|
if action.type == ActionType.LOCAL:
|
|
return self._resolve_local_action(action)
|
|
elif action.type == ActionType.GITHUB:
|
|
return self._resolve_github_action(action)
|
|
elif action.type == ActionType.DOCKER:
|
|
# Docker actions don't have manifests to parse
|
|
return action.canonical_key(), None
|
|
return "", None
|
|
|
|
def _resolve_local_action(self, action: ActionRef) -> tuple[str, ActionManifest | None]:
|
|
"""Resolve a local action."""
|
|
if not action.path:
|
|
return "", None
|
|
|
|
# Remove leading ./ prefix only
|
|
clean_path = action.path[2:] if action.path.startswith("./") else action.path
|
|
action_path = self.repo_path / clean_path
|
|
|
|
# If action_path is a directory, look for action.yml/yaml inside
|
|
# If it's a file path, look in parent directory
|
|
if action_path.is_dir():
|
|
for name in ("action.yml", "action.yaml"):
|
|
manifest_path = action_path / name
|
|
if manifest_path.exists():
|
|
try:
|
|
manifest = self.parser.parse_action(manifest_path)
|
|
return action.canonical_key(), manifest
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse local action {manifest_path}: {e}")
|
|
continue
|
|
else:
|
|
# Try as parent directory
|
|
parent = action_path.parent if action_path.name.startswith("action.") else action_path
|
|
for name in ("action.yml", "action.yaml"):
|
|
manifest_path = parent / name
|
|
if manifest_path.exists():
|
|
try:
|
|
manifest = self.parser.parse_action(manifest_path)
|
|
return action.canonical_key(), manifest
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse local action {manifest_path}: {e}")
|
|
continue
|
|
|
|
logger.warning(f"Local action manifest not found: {action_path}")
|
|
return "", None
|
|
|
|
def _resolve_github_action(self, action: ActionRef) -> tuple[str, ActionManifest | None]:
|
|
"""Resolve a GitHub action."""
|
|
if not action.owner or not action.repo or not action.ref:
|
|
return "", None
|
|
|
|
# Resolve ref to SHA
|
|
cache_key = self.cache.make_key("ref", action.owner, action.repo, action.ref)
|
|
sha = self.cache.get(cache_key)
|
|
|
|
if not sha:
|
|
try:
|
|
sha = self.github_client.get_ref_sha(action.owner, action.repo, action.ref)
|
|
self.cache.set(cache_key, sha)
|
|
except Exception as e:
|
|
logger.error(f"Failed to resolve ref {action.owner}/{action.repo}@{action.ref}: {e}")
|
|
return "", None
|
|
|
|
action.resolved_sha = sha
|
|
|
|
# Fetch action manifest
|
|
manifest_path = action.path if action.path and action.path != "action.yml" else ""
|
|
manifest_key = self.cache.make_key("manifest", action.owner, action.repo, sha, manifest_path)
|
|
manifest_content = self.cache.get(manifest_key)
|
|
|
|
if not manifest_content:
|
|
# Try action.yml first, then action.yaml
|
|
base_path = f"{manifest_path}/" if manifest_path else ""
|
|
for name in ("action.yml", "action.yaml"):
|
|
file_path = f"{base_path}{name}"
|
|
try:
|
|
manifest_content = self.github_client.get_file_content(action.owner, action.repo, file_path, sha)
|
|
self.cache.set(manifest_key, manifest_content)
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if not manifest_content:
|
|
# Only log warning if both extensions failed
|
|
if manifest_path:
|
|
logger.error(
|
|
f"Action manifest not found: {action.owner}/{action.repo}/{manifest_path} "
|
|
f"(tried action.yml and action.yaml)"
|
|
)
|
|
else:
|
|
logger.error(
|
|
f"Action manifest not found: {action.owner}/{action.repo} (tried action.yml and action.yaml)"
|
|
)
|
|
return action.canonical_key(), None
|
|
|
|
# Parse manifest
|
|
try:
|
|
# Write to temp file and parse
|
|
import tempfile
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".yml", delete=False) as f:
|
|
f.write(manifest_content)
|
|
temp_path = Path(f.name)
|
|
|
|
manifest = self.parser.parse_action(temp_path)
|
|
temp_path.unlink()
|
|
|
|
return action.canonical_key(), manifest
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse manifest for {action.canonical_key()}: {e}")
|
|
return "", None
|