Coverage for gws-app/gws/lib/upload/__init__.py: 100%

0 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1# """Manage chunked uploads.""" 

2# 

3# import os 

4# import shutil 

5# 

6# import gws 

7# import gws.lib.jsonx 

8# import gws.lib.osx 

9# import gws.base.web.error 

10# 

11# 

12# class Error(gws.Error): 

13# pass 

14# 

15# 

16# class UploadRecord(gws.Data): 

17# uid: str 

18# name: str 

19# path: str 

20# 

21# 

22# class UploadChunkParams(gws.Request): 

23# uid: str 

24# name: str 

25# totalSize: int 

26# content: bytes 

27# chunkNumber: int 

28# chunkCount: int 

29# 

30# 

31# class UploadChunkResponse(gws.Response): 

32# uid: str 

33# 

34# 

35# _UPLOAD_DIR = gws.c.TMP_DIR + '/uploads' 

36# 

37# 

38# def upload_chunk(p: UploadChunkParams) -> UploadChunkResponse: 

39# try: 

40# uid = save_chunk( 

41# uid=p.uid, 

42# name=p.name, 

43# content=p.content, 

44# total_size=p.totalSize, 

45# chunk_number=p.chunkNumber, 

46# chunk_count=p.chunkCount 

47# ) 

48# return UploadChunkResponse(uid=uid) 

49# except Error as e: 

50# gws.log.error(e) 

51# raise gws.base.web.error.BadRequest() 

52# 

53# 

54# def save_chunk(uid: str, name: str, content: bytes, total_size: int, chunk_number: int, chunk_count: int) -> str: 

55# dir = gws.u.ensure_dir(_UPLOAD_DIR) 

56# 

57# if chunk_number == 1: 

58# uid = gws.u.random_string(64) 

59# status = gws.Data( 

60# name=name, 

61# total_size=total_size, 

62# chunk_count=chunk_count, 

63# ) 

64# gws.lib.jsonx.to_path(f'{dir}/{uid}.json', status) 

65# elif not uid.isalnum(): 

66# raise Error(f'upload {uid!r}: invalid uid') 

67# else: 

68# try: 

69# status = gws.Data(gws.lib.jsonx.from_path(f'{dir}/{uid}.json')) 

70# except gws.lib.jsonx.Error: 

71# status = None 

72# 

73# if not status: 

74# raise Error(f'upload {uid!r}: invalid status') 

75# 

76# if chunk_number < 1 or chunk_number > status.chunk_count: 

77# raise Error(f'upload {uid!r}: invalid chunk number') 

78# 

79# gws.u.write_file_b(f'{dir}/{uid}.{chunk_number}', content) 

80# return uid 

81# 

82# 

83# def get(uid: str) -> UploadRecord: 

84# dir = gws.u.ensure_dir(_UPLOAD_DIR) 

85# 

86# try: 

87# status = gws.Data(gws.lib.jsonx.from_path(f'{dir}/{uid}.json')) 

88# except gws.lib.jsonx.Error: 

89# status = None 

90# 

91# if not status: 

92# raise Error(f'upload {uid!r}: not found') 

93# 

94# path = f'{dir}/{uid}.all' 

95# 

96# if os.path.isfile(path): 

97# return UploadRecord(uid=uid, path=path, name=status.name) 

98# 

99# # @TODO this should use a system lock 

100# 

101# chunks = [f'{dir}/{uid}.{n}' for n in range(1, status.chunk_count + 1)] 

102# 

103# if not all(os.path.isfile(c) for c in chunks): 

104# raise Error(f'upload {uid!r}: incomplete') 

105# 

106# tmp_path = path + '.' + gws.u.random_string(6) 

107# 

108# with open(tmp_path, 'wb') as fp_all: 

109# for c in chunks: 

110# try: 

111# with open(c, 'rb') as fp: 

112# shutil.copyfileobj(fp, fp_all) 

113# except (OSError, IOError) as e: 

114# raise Error(f'upload {uid!r}: read error') from e 

115# 

116# if gws.lib.osx.file_size(tmp_path) != status.total_size: 

117# raise Error(f'upload {uid!r}: invalid file size') 

118# 

119# try: 

120# os.rename(tmp_path, path) 

121# except OSError: 

122# raise Error(f'upload {uid!r}: move error') from e 

123# 

124# for c in chunks: 

125# gws.lib.osx.unlink(c) 

126# 

127# return UploadRecord(uid=uid, path=path, name=status.name) 

128# 

129# 

130# def delete(uid: str): 

131# dir = gws.u.ensure_dir(_UPLOAD_DIR) 

132# 

133# for p in gws.lib.osx.find_files(dir): 

134# if p.startswith(uid + '.'): 

135# gws.lib.osx.unlink(p)