Coverage for gws-app/gws/base/printer/action.py: 0%

60 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Provides the printing API.""" 

2 

3from typing import Optional, cast 

4 

5import gws 

6import gws.base.action 

7import gws.base.web 

8import gws.config 

9import gws.lib.job 

10import gws.lib.jsonx 

11import gws.lib.mime 

12import gws.lib.style 

13 

14 

15from . import manager 

16 

17gws.ext.new.action('printer') 

18 

19 

20class Config(gws.base.action.Config): 

21 pass 

22 

23 

24class Props(gws.base.action.Props): 

25 pass 

26 

27 

28class JobRequest(gws.Request): 

29 jobUid: str 

30 

31 

32class CliParams(gws.CliParams): 

33 project: Optional[str] 

34 """project uid""" 

35 request: str 

36 """path to request.json""" 

37 output: str 

38 """output path""" 

39 

40 

41class Object(gws.base.action.Object): 

42 

43 @gws.ext.command.api('printerStart') 

44 def start_print(self, req: gws.WebRequester, p: gws.PrintRequest) -> gws.PrintJobResponse: 

45 """Start a background print job""" 

46 

47 job = self.root.app.printerMgr.start_job(p, req.user) 

48 return self.root.app.printerMgr.status(job) 

49 

50 @gws.ext.command.api('printerStatus') 

51 def get_status(self, req: gws.WebRequester, p: JobRequest) -> gws.PrintJobResponse: 

52 """Query the print job status""" 

53 

54 job = self.root.app.printerMgr.get_job(p.jobUid, req.user) 

55 if not job: 

56 raise gws.NotFoundError(f'{p.jobUid=} not found') 

57 return self.root.app.printerMgr.status(job) 

58 

59 @gws.ext.command.api('printerCancel') 

60 def cancel(self, req: gws.WebRequester, p: JobRequest) -> gws.PrintJobResponse: 

61 """Cancel a print job""" 

62 

63 job = self.root.app.printerMgr.get_job(p.jobUid, req.user) 

64 if not job: 

65 raise gws.NotFoundError(f'{p.jobUid=} not found') 

66 self.root.app.printerMgr.cancel_job(job) 

67 return self.root.app.printerMgr.status(job) 

68 

69 @gws.ext.command.get('printerResult') 

70 def get_result(self, req: gws.WebRequester, p: JobRequest) -> gws.ContentResponse: 

71 """Get the result of a print job as a byte stream""" 

72 

73 job = self.root.app.printerMgr.get_job(p.jobUid, req.user) 

74 if not job: 

75 raise gws.NotFoundError(f'printerResult {p.jobUid=} not found') 

76 if job.state != gws.JobState.complete: 

77 raise gws.NotFoundError(f'printerResult {p.jobUid=} wrong {job.state=}') 

78 

79 res_path = self.root.app.printerMgr.result_path(job) 

80 if not res_path: 

81 raise gws.NotFoundError(f'printerResult {p.jobUid=} no res_path') 

82 

83 return gws.ContentResponse(contentPath=res_path) 

84 

85 @gws.ext.command.cli('printerPrint') 

86 def print(self, p: CliParams): 

87 """Print using the specified params""" 

88 

89 root = gws.config.load() 

90 request = root.specs.read( 

91 gws.lib.jsonx.from_path(p.request), 

92 'gws.PrintRequest', 

93 p.request, 

94 ) 

95 

96 res_path = root.app.printerMgr.run_job(request, root.app.authMgr.systemUser) 

97 res = gws.u.read_file_b(res_path) 

98 gws.u.write_file_b(p.output, res)