Coverage for gws-app/gws/base/printer/manager.py: 0%
48 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Printer manager."""
3import gws
4import gws.lib.job
5import gws.server.spool
8from . import core, worker
11class Object(gws.PrinterManager):
12 def printers_for_project(self, project, user):
13 ls = [p for p in project.printers if user.can_use(p)]
14 ls.extend(p for p in self.root.app.printers if user.can_use(p))
15 if ls:
16 return ls
17 return [self.root.app.defaultPrinter]
19 def start_job(self, request, user):
20 request_path = gws.u.serialize_to_path(request, gws.u.printtemp('print.pickle'))
22 job = gws.lib.job.create(
23 self.root,
24 user=user,
25 payload=dict(
26 requestPath=request_path,
27 projectUid=request.projectUid
28 ),
29 worker=worker.__name__ + '.worker')
31 if gws.server.spool.is_active():
32 gws.server.spool.add(job)
33 else:
34 gws.lib.job.run(self.root, job.uid)
35 return gws.lib.job.get(self.root, job.uid)
37 def get_job(self, uid, user):
38 job = gws.lib.job.get(self.root, uid)
39 if not job:
40 gws.log.error(f'JOB {uid}: not found')
41 return
42 if job.user.uid != user.uid:
43 gws.log.error(f'JOB {uid}: wrong user (job={job.user.uid!r} user={user.uid!r})')
44 return
45 return job
47 def run_job(self, request, user):
48 w = worker.Object(self.root, '', request, user)
49 return w.run()
51 def cancel_job(self, job):
52 job.cancel()
54 def result_path(self, job):
55 return job.payload.get('resultPath')
57 def status(self, job) -> gws.PrintJobResponse:
58 payload = job.payload
60 def _progress():
61 if job.state == gws.JobState.complete:
62 return 100
63 if job.state != gws.JobState.running:
64 return 0
65 num_steps = payload.get('numSteps', 0)
66 if not num_steps:
67 return 0
68 step = payload.get('step', 0)
69 return int(min(100.0, step * 100.0 / num_steps))
71 _url_path_suffix = '/gws.pdf'
73 return gws.PrintJobResponse(
74 jobUid=job.uid,
75 state=job.state,
76 progress=_progress(),
77 stepType=payload.get('stepType', ''),
78 stepName=payload.get('stepName', ''),
79 url=gws.u.action_url_path('printerResult', jobUid=job.uid, projectUid=payload.get('projectUid')) + _url_path_suffix
80 )