Coverage for gws-app/gws/lib/osx/__init__.py: 24%
206 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Utilities for os/shell scripting"""
3from typing import Optional
5import hashlib
6import os
7import re
8import signal
9import subprocess
10import shutil
11import shlex
12import time
14import psutil
16import gws
19class Error(gws.Error):
20 pass
23class TimeoutError(Error):
24 pass
27_Path = str | bytes
30def getenv(key: str, default: str = None) -> Optional[str]:
31 """Returns the value for a given environment-variable.
33 Args:
34 key: An environment-variable.
35 default: The default return.
37 Returns:
38 ``default`` if no key has been found, if there is such key then the value for the environment-variable is returned.
39 """
40 return os.getenv(key, default)
43def run_nowait(cmd: str | list, **kwargs) -> subprocess.Popen:
44 """Run a process and return immediately.
46 Args:
47 cmd: A process to run.
48 kwargs:
50 Returns:
51 The output of the command.
52 """
54 args = {
55 'stdin': None,
56 'stdout': None,
57 'stderr': None,
58 'shell': False,
59 }
60 args.update(kwargs)
62 return subprocess.Popen(cmd, **args)
65def run(cmd: str | list, input: str = None, echo: bool = False, strict: bool = True, timeout: float = None, **kwargs) -> str:
66 """Run an external command.
68 Args:
69 cmd: Command to run.
70 input: Input data.
71 echo: Echo the output instead of capturing it.
72 strict: Raise an error on a non-zero exit code.
73 timeout: Timeout.
74 kwargs: Arguments to pass to ``subprocess.Popen``.
76 Returns:
77 The command output.
78 """
80 args = {
81 'stdin': subprocess.PIPE if input else None,
82 'stdout': None if echo else subprocess.PIPE,
83 'stderr': subprocess.STDOUT,
84 'shell': False,
85 }
86 args.update(kwargs)
88 if isinstance(cmd, str):
89 cmd = shlex.split(cmd)
91 gws.log.debug(f'RUN: {cmd=}')
93 try:
94 p = subprocess.Popen(cmd, **args)
95 out, _ = p.communicate(input, timeout)
96 rc = p.returncode
97 except subprocess.TimeoutExpired as exc:
98 raise TimeoutError(f'run: command timed out', repr(cmd)) from exc
99 except Exception as exc:
100 raise Error(f'run: failure', repr(cmd)) from exc
102 if rc:
103 gws.log.debug(f'RUN_FAILED: {cmd=} {rc=} {out=}')
105 if rc and strict:
106 raise Error(f'run: non-zero exit', repr(cmd))
108 return _to_str(out or '')
111def unlink(path: _Path) -> bool:
112 """Deletes a given path.
114 Args:
115 path: Filepath.
116 """
117 try:
118 if os.path.isfile(path):
119 os.unlink(path)
120 return True
121 except OSError as exc:
122 gws.log.debug(f'OSError: unlink: {exc}')
123 return False
126def rename(src: _Path, dst: _Path) -> bool:
127 """Moves and renames the source path according to the given destination.
129 Args:
130 src: Path to source.
131 dst: Destination.
132 """
134 try:
135 os.replace(src, dst)
136 return True
137 except OSError as exc:
138 gws.log.warning(f'OSError: rename: {exc}')
139 return False
142def chown(path: _Path, user: int = None, group: int = None) -> bool:
143 """Changes the UID or GID for a given path.
145 Args:
146 path: Filepath.
147 user: UID.
148 group: GID.
149 """
150 try:
151 os.chown(path, user or gws.c.UID, group or gws.c.GID)
152 return True
153 except OSError as exc:
154 gws.log.warning(f'OSError: chown: {exc}')
155 return False
158def mkdir(path: _Path, mode: int = 0o755, user: int = None, group: int = None) -> bool:
159 """Check a (possibly nested) directory.
161 Args:
162 path: Path to a directory.
163 mode: Directory creation mode.
164 user: Directory user (defaults to gws.c.UID)
165 group: Directory group (defaults to gws.c.GID)
166 """
168 try:
169 os.makedirs(path, mode, exist_ok=True)
170 return chown(path, user, group)
171 except OSError as exc:
172 gws.log.warning(f'OSError: mkdir: {exc}')
173 return False
176def rmdir(path: _Path) -> bool:
177 """Remove a directory or a directory tree.
179 Args:
180 path: Path to a directory. Can be non-empty
181 """
183 try:
184 shutil.rmtree(path)
185 return True
186 except OSError as exc:
187 gws.log.warning(f'OSError: rmdir: {exc}')
188 return False
191def file_mtime(path: _Path) -> float:
192 """Returns the time from epoch when the path was recently changed.
194 Args:
195 path: File-/directory-path.
197 Returns:
198 Time since epoch in seconds until most recent change in file.
199 """
200 try:
201 return os.stat(path).st_mtime
202 except OSError as exc:
203 gws.log.debug(f'OSError: file_mtime: {exc}')
204 return -1
207def file_age(path: _Path) -> int:
208 """Returns the amount of seconds since the path has been changed.
210 Args:
211 path: Filepath.
213 Returns:
214 Amount of seconds since most recent change in file, if the path is invalid ``-1`` is returned.
215 """
216 try:
217 return int(time.time() - os.stat(path).st_mtime)
218 except OSError as exc:
219 gws.log.debug(f'OSError: file_age: {exc}')
220 return -1
223def file_size(path: _Path) -> int:
224 """Returns the file size.
226 Args:
227 path: Filepath.
229 Returns:
230 Amount of characters in the file or ``-1`` if the path is invalid.
231 """
232 try:
233 return os.stat(path).st_size
234 except OSError as exc:
235 gws.log.debug(f'OSError: file_size: {exc}')
236 return -1
239def file_checksum(path: _Path) -> str:
240 """Returns the checksum of the file.
242 Args:
243 path: Filepath.
245 Returns:
246 Empty string if the path is invalid, otherwise the file's checksum.
247 """
248 try:
249 with open(path, 'rb') as fp:
250 return hashlib.sha256(fp.read()).hexdigest()
251 except OSError as exc:
252 gws.log.debug(f'OSError: file_checksum: {exc}')
253 return ''
256def kill_pid(pid: int, sig_name='TERM') -> bool:
257 """Kills a process.
259 Args:
260 pid: Process ID.
261 sig_name:
263 Returns:
264 ``True`` if the process with the given PID is killed or does not exist.``False `` if the process could not be killed.
265 """
266 sig = getattr(signal, sig_name, None) or getattr(signal, 'SIG' + sig_name)
267 try:
268 psutil.Process(pid).send_signal(sig)
269 return True
270 except psutil.NoSuchProcess:
271 return True
272 except psutil.Error as e:
273 gws.log.warning(f'send_signal failed, pid={pid!r}, {e}')
274 return False
277def running_pids() -> dict[int, str]:
278 """Returns the current pids and the corresponding process' name."""
279 d = {}
280 for p in psutil.process_iter():
281 d[p.pid] = p.name()
282 return d
285def process_rss_size(unit: str = 'm') -> float:
286 """Returns the Resident Set Size.
288 Args:
289 unit: ``m`` | ``k`` | ``g``
291 Returns:
292 The Resident Set Size with the given unit.
293 """
294 n = psutil.Process().memory_info().rss
295 if unit == 'k':
296 return n / 1e3
297 if unit == 'm':
298 return n / 1e6
299 if unit == 'g':
300 return n / 1e9
301 return n
304def find_files(dirname: _Path, pattern=None, ext=None, deep: bool = True):
305 """Finds files in a given directory.
307 Args:
308 dirname: Path to directory.
309 pattern: Pattern to match.
310 ext: extension to match.
311 deep: If true then searches through all subdirectories for files,
312 otherwise it returns the files only in the given directory.
314 Returns:
315 A generator object.
316 """
317 if not pattern and ext:
318 if isinstance(ext, (list, tuple)):
319 ext = '|'.join(ext)
320 pattern = '\\.(' + ext + ')$'
322 de: os.DirEntry
323 for de in os.scandir(dirname):
324 if de.name.startswith('.'):
325 continue
327 if de.is_dir() and deep:
328 yield from find_files(de.path, pattern)
329 continue
331 if de.is_file() and (pattern is None or re.search(pattern, de.path)):
332 yield de.path
335def find_directories(dirname: _Path, pattern=None, deep: bool = True):
336 """Finds all directories in a given directory.
338 Args:
339 dirname: Path to directory.
340 pattern: Pattern to match.
341 deep: If true then searches through all subdirectories for directories,
342 otherwise it returns the directories only in the given directory.
344 Returns:
345 A generator object.
346 """
347 de: os.DirEntry
348 for de in os.scandir(dirname):
349 if de.name.startswith('.'):
350 continue
352 if not de.is_dir():
353 continue
355 if pattern is None or re.search(pattern, de.path):
356 yield de.path
358 if deep:
359 yield from find_directories(de.path, pattern)
362def parse_path(path: _Path) -> dict[str, str]:
363 """Parse a path into a dict(path,dirname,filename,name,extension).
365 Args:
366 path: Path.
368 Returns:
369 A dict(path,dirname,filename,name,extension).
370 """
372 str_path = _to_str(path)
373 sp = os.path.split(str_path)
375 d = {
376 'dirname': sp[0],
377 'filename': sp[1],
378 'name': '',
379 'extension': '',
380 }
382 if d['filename'].startswith('.'):
383 d['name'] = d['filename']
384 else:
385 par = d['filename'].partition('.')
386 d['name'] = par[0]
387 d['extension'] = par[2]
389 return d
392def file_name(path: _Path) -> str:
393 """Returns the filename.
395 Args:
396 path: Filepath.
398 Returns:
399 The filename.
400 """
402 sp = os.path.split(_to_str(path))
403 return sp[1]
406def is_abs_path(path: _Path) -> bool:
407 return os.path.isabs(path)
410def abs_path(path: _Path, base: _Path) -> str:
411 """Absolutize a relative path with respect to a base directory or file path.
413 Args:
414 path: A path.
415 base: A path to the base.
417 Raises:
418 ``ValueError``: If base is empty
420 Returns:
421 The absolute path.
422 """
424 str_path = _to_str(path)
426 if os.path.isabs(str_path):
427 return str_path
429 if not base:
430 raise ValueError('cannot compute abspath without a base')
432 if os.path.isfile(base):
433 base = os.path.dirname(base)
435 return os.path.abspath(os.path.join(_to_str(base), str_path))
438def abs_web_path(path: str, basedir: str) -> Optional[str]:
439 """Return an absolute path in a base dir and ensure the path is correct.
441 Args:
442 path: Path to absolutize.
443 basedir: Path to base directory.
445 Returns:
446 Absolute path with respect to base directory.
447 """
449 _dir_re = r'^[A-Za-z0-9_-]+$'
450 _fil_re = r'^[A-Za-z0-9_-]+(\.[a-z0-9]+)*$'
452 gws.log.debug(f'abs_web_path: trying {path!r} in {basedir!r}')
454 dirs = []
455 for s in path.split('/'):
456 s = s.strip()
457 if s:
458 dirs.append(s)
460 fname = dirs.pop()
462 if not all(re.match(_dir_re, p) for p in dirs):
463 gws.log.warning(f'abs_web_path: invalid dirname in path={path!r}')
464 return
466 if not re.match(_fil_re, fname):
467 gws.log.warning(f'abs_web_path: invalid filename in path={path!r}')
468 return
470 p = basedir
471 if dirs:
472 p += '/' + '/'.join(dirs)
473 p += '/' + fname
475 if not os.path.isfile(p):
476 gws.log.warning(f'abs_web_path: not a file path={path!r}')
477 return
479 return p
482def rel_path(path: _Path, base: _Path) -> str:
483 """Relativize an absolute path with respect to a base directory or file path.
485 Args:
486 path: Path to relativize.
487 base: Path to base directory.
489 Returns:
490 Relativized path with respect to base directory.
492 """
494 if os.path.isfile(base):
495 base = os.path.dirname(base)
497 str_path = path if isinstance(path, str) else path.decode('utf8')
499 return os.path.relpath(_to_str(path), _to_str(base))
502def _to_str(p: _Path) -> str:
503 return p if isinstance(p, str) else p.decode('utf8')
506def _to_bytes(p: _Path) -> bytes:
507 return p if isinstance(p, bytes) else p.encode('utf8')