Coverage for gws-app/gws/base/database/provider.py: 0%

132 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1from typing import Optional, cast 

2import contextlib 

3 

4import gws 

5import gws.lib.sa as sa 

6 

7 

8class Config(gws.Config): 

9 """Database provider""" 

10 

11 schemaCacheLifeTime: gws.Duration = 3600 

12 """life time for schema caches""" 

13 

14 

15class Object(gws.DatabaseProvider): 

16 saEngine: sa.Engine 

17 saMetaMap: dict[str, sa.MetaData] 

18 saConnection: Optional[sa.Connection] 

19 saConnectionCount: int 

20 

21 def __getstate__(self): 

22 return gws.u.omit(vars(self), 'saMetaMap', 'saEngine', 'saConnection') 

23 

24 def configure(self): 

25 self.url = '' 

26 self.saEngine = self.engine(poolclass=sa.NullPool) 

27 self.saMetaMap = {} 

28 self.saConnection = None 

29 self.saConnectionCount = 0 

30 

31 def activate(self): 

32 self.saEngine = self.engine() 

33 self.saMetaMap = {} 

34 self.saConnection = None 

35 self.saConnectionCount = 0 

36 

37 def reflect_schema(self, schema: str): 

38 if schema in self.saMetaMap: 

39 return 

40 

41 def _load(): 

42 md = sa.MetaData(schema=schema) 

43 

44 # introspecting the whole schema is generally faster 

45 # but what if we only need a single table from a big schema? 

46 # @TODO add options for reflection 

47 

48 gws.debug.time_start(f'AUTOLOAD {self.uid=} {schema=}') 

49 with self.connect() as conn: 

50 md.reflect(conn, schema, resolve_fks=False, views=True) 

51 gws.debug.time_end() 

52 

53 return md 

54 

55 life_time = self.cfg('schemaCacheLifeTime', 0) 

56 if not life_time: 

57 self.saMetaMap[schema] = _load() 

58 else: 

59 self.saMetaMap[schema] = gws.u.get_cached_object(f'database_metadata_schema_{schema}', life_time, _load) 

60 

61 @contextlib.contextmanager 

62 def connect(self): 

63 if self.saConnection is None: 

64 self.saConnection = self.saEngine.connect() 

65 # gws.log.debug(f'db connection opened: {self.saConnection}') 

66 self.saConnectionCount = 1 

67 else: 

68 self.saConnectionCount += 1 

69 

70 try: 

71 yield self.saConnection 

72 finally: 

73 self.saConnectionCount -= 1 

74 if self.saConnectionCount == 0: 

75 self.saConnection.close() 

76 # gws.log.debug(f'db connection closed: {self.saConnection}') 

77 self.saConnection = None 

78 

79 def table(self, table_name, **kwargs): 

80 sa_table = self._sa_table(table_name) 

81 if sa_table is None: 

82 raise sa.Error(f'table {table_name!r} not found') 

83 return sa_table 

84 

85 def count(self, table): 

86 sa_table = self._sa_table(table) 

87 if sa_table is None: 

88 return 0 

89 sql = sa.select(sa.func.count()).select_from(sa_table) 

90 with self.connect() as conn: 

91 try: 

92 r = list(conn.execute(sql)) 

93 return r[0][0] 

94 except sa.Error: 

95 conn.rollback() 

96 return 0 

97 

98 def has_table(self, table_name: str): 

99 sa_table = self._sa_table(table_name) 

100 return sa_table is not None 

101 

102 def _sa_table(self, tab_or_name) -> sa.Table: 

103 if isinstance(tab_or_name, sa.Table): 

104 return tab_or_name 

105 schema, name = self.split_table_name(tab_or_name) 

106 self.reflect_schema(schema) 

107 # see _get_table_key in sqlalchemy/sql/schema.py 

108 table_key = schema + '.' + name 

109 return self.saMetaMap[schema].tables.get(table_key) 

110 

111 def column(self, table, column_name): 

112 sa_table = self._sa_table(table) 

113 try: 

114 return sa_table.columns[column_name] 

115 except KeyError: 

116 raise sa.Error(f'column {str(table)}.{column_name!r} not found') 

117 

118 def has_column(self, table, column_name): 

119 sa_table = self._sa_table(table) 

120 return sa_table is not None and column_name in sa_table.columns 

121 

122 def select_text(self, sql, **kwargs): 

123 with self.connect() as conn: 

124 try: 

125 return [ 

126 gws.u.to_dict(r) 

127 for r in conn.execute(sa.text(sql), kwargs) 

128 ] 

129 except sa.Error: 

130 conn.rollback() 

131 raise 

132 

133 def execute_text(self, sql, **kwargs): 

134 with self.connect() as conn: 

135 try: 

136 res = conn.execute(sa.text(sql), kwargs) 

137 conn.commit() 

138 return res 

139 except sa.Error: 

140 conn.rollback() 

141 raise 

142 

143 SA_TO_ATTR = { 

144 # common: sqlalchemy.sql.sqltypes 

145 

146 'BIGINT': gws.AttributeType.int, 

147 'BOOLEAN': gws.AttributeType.bool, 

148 'CHAR': gws.AttributeType.str, 

149 'DATE': gws.AttributeType.date, 

150 'DOUBLE_PRECISION': gws.AttributeType.float, 

151 'INTEGER': gws.AttributeType.int, 

152 'NUMERIC': gws.AttributeType.float, 

153 'REAL': gws.AttributeType.float, 

154 'SMALLINT': gws.AttributeType.int, 

155 'TEXT': gws.AttributeType.str, 

156 # 'UUID': ..., 

157 'VARCHAR': gws.AttributeType.str, 

158 

159 # postgres specific: sqlalchemy.dialects.postgresql.types 

160 

161 # 'JSON': ..., 

162 # 'JSONB': ..., 

163 # 'BIT': ..., 

164 'BYTEA': gws.AttributeType.bytes, 

165 # 'CIDR': ..., 

166 # 'INET': ..., 

167 # 'MACADDR': ..., 

168 # 'MACADDR8': ..., 

169 # 'MONEY': ..., 

170 'TIME': gws.AttributeType.time, 

171 'TIMESTAMP': gws.AttributeType.datetime, 

172 } 

173 

174 # @TODO proper support for Z/M geoms 

175 

176 SA_TO_GEOM = { 

177 'POINT': gws.GeometryType.point, 

178 'POINTM': gws.GeometryType.point, 

179 'POINTZ': gws.GeometryType.point, 

180 'POINTZM': gws.GeometryType.point, 

181 'LINESTRING': gws.GeometryType.linestring, 

182 'LINESTRINGM': gws.GeometryType.linestring, 

183 'LINESTRINGZ': gws.GeometryType.linestring, 

184 'LINESTRINGZM': gws.GeometryType.linestring, 

185 'POLYGON': gws.GeometryType.polygon, 

186 'POLYGONM': gws.GeometryType.polygon, 

187 'POLYGONZ': gws.GeometryType.polygon, 

188 'POLYGONZM': gws.GeometryType.polygon, 

189 'MULTIPOINT': gws.GeometryType.multipoint, 

190 'MULTIPOINTM': gws.GeometryType.multipoint, 

191 'MULTIPOINTZ': gws.GeometryType.multipoint, 

192 'MULTIPOINTZM': gws.GeometryType.multipoint, 

193 'MULTILINESTRING': gws.GeometryType.multilinestring, 

194 'MULTILINESTRINGM': gws.GeometryType.multilinestring, 

195 'MULTILINESTRINGZ': gws.GeometryType.multilinestring, 

196 'MULTILINESTRINGZM': gws.GeometryType.multilinestring, 

197 'MULTIPOLYGON': gws.GeometryType.multipolygon, 

198 # 'GEOMETRYCOLLECTION': gws.GeometryType.geometrycollection, 

199 # 'CURVE': gws.GeometryType.curve, 

200 } 

201 

202 UNKNOWN_TYPE = gws.AttributeType.str 

203 UNKNOWN_ARRAY_TYPE = gws.AttributeType.strlist 

204 

205 def describe(self, table): 

206 sa_table = self._sa_table(table) 

207 if sa_table is None: 

208 raise sa.Error(f'table {table!r} not found') 

209 

210 schema = sa_table.schema 

211 name = sa_table.name 

212 

213 desc = gws.DataSetDescription( 

214 columns=[], 

215 columnMap={}, 

216 fullName=self.join_table_name(schema, name), 

217 geometryName='', 

218 geometrySrid=0, 

219 geometryType='', 

220 name=name, 

221 schema=schema 

222 ) 

223 

224 for n, sa_col in enumerate(cast(list[sa.Column], sa_table.columns)): 

225 col = self.describe_column(table, sa_col.name) 

226 col.columnIndex = n 

227 desc.columns.append(col) 

228 desc.columnMap[col.name] = col 

229 

230 for col in desc.columns: 

231 if col.geometryType: 

232 desc.geometryName = col.name 

233 desc.geometryType = col.geometryType 

234 desc.geometrySrid = col.geometrySrid 

235 break 

236 

237 return desc 

238 

239 def describe_column(self, table, column_name) -> gws.ColumnDescription: 

240 sa_col = self.column(table, column_name) 

241 

242 col = gws.ColumnDescription( 

243 columnIndex=0, 

244 comment=str(sa_col.comment or ''), 

245 default=sa_col.default, 

246 geometrySrid=0, 

247 geometryType='', 

248 isAutoincrement=bool(sa_col.autoincrement), 

249 isNullable=bool(sa_col.nullable), 

250 isPrimaryKey=bool(sa_col.primary_key), 

251 isUnique=bool(sa_col.unique), 

252 hasDefault=sa_col.server_default is not None, 

253 name=str(sa_col.name), 

254 nativeType='', 

255 type='', 

256 ) 

257 

258 col.nativeType = type(sa_col.type).__name__.upper() 

259 col.type = self.SA_TO_ATTR.get(col.nativeType, self.UNKNOWN_TYPE) 

260 

261 return col