Coverage for gws-app/gws/lib/vendor/dog/util.py: 0%

148 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1import os 

2import random 

3import re 

4import subprocess 

5import sys 

6import time 

7 

8 

9class Data: 

10 def __init__(self, **kwargs): 

11 vars(self).update(kwargs) 

12 

13 def get(self, k, default=None): 

14 return vars(self).get(k, default) 

15 

16 def __getattr__(self, item): 

17 return None 

18 

19 

20def to_data(arg): 

21 if isinstance(arg, Data): 

22 return Data(**vars(arg)) 

23 return Data(**arg) 

24 

25 

26def parse_args(argv): 

27 args = {} 

28 opt = None 

29 n = 0 

30 

31 for a in argv: 

32 if a.startswith('--'): 

33 opt = a[2:] 

34 args[opt] = True 

35 elif a.startswith('-'): 

36 opt = a[1:] 

37 args[opt] = True 

38 elif opt: 

39 args[opt] = a 

40 opt = None 

41 else: 

42 args[n] = a 

43 n += 1 

44 

45 return args 

46 

47 

48def read_file(path: str) -> str: 

49 with open(path, 'rt', encoding='utf8') as fp: 

50 return fp.read() 

51 

52 

53def read_file_b(path: str) -> bytes: 

54 with open(path, 'rb') as fp: 

55 return fp.read() 

56 

57 

58def write_file(path: str, s: str): 

59 os.makedirs(os.path.dirname(path), exist_ok=True) 

60 with open(path, 'wt', encoding='utf8') as fp: 

61 fp.write(s) 

62 

63 

64def write_file_b(path: str, s: bytes): 

65 os.makedirs(os.path.dirname(path), exist_ok=True) 

66 with open(path, 'wb') as fp: 

67 fp.write(s) 

68 

69 

70def abspath(path, base): 

71 if os.path.isabs(path): 

72 return path 

73 path = os.path.join(os.path.dirname(base), path) 

74 return os.path.abspath(path) 

75 

76 

77def normpath(path): 

78 res = [] 

79 if path.startswith('/'): 

80 res.append('') 

81 

82 for p in path.split('/'): 

83 p = p.strip() 

84 if not p or p == '.': 

85 continue 

86 if p == '..': 

87 if not res: 

88 return '' 

89 res.pop() 

90 continue 

91 if p.startswith('.'): 

92 return '' 

93 res.append(p) 

94 

95 return '/'.join(res) 

96 

97 

98def random_string(length): 

99 a = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' 

100 r = random.SystemRandom() 

101 return ''.join(r.choice(a) for _ in range(length)) 

102 

103 

104_UID_DE_TRANS = { 

105 ord('ä'): 'ae', 

106 ord('ö'): 'oe', 

107 ord('ü'): 'ue', 

108 ord('ß'): 'ss', 

109} 

110 

111 

112def to_uid(x) -> str: 

113 """Convert a value to an uid (alphanumeric string).""" 

114 

115 if not x: 

116 return '' 

117 x = str(x).lower().strip().translate(_UID_DE_TRANS) 

118 x = re.sub(r'[^a-z0-9]+', '-', x) 

119 return x.strip('-') 

120 

121 

122def flatten(ls): 

123 res = [] 

124 for x in ls: 

125 if isinstance(x, list): 

126 res.extend(flatten(x)) 

127 else: 

128 res.append(x) 

129 return res 

130 

131 

132def run(cmd, pipe=False): 

133 """Run a command, return a tuple (ok, output)""" 

134 

135 log.debug('run ' + repr(' '.join(cmd))) 

136 

137 args = { 

138 'stdout': subprocess.PIPE if pipe else None, 

139 'stderr': subprocess.STDOUT if pipe else None, 

140 'stdin': None, 

141 'shell': False, 

142 } 

143 

144 try: 

145 p = subprocess.Popen(cmd, **args) 

146 out, _ = p.communicate(None) 

147 rc = p.returncode 

148 if isinstance(out, bytes): 

149 out = out.decode('utf8') 

150 except Exception as exc: 

151 log.error(f'run failed: {cmd!r} {exc!r}') 

152 return False, repr(exc) 

153 

154 if rc > 0: 

155 log.error(f'run failed: {cmd!r} {out!r}') 

156 return False, out 

157 

158 return True, out 

159 

160 

161color = { 

162 'none': '', 

163 'black': '\u001b[30m', 

164 'red': '\u001b[31m', 

165 'green': '\u001b[32m', 

166 'yellow': '\u001b[33m', 

167 'blue': '\u001b[34m', 

168 'magenta': '\u001b[35m', 

169 'cyan': '\u001b[36m', 

170 'white': '\u001b[37m', 

171 'reset': '\u001b[0m', 

172} 

173 

174log_colors = { 

175 'ERROR': 'red', 

176 'WARNING': 'yellow', 

177 'INFO': 'green', 

178 'DEBUG': 'cyan', 

179} 

180 

181 

182class _Logger: 

183 level = 'INFO' 

184 

185 def set_level(self, level): 

186 self.level = level 

187 

188 def log(self, level, *args): 

189 levels = 'ERROR', 'WARNING', 'INFO', 'DEBUG' 

190 if levels.index(level) <= levels.index(self.level): 

191 cin = cout = '' 

192 if sys.stdout.isatty(): 

193 cin = color[log_colors[level]] 

194 cout = color['reset'] 

195 a = ' '.join(str(a) for a in args) 

196 sys.stdout.write(f'{cin}[dog] {level}: {a}{cout}\n') 

197 sys.stdout.flush() 

198 

199 def error(self, *args): 

200 self.log('ERROR', *args) 

201 

202 def warning(self, *args): 

203 self.log('WARNING', *args) 

204 

205 def info(self, *args): 

206 self.log('INFO', *args) 

207 

208 def debug(self, *args): 

209 self.log('DEBUG', *args) 

210 

211 

212log = _Logger() 

213 

214 

215def bold(c): 

216 return c.replace('m', ';1m') 

217 

218 

219def cprint(clr, msg): 

220 print(color[clr] + msg + color['reset']) 

221 

222 

223_TIME_STACK = [] 

224 

225 

226def time_start(label): 

227 _TIME_STACK.append((time.time(), label)) 

228 

229 

230def time_end(): 

231 if _TIME_STACK: 

232 t2 = time.time() 

233 t1, label = _TIME_STACK.pop() 

234 log.debug(f'{label} in {t2 - t1:.2f}s') 

235 

236 

237def base36(n): 

238 abc = '0123456789abcdefghijklmnopqrstuvwxyz' 

239 enc = '' 

240 while n: 

241 n, i = divmod(n, 36) 

242 enc = abc[i] + enc 

243 return enc