Coverage for gws-app/gws/base/printer/manager.py: 0%

48 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Printer manager.""" 

2 

3import gws 

4import gws.lib.job 

5import gws.server.spool 

6 

7 

8from . import core, worker 

9 

10 

11class Object(gws.PrinterManager): 

12 def printers_for_project(self, project, user): 

13 ls = [p for p in project.printers if user.can_use(p)] 

14 ls.extend(p for p in self.root.app.printers if user.can_use(p)) 

15 if ls: 

16 return ls 

17 return [self.root.app.defaultPrinter] 

18 

19 def start_job(self, request, user): 

20 request_path = gws.u.serialize_to_path(request, gws.u.printtemp('print.pickle')) 

21 

22 job = gws.lib.job.create( 

23 self.root, 

24 user=user, 

25 payload=dict( 

26 requestPath=request_path, 

27 projectUid=request.projectUid 

28 ), 

29 worker=worker.__name__ + '.worker') 

30 

31 if gws.server.spool.is_active(): 

32 gws.server.spool.add(job) 

33 else: 

34 gws.lib.job.run(self.root, job.uid) 

35 return gws.lib.job.get(self.root, job.uid) 

36 

37 def get_job(self, uid, user): 

38 job = gws.lib.job.get(self.root, uid) 

39 if not job: 

40 gws.log.error(f'JOB {uid}: not found') 

41 return 

42 if job.user.uid != user.uid: 

43 gws.log.error(f'JOB {uid}: wrong user (job={job.user.uid!r} user={user.uid!r})') 

44 return 

45 return job 

46 

47 def run_job(self, request, user): 

48 w = worker.Object(self.root, '', request, user) 

49 return w.run() 

50 

51 def cancel_job(self, job): 

52 job.cancel() 

53 

54 def result_path(self, job): 

55 return job.payload.get('resultPath') 

56 

57 def status(self, job) -> gws.PrintJobResponse: 

58 payload = job.payload 

59 

60 def _progress(): 

61 if job.state == gws.JobState.complete: 

62 return 100 

63 if job.state != gws.JobState.running: 

64 return 0 

65 num_steps = payload.get('numSteps', 0) 

66 if not num_steps: 

67 return 0 

68 step = payload.get('step', 0) 

69 return int(min(100.0, step * 100.0 / num_steps)) 

70 

71 _url_path_suffix = '/gws.pdf' 

72 

73 return gws.PrintJobResponse( 

74 jobUid=job.uid, 

75 state=job.state, 

76 progress=_progress(), 

77 stepType=payload.get('stepType', ''), 

78 stepName=payload.get('stepName', ''), 

79 url=gws.u.action_url_path('printerResult', jobUid=job.uid, projectUid=payload.get('projectUid')) + _url_path_suffix 

80 )