Coverage for gws-app/gws/plugin/auth_session_manager/sqlite/_test.py: 0%
90 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1import multiprocessing
3import gws
4import gws.lib.osx as osx
5import gws.test.util as u
7DB_PATH = u.path_in_base_dir('sess')
10def root():
11 cfg = f'''
12 auth {{
13 providers+ {{
14 type 'mockAuthProvider1'
15 allowedMethods ['mockAuthMethod1']
16 }}
17 methods+ {{
18 type 'mockAuthMethod1'
19 }}
20 session {{
21 path {DB_PATH!r}
22 type "sqlite"
23 lifeTime 2
24 }}
25 }}
26 '''
28 return u.gws_root(cfg)
31##
33def _prepare() -> tuple[gws.AuthManager, gws.AuthSessionManager, gws.User]:
34 am = root().app.authMgr
35 sm = am.sessionMgr
36 u.mock.add_user('me', 'foo')
37 usr = am.authenticate(am.methods[0], gws.Data(username='me', password='foo'))
38 return am, sm, usr
41def test_db_recreated_if_deleted():
42 osx.unlink(DB_PATH)
44 am, sm, usr = _prepare()
45 s1 = sm.create(am.methods[0], usr, {'foo': 'bar'})
46 assert sm.get_valid(s1.uid).get('foo') == 'bar'
48 osx.unlink(DB_PATH)
50 # should not raise
51 assert sm.get_valid(s1.uid) is None
53 # 'save' has no effect, the session is gone
54 s1.set('foo', 'bar2')
55 sm.save(s1)
56 assert sm.get_valid(s1.uid) is None
58 # 'create' is fine
59 s2 = sm.create(am.methods[0], usr, {'foo': 'bar3'})
60 assert sm.get_valid(s2.uid).get('foo') == 'bar3'
63def test_create_session():
64 osx.unlink(DB_PATH)
65 am, sm, usr = _prepare()
67 s1 = sm.create(am.methods[0], usr)
68 s2 = sm.get_valid(s1.uid)
70 assert s1.user.uid == s2.user.uid
71 assert s1.method.uid == s2.method.uid
74def test_update_session():
75 osx.unlink(DB_PATH)
76 am, sm, usr = _prepare()
78 s1 = sm.create(am.methods[0], usr)
79 s1.set('foo', 'bar')
80 u1 = s1.updated
81 gws.u.sleep(1)
82 sm.save(s1)
83 s2 = sm.get_valid(s1.uid)
84 u2 = s2.updated
85 assert s2.get('foo') == 'bar'
86 assert u2 > u1
89def test_delete_session():
90 osx.unlink(DB_PATH)
91 am, sm, usr = _prepare()
93 s1 = sm.create(am.methods[0], usr)
94 sm.delete(s1)
95 s2 = sm.get(s1.uid)
96 assert s2 is None
99def test_session_expiration():
100 osx.unlink(DB_PATH)
101 am, sm, usr = _prepare()
103 dead = sm.create(am.methods[0], usr)
104 live = sm.create(am.methods[0], usr)
106 gws.u.sleep(1)
108 sm.touch(live)
110 gws.u.sleep(1)
112 sm.touch(live)
114 gws.u.sleep(1)
116 dead2 = sm.get_valid(dead.uid)
117 live2 = sm.get_valid(live.uid)
119 assert dead2 is None
120 assert live2.uid == live.uid
123def _session_mp_worker(n, num_loops):
124 am, sm, usr = _prepare()
125 s1 = sm.create(am.methods[0], usr)
127 for k in range(num_loops):
128 # gws.log.debug(f'_session_mp_worker: sid={s1.uid} {n}:{k}')
129 s2 = sm.get(s1.uid)
130 s2.set('foo', f'{n}:{k}')
131 sm.save(s2)
134def test_concurrency():
135 num_processes = 100
136 num_loops = 50
138 osx.unlink(DB_PATH)
140 ps = []
142 for n in range(num_processes):
143 p = multiprocessing.Process(target=_session_mp_worker, args=[n, num_loops])
144 ps.append(p)
145 p.start()
147 for p in ps:
148 p.join()
150 # ensure everything is written
152 am, sm, usr = _prepare()
153 all_sess = sm.get_all()
154 assert len(all_sess) == num_processes
156 v1 = set(f'{n}:{num_loops - 1}' for n in range(num_processes))
157 v2 = set(s.get('foo') for s in all_sess)
158 assert v2 == v1