Coverage for gws-app/gws/lib/job/__init__.py: 0%

85 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1from typing import Optional 

2 

3import importlib 

4 

5import gws 

6import gws.lib.jsonx 

7import gws.lib.sqlitex 

8 

9 

10class Error(gws.Error): 

11 pass 

12 

13 

14class PrematureTermination(Exception): 

15 pass 

16 

17 

18_DB_PATH = gws.c.PRINT_DIR + '/jobs81.sqlite' 

19 

20_TABLE = 'job' 

21 

22_INIT_DDL = f''' 

23 CREATE TABLE IF NOT EXISTS {_TABLE} ( 

24 uid TEXT NOT NULL PRIMARY KEY, 

25 user_uid TEXT NOT NULL, 

26 str_user TEXT NOT NULL, 

27 worker TEXT NOT NULL, 

28 payload TEXT NOT NULL, 

29 state TEXT NOT NULL, 

30 error TEXT NOT NULL, 

31 created INTEGER NOT NULL, 

32 updated INTEGER NOT NULL 

33 )  

34''' 

35 

36 

37def create(root: gws.Root, user: gws.User, worker: str, payload: dict = None) -> 'Object': 

38 uid = gws.u.random_string(64) 

39 gws.log.debug(f'JOB {uid}: creating: {worker=} {user.uid=}') 

40 

41 _db().insert(_TABLE, dict( 

42 uid=uid, 

43 user_uid=user.uid, 

44 str_user=root.app.authMgr.serialize_user(user), 

45 worker=worker, 

46 payload=gws.lib.jsonx.to_string(payload or {}), 

47 state=gws.JobState.open, 

48 error='', 

49 created=gws.u.stime(), 

50 updated=gws.u.stime() 

51 )) 

52 

53 job = get(root, uid) 

54 if not job: 

55 raise gws.Error(f'error creating job {uid=}') 

56 return job 

57 

58 

59def run(root: gws.Root, uid): 

60 job = get(root, uid) 

61 if not job: 

62 raise gws.Error(f'invalid job {uid=}') 

63 job.run() 

64 

65 

66def get(root: gws.Root, uid) -> Optional['Object']: 

67 rs = _db().select(f'SELECT * FROM {_TABLE} WHERE uid=:uid', uid=uid) 

68 if rs: 

69 return Object(root, rs[0]) 

70 

71 

72def remove(uid): 

73 _db().execute(f'DELETE FROM {_TABLE} WHERE uid=:uid', uid=uid) 

74 

75 

76## 

77 

78class Object(gws.Job): 

79 worker: str 

80 

81 def __init__(self, root: gws.Root, rec): 

82 self.root = root 

83 

84 self.error = rec['error'] 

85 self.payload = gws.lib.jsonx.from_string(rec['payload']) 

86 self.state = rec['state'] 

87 self.uid = rec['uid'] 

88 self.user = self._get_user(rec) 

89 self.worker = rec['worker'] 

90 

91 def _get_user(self, rec) -> gws.User: 

92 auth = self.root.app.authMgr 

93 if rec.get('str_user'): 

94 user = auth.unserialize_user(rec.get('str_user')) 

95 if user: 

96 return user 

97 return auth.guestUser 

98 

99 def run(self): 

100 if self.state != gws.JobState.open: 

101 gws.log.error(f'JOB {self.uid}: invalid state for run={self.state!r}') 

102 return 

103 

104 # @TODO lock 

105 self.update(state=gws.JobState.running) 

106 

107 try: 

108 mod_name, _, fn_name = self.worker.rpartition('.') 

109 mod = importlib.import_module(mod_name) 

110 fn = getattr(mod, fn_name) 

111 fn(self.root, self) 

112 except PrematureTermination as exc: 

113 gws.log.error(f'JOB {self.uid}: PrematureTermination: {exc.args[0]!r}') 

114 self.update(state=gws.JobState.error) 

115 except Exception as exc: 

116 gws.log.error(f'JOB {self.uid}: FAILED') 

117 gws.log.exception() 

118 self.update(state=gws.JobState.error, error=repr(exc)) 

119 

120 def update(self, payload=None, state=None, error=None): 

121 rec = { 

122 'updated': gws.u.stime(), 

123 } 

124 

125 if payload is not None: 

126 rec['payload'] = gws.lib.jsonx.to_string(payload) 

127 if state: 

128 rec['state'] = state 

129 if error: 

130 rec['error'] = error 

131 

132 vals = ','.join(f'{k}=:{k}' for k in rec) 

133 

134 _db().execute(f'UPDATE {_TABLE} SET {vals} WHERE uid=:uid', uid=self.uid, **rec) 

135 

136 gws.log.debug(f'JOB {self.uid}: update: {rec=}') 

137 

138 def cancel(self): 

139 self.update(state=gws.JobState.cancel) 

140 

141 def remove(self): 

142 _db().execute(f'DELETE FROM {_TABLE} WHERE uid=:uid', uid=self.uid) 

143 

144 

145## 

146 

147 

148_sqlitex: Optional[gws.lib.sqlitex.Object] = None 

149 

150 

151def _db() -> gws.lib.sqlitex.Object: 

152 global _sqlitex 

153 

154 if _sqlitex is None: 

155 _sqlitex = gws.lib.sqlitex.Object(_DB_PATH, _INIT_DDL) 

156 return _sqlitex