Files
ghaw-auditor/ghaw_auditor/resolver.py
2025-10-19 09:52:13 +03:00

165 lines
6.5 KiB
Python

"""Action resolver for GitHub actions."""
from __future__ import annotations
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from ghaw_auditor.cache import Cache
from ghaw_auditor.github_client import GitHubClient
from ghaw_auditor.models import ActionManifest, ActionRef, ActionType
from ghaw_auditor.parser import Parser
logger = logging.getLogger(__name__)
class Resolver:
"""Resolves action references and fetches manifests."""
def __init__(
self,
github_client: GitHubClient,
cache: Cache,
repo_path: Path,
concurrency: int = 4,
) -> None:
"""Initialize resolver."""
self.github_client = github_client
self.cache = cache
self.parser = Parser(repo_path)
self.repo_path = repo_path
self.concurrency = concurrency
def resolve_actions(self, actions: list[ActionRef]) -> dict[str, ActionManifest]:
"""Resolve multiple action references in parallel."""
resolved: dict[str, ActionManifest] = {}
with ThreadPoolExecutor(max_workers=self.concurrency) as executor:
futures = {executor.submit(self._resolve_action, action): action for action in actions}
for future in as_completed(futures):
action = futures[future]
try:
key, manifest = future.result()
if key and manifest:
resolved[key] = manifest
except Exception as e:
logger.error(f"Failed to resolve {action.canonical_key()}: {e}")
return resolved
def _resolve_action(self, action: ActionRef) -> tuple[str, ActionManifest | None]:
"""Resolve a single action reference."""
if action.type == ActionType.LOCAL:
return self._resolve_local_action(action)
elif action.type == ActionType.GITHUB:
return self._resolve_github_action(action)
elif action.type == ActionType.DOCKER:
# Docker actions don't have manifests to parse
return action.canonical_key(), None
return "", None
def _resolve_local_action(self, action: ActionRef) -> tuple[str, ActionManifest | None]:
"""Resolve a local action."""
if not action.path:
return "", None
# Remove leading ./ prefix only
clean_path = action.path[2:] if action.path.startswith("./") else action.path
action_path = self.repo_path / clean_path
# If action_path is a directory, look for action.yml/yaml inside
# If it's a file path, look in parent directory
if action_path.is_dir():
for name in ("action.yml", "action.yaml"):
manifest_path = action_path / name
if manifest_path.exists():
try:
manifest = self.parser.parse_action(manifest_path)
return action.canonical_key(), manifest
except Exception as e:
logger.error(f"Failed to parse local action {manifest_path}: {e}")
continue
else:
# Try as parent directory
parent = action_path.parent if action_path.name.startswith("action.") else action_path
for name in ("action.yml", "action.yaml"):
manifest_path = parent / name
if manifest_path.exists():
try:
manifest = self.parser.parse_action(manifest_path)
return action.canonical_key(), manifest
except Exception as e:
logger.error(f"Failed to parse local action {manifest_path}: {e}")
continue
logger.warning(f"Local action manifest not found: {action_path}")
return "", None
def _resolve_github_action(self, action: ActionRef) -> tuple[str, ActionManifest | None]:
"""Resolve a GitHub action."""
if not action.owner or not action.repo or not action.ref:
return "", None
# Resolve ref to SHA
cache_key = self.cache.make_key("ref", action.owner, action.repo, action.ref)
sha = self.cache.get(cache_key)
if not sha:
try:
sha = self.github_client.get_ref_sha(action.owner, action.repo, action.ref)
self.cache.set(cache_key, sha)
except Exception as e:
logger.error(f"Failed to resolve ref {action.owner}/{action.repo}@{action.ref}: {e}")
return "", None
action.resolved_sha = sha
# Fetch action manifest
manifest_path = action.path if action.path and action.path != "action.yml" else ""
manifest_key = self.cache.make_key("manifest", action.owner, action.repo, sha, manifest_path)
manifest_content = self.cache.get(manifest_key)
if not manifest_content:
# Try action.yml first, then action.yaml
base_path = f"{manifest_path}/" if manifest_path else ""
for name in ("action.yml", "action.yaml"):
file_path = f"{base_path}{name}"
try:
manifest_content = self.github_client.get_file_content(action.owner, action.repo, file_path, sha)
self.cache.set(manifest_key, manifest_content)
break
except Exception:
continue
if not manifest_content:
# Only log warning if both extensions failed
if manifest_path:
logger.error(
f"Action manifest not found: {action.owner}/{action.repo}/{manifest_path} "
f"(tried action.yml and action.yaml)"
)
else:
logger.error(
f"Action manifest not found: {action.owner}/{action.repo} (tried action.yml and action.yaml)"
)
return action.canonical_key(), None
# Parse manifest
try:
# Write to temp file and parse
import tempfile
with tempfile.NamedTemporaryFile(mode="w", suffix=".yml", delete=False) as f:
f.write(manifest_content)
temp_path = Path(f.name)
manifest = self.parser.parse_action(temp_path)
temp_path.unlink()
return action.canonical_key(), manifest
except Exception as e:
logger.error(f"Failed to parse manifest for {action.canonical_key()}: {e}")
return "", None