Coverage for gws-app/gws/lib/cli/__init__.py: 39%

149 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Utilities for CLI commands.""" 

2 

3import re 

4import os 

5import sys 

6import subprocess 

7import time 

8import math 

9 

10SCRIPT_NAME = '' 

11 

12_COLOR = { 

13 'black': '\x1b[30m', 

14 'red': '\x1b[31m', 

15 'green': '\x1b[32m', 

16 'yellow': '\x1b[33m', 

17 'blue': '\x1b[34m', 

18 'magenta': '\x1b[35m', 

19 'cyan': '\x1b[36m', 

20 'white': '\x1b[37m', 

21 'reset': '\x1b[0m', 

22} 

23 

24 

25def cprint(clr, msg): 

26 if SCRIPT_NAME: 

27 msg = '[' + SCRIPT_NAME + '] ' + msg 

28 if clr and sys.stdout.isatty(): 

29 msg = _COLOR[clr] + msg + _COLOR['reset'] 

30 sys.stdout.write(msg + '\n') 

31 sys.stdout.flush() 

32 

33 

34def error(msg): 

35 cprint('red', msg) 

36 

37 

38def fatal(msg): 

39 cprint('red', msg) 

40 sys.exit(1) 

41 

42 

43def warning(msg): 

44 cprint('yellow', msg) 

45 

46 

47def info(msg): 

48 cprint('cyan', msg) 

49 

50 

51## 

52 

53def run(cmd): 

54 if isinstance(cmd, list): 

55 cmd = ' '.join(cmd) 

56 cmd = re.sub(r'\s+', ' ', cmd.strip()) 

57 info(f'> {cmd}') 

58 res = subprocess.run(cmd, shell=True, capture_output=False) 

59 if res.returncode: 

60 fatal(f'COMMAND FAILED, code {res.returncode}') 

61 

62 

63def exec(cmd): 

64 try: 

65 return ( 

66 subprocess 

67 .run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) 

68 .stdout.decode('utf8').strip() 

69 ) 

70 except Exception as exc: 

71 return f'> {cmd} FAILED: {exc}' 

72 

73 

74def find_dirs(dirname): 

75 if not os.path.isdir(dirname): 

76 return 

77 

78 de: os.DirEntry 

79 for de in os.scandir(dirname): 

80 if de.name.startswith('.'): 

81 continue 

82 if de.is_dir(): 

83 yield de.path 

84 

85 

86def find_files(dirname, pattern=None, deep=True): 

87 if not os.path.isdir(dirname): 

88 return 

89 

90 de: os.DirEntry 

91 for de in os.scandir(dirname): 

92 if de.name.startswith('.'): 

93 continue 

94 if de.is_dir() and deep: 

95 yield from find_files(de.path, pattern) 

96 continue 

97 if de.is_file() and (pattern is None or re.search(pattern, de.path)): 

98 yield de.path 

99 

100 

101def read_file(path): 

102 with open(path, 'rt', encoding='utf8') as fp: 

103 return fp.read().strip() 

104 

105 

106def write_file(path, text): 

107 with open(path, 'wt', encoding='utf8') as fp: 

108 fp.write(text) 

109 

110 

111def parse_args(argv): 

112 args = {} 

113 opt = None 

114 n = 0 

115 

116 for a in argv: 

117 if a == '-': 

118 args['_rest'] = [] 

119 elif '_rest' in args: 

120 args['_rest'].append(a) 

121 elif a.startswith('--'): 

122 opt = a[2:] 

123 args[opt] = True 

124 elif a.startswith('-'): 

125 opt = a[1:] 

126 args[opt] = True 

127 elif opt: 

128 args[opt] = a 

129 opt = None 

130 else: 

131 args[n] = a 

132 n += 1 

133 

134 return args 

135 

136 

137def main(name, main_fn, usage): 

138 global SCRIPT_NAME 

139 

140 SCRIPT_NAME = name 

141 

142 args = parse_args(sys.argv) 

143 if not args or 'h' in args or 'help' in args: 

144 print('\n' + usage.strip() + '\n') 

145 sys.exit(0) 

146 

147 sys.exit(main_fn(args)) 

148 

149 

150def text_table(data, header=None, delim=' | '): 

151 """Format a list of dicts as a text-mode table.""" 

152 

153 data = list(data) 

154 

155 if not data: 

156 return '' 

157 

158 is_dict = isinstance(data[0], dict) 

159 

160 print_header = header is not None 

161 if header is None or header == 'auto': 

162 header = data[0].keys() if is_dict else list(range(len(data[0]))) 

163 

164 widths = [len(h) if print_header else 1 for h in header] 

165 

166 def get(d, h): 

167 if is_dict: 

168 return d.get(h, '') 

169 try: 

170 return d[h] 

171 except IndexError: 

172 return '' 

173 

174 for d in data: 

175 widths = [ 

176 max(a, b) 

177 for a, b in zip( 

178 widths, 

179 [len(str(get(d, h))) for h in header] 

180 ) 

181 ] 

182 

183 def field(n, v): 

184 if isinstance(v, (int, float)): 

185 return str(v).rjust(widths[n]) 

186 return str(v).ljust(widths[n]) 

187 

188 rows = [] 

189 

190 if print_header: 

191 hdr = delim.join(field(n, h) for n, h in enumerate(header)) 

192 rows.append(hdr) 

193 rows.append('-' * len(hdr)) 

194 

195 for d in data: 

196 rows.append(delim.join(field(n, get(d, h)) for n, h in enumerate(header))) 

197 

198 return '\n'.join(rows) 

199 

200 

201class ProgressIndicator: 

202 def __init__(self, title, total=0, resolution=10): 

203 self.resolution = resolution 

204 self.title = title 

205 self.total = total 

206 self.progress = 0 

207 self.lastd = 0 

208 self.starttime = 0 

209 

210 def __enter__(self): 

211 self.log(f'START ({self.total})' if self.total else 'START') 

212 self.starttime = time.time() 

213 return self 

214 

215 def __exit__(self, exc_type, exc_val, exc_tb): 

216 if not exc_type: 

217 ts = time.time() - self.starttime 

218 self.log(f'END ({ts:.2f} sec)') 

219 

220 def update(self, add=1): 

221 if not self.total: 

222 return 

223 self.progress += add 

224 p = math.floor(self.progress * 100.0 / self.total) 

225 if p > 100: 

226 p = 100 

227 d = round(p / self.resolution) * self.resolution 

228 if d > self.lastd: 

229 self.log(f'{d}%') 

230 self.lastd = d 

231 

232 def log(self, s): 

233 info(f'{self.title}: {s}')