Coverage for gws-app/gws/base/storage/core.py: 0%
70 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Storage object."""
3from typing import Optional
5import gws
6import gws.lib.jsonx
9class Verb(gws.Enum):
10 read = 'read'
11 write = 'write'
12 list = 'list'
13 delete = 'delete'
16class State(gws.Data):
17 names: list[str]
18 canRead: bool
19 canWrite: bool
20 canCreate: bool
21 canDelete: bool
24class Request(gws.Request):
25 verb: Verb
26 entryName: Optional[str]
27 entryData: Optional[dict]
30class Response(gws.Response):
31 data: Optional[dict]
32 state: State
35class Config(gws.ConfigWithAccess):
36 """Storage configuration"""
38 providerUid: Optional[str]
39 """storage provider uid"""
41 categoryName: Optional[str]
42 """category name"""
45class Props(gws.Props):
46 state: State
49class Object(gws.Node):
50 storageProvider: gws.StorageProvider
51 categoryName: str
53 def configure(self):
54 self.configure_provider()
55 self.categoryName = self.cfg('categoryName')
57 def configure_provider(self):
58 self.storageProvider = self.root.app.storageMgr.find_provider(self.cfg('providerUid'))
59 if not self.storageProvider:
60 raise gws.Error(f'storage provider not found')
61 return True
63 def props(self, user):
64 return gws.Props(
65 state=self.get_state_for(user),
66 )
68 def get_state_for(self, user):
69 return State(
70 names=self.storageProvider.list_names(self.categoryName) if user.can_read(self) else [],
71 canRead=user.can_read(self),
72 canWrite=user.can_write(self),
73 canDelete=user.can_delete(self),
74 canCreate=user.can_create(self),
75 )
77 def handle_request(self, req: gws.WebRequester, p: Request) -> Response:
78 state = self.get_state_for(req.user)
79 data = None
81 if p.verb == Verb.list:
82 if not state.canRead:
83 raise gws.ForbiddenError()
84 pass
86 if p.verb == Verb.read:
87 if not state.canRead or not p.entryName:
88 raise gws.ForbiddenError()
89 rec = self.storageProvider.read(self.categoryName, p.entryName)
90 if rec:
91 data = gws.lib.jsonx.from_string(rec.data)
93 if p.verb == Verb.write:
94 if not p.entryName or not p.entryData:
95 raise gws.ForbiddenError()
96 if p.entryName in state.names and not state.canWrite:
97 raise gws.ForbiddenError()
98 if p.entryName not in state.names and not state.canCreate:
99 raise gws.ForbiddenError()
101 d = gws.lib.jsonx.to_string(p.entryData)
102 self.storageProvider.write(self.categoryName, p.entryName, d, req.user.uid)
104 if p.verb == Verb.delete:
105 if not state.canDelete or not p.entryName:
106 raise gws.ForbiddenError()
107 self.storageProvider.delete(self.categoryName, p.entryName)
109 return Response(data=data, state=self.get_state_for(req.user))