Coverage for gws-app/gws/base/storage/core.py: 0%

70 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Storage object.""" 

2 

3from typing import Optional 

4 

5import gws 

6import gws.lib.jsonx 

7 

8 

9class Verb(gws.Enum): 

10 read = 'read' 

11 write = 'write' 

12 list = 'list' 

13 delete = 'delete' 

14 

15 

16class State(gws.Data): 

17 names: list[str] 

18 canRead: bool 

19 canWrite: bool 

20 canCreate: bool 

21 canDelete: bool 

22 

23 

24class Request(gws.Request): 

25 verb: Verb 

26 entryName: Optional[str] 

27 entryData: Optional[dict] 

28 

29 

30class Response(gws.Response): 

31 data: Optional[dict] 

32 state: State 

33 

34 

35class Config(gws.ConfigWithAccess): 

36 """Storage configuration""" 

37 

38 providerUid: Optional[str] 

39 """storage provider uid""" 

40 

41 categoryName: Optional[str] 

42 """category name""" 

43 

44 

45class Props(gws.Props): 

46 state: State 

47 

48 

49class Object(gws.Node): 

50 storageProvider: gws.StorageProvider 

51 categoryName: str 

52 

53 def configure(self): 

54 self.configure_provider() 

55 self.categoryName = self.cfg('categoryName') 

56 

57 def configure_provider(self): 

58 self.storageProvider = self.root.app.storageMgr.find_provider(self.cfg('providerUid')) 

59 if not self.storageProvider: 

60 raise gws.Error(f'storage provider not found') 

61 return True 

62 

63 def props(self, user): 

64 return gws.Props( 

65 state=self.get_state_for(user), 

66 ) 

67 

68 def get_state_for(self, user): 

69 return State( 

70 names=self.storageProvider.list_names(self.categoryName) if user.can_read(self) else [], 

71 canRead=user.can_read(self), 

72 canWrite=user.can_write(self), 

73 canDelete=user.can_delete(self), 

74 canCreate=user.can_create(self), 

75 ) 

76 

77 def handle_request(self, req: gws.WebRequester, p: Request) -> Response: 

78 state = self.get_state_for(req.user) 

79 data = None 

80 

81 if p.verb == Verb.list: 

82 if not state.canRead: 

83 raise gws.ForbiddenError() 

84 pass 

85 

86 if p.verb == Verb.read: 

87 if not state.canRead or not p.entryName: 

88 raise gws.ForbiddenError() 

89 rec = self.storageProvider.read(self.categoryName, p.entryName) 

90 if rec: 

91 data = gws.lib.jsonx.from_string(rec.data) 

92 

93 if p.verb == Verb.write: 

94 if not p.entryName or not p.entryData: 

95 raise gws.ForbiddenError() 

96 if p.entryName in state.names and not state.canWrite: 

97 raise gws.ForbiddenError() 

98 if p.entryName not in state.names and not state.canCreate: 

99 raise gws.ForbiddenError() 

100 

101 d = gws.lib.jsonx.to_string(p.entryData) 

102 self.storageProvider.write(self.categoryName, p.entryName, d, req.user.uid) 

103 

104 if p.verb == Verb.delete: 

105 if not state.canDelete or not p.entryName: 

106 raise gws.ForbiddenError() 

107 self.storageProvider.delete(self.categoryName, p.entryName) 

108 

109 return Response(data=data, state=self.get_state_for(req.user))