Coverage for gws-app/gws/lib/job/__init__.py: 0%
85 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1from typing import Optional
3import importlib
5import gws
6import gws.lib.jsonx
7import gws.lib.sqlitex
10class Error(gws.Error):
11 pass
14class PrematureTermination(Exception):
15 pass
18_DB_PATH = gws.c.PRINT_DIR + '/jobs81.sqlite'
20_TABLE = 'job'
22_INIT_DDL = f'''
23 CREATE TABLE IF NOT EXISTS {_TABLE} (
24 uid TEXT NOT NULL PRIMARY KEY,
25 user_uid TEXT NOT NULL,
26 str_user TEXT NOT NULL,
27 worker TEXT NOT NULL,
28 payload TEXT NOT NULL,
29 state TEXT NOT NULL,
30 error TEXT NOT NULL,
31 created INTEGER NOT NULL,
32 updated INTEGER NOT NULL
33 )
34'''
37def create(root: gws.Root, user: gws.User, worker: str, payload: dict = None) -> 'Object':
38 uid = gws.u.random_string(64)
39 gws.log.debug(f'JOB {uid}: creating: {worker=} {user.uid=}')
41 _db().insert(_TABLE, dict(
42 uid=uid,
43 user_uid=user.uid,
44 str_user=root.app.authMgr.serialize_user(user),
45 worker=worker,
46 payload=gws.lib.jsonx.to_string(payload or {}),
47 state=gws.JobState.open,
48 error='',
49 created=gws.u.stime(),
50 updated=gws.u.stime()
51 ))
53 job = get(root, uid)
54 if not job:
55 raise gws.Error(f'error creating job {uid=}')
56 return job
59def run(root: gws.Root, uid):
60 job = get(root, uid)
61 if not job:
62 raise gws.Error(f'invalid job {uid=}')
63 job.run()
66def get(root: gws.Root, uid) -> Optional['Object']:
67 rs = _db().select(f'SELECT * FROM {_TABLE} WHERE uid=:uid', uid=uid)
68 if rs:
69 return Object(root, rs[0])
72def remove(uid):
73 _db().execute(f'DELETE FROM {_TABLE} WHERE uid=:uid', uid=uid)
76##
78class Object(gws.Job):
79 worker: str
81 def __init__(self, root: gws.Root, rec):
82 self.root = root
84 self.error = rec['error']
85 self.payload = gws.lib.jsonx.from_string(rec['payload'])
86 self.state = rec['state']
87 self.uid = rec['uid']
88 self.user = self._get_user(rec)
89 self.worker = rec['worker']
91 def _get_user(self, rec) -> gws.User:
92 auth = self.root.app.authMgr
93 if rec.get('str_user'):
94 user = auth.unserialize_user(rec.get('str_user'))
95 if user:
96 return user
97 return auth.guestUser
99 def run(self):
100 if self.state != gws.JobState.open:
101 gws.log.error(f'JOB {self.uid}: invalid state for run={self.state!r}')
102 return
104 # @TODO lock
105 self.update(state=gws.JobState.running)
107 try:
108 mod_name, _, fn_name = self.worker.rpartition('.')
109 mod = importlib.import_module(mod_name)
110 fn = getattr(mod, fn_name)
111 fn(self.root, self)
112 except PrematureTermination as exc:
113 gws.log.error(f'JOB {self.uid}: PrematureTermination: {exc.args[0]!r}')
114 self.update(state=gws.JobState.error)
115 except Exception as exc:
116 gws.log.error(f'JOB {self.uid}: FAILED')
117 gws.log.exception()
118 self.update(state=gws.JobState.error, error=repr(exc))
120 def update(self, payload=None, state=None, error=None):
121 rec = {
122 'updated': gws.u.stime(),
123 }
125 if payload is not None:
126 rec['payload'] = gws.lib.jsonx.to_string(payload)
127 if state:
128 rec['state'] = state
129 if error:
130 rec['error'] = error
132 vals = ','.join(f'{k}=:{k}' for k in rec)
134 _db().execute(f'UPDATE {_TABLE} SET {vals} WHERE uid=:uid', uid=self.uid, **rec)
136 gws.log.debug(f'JOB {self.uid}: update: {rec=}')
138 def cancel(self):
139 self.update(state=gws.JobState.cancel)
141 def remove(self):
142 _db().execute(f'DELETE FROM {_TABLE} WHERE uid=:uid', uid=self.uid)
145##
148_sqlitex: Optional[gws.lib.sqlitex.Object] = None
151def _db() -> gws.lib.sqlitex.Object:
152 global _sqlitex
154 if _sqlitex is None:
155 _sqlitex = gws.lib.sqlitex.Object(_DB_PATH, _INIT_DDL)
156 return _sqlitex