Coverage for gws-app/gws/lib/cli/__init__.py: 39%
149 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Utilities for CLI commands."""
3import re
4import os
5import sys
6import subprocess
7import time
8import math
10SCRIPT_NAME = ''
12_COLOR = {
13 'black': '\x1b[30m',
14 'red': '\x1b[31m',
15 'green': '\x1b[32m',
16 'yellow': '\x1b[33m',
17 'blue': '\x1b[34m',
18 'magenta': '\x1b[35m',
19 'cyan': '\x1b[36m',
20 'white': '\x1b[37m',
21 'reset': '\x1b[0m',
22}
25def cprint(clr, msg):
26 if SCRIPT_NAME:
27 msg = '[' + SCRIPT_NAME + '] ' + msg
28 if clr and sys.stdout.isatty():
29 msg = _COLOR[clr] + msg + _COLOR['reset']
30 sys.stdout.write(msg + '\n')
31 sys.stdout.flush()
34def error(msg):
35 cprint('red', msg)
38def fatal(msg):
39 cprint('red', msg)
40 sys.exit(1)
43def warning(msg):
44 cprint('yellow', msg)
47def info(msg):
48 cprint('cyan', msg)
51##
53def run(cmd):
54 if isinstance(cmd, list):
55 cmd = ' '.join(cmd)
56 cmd = re.sub(r'\s+', ' ', cmd.strip())
57 info(f'> {cmd}')
58 res = subprocess.run(cmd, shell=True, capture_output=False)
59 if res.returncode:
60 fatal(f'COMMAND FAILED, code {res.returncode}')
63def exec(cmd):
64 try:
65 return (
66 subprocess
67 .run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
68 .stdout.decode('utf8').strip()
69 )
70 except Exception as exc:
71 return f'> {cmd} FAILED: {exc}'
74def find_dirs(dirname):
75 if not os.path.isdir(dirname):
76 return
78 de: os.DirEntry
79 for de in os.scandir(dirname):
80 if de.name.startswith('.'):
81 continue
82 if de.is_dir():
83 yield de.path
86def find_files(dirname, pattern=None, deep=True):
87 if not os.path.isdir(dirname):
88 return
90 de: os.DirEntry
91 for de in os.scandir(dirname):
92 if de.name.startswith('.'):
93 continue
94 if de.is_dir() and deep:
95 yield from find_files(de.path, pattern)
96 continue
97 if de.is_file() and (pattern is None or re.search(pattern, de.path)):
98 yield de.path
101def read_file(path):
102 with open(path, 'rt', encoding='utf8') as fp:
103 return fp.read().strip()
106def write_file(path, text):
107 with open(path, 'wt', encoding='utf8') as fp:
108 fp.write(text)
111def parse_args(argv):
112 args = {}
113 opt = None
114 n = 0
116 for a in argv:
117 if a == '-':
118 args['_rest'] = []
119 elif '_rest' in args:
120 args['_rest'].append(a)
121 elif a.startswith('--'):
122 opt = a[2:]
123 args[opt] = True
124 elif a.startswith('-'):
125 opt = a[1:]
126 args[opt] = True
127 elif opt:
128 args[opt] = a
129 opt = None
130 else:
131 args[n] = a
132 n += 1
134 return args
137def main(name, main_fn, usage):
138 global SCRIPT_NAME
140 SCRIPT_NAME = name
142 args = parse_args(sys.argv)
143 if not args or 'h' in args or 'help' in args:
144 print('\n' + usage.strip() + '\n')
145 sys.exit(0)
147 sys.exit(main_fn(args))
150def text_table(data, header=None, delim=' | '):
151 """Format a list of dicts as a text-mode table."""
153 data = list(data)
155 if not data:
156 return ''
158 is_dict = isinstance(data[0], dict)
160 print_header = header is not None
161 if header is None or header == 'auto':
162 header = data[0].keys() if is_dict else list(range(len(data[0])))
164 widths = [len(h) if print_header else 1 for h in header]
166 def get(d, h):
167 if is_dict:
168 return d.get(h, '')
169 try:
170 return d[h]
171 except IndexError:
172 return ''
174 for d in data:
175 widths = [
176 max(a, b)
177 for a, b in zip(
178 widths,
179 [len(str(get(d, h))) for h in header]
180 )
181 ]
183 def field(n, v):
184 if isinstance(v, (int, float)):
185 return str(v).rjust(widths[n])
186 return str(v).ljust(widths[n])
188 rows = []
190 if print_header:
191 hdr = delim.join(field(n, h) for n, h in enumerate(header))
192 rows.append(hdr)
193 rows.append('-' * len(hdr))
195 for d in data:
196 rows.append(delim.join(field(n, get(d, h)) for n, h in enumerate(header)))
198 return '\n'.join(rows)
201class ProgressIndicator:
202 def __init__(self, title, total=0, resolution=10):
203 self.resolution = resolution
204 self.title = title
205 self.total = total
206 self.progress = 0
207 self.lastd = 0
208 self.starttime = 0
210 def __enter__(self):
211 self.log(f'START ({self.total})' if self.total else 'START')
212 self.starttime = time.time()
213 return self
215 def __exit__(self, exc_type, exc_val, exc_tb):
216 if not exc_type:
217 ts = time.time() - self.starttime
218 self.log(f'END ({ts:.2f} sec)')
220 def update(self, add=1):
221 if not self.total:
222 return
223 self.progress += add
224 p = math.floor(self.progress * 100.0 / self.total)
225 if p > 100:
226 p = 100
227 d = round(p / self.resolution) * self.resolution
228 if d > self.lastd:
229 self.log(f'{d}%')
230 self.lastd = d
232 def log(self, s):
233 info(f'{self.title}: {s}')