Coverage for gws-app/gws/lib/zipx/__init__.py: 0%
89 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Zipfile wrappers."""
3import io
4import os
5import shutil
6import zipfile
8import gws
11class Error(gws.Error):
12 pass
15def zip_to_path(path: str, *sources, flat: bool = False) -> int:
16 """Create a zip archive in a file.
18 Args:
19 path: Path to the zip archive.
20 sources: Paths or dicts to zip.
21 flat: If ``True`` base names are being kept in zip archive,
22 else whole paths are being kept in zip archive. Default is ``False``
24 Returns:
25 The amount of files in the zip archive.
26 """
28 return _zip(path, sources, flat)
31def zip_to_bytes(*sources, flat: bool = False) -> bytes:
32 """Create a zip archive in memory.
34 Args:
35 sources: Paths or dicts to zip.
36 flat: If ``True`` only base names will be returned,
37 else the whole paths will be returned. Default is ``False``.
39 Returns:
40 The names of the file paths encoded in bytes.
41 """
43 with io.BytesIO() as fp:
44 cnt = _zip(fp, sources, flat)
45 return fp.getvalue() if cnt else b''
48def unzip_path(path: str, target_dir: str, flat: bool = False) -> int:
49 """Unpack a zip archive into a directory.
51 Args:
52 path: Path to the zip archive.
53 target_dir: Path to the target directory.
54 flat: If ``True`` omit path and consider only base name of files in the zip archive,
55 else complete paths are considered of files in the zip archive. Default is ``False``.
57 Returns:
58 The amount of unzipped files.
59 """
61 return _unzip(path, target_dir, None, flat)
64def unzip_bytes(source: bytes, target_dir: str, flat: bool = False) -> int:
65 """Unpack a zip archive in memory into a directory.
67 Args:
68 source: Path to the zip archive.
69 target_dir: Path to the target directory.
70 flat: If ``True`` omit path and consider only base name of files in the zip archive,
71 else complete paths are considered of files in the zip archive. Default is ``False``.
73 Returns:
74 The amount of unzipped files.
75 """
77 with io.BytesIO(source) as fp:
78 return _unzip(fp, target_dir, None, flat)
81def unzip_path_to_dict(path: str, flat: bool = False) -> dict:
82 """Unpack a zip archive into a dict.
84 Args:
85 path: Path to the zip archive.
86 flat: If ``True`` then the dictionary contains the base names of the unzipped files,
87 else it contains the whole path. Default is ``False``.
89 Returns:
90 A dictionary containing all the file paths or base names.
91 """
93 dct = {}
94 _unzip(path, None, dct, flat)
95 return dct
98def unzip_bytes_to_dict(source: bytes, flat: bool = False) -> dict:
99 """Unpack a zip archive in memory into a dict.
101 Args:
102 source: Path to zip archive.
103 flat: If ``True`` then the dictionary contains the base names of the unzipped files,
104 else it contains the whole path. Default is ``False``.
106 Returns:
107 A dictionary containing all the file paths or base names.
108 """
110 with io.BytesIO(source) as fp:
111 dct = {}
112 _unzip(fp, None, dct, flat)
113 return dct
116##
118def _zip(target, sources, flat):
119 files = []
120 dct = {}
122 for src in sources:
124 if isinstance(src, dict):
125 for name, content in src.items():
126 dct[os.path.basename(name) if flat else name] = content
127 continue
129 if os.path.isdir(src):
130 for p in _scan_dir(src):
131 if flat:
132 files.append([p, os.path.basename(p)])
133 else:
134 files.append([p, os.path.relpath(p, src)])
135 continue
137 if os.path.isfile(src):
138 if flat:
139 files.append([src, os.path.basename(src)])
140 else:
141 files.append([src, src])
142 continue
144 raise Error(f'zip: invalid argument: {src!r}')
146 if not files and not dct:
147 return 0
149 cnt = 0
151 with zipfile.ZipFile(target, 'w', compression=zipfile.ZIP_DEFLATED) as zf:
152 for filename, arcname in files:
153 zf.write(filename, arcname)
154 cnt += 1
155 for name, content in dct.items():
156 zf.writestr(name, content)
157 cnt += 1
159 return cnt
162def _unzip(source, target_dir, target_dict, flat):
163 cnt = 0
165 with zipfile.ZipFile(source, 'r') as zf:
166 for zi in zf.infolist():
167 if zi.is_dir():
168 continue
170 path = zi.filename
171 base = os.path.basename(path)
173 if path.startswith(('/', '.')) or '..' in path or not base:
174 gws.log.warning(f'unzip: invalid file name: {path!r}')
175 continue
177 if target_dict is not None:
178 with zf.open(path) as src:
179 target_dict[base if flat else path] = src.read()
180 continue
182 if flat:
183 dst = os.path.join(target_dir, base)
184 else:
185 dst = os.path.join(target_dir, *path.split('/'))
186 os.makedirs(os.path.dirname(dst), exist_ok=True)
188 with zf.open(path) as src, open(dst, 'wb') as fp:
189 shutil.copyfileobj(src, fp)
190 cnt += 1
192 return cnt
195def _scan_dir(source_dir):
196 paths = []
198 for de in os.scandir(source_dir):
199 if de.is_file():
200 paths.append(de.path)
201 elif de.is_dir():
202 paths.extend(_scan_dir(de.path))
204 return paths