Coverage for gws-app/gws/base/action/manager.py: 14%

65 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1import gws 

2import gws.spec 

3 

4 

5def prepare_cli_action( 

6 root: gws.Root, 

7 command_category: gws.CommandCategory, 

8 command_name: str, 

9 params: dict, 

10 read_options=None, 

11): 

12 desc = root.specs.command_descriptor(command_category, command_name) 

13 if not desc: 

14 raise gws.NotFoundError(f'{command_category}.{command_name} not found') 

15 

16 try: 

17 request = root.specs.read(params, desc.tArg, options=read_options) 

18 except gws.spec.ReadError as exc: 

19 raise gws.BadRequestError(f'read error: {exc!r}') from exc 

20 

21 cls = root.specs.get_class(desc.tOwner) 

22 action = cls() 

23 

24 fn = getattr(action, desc.methodName) 

25 return fn, request 

26 

27 

28class Object(gws.ActionManager): 

29 

30 def actions_for_project(self, project, user): 

31 d = {} 

32 

33 for a in project.actions: 

34 if user.can_use(a): 

35 d[a.extType] = a 

36 

37 for a in self.root.app.actions: 

38 if a.extType not in d and user.can_use(a): 

39 d[a.extType] = a 

40 

41 return list(d.values()) 

42 

43 def prepare_action(self, command_category, command_name, params, user, read_options=None): 

44 desc = self.root.specs.command_descriptor(command_category, command_name) 

45 if not desc: 

46 raise gws.NotFoundError(f'{command_category}.{command_name} not found') 

47 

48 try: 

49 request = self.root.specs.read(params, desc.tArg, options=read_options) 

50 except gws.spec.ReadError as exc: 

51 raise gws.BadRequestError(f'read error: {exc!r}') from exc 

52 

53 action = None 

54 

55 project_uid = request.get('projectUid') 

56 if project_uid: 

57 project = self.root.app.project(project_uid) 

58 if not project: 

59 raise gws.NotFoundError(f'project not found: {project_uid!r}') 

60 if not user.can_use(project): 

61 raise gws.ForbiddenError(f'project forbidden: {project_uid!r}') 

62 action = self._find_by_ext_name(project, desc.owner.extName, user) 

63 

64 if not action: 

65 action = self._find_by_ext_name(self.root.app, desc.owner.extName, user) 

66 

67 if not action: 

68 raise gws.NotFoundError(f'action {desc.owner.extName!r}: not found') 

69 

70 fn = getattr(action, desc.methodName) 

71 return fn, request 

72 

73 def find_action(self, project, ext_type, user): 

74 if project: 

75 a = self._find_by_ext_type(project, ext_type, user) 

76 if a: 

77 return a 

78 return self._find_by_ext_type(self.root.app, ext_type, user) 

79 

80 # @TODO build indexes for this 

81 

82 def _find_by_ext_name(self, obj, ext_name: str, user: gws.User): 

83 for a in obj.actions: 

84 if a.extName == ext_name: 

85 if not user.can_use(a): 

86 raise gws.ForbiddenError(f'action {ext_name!r}: forbidden in {obj!r}') 

87 return a 

88 

89 def _find_by_ext_type(self, obj, ext_type: str, user: gws.User): 

90 for a in obj.actions: 

91 if a.extType == ext_type: 

92 if not user.can_use(a): 

93 raise gws.ForbiddenError(f'action {ext_type!r}: forbidden in {obj!r}') 

94 return a