Coverage for gws-app/gws/lib/osx/__init__.py: 24%

206 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Utilities for os/shell scripting""" 

2 

3from typing import Optional 

4 

5import hashlib 

6import os 

7import re 

8import signal 

9import subprocess 

10import shutil 

11import shlex 

12import time 

13 

14import psutil 

15 

16import gws 

17 

18 

19class Error(gws.Error): 

20 pass 

21 

22 

23class TimeoutError(Error): 

24 pass 

25 

26 

27_Path = str | bytes 

28 

29 

30def getenv(key: str, default: str = None) -> Optional[str]: 

31 """Returns the value for a given environment-variable. 

32 

33 Args: 

34 key: An environment-variable. 

35 default: The default return. 

36 

37 Returns: 

38 ``default`` if no key has been found, if there is such key then the value for the environment-variable is returned. 

39 """ 

40 return os.getenv(key, default) 

41 

42 

43def run_nowait(cmd: str | list, **kwargs) -> subprocess.Popen: 

44 """Run a process and return immediately. 

45 

46 Args: 

47 cmd: A process to run. 

48 kwargs: 

49 

50 Returns: 

51 The output of the command. 

52 """ 

53 

54 args = { 

55 'stdin': None, 

56 'stdout': None, 

57 'stderr': None, 

58 'shell': False, 

59 } 

60 args.update(kwargs) 

61 

62 return subprocess.Popen(cmd, **args) 

63 

64 

65def run(cmd: str | list, input: str = None, echo: bool = False, strict: bool = True, timeout: float = None, **kwargs) -> str: 

66 """Run an external command. 

67 

68 Args: 

69 cmd: Command to run. 

70 input: Input data. 

71 echo: Echo the output instead of capturing it. 

72 strict: Raise an error on a non-zero exit code. 

73 timeout: Timeout. 

74 kwargs: Arguments to pass to ``subprocess.Popen``. 

75 

76 Returns: 

77 The command output. 

78 """ 

79 

80 args = { 

81 'stdin': subprocess.PIPE if input else None, 

82 'stdout': None if echo else subprocess.PIPE, 

83 'stderr': subprocess.STDOUT, 

84 'shell': False, 

85 } 

86 args.update(kwargs) 

87 

88 if isinstance(cmd, str): 

89 cmd = shlex.split(cmd) 

90 

91 gws.log.debug(f'RUN: {cmd=}') 

92 

93 try: 

94 p = subprocess.Popen(cmd, **args) 

95 out, _ = p.communicate(input, timeout) 

96 rc = p.returncode 

97 except subprocess.TimeoutExpired as exc: 

98 raise TimeoutError(f'run: command timed out', repr(cmd)) from exc 

99 except Exception as exc: 

100 raise Error(f'run: failure', repr(cmd)) from exc 

101 

102 if rc: 

103 gws.log.debug(f'RUN_FAILED: {cmd=} {rc=} {out=}') 

104 

105 if rc and strict: 

106 raise Error(f'run: non-zero exit', repr(cmd)) 

107 

108 return _to_str(out or '') 

109 

110 

111def unlink(path: _Path) -> bool: 

112 """Deletes a given path. 

113 

114 Args: 

115 path: Filepath. 

116 """ 

117 try: 

118 if os.path.isfile(path): 

119 os.unlink(path) 

120 return True 

121 except OSError as exc: 

122 gws.log.debug(f'OSError: unlink: {exc}') 

123 return False 

124 

125 

126def rename(src: _Path, dst: _Path) -> bool: 

127 """Moves and renames the source path according to the given destination. 

128 

129 Args: 

130 src: Path to source. 

131 dst: Destination. 

132 """ 

133 

134 try: 

135 os.replace(src, dst) 

136 return True 

137 except OSError as exc: 

138 gws.log.warning(f'OSError: rename: {exc}') 

139 return False 

140 

141 

142def chown(path: _Path, user: int = None, group: int = None) -> bool: 

143 """Changes the UID or GID for a given path. 

144 

145 Args: 

146 path: Filepath. 

147 user: UID. 

148 group: GID. 

149 """ 

150 try: 

151 os.chown(path, user or gws.c.UID, group or gws.c.GID) 

152 return True 

153 except OSError as exc: 

154 gws.log.warning(f'OSError: chown: {exc}') 

155 return False 

156 

157 

158def mkdir(path: _Path, mode: int = 0o755, user: int = None, group: int = None) -> bool: 

159 """Check a (possibly nested) directory. 

160 

161 Args: 

162 path: Path to a directory. 

163 mode: Directory creation mode. 

164 user: Directory user (defaults to gws.c.UID) 

165 group: Directory group (defaults to gws.c.GID) 

166 """ 

167 

168 try: 

169 os.makedirs(path, mode, exist_ok=True) 

170 return chown(path, user, group) 

171 except OSError as exc: 

172 gws.log.warning(f'OSError: mkdir: {exc}') 

173 return False 

174 

175 

176def rmdir(path: _Path) -> bool: 

177 """Remove a directory or a directory tree. 

178 

179 Args: 

180 path: Path to a directory. Can be non-empty 

181 """ 

182 

183 try: 

184 shutil.rmtree(path) 

185 return True 

186 except OSError as exc: 

187 gws.log.warning(f'OSError: rmdir: {exc}') 

188 return False 

189 

190 

191def file_mtime(path: _Path) -> float: 

192 """Returns the time from epoch when the path was recently changed. 

193 

194 Args: 

195 path: File-/directory-path. 

196 

197 Returns: 

198 Time since epoch in seconds until most recent change in file. 

199 """ 

200 try: 

201 return os.stat(path).st_mtime 

202 except OSError as exc: 

203 gws.log.debug(f'OSError: file_mtime: {exc}') 

204 return -1 

205 

206 

207def file_age(path: _Path) -> int: 

208 """Returns the amount of seconds since the path has been changed. 

209 

210 Args: 

211 path: Filepath. 

212 

213 Returns: 

214 Amount of seconds since most recent change in file, if the path is invalid ``-1`` is returned. 

215 """ 

216 try: 

217 return int(time.time() - os.stat(path).st_mtime) 

218 except OSError as exc: 

219 gws.log.debug(f'OSError: file_age: {exc}') 

220 return -1 

221 

222 

223def file_size(path: _Path) -> int: 

224 """Returns the file size. 

225 

226 Args: 

227 path: Filepath. 

228 

229 Returns: 

230 Amount of characters in the file or ``-1`` if the path is invalid. 

231 """ 

232 try: 

233 return os.stat(path).st_size 

234 except OSError as exc: 

235 gws.log.debug(f'OSError: file_size: {exc}') 

236 return -1 

237 

238 

239def file_checksum(path: _Path) -> str: 

240 """Returns the checksum of the file. 

241 

242 Args: 

243 path: Filepath. 

244 

245 Returns: 

246 Empty string if the path is invalid, otherwise the file's checksum. 

247 """ 

248 try: 

249 with open(path, 'rb') as fp: 

250 return hashlib.sha256(fp.read()).hexdigest() 

251 except OSError as exc: 

252 gws.log.debug(f'OSError: file_checksum: {exc}') 

253 return '' 

254 

255 

256def kill_pid(pid: int, sig_name='TERM') -> bool: 

257 """Kills a process. 

258 

259 Args: 

260 pid: Process ID. 

261 sig_name: 

262 

263 Returns: 

264 ``True`` if the process with the given PID is killed or does not exist.``False `` if the process could not be killed. 

265 """ 

266 sig = getattr(signal, sig_name, None) or getattr(signal, 'SIG' + sig_name) 

267 try: 

268 psutil.Process(pid).send_signal(sig) 

269 return True 

270 except psutil.NoSuchProcess: 

271 return True 

272 except psutil.Error as e: 

273 gws.log.warning(f'send_signal failed, pid={pid!r}, {e}') 

274 return False 

275 

276 

277def running_pids() -> dict[int, str]: 

278 """Returns the current pids and the corresponding process' name.""" 

279 d = {} 

280 for p in psutil.process_iter(): 

281 d[p.pid] = p.name() 

282 return d 

283 

284 

285def process_rss_size(unit: str = 'm') -> float: 

286 """Returns the Resident Set Size. 

287 

288 Args: 

289 unit: ``m`` | ``k`` | ``g`` 

290 

291 Returns: 

292 The Resident Set Size with the given unit. 

293 """ 

294 n = psutil.Process().memory_info().rss 

295 if unit == 'k': 

296 return n / 1e3 

297 if unit == 'm': 

298 return n / 1e6 

299 if unit == 'g': 

300 return n / 1e9 

301 return n 

302 

303 

304def find_files(dirname: _Path, pattern=None, ext=None, deep: bool = True): 

305 """Finds files in a given directory. 

306 

307 Args: 

308 dirname: Path to directory. 

309 pattern: Pattern to match. 

310 ext: extension to match. 

311 deep: If true then searches through all subdirectories for files, 

312 otherwise it returns the files only in the given directory. 

313 

314 Returns: 

315 A generator object. 

316 """ 

317 if not pattern and ext: 

318 if isinstance(ext, (list, tuple)): 

319 ext = '|'.join(ext) 

320 pattern = '\\.(' + ext + ')$' 

321 

322 de: os.DirEntry 

323 for de in os.scandir(dirname): 

324 if de.name.startswith('.'): 

325 continue 

326 

327 if de.is_dir() and deep: 

328 yield from find_files(de.path, pattern) 

329 continue 

330 

331 if de.is_file() and (pattern is None or re.search(pattern, de.path)): 

332 yield de.path 

333 

334 

335def find_directories(dirname: _Path, pattern=None, deep: bool = True): 

336 """Finds all directories in a given directory. 

337 

338 Args: 

339 dirname: Path to directory. 

340 pattern: Pattern to match. 

341 deep: If true then searches through all subdirectories for directories, 

342 otherwise it returns the directories only in the given directory. 

343 

344 Returns: 

345 A generator object. 

346 """ 

347 de: os.DirEntry 

348 for de in os.scandir(dirname): 

349 if de.name.startswith('.'): 

350 continue 

351 

352 if not de.is_dir(): 

353 continue 

354 

355 if pattern is None or re.search(pattern, de.path): 

356 yield de.path 

357 

358 if deep: 

359 yield from find_directories(de.path, pattern) 

360 

361 

362def parse_path(path: _Path) -> dict[str, str]: 

363 """Parse a path into a dict(path,dirname,filename,name,extension). 

364 

365 Args: 

366 path: Path. 

367 

368 Returns: 

369 A dict(path,dirname,filename,name,extension). 

370 """ 

371 

372 str_path = _to_str(path) 

373 sp = os.path.split(str_path) 

374 

375 d = { 

376 'dirname': sp[0], 

377 'filename': sp[1], 

378 'name': '', 

379 'extension': '', 

380 } 

381 

382 if d['filename'].startswith('.'): 

383 d['name'] = d['filename'] 

384 else: 

385 par = d['filename'].partition('.') 

386 d['name'] = par[0] 

387 d['extension'] = par[2] 

388 

389 return d 

390 

391 

392def file_name(path: _Path) -> str: 

393 """Returns the filename. 

394 

395 Args: 

396 path: Filepath. 

397 

398 Returns: 

399 The filename. 

400 """ 

401 

402 sp = os.path.split(_to_str(path)) 

403 return sp[1] 

404 

405 

406def is_abs_path(path: _Path) -> bool: 

407 return os.path.isabs(path) 

408 

409 

410def abs_path(path: _Path, base: _Path) -> str: 

411 """Absolutize a relative path with respect to a base directory or file path. 

412 

413 Args: 

414 path: A path. 

415 base: A path to the base. 

416 

417 Raises: 

418 ``ValueError``: If base is empty 

419 

420 Returns: 

421 The absolute path. 

422 """ 

423 

424 str_path = _to_str(path) 

425 

426 if os.path.isabs(str_path): 

427 return str_path 

428 

429 if not base: 

430 raise ValueError('cannot compute abspath without a base') 

431 

432 if os.path.isfile(base): 

433 base = os.path.dirname(base) 

434 

435 return os.path.abspath(os.path.join(_to_str(base), str_path)) 

436 

437 

438def abs_web_path(path: str, basedir: str) -> Optional[str]: 

439 """Return an absolute path in a base dir and ensure the path is correct. 

440 

441 Args: 

442 path: Path to absolutize. 

443 basedir: Path to base directory. 

444 

445 Returns: 

446 Absolute path with respect to base directory. 

447 """ 

448 

449 _dir_re = r'^[A-Za-z0-9_-]+$' 

450 _fil_re = r'^[A-Za-z0-9_-]+(\.[a-z0-9]+)*$' 

451 

452 gws.log.debug(f'abs_web_path: trying {path!r} in {basedir!r}') 

453 

454 dirs = [] 

455 for s in path.split('/'): 

456 s = s.strip() 

457 if s: 

458 dirs.append(s) 

459 

460 fname = dirs.pop() 

461 

462 if not all(re.match(_dir_re, p) for p in dirs): 

463 gws.log.warning(f'abs_web_path: invalid dirname in path={path!r}') 

464 return 

465 

466 if not re.match(_fil_re, fname): 

467 gws.log.warning(f'abs_web_path: invalid filename in path={path!r}') 

468 return 

469 

470 p = basedir 

471 if dirs: 

472 p += '/' + '/'.join(dirs) 

473 p += '/' + fname 

474 

475 if not os.path.isfile(p): 

476 gws.log.warning(f'abs_web_path: not a file path={path!r}') 

477 return 

478 

479 return p 

480 

481 

482def rel_path(path: _Path, base: _Path) -> str: 

483 """Relativize an absolute path with respect to a base directory or file path. 

484 

485 Args: 

486 path: Path to relativize. 

487 base: Path to base directory. 

488 

489 Returns: 

490 Relativized path with respect to base directory. 

491 

492 """ 

493 

494 if os.path.isfile(base): 

495 base = os.path.dirname(base) 

496 

497 str_path = path if isinstance(path, str) else path.decode('utf8') 

498 

499 return os.path.relpath(_to_str(path), _to_str(base)) 

500 

501 

502def _to_str(p: _Path) -> str: 

503 return p if isinstance(p, str) else p.decode('utf8') 

504 

505 

506def _to_bytes(p: _Path) -> bytes: 

507 return p if isinstance(p, bytes) else p.encode('utf8')