Coverage for gws-app/gws/lib/upload/__init__.py: 100%
0 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1# """Manage chunked uploads."""
2#
3# import os
4# import shutil
5#
6# import gws
7# import gws.lib.jsonx
8# import gws.lib.osx
9# import gws.base.web.error
10#
11#
12# class Error(gws.Error):
13# pass
14#
15#
16# class UploadRecord(gws.Data):
17# uid: str
18# name: str
19# path: str
20#
21#
22# class UploadChunkParams(gws.Request):
23# uid: str
24# name: str
25# totalSize: int
26# content: bytes
27# chunkNumber: int
28# chunkCount: int
29#
30#
31# class UploadChunkResponse(gws.Response):
32# uid: str
33#
34#
35# _UPLOAD_DIR = gws.c.TMP_DIR + '/uploads'
36#
37#
38# def upload_chunk(p: UploadChunkParams) -> UploadChunkResponse:
39# try:
40# uid = save_chunk(
41# uid=p.uid,
42# name=p.name,
43# content=p.content,
44# total_size=p.totalSize,
45# chunk_number=p.chunkNumber,
46# chunk_count=p.chunkCount
47# )
48# return UploadChunkResponse(uid=uid)
49# except Error as e:
50# gws.log.error(e)
51# raise gws.base.web.error.BadRequest()
52#
53#
54# def save_chunk(uid: str, name: str, content: bytes, total_size: int, chunk_number: int, chunk_count: int) -> str:
55# dir = gws.u.ensure_dir(_UPLOAD_DIR)
56#
57# if chunk_number == 1:
58# uid = gws.u.random_string(64)
59# status = gws.Data(
60# name=name,
61# total_size=total_size,
62# chunk_count=chunk_count,
63# )
64# gws.lib.jsonx.to_path(f'{dir}/{uid}.json', status)
65# elif not uid.isalnum():
66# raise Error(f'upload {uid!r}: invalid uid')
67# else:
68# try:
69# status = gws.Data(gws.lib.jsonx.from_path(f'{dir}/{uid}.json'))
70# except gws.lib.jsonx.Error:
71# status = None
72#
73# if not status:
74# raise Error(f'upload {uid!r}: invalid status')
75#
76# if chunk_number < 1 or chunk_number > status.chunk_count:
77# raise Error(f'upload {uid!r}: invalid chunk number')
78#
79# gws.u.write_file_b(f'{dir}/{uid}.{chunk_number}', content)
80# return uid
81#
82#
83# def get(uid: str) -> UploadRecord:
84# dir = gws.u.ensure_dir(_UPLOAD_DIR)
85#
86# try:
87# status = gws.Data(gws.lib.jsonx.from_path(f'{dir}/{uid}.json'))
88# except gws.lib.jsonx.Error:
89# status = None
90#
91# if not status:
92# raise Error(f'upload {uid!r}: not found')
93#
94# path = f'{dir}/{uid}.all'
95#
96# if os.path.isfile(path):
97# return UploadRecord(uid=uid, path=path, name=status.name)
98#
99# # @TODO this should use a system lock
100#
101# chunks = [f'{dir}/{uid}.{n}' for n in range(1, status.chunk_count + 1)]
102#
103# if not all(os.path.isfile(c) for c in chunks):
104# raise Error(f'upload {uid!r}: incomplete')
105#
106# tmp_path = path + '.' + gws.u.random_string(6)
107#
108# with open(tmp_path, 'wb') as fp_all:
109# for c in chunks:
110# try:
111# with open(c, 'rb') as fp:
112# shutil.copyfileobj(fp, fp_all)
113# except (OSError, IOError) as e:
114# raise Error(f'upload {uid!r}: read error') from e
115#
116# if gws.lib.osx.file_size(tmp_path) != status.total_size:
117# raise Error(f'upload {uid!r}: invalid file size')
118#
119# try:
120# os.rename(tmp_path, path)
121# except OSError:
122# raise Error(f'upload {uid!r}: move error') from e
123#
124# for c in chunks:
125# gws.lib.osx.unlink(c)
126#
127# return UploadRecord(uid=uid, path=path, name=status.name)
128#
129#
130# def delete(uid: str):
131# dir = gws.u.ensure_dir(_UPLOAD_DIR)
132#
133# for p in gws.lib.osx.find_files(dir):
134# if p.startswith(uid + '.'):
135# gws.lib.osx.unlink(p)