Coverage for gws-app/gws/lib/sqlitex/__init__.py: 0%
56 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Convenience wrapper for the SA SQLite engine.
3This wrapper accepts a database path and optionally an "init" DDL statement.
4It executes queries given in a text form.
6Failed queries are repeated up to 3 times to work around transient errors, like the DB being locked.
8If the error message is "no such table", the wrapper runs the "init" DDL before repeating.
10"""
12from typing import Optional
13import gws
14import gws.lib.sa as sa
17class Error(gws.Error):
18 pass
21class Object:
22 saEngine: sa.Engine
24 def __init__(self, db_path: str, init_ddl: Optional[str] = ''):
25 self.dbPath = db_path
26 self.initDdl = init_ddl
28 def execute(self, stmt: str, **params):
29 """Execute a text DML statement and commit."""
31 self._exec2(False, stmt, params)
33 def select(self, stmt: str, **params) -> list[dict]:
34 """Execute a text select statement and commit."""
36 return self._exec2(True, stmt, params)
38 def insert(self, table_name: str, rec: dict):
39 """Insert a new record (dict) into a table."""
41 keys = ','.join(rec)
42 vals = ','.join(':' + k for k in rec)
44 self._exec2(False, f'INSERT INTO {table_name} ({keys}) VALUES({vals})', rec)
46 ##
48 _MAX_ERRORS = 3
49 _SLEEP_TIME = 0.1
51 def _exec2(self, is_select, stmt, params):
52 err_cnt = 0
54 while True:
55 sa_exc = None
57 try:
58 with self._engine().connect() as conn:
59 if is_select:
60 return [gws.u.to_dict(r) for r in conn.execute(sa.text(stmt), params)]
61 conn.execute(sa.text(stmt), params)
62 conn.commit()
63 return
64 except sa.Error as exc:
65 sa_exc = exc
67 # @TODO using strings for error checking, is there a better way?
69 if 'no such table' in str(sa_exc) and self.initDdl:
70 gws.log.warning(f'sqlitex: {self.dbPath}: error={sa_exc}, running init...')
71 try:
72 with self._engine().connect() as conn:
73 conn.execute(sa.text(self.initDdl))
74 conn.commit()
75 continue
76 except sa.Error as exc:
77 sa_exc = exc
79 if 'database is locked' in str(sa_exc):
80 gws.log.warning(f'sqlitex: {self.dbPath}: locked, waiting...')
81 gws.u.sleep(self._SLEEP_TIME)
82 continue
84 err_cnt += 1
85 if err_cnt < self._MAX_ERRORS:
86 gws.log.warning(f'sqlitex: {self.dbPath}: error={sa_exc}, waiting...')
87 gws.u.sleep(self._SLEEP_TIME)
88 continue
90 raise gws.Error(f'sqlitex: {self.dbPath}: fatal error') from sa_exc
92 def _engine(self):
93 if getattr(self, 'saEngine', None) is None:
94 self.saEngine = sa.create_engine(f'sqlite:///{self.dbPath}', poolclass=sa.NullPool, echo=False)
95 return self.saEngine