Coverage for gws-app/gws/plugin/postgres/provider.py: 0%
130 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Postgres database provider."""
3from typing import Optional
5import os
6import re
8import gws.base.database
9import gws.gis.crs
10import gws.gis.extent
11import gws.lib.net
12import gws.lib.sa as sa
14gws.ext.new.databaseProvider('postgres')
17class Config(gws.base.database.provider.Config):
18 """Postgres/Postgis database provider"""
20 database: Optional[str]
21 """Database name."""
22 host: Optional[str]
23 """Database host."""
24 port: int = 5432
25 """Database port."""
26 username: Optional[str]
27 """Username."""
28 password: Optional[str]
29 """Password."""
30 serviceName: Optional[str]
31 """Service name from pg_services file."""
32 options: Optional[dict]
33 """Libpq connection options."""
34 pool: Optional[dict]
35 """Options for connection pooling."""
38class Object(gws.base.database.provider.Object):
39 def configure(self):
40 self.url = connection_url(self.config)
41 if not self.url:
42 raise sa.Error(f'"host/database" or "serviceName" are required')
44 def engine(self, **kwargs):
45 pool = self.cfg('pool') or {}
46 p = pool.get('disabled')
47 if p is True:
48 kwargs.setdefault('poolclass', sa.NullPool)
49 p = pool.get('pre_ping')
50 if p is True:
51 kwargs.setdefault('pool_pre_ping', True)
52 p = pool.get('size')
53 if isinstance(p, int):
54 kwargs.setdefault('pool_size', p)
55 p = pool.get('recycle')
56 if isinstance(p, int):
57 kwargs.setdefault('pool_recycle', p)
58 p = pool.get('timeout')
59 if isinstance(p, int):
60 kwargs.setdefault('pool_timeout', p)
62 if self.root.app.developer_option('db.engine_echo'):
63 kwargs.setdefault('echo', True)
64 kwargs.setdefault('echo_pool', True)
66 url = connection_url(self.config)
67 return sa.create_engine(url, **kwargs)
69 _RE_TABLE_NAME = r'''(?x)
70 ^
71 (
72 ( " (?P<a1> ([^"] | "")+ ) " )
73 |
74 (?P<a2> [^".]+ )
75 )
76 (
77 \.
78 (
79 ( " (?P<b1> ([^"] | "")+ ) " )
80 |
81 (?P<b2> [^".]+ )
82 )
83 )?
84 $
85 '''
87 _DEFAULT_SCHEMA = 'public'
89 def split_table_name(self, table_name):
90 m = re.match(self._RE_TABLE_NAME, table_name.strip())
91 if not m:
92 raise ValueError(f'invalid table name {table_name!r}')
94 d = m.groupdict()
95 s = d['a1'] or d['a2']
96 t = d['b1'] or d['b2']
97 if not t:
98 s, t = self._DEFAULT_SCHEMA, s
100 return s.replace('""', '"'), t.replace('""', '"')
102 def join_table_name(self, schema, name):
103 if schema:
104 return schema + '.' + name
105 schema, name2 = self.split_table_name(name)
106 return schema + '.' + name2
108 def table_bounds(self, table):
109 desc = self.describe(table)
110 if not desc.geometryName:
111 return
113 tab = self.table(table)
114 sql = sa.select(sa.func.ST_Extent(tab.columns.get(desc.geometryName)))
115 with self.connect() as conn:
116 box = conn.execute(sql).scalar_one()
117 extent = gws.gis.extent.from_box(box)
118 if extent:
119 return gws.Bounds(extent=extent, crs=gws.gis.crs.get(desc.geometrySrid))
121 def describe_column(self, table, column_name):
122 col = super().describe_column(table, column_name)
124 if col.nativeType == 'ARRAY':
125 sa_col = self.column(table, column_name)
126 it = getattr(sa_col.type, 'item_type', None)
127 ia = self.SA_TO_ATTR.get(type(it).__name__.upper())
128 if ia == gws.AttributeType.str:
129 col.type = gws.AttributeType.strlist
130 elif ia == gws.AttributeType.int:
131 col.type = gws.AttributeType.intlist
132 elif ia == gws.AttributeType.float:
133 col.type = gws.AttributeType.floatlist
134 else:
135 col.type = self.UNKNOWN_ARRAY_TYPE
136 return col
138 if col.nativeType == 'GEOMETRY':
139 typ, srid = self._get_geom_type_and_srid(table, column_name)
140 col.type = gws.AttributeType.geometry
141 col.geometryType = self.SA_TO_GEOM.get(typ, gws.GeometryType.geometry)
142 col.geometrySrid = srid
143 return col
145 return col
147 def _get_geom_type_and_srid(self, table, column_name):
148 sa_table = self.table(table)
149 sa_col = self.column(table, column_name)
151 typ = getattr(sa_col.type, 'geometry_type', '').upper()
152 srid = getattr(sa_col.type, 'srid', 0)
154 if typ != 'GEOMETRY' and srid > 0:
155 return typ, srid
157 # not a typmod, possibly constraint-based. Query "geometry_columns"...
159 gcs = getattr(self, '_geometry_columns_cache', None)
160 if not gcs:
161 gcs = self.select_text(f'''
162 SELECT
163 f_table_schema,
164 f_table_name,
165 f_geometry_column,
166 type,
167 srid
168 FROM public.geometry_columns
169 ''')
170 setattr(self, '_geometry_columns_cache', gcs)
172 for gc in gcs:
173 if (
174 gc['f_table_schema'] == sa_table.schema
175 and gc['f_table_name'] == sa_table.name
176 and gc['f_geometry_column'] == sa_col.name
177 ):
178 return gc['type'], gc['srid']
180 return 'GEOMETRY', -1
183##
185def connection_url(cfg: gws.Config):
186 # https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
187 # https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS
189 defaults = {
190 'application_name': 'GWS',
191 }
193 params = gws.u.merge(defaults, cfg.get('options'))
195 p = cfg.get('host')
196 if p:
197 return gws.lib.net.make_url(
198 scheme='postgresql',
199 username=cfg.get('username'),
200 password=cfg.get('password'),
201 hostname=p,
202 port=cfg.get('port'),
203 path=cfg.get('database') or cfg.get('dbname') or '',
204 params=params,
205 )
207 p = cfg.get('serviceName')
208 if p:
209 s = os.getenv('PGSERVICEFILE')
210 if not s or not os.path.isfile(s):
211 raise sa.Error(f'PGSERVICEFILE {s!r} not found')
213 params['service'] = p
215 return gws.lib.net.make_url(
216 scheme='postgresql',
217 hostname='',
218 path=cfg.get('database') or cfg.get('dbname') or '',
219 params=params,
220 )