Coverage for gws-app/gws/plugin/auth_session_manager/sqlite/__init__.py: 0%
60 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1from typing import Optional
3import gws
4import gws.base.auth
5import gws.lib.datetimex
6import gws.lib.jsonx
7import gws.lib.osx
8import gws.lib.sqlitex
10gws.ext.new.authSessionManager('sqlite')
13class Config(gws.base.auth.session_manager.Config):
14 """Configuration for sqlite sessions"""
16 path: Optional[str]
17 """session storage path"""
20_TABLE = 'sessions81'
22_INIT_DDL = f'''
23 CREATE TABLE IF NOT EXISTS {_TABLE} (
24 uid TEXT NOT NULL PRIMARY KEY,
25 method_uid TEXT NOT NULL,
26 user_uid TEXT NOT NULL,
27 str_user TEXT NOT NULL,
28 str_data TEXT NOT NULL,
29 created INTEGER NOT NULL,
30 updated INTEGER NOT NULL
31 )
32'''
35class Object(gws.base.auth.session_manager.Object):
36 dbPath: str
38 def configure(self):
39 self.dbPath = self.cfg('path', default=f'{gws.c.MISC_DIR}/sessions81.sqlite')
41 ##
43 def cleanup(self):
44 last_time = gws.u.stime() - self.lifeTime
45 self._db().execute(f'DELETE FROM {_TABLE} WHERE updated < :last_time', last_time=last_time)
47 def create(self, method, user, data=None):
48 am = self.root.app.authMgr
49 uid = gws.u.random_string(64)
51 self._db().insert(_TABLE, dict(
52 uid=uid,
53 method_uid=method.uid,
54 user_uid=user.uid,
55 str_user=am.serialize_user(user),
56 str_data=gws.lib.jsonx.to_string(data or {}),
57 created=gws.u.stime(),
58 updated=gws.u.stime(),
59 ))
61 return self.get(uid)
63 def delete(self, sess):
64 self._db().execute(f'DELETE FROM {_TABLE} WHERE uid=:uid', uid=sess.uid)
66 def delete_all(self):
67 self._db().execute(f'DELETE FROM {_TABLE}')
69 def get(self, uid):
70 rs = self._db().select(f'SELECT * FROM {_TABLE} WHERE uid=:uid', uid=uid)
71 if len(rs) == 1:
72 return self._session(rs[0])
74 def get_valid(self, uid):
75 last_time = gws.u.stime() - self.lifeTime
76 rs = self._db().select(f'SELECT * FROM {_TABLE} WHERE uid=:uid', uid=uid)
77 if len(rs) == 1:
78 rec = gws.u.to_dict(rs[0])
79 if rec['updated'] >= last_time:
80 return self._session(rec)
82 def get_all(self):
83 return [
84 self._session(rec)
85 for rec in self._db().select(f'SELECT * FROM {_TABLE}')
86 ]
88 def save(self, sess):
89 if not sess.isChanged:
90 return
92 self._db().execute(
93 f'UPDATE {_TABLE} SET str_data=:str_data, updated=:updated WHERE uid=:uid',
94 str_data=gws.lib.jsonx.to_string(sess.data or {}),
95 updated=gws.u.stime(),
96 uid=sess.uid
97 )
99 sess.isChanged = False
101 def touch(self, sess):
102 if sess.isChanged:
103 return self.save(sess)
105 self._db().execute(
106 f'UPDATE {_TABLE} SET updated=:updated WHERE uid=:uid',
107 updated=gws.u.stime(),
108 uid=sess.uid
109 )
111 ##
113 def _session(self, rec):
114 am = self.root.app.authMgr
115 r = gws.u.to_dict(rec)
116 return gws.base.auth.session.Object(
117 uid=r['uid'],
118 method=am.get_method(r['method_uid']),
119 user=am.unserialize_user(r['str_user']),
120 data=gws.lib.jsonx.from_string(r['str_data']),
121 created=gws.lib.datetimex.from_timestamp(r['created']),
122 updated=gws.lib.datetimex.from_timestamp(r['updated']),
123 )
125 _sqlitex: gws.lib.sqlitex.Object
127 def _db(self):
128 if getattr(self, '_sqlitex', None) is None:
129 self._sqlitex = gws.lib.sqlitex.Object(self.dbPath, _INIT_DDL)
130 return self._sqlitex