feat(python): add ruff linter and formatter configuration

This commit is contained in:
2026-02-07 15:08:40 +02:00
parent c0665a3161
commit b327a99991
7 changed files with 187 additions and 155 deletions

View File

@@ -8,6 +8,10 @@ indent_style = space
insert_final_newline = true insert_final_newline = true
trim_trailing_whitespace = true trim_trailing_whitespace = true
[*.py]
indent_size = 4
max_line_length = 120
[*.fish] [*.fish]
max_line_length = 120 max_line_length = 120

View File

@@ -60,3 +60,10 @@ repos:
hooks: hooks:
- id: fish_syntax - id: fish_syntax
- id: fish_indent - id: fish_indent
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.9.6
hooks:
- id: ruff
args: [--fix]
- id: ruff-format

View File

@@ -7,65 +7,67 @@ To be used with a companion fish function like this:
""" """
from __future__ import print_function
import json import json
import os import os
import signal import signal
import subprocess import subprocess
import sys import sys
import traceback
BASH = "bash"
BASH = 'bash'
FISH_READONLY = [ FISH_READONLY = [
'PWD', 'SHLVL', 'history', 'pipestatus', 'status', 'version', "PWD",
'FISH_VERSION', 'fish_pid', 'hostname', '_', 'fish_private_mode' "SHLVL",
"history",
"pipestatus",
"status",
"version",
"FISH_VERSION",
"fish_pid",
"hostname",
"_",
"fish_private_mode",
] ]
IGNORED = [ IGNORED = ["PS1", "XPC_SERVICE_NAME"]
'PS1', 'XPC_SERVICE_NAME'
]
def ignored(name): def ignored(name):
if name == 'PWD': # this is read only, but has special handling if name == "PWD": # this is read only, but has special handling
return False return False
# ignore other read only variables # ignore other read only variables
if name in FISH_READONLY: if name in FISH_READONLY:
return True return True
if name in IGNORED or name.startswith("BASH_FUNC"): if name in IGNORED or name.startswith("BASH_FUNC"):
return True return True
if name.startswith('%'): return name.startswith("%")
return True
return False
def escape(string): def escape(string):
# use json.dumps to reliably escape quotes and backslashes # use json.dumps to reliably escape quotes and backslashes
return json.dumps(string).replace(r'$', r'\$') return json.dumps(string).replace(r"$", r"\$")
def escape_identifier(word): def escape_identifier(word):
return escape(word.replace('?', '\\?')) return escape(word.replace("?", "\\?"))
def comment(string): def comment(string):
return '\n'.join(['# ' + line for line in string.split('\n')]) return "\n".join(["# " + line for line in string.split("\n")])
def gen_script(): def gen_script():
# Use the following instead of /usr/bin/env to read environment so we can # Use the following instead of /usr/bin/env to read environment so we can
# deal with multi-line environment variables (and other odd cases). # deal with multi-line environment variables (and other odd cases).
env_reader = "%s -c 'import os,json; print(json.dumps({k:v for k,v in os.environ.items()}))'" % (sys.executable) env_reader = f"{sys.executable} -c 'import os,json; print(json.dumps({{k:v for k,v in os.environ.items()}}))'"
args = [BASH, '-c', env_reader] args = [BASH, "-c", env_reader]
output = subprocess.check_output(args, universal_newlines=True) output = subprocess.check_output(args, universal_newlines=True)
old_env = output.strip() old_env = output.strip()
pipe_r, pipe_w = os.pipe() pipe_r, pipe_w = os.pipe()
if sys.version_info >= (3, 4): os.set_inheritable(pipe_w, True)
os.set_inheritable(pipe_w, True) command = f"eval $1 && ({env_reader}; alias) >&{pipe_w}"
command = 'eval $1 && ({}; alias) >&{}'.format( args = [BASH, "-c", command, "bass", " ".join(sys.argv[1:])]
env_reader,
pipe_w
)
args = [BASH, '-c', command, 'bass', ' '.join(sys.argv[1:])]
p = subprocess.Popen(args, universal_newlines=True, close_fds=False) p = subprocess.Popen(args, universal_newlines=True, close_fds=False)
os.close(pipe_w) os.close(pipe_w)
with os.fdopen(pipe_r) as f: with os.fdopen(pipe_r) as f:
@@ -73,9 +75,7 @@ def gen_script():
alias_str = f.read() alias_str = f.read()
if p.wait() != 0: if p.wait() != 0:
raise subprocess.CalledProcessError( raise subprocess.CalledProcessError(
returncode=p.returncode, returncode=p.returncode, cmd=" ".join(sys.argv[1:]), output=new_env + alias_str
cmd=' '.join(sys.argv[1:]),
output=new_env + alias_str
) )
new_env = new_env.strip() new_env = new_env.strip()
@@ -89,41 +89,41 @@ def gen_script():
continue continue
v1 = old_env.get(k) v1 = old_env.get(k)
if not v1: if not v1:
script_lines.append(comment('adding %s=%s' % (k, v))) script_lines.append(comment(f"adding {k}={v}"))
elif v1 != v: elif v1 != v:
script_lines.append(comment('updating %s=%s -> %s' % (k, v1, v))) script_lines.append(comment(f"updating {k}={v1} -> {v}"))
# process special variables # process special variables
if k == 'PWD': if k == "PWD":
script_lines.append('cd %s' % escape(v)) script_lines.append(f"cd {escape(v)}")
continue continue
else: else:
continue continue
if k == 'PATH': if k == "PATH": # noqa: SIM108
value = ' '.join([escape(directory) value = " ".join([escape(directory) for directory in v.split(":")])
for directory in v.split(':')])
else: else:
value = escape(v) value = escape(v)
script_lines.append('set -g -x %s %s' % (k, value)) script_lines.append(f"set -g -x {k} {value}")
for var in set(old_env.keys()) - set(new_env.keys()): for var in set(old_env.keys()) - set(new_env.keys()):
script_lines.append(comment('removing %s' % var)) script_lines.append(comment(f"removing {var}"))
script_lines.append('set -e %s' % var) script_lines.append(f"set -e {var}")
script = '\n'.join(script_lines) script = "\n".join(script_lines)
alias_lines = [] alias_lines = []
for line in alias_str.splitlines(): for line in alias_str.splitlines():
_, rest = line.split(None, 1) _, rest = line.split(None, 1)
k, v = rest.split("=", 1) k, v = rest.split("=", 1)
alias_lines.append("alias " + escape_identifier(k) + "=" + v) alias_lines.append("alias " + escape_identifier(k) + "=" + v)
alias = '\n'.join(alias_lines) alias = "\n".join(alias_lines)
return script + '\n' + alias return script + "\n" + alias
script_file = os.fdopen(3, 'w')
script_file = os.fdopen(3, "w")
if not sys.argv[1:]: if not sys.argv[1:]:
print('__bass_usage', file=script_file, end='') print("__bass_usage", file=script_file, end="")
sys.exit(0) sys.exit(0)
try: try:
@@ -131,8 +131,8 @@ try:
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
sys.exit(e.returncode) sys.exit(e.returncode)
except Exception: except Exception:
print('Bass internal error!', file=sys.stderr) print("Bass internal error!", file=sys.stderr)
raise # traceback will output to stderr raise # traceback will output to stderr
except KeyboardInterrupt: except KeyboardInterrupt:
signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGINT, signal.SIG_DFL)
os.kill(os.getpid(), signal.SIGINT) os.kill(os.getpid(), signal.SIGINT)

View File

@@ -24,7 +24,7 @@ str_to_operator = {
def vercmp(expr): def vercmp(expr):
"""Version Comparison function.""" """Version Comparison function."""
words = expr.split() words = expr.split()
comparisons = [words[i: i + 3] for i in range(0, len(words) - 2, 2)] comparisons = [words[i : i + 3] for i in range(0, len(words) - 2, 2)]
for left, op_str, right in comparisons: for left, op_str, right in comparisons:
compare_op = str_to_operator[op_str] compare_op = str_to_operator[op_str]
if not compare_op(version.parse(left), version.parse(right)): if not compare_op(version.parse(left), version.parse(right)):
@@ -63,7 +63,7 @@ def test():
except KeyError: except KeyError:
pass pass
else: else:
assert False, "invalid operator did not raise" raise AssertionError("invalid operator did not raise")
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,5 +1,4 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*-
# Python script to find the largest files in a git repository. # Python script to find the largest files in a git repository.
# The general method is based on the script in this blog post: # The general method is based on the script in this blog post:
@@ -32,60 +31,59 @@
# vim:tw=120:ts=4:ft=python:norl: # vim:tw=120:ts=4:ft=python:norl:
from subprocess import check_output, Popen, PIPE
import argparse import argparse
import signal import signal
import sys import sys
from subprocess import PIPE, Popen, check_output
sortByOnDiskSize = False sortByOnDiskSize = False
class Blob(object):
sha1 = ''
size = 0
packed_size = 0
path = ''
def __init__(self, line): class Blob:
cols = line.split() sha1 = ""
self.sha1, self.size, self.packed_size = cols[0], int(cols[2]), int(cols[3]) size = 0
packed_size = 0
path = ""
def __repr__(self): def __init__(self, line):
return '{} - {} - {} - {}'.format( cols = line.split()
self.sha1, self.size, self.packed_size, self.path) self.sha1, self.size, self.packed_size = cols[0], int(cols[2]), int(cols[3])
def __lt__(self, other): def __repr__(self):
if (sortByOnDiskSize): return f"{self.sha1} - {self.size} - {self.packed_size} - {self.path}"
return self.size < other.size
else:
return self.packed_size < other.packed_size
def csv_line(self): def __lt__(self, other):
return "{},{},{},{}".format( if sortByOnDiskSize:
self.size/1024, self.packed_size/1024, self.sha1, self.path) return self.size < other.size
else:
return self.packed_size < other.packed_size
def csv_line(self):
return f"{self.size / 1024},{self.packed_size / 1024},{self.sha1},{self.path}"
def main(): def main():
global sortByOnDiskSize global sortByOnDiskSize
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
args = parse_arguments() args = parse_arguments()
sortByOnDiskSize = args.sortByOnDiskSize sortByOnDiskSize = args.sortByOnDiskSize
size_limit = 1024*args.filesExceeding size_limit = 1024 * args.filesExceeding
if args.filesExceeding > 0: if args.filesExceeding > 0:
print("Finding objects larger than {}kB…".format(args.filesExceeding)) print(f"Finding objects larger than {args.filesExceeding}kB…")
else: else:
print("Finding the {} largest objects…".format(args.matchCount)) print(f"Finding the {args.matchCount} largest objects…")
blobs = get_top_blobs(args.matchCount, size_limit) blobs = get_top_blobs(args.matchCount, size_limit)
populate_blob_paths(blobs) populate_blob_paths(blobs)
print_out_blobs(blobs) print_out_blobs(blobs)
def get_top_blobs(count, size_limit): def get_top_blobs(count, size_limit):
"""Get top blobs from git repository """Get top blobs from git repository
Args: Args:
count (int): How many items to return count (int): How many items to return
@@ -93,110 +91,123 @@ def get_top_blobs(count, size_limit):
Returns: Returns:
dict: Dictionary of Blobs dict: Dictionary of Blobs
""" """
sort_column = 4 sort_column = 4
if sortByOnDiskSize: if sortByOnDiskSize:
sort_column = 3 sort_column = 3
verify_pack = "git verify-pack -v `git rev-parse --git-dir`/objects/pack/pack-*.idx | grep blob | sort -k{}nr".format(sort_column) # noqa: E501 verify_pack = (
output = check_output(verify_pack, shell=True).decode('utf-8').strip().split("\n")[:-1] # noqa: E501 f"git verify-pack -v `git rev-parse --git-dir`/objects/pack/pack-*.idx | grep blob | sort -k{sort_column}nr"
)
output = check_output(verify_pack, shell=True).decode("utf-8").strip().split("\n")[:-1]
blobs = {} blobs = {}
# use __lt__ to do the appropriate comparison # use __lt__ to do the appropriate comparison
compare_blob = Blob("a b {} {} c".format(size_limit, size_limit)) compare_blob = Blob(f"a b {size_limit} {size_limit} c")
for obj_line in output: for obj_line in output:
blob = Blob(obj_line) blob = Blob(obj_line)
if size_limit > 0: if size_limit > 0:
if compare_blob < blob: if compare_blob < blob:
blobs[blob.sha1] = blob blobs[blob.sha1] = blob
else: else:
break break
else: else:
blobs[blob.sha1] = blob blobs[blob.sha1] = blob
if len(blobs) == count: if len(blobs) == count:
break break
return blobs return blobs
def populate_blob_paths(blobs): def populate_blob_paths(blobs):
"""Populate blob paths that only have a path """Populate blob paths that only have a path
Args: Args:
blobs (Blob, dict): Dictionary of Blobs blobs (Blob, dict): Dictionary of Blobs
""" """
if len(blobs): if len(blobs):
print("Finding object paths…") print("Finding object paths…")
# Only include revs which have a path. Other revs aren't blobs. # Only include revs which have a path. Other revs aren't blobs.
rev_list = "git rev-list --all --objects | awk '$2 {print}'" rev_list = "git rev-list --all --objects | awk '$2 {print}'"
all_object_lines = check_output(rev_list, shell=True).decode('utf-8').strip().split("\n")[:-1] # noqa: E501 all_object_lines = check_output(rev_list, shell=True).decode("utf-8").strip().split("\n")[:-1]
outstanding_keys = list(blobs.keys()) outstanding_keys = list(blobs.keys())
for line in all_object_lines: for line in all_object_lines:
cols = line.split() cols = line.split()
sha1, path = cols[0], " ".join(cols[1:]) sha1, path = cols[0], " ".join(cols[1:])
if (sha1 in outstanding_keys): if sha1 in outstanding_keys:
outstanding_keys.remove(sha1) outstanding_keys.remove(sha1)
blobs[sha1].path = path blobs[sha1].path = path
# short-circuit the search if we're done # short-circuit the search if we're done
if not len(outstanding_keys): if not len(outstanding_keys):
break break
def print_out_blobs(blobs): def print_out_blobs(blobs):
if len(blobs): if len(blobs):
csv_lines = ["size,pack,hash,path"] csv_lines = ["size,pack,hash,path"]
for blob in sorted(blobs.values(), reverse=True): for blob in sorted(blobs.values(), reverse=True):
csv_lines.append(blob.csv_line()) csv_lines.append(blob.csv_line())
command = ["column", "-t", "-s", ","] command = ["column", "-t", "-s", ","]
p = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE) p = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
# Encode the input as bytes # Encode the input as bytes
input_data = ("\n".join(csv_lines) + "\n").encode() input_data = ("\n".join(csv_lines) + "\n").encode()
stdout, _ = p.communicate(input_data) stdout, _ = p.communicate(input_data)
print("\nAll sizes in kB. The pack column is the compressed size of the object inside the pack file.\n") # noqa: E501 print("\nAll sizes in kB. The pack column is the compressed size of the object inside the pack file.\n")
print(stdout.decode("utf-8").rstrip('\n')) print(stdout.decode("utf-8").rstrip("\n"))
else: else:
print("No files found which match those criteria.") print("No files found which match those criteria.")
def parse_arguments(): def parse_arguments():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(description="List the largest files in a git repository")
description='List the largest files in a git repository' parser.add_argument(
) "-c",
parser.add_argument( "--match-count",
'-c', '--match-count', dest='matchCount', type=int, default=10, dest="matchCount",
help='Files to return. Default is 10. Ignored if --files-exceeding is used.' type=int,
) default=10,
parser.add_argument( help="Files to return. Default is 10. Ignored if --files-exceeding is used.",
'--files-exceeding', dest='filesExceeding', type=int, default=0, )
help='The cutoff amount, in KB. Files with a pack size (or physical size, with -p) larger than this will be printed.' # noqa: E501 parser.add_argument(
) "--files-exceeding",
parser.add_argument( dest="filesExceeding",
'-p', '--physical-sort', dest='sortByOnDiskSize', type=int,
action='store_true', default=False, default=0,
help='Sort by the on-disk size. Default is to sort by the pack size.' help=(
) "The cutoff amount, in KB. Files with a pack size"
" (or physical size, with -p) larger than this will be printed."
),
)
parser.add_argument(
"-p",
"--physical-sort",
dest="sortByOnDiskSize",
action="store_true",
default=False,
help="Sort by the on-disk size. Default is to sort by the pack size.",
)
return parser.parse_args() return parser.parse_args()
def signal_handler(signal, frame): def signal_handler(signal, frame):
print('Caught Ctrl-C. Exiting.') print("Caught Ctrl-C. Exiting.")
sys.exit(0) sys.exit(0)
# Default function is main() # Default function is main()
if __name__ == '__main__': if __name__ == "__main__":
main() main()

9
pyproject.toml Normal file
View File

@@ -0,0 +1,9 @@
[tool.ruff]
target-version = "py39"
line-length = 120
[tool.ruff.lint]
select = ["E", "F", "W", "I", "UP", "B", "SIM", "C4"]
[tool.ruff.format]
quote-style = "double"

View File

@@ -18,6 +18,7 @@ fi
tools=( tools=(
ansible # IT automation and configuration management ansible # IT automation and configuration management
openapi-python-client # Generate Python API clients from OpenAPI specs openapi-python-client # Generate Python API clients from OpenAPI specs
ruff # Fast Python linter and formatter
) )
# Library packages — installed into system Python with `uv pip install --system` # Library packages — installed into system Python with `uv pip install --system`