Coverage for gws-app/gws/base/database/provider.py: 0%
132 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1from typing import Optional, cast
2import contextlib
4import gws
5import gws.lib.sa as sa
8class Config(gws.Config):
9 """Database provider"""
11 schemaCacheLifeTime: gws.Duration = 3600
12 """life time for schema caches"""
15class Object(gws.DatabaseProvider):
16 saEngine: sa.Engine
17 saMetaMap: dict[str, sa.MetaData]
18 saConnection: Optional[sa.Connection]
19 saConnectionCount: int
21 def __getstate__(self):
22 return gws.u.omit(vars(self), 'saMetaMap', 'saEngine', 'saConnection')
24 def configure(self):
25 self.url = ''
26 self.saEngine = self.engine(poolclass=sa.NullPool)
27 self.saMetaMap = {}
28 self.saConnection = None
29 self.saConnectionCount = 0
31 def activate(self):
32 self.saEngine = self.engine()
33 self.saMetaMap = {}
34 self.saConnection = None
35 self.saConnectionCount = 0
37 def reflect_schema(self, schema: str):
38 if schema in self.saMetaMap:
39 return
41 def _load():
42 md = sa.MetaData(schema=schema)
44 # introspecting the whole schema is generally faster
45 # but what if we only need a single table from a big schema?
46 # @TODO add options for reflection
48 gws.debug.time_start(f'AUTOLOAD {self.uid=} {schema=}')
49 with self.connect() as conn:
50 md.reflect(conn, schema, resolve_fks=False, views=True)
51 gws.debug.time_end()
53 return md
55 life_time = self.cfg('schemaCacheLifeTime', 0)
56 if not life_time:
57 self.saMetaMap[schema] = _load()
58 else:
59 self.saMetaMap[schema] = gws.u.get_cached_object(f'database_metadata_schema_{schema}', life_time, _load)
61 @contextlib.contextmanager
62 def connect(self):
63 if self.saConnection is None:
64 self.saConnection = self.saEngine.connect()
65 # gws.log.debug(f'db connection opened: {self.saConnection}')
66 self.saConnectionCount = 1
67 else:
68 self.saConnectionCount += 1
70 try:
71 yield self.saConnection
72 finally:
73 self.saConnectionCount -= 1
74 if self.saConnectionCount == 0:
75 self.saConnection.close()
76 # gws.log.debug(f'db connection closed: {self.saConnection}')
77 self.saConnection = None
79 def table(self, table_name, **kwargs):
80 sa_table = self._sa_table(table_name)
81 if sa_table is None:
82 raise sa.Error(f'table {table_name!r} not found')
83 return sa_table
85 def count(self, table):
86 sa_table = self._sa_table(table)
87 if sa_table is None:
88 return 0
89 sql = sa.select(sa.func.count()).select_from(sa_table)
90 with self.connect() as conn:
91 try:
92 r = list(conn.execute(sql))
93 return r[0][0]
94 except sa.Error:
95 conn.rollback()
96 return 0
98 def has_table(self, table_name: str):
99 sa_table = self._sa_table(table_name)
100 return sa_table is not None
102 def _sa_table(self, tab_or_name) -> sa.Table:
103 if isinstance(tab_or_name, sa.Table):
104 return tab_or_name
105 schema, name = self.split_table_name(tab_or_name)
106 self.reflect_schema(schema)
107 # see _get_table_key in sqlalchemy/sql/schema.py
108 table_key = schema + '.' + name
109 return self.saMetaMap[schema].tables.get(table_key)
111 def column(self, table, column_name):
112 sa_table = self._sa_table(table)
113 try:
114 return sa_table.columns[column_name]
115 except KeyError:
116 raise sa.Error(f'column {str(table)}.{column_name!r} not found')
118 def has_column(self, table, column_name):
119 sa_table = self._sa_table(table)
120 return sa_table is not None and column_name in sa_table.columns
122 def select_text(self, sql, **kwargs):
123 with self.connect() as conn:
124 try:
125 return [
126 gws.u.to_dict(r)
127 for r in conn.execute(sa.text(sql), kwargs)
128 ]
129 except sa.Error:
130 conn.rollback()
131 raise
133 def execute_text(self, sql, **kwargs):
134 with self.connect() as conn:
135 try:
136 res = conn.execute(sa.text(sql), kwargs)
137 conn.commit()
138 return res
139 except sa.Error:
140 conn.rollback()
141 raise
143 SA_TO_ATTR = {
144 # common: sqlalchemy.sql.sqltypes
146 'BIGINT': gws.AttributeType.int,
147 'BOOLEAN': gws.AttributeType.bool,
148 'CHAR': gws.AttributeType.str,
149 'DATE': gws.AttributeType.date,
150 'DOUBLE_PRECISION': gws.AttributeType.float,
151 'INTEGER': gws.AttributeType.int,
152 'NUMERIC': gws.AttributeType.float,
153 'REAL': gws.AttributeType.float,
154 'SMALLINT': gws.AttributeType.int,
155 'TEXT': gws.AttributeType.str,
156 # 'UUID': ...,
157 'VARCHAR': gws.AttributeType.str,
159 # postgres specific: sqlalchemy.dialects.postgresql.types
161 # 'JSON': ...,
162 # 'JSONB': ...,
163 # 'BIT': ...,
164 'BYTEA': gws.AttributeType.bytes,
165 # 'CIDR': ...,
166 # 'INET': ...,
167 # 'MACADDR': ...,
168 # 'MACADDR8': ...,
169 # 'MONEY': ...,
170 'TIME': gws.AttributeType.time,
171 'TIMESTAMP': gws.AttributeType.datetime,
172 }
174 # @TODO proper support for Z/M geoms
176 SA_TO_GEOM = {
177 'POINT': gws.GeometryType.point,
178 'POINTM': gws.GeometryType.point,
179 'POINTZ': gws.GeometryType.point,
180 'POINTZM': gws.GeometryType.point,
181 'LINESTRING': gws.GeometryType.linestring,
182 'LINESTRINGM': gws.GeometryType.linestring,
183 'LINESTRINGZ': gws.GeometryType.linestring,
184 'LINESTRINGZM': gws.GeometryType.linestring,
185 'POLYGON': gws.GeometryType.polygon,
186 'POLYGONM': gws.GeometryType.polygon,
187 'POLYGONZ': gws.GeometryType.polygon,
188 'POLYGONZM': gws.GeometryType.polygon,
189 'MULTIPOINT': gws.GeometryType.multipoint,
190 'MULTIPOINTM': gws.GeometryType.multipoint,
191 'MULTIPOINTZ': gws.GeometryType.multipoint,
192 'MULTIPOINTZM': gws.GeometryType.multipoint,
193 'MULTILINESTRING': gws.GeometryType.multilinestring,
194 'MULTILINESTRINGM': gws.GeometryType.multilinestring,
195 'MULTILINESTRINGZ': gws.GeometryType.multilinestring,
196 'MULTILINESTRINGZM': gws.GeometryType.multilinestring,
197 'MULTIPOLYGON': gws.GeometryType.multipolygon,
198 # 'GEOMETRYCOLLECTION': gws.GeometryType.geometrycollection,
199 # 'CURVE': gws.GeometryType.curve,
200 }
202 UNKNOWN_TYPE = gws.AttributeType.str
203 UNKNOWN_ARRAY_TYPE = gws.AttributeType.strlist
205 def describe(self, table):
206 sa_table = self._sa_table(table)
207 if sa_table is None:
208 raise sa.Error(f'table {table!r} not found')
210 schema = sa_table.schema
211 name = sa_table.name
213 desc = gws.DataSetDescription(
214 columns=[],
215 columnMap={},
216 fullName=self.join_table_name(schema, name),
217 geometryName='',
218 geometrySrid=0,
219 geometryType='',
220 name=name,
221 schema=schema
222 )
224 for n, sa_col in enumerate(cast(list[sa.Column], sa_table.columns)):
225 col = self.describe_column(table, sa_col.name)
226 col.columnIndex = n
227 desc.columns.append(col)
228 desc.columnMap[col.name] = col
230 for col in desc.columns:
231 if col.geometryType:
232 desc.geometryName = col.name
233 desc.geometryType = col.geometryType
234 desc.geometrySrid = col.geometrySrid
235 break
237 return desc
239 def describe_column(self, table, column_name) -> gws.ColumnDescription:
240 sa_col = self.column(table, column_name)
242 col = gws.ColumnDescription(
243 columnIndex=0,
244 comment=str(sa_col.comment or ''),
245 default=sa_col.default,
246 geometrySrid=0,
247 geometryType='',
248 isAutoincrement=bool(sa_col.autoincrement),
249 isNullable=bool(sa_col.nullable),
250 isPrimaryKey=bool(sa_col.primary_key),
251 isUnique=bool(sa_col.unique),
252 hasDefault=sa_col.server_default is not None,
253 name=str(sa_col.name),
254 nativeType='',
255 type='',
256 )
258 col.nativeType = type(sa_col.type).__name__.upper()
259 col.type = self.SA_TO_ATTR.get(col.nativeType, self.UNKNOWN_TYPE)
261 return col