Coverage for gws-app/gws/lib/sqlitex/__init__.py: 0%

56 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Convenience wrapper for the SA SQLite engine. 

2 

3This wrapper accepts a database path and optionally an "init" DDL statement. 

4It executes queries given in a text form. 

5 

6Failed queries are repeated up to 3 times to work around transient errors, like the DB being locked. 

7 

8If the error message is "no such table", the wrapper runs the "init" DDL before repeating. 

9 

10""" 

11 

12from typing import Optional 

13import gws 

14import gws.lib.sa as sa 

15 

16 

17class Error(gws.Error): 

18 pass 

19 

20 

21class Object: 

22 saEngine: sa.Engine 

23 

24 def __init__(self, db_path: str, init_ddl: Optional[str] = ''): 

25 self.dbPath = db_path 

26 self.initDdl = init_ddl 

27 

28 def execute(self, stmt: str, **params): 

29 """Execute a text DML statement and commit.""" 

30 

31 self._exec2(False, stmt, params) 

32 

33 def select(self, stmt: str, **params) -> list[dict]: 

34 """Execute a text select statement and commit.""" 

35 

36 return self._exec2(True, stmt, params) 

37 

38 def insert(self, table_name: str, rec: dict): 

39 """Insert a new record (dict) into a table.""" 

40 

41 keys = ','.join(rec) 

42 vals = ','.join(':' + k for k in rec) 

43 

44 self._exec2(False, f'INSERT INTO {table_name} ({keys}) VALUES({vals})', rec) 

45 

46 ## 

47 

48 _MAX_ERRORS = 3 

49 _SLEEP_TIME = 0.1 

50 

51 def _exec2(self, is_select, stmt, params): 

52 err_cnt = 0 

53 

54 while True: 

55 sa_exc = None 

56 

57 try: 

58 with self._engine().connect() as conn: 

59 if is_select: 

60 return [gws.u.to_dict(r) for r in conn.execute(sa.text(stmt), params)] 

61 conn.execute(sa.text(stmt), params) 

62 conn.commit() 

63 return 

64 except sa.Error as exc: 

65 sa_exc = exc 

66 

67 # @TODO using strings for error checking, is there a better way? 

68 

69 if 'no such table' in str(sa_exc) and self.initDdl: 

70 gws.log.warning(f'sqlitex: {self.dbPath}: error={sa_exc}, running init...') 

71 try: 

72 with self._engine().connect() as conn: 

73 conn.execute(sa.text(self.initDdl)) 

74 conn.commit() 

75 continue 

76 except sa.Error as exc: 

77 sa_exc = exc 

78 

79 if 'database is locked' in str(sa_exc): 

80 gws.log.warning(f'sqlitex: {self.dbPath}: locked, waiting...') 

81 gws.u.sleep(self._SLEEP_TIME) 

82 continue 

83 

84 err_cnt += 1 

85 if err_cnt < self._MAX_ERRORS: 

86 gws.log.warning(f'sqlitex: {self.dbPath}: error={sa_exc}, waiting...') 

87 gws.u.sleep(self._SLEEP_TIME) 

88 continue 

89 

90 raise gws.Error(f'sqlitex: {self.dbPath}: fatal error') from sa_exc 

91 

92 def _engine(self): 

93 if getattr(self, 'saEngine', None) is None: 

94 self.saEngine = sa.create_engine(f'sqlite:///{self.dbPath}', poolclass=sa.NullPool, echo=False) 

95 return self.saEngine