Coverage for gws-app/gws/plugin/auth_session_manager/sqlite/_test.py: 0%

90 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1import multiprocessing 

2 

3import gws 

4import gws.lib.osx as osx 

5import gws.test.util as u 

6 

7DB_PATH = u.path_in_base_dir('sess') 

8 

9 

10def root(): 

11 cfg = f''' 

12 auth {{ 

13 providers+ {{ 

14 type 'mockAuthProvider1' 

15 allowedMethods ['mockAuthMethod1'] 

16 }} 

17 methods+ {{  

18 type 'mockAuthMethod1'  

19 }} 

20 session {{ 

21 path {DB_PATH!r} 

22 type "sqlite" 

23 lifeTime 2 

24 }} 

25 }} 

26 ''' 

27 

28 return u.gws_root(cfg) 

29 

30 

31## 

32 

33def _prepare() -> tuple[gws.AuthManager, gws.AuthSessionManager, gws.User]: 

34 am = root().app.authMgr 

35 sm = am.sessionMgr 

36 u.mock.add_user('me', 'foo') 

37 usr = am.authenticate(am.methods[0], gws.Data(username='me', password='foo')) 

38 return am, sm, usr 

39 

40 

41def test_db_recreated_if_deleted(): 

42 osx.unlink(DB_PATH) 

43 

44 am, sm, usr = _prepare() 

45 s1 = sm.create(am.methods[0], usr, {'foo': 'bar'}) 

46 assert sm.get_valid(s1.uid).get('foo') == 'bar' 

47 

48 osx.unlink(DB_PATH) 

49 

50 # should not raise 

51 assert sm.get_valid(s1.uid) is None 

52 

53 # 'save' has no effect, the session is gone 

54 s1.set('foo', 'bar2') 

55 sm.save(s1) 

56 assert sm.get_valid(s1.uid) is None 

57 

58 # 'create' is fine 

59 s2 = sm.create(am.methods[0], usr, {'foo': 'bar3'}) 

60 assert sm.get_valid(s2.uid).get('foo') == 'bar3' 

61 

62 

63def test_create_session(): 

64 osx.unlink(DB_PATH) 

65 am, sm, usr = _prepare() 

66 

67 s1 = sm.create(am.methods[0], usr) 

68 s2 = sm.get_valid(s1.uid) 

69 

70 assert s1.user.uid == s2.user.uid 

71 assert s1.method.uid == s2.method.uid 

72 

73 

74def test_update_session(): 

75 osx.unlink(DB_PATH) 

76 am, sm, usr = _prepare() 

77 

78 s1 = sm.create(am.methods[0], usr) 

79 s1.set('foo', 'bar') 

80 u1 = s1.updated 

81 gws.u.sleep(1) 

82 sm.save(s1) 

83 s2 = sm.get_valid(s1.uid) 

84 u2 = s2.updated 

85 assert s2.get('foo') == 'bar' 

86 assert u2 > u1 

87 

88 

89def test_delete_session(): 

90 osx.unlink(DB_PATH) 

91 am, sm, usr = _prepare() 

92 

93 s1 = sm.create(am.methods[0], usr) 

94 sm.delete(s1) 

95 s2 = sm.get(s1.uid) 

96 assert s2 is None 

97 

98 

99def test_session_expiration(): 

100 osx.unlink(DB_PATH) 

101 am, sm, usr = _prepare() 

102 

103 dead = sm.create(am.methods[0], usr) 

104 live = sm.create(am.methods[0], usr) 

105 

106 gws.u.sleep(1) 

107 

108 sm.touch(live) 

109 

110 gws.u.sleep(1) 

111 

112 sm.touch(live) 

113 

114 gws.u.sleep(1) 

115 

116 dead2 = sm.get_valid(dead.uid) 

117 live2 = sm.get_valid(live.uid) 

118 

119 assert dead2 is None 

120 assert live2.uid == live.uid 

121 

122 

123def _session_mp_worker(n, num_loops): 

124 am, sm, usr = _prepare() 

125 s1 = sm.create(am.methods[0], usr) 

126 

127 for k in range(num_loops): 

128 # gws.log.debug(f'_session_mp_worker: sid={s1.uid} {n}:{k}') 

129 s2 = sm.get(s1.uid) 

130 s2.set('foo', f'{n}:{k}') 

131 sm.save(s2) 

132 

133 

134def test_concurrency(): 

135 num_processes = 100 

136 num_loops = 50 

137 

138 osx.unlink(DB_PATH) 

139 

140 ps = [] 

141 

142 for n in range(num_processes): 

143 p = multiprocessing.Process(target=_session_mp_worker, args=[n, num_loops]) 

144 ps.append(p) 

145 p.start() 

146 

147 for p in ps: 

148 p.join() 

149 

150 # ensure everything is written 

151 

152 am, sm, usr = _prepare() 

153 all_sess = sm.get_all() 

154 assert len(all_sess) == num_processes 

155 

156 v1 = set(f'{n}:{num_loops - 1}' for n in range(num_processes)) 

157 v2 = set(s.get('foo') for s in all_sess) 

158 assert v2 == v1