Coverage for gws-app/gws/plugin/postgres/provider.py: 0%

130 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Postgres database provider.""" 

2 

3from typing import Optional 

4 

5import os 

6import re 

7 

8import gws.base.database 

9import gws.gis.crs 

10import gws.gis.extent 

11import gws.lib.net 

12import gws.lib.sa as sa 

13 

14gws.ext.new.databaseProvider('postgres') 

15 

16 

17class Config(gws.base.database.provider.Config): 

18 """Postgres/Postgis database provider""" 

19 

20 database: Optional[str] 

21 """Database name.""" 

22 host: Optional[str] 

23 """Database host.""" 

24 port: int = 5432 

25 """Database port.""" 

26 username: Optional[str] 

27 """Username.""" 

28 password: Optional[str] 

29 """Password.""" 

30 serviceName: Optional[str] 

31 """Service name from pg_services file.""" 

32 options: Optional[dict] 

33 """Libpq connection options.""" 

34 pool: Optional[dict] 

35 """Options for connection pooling.""" 

36 

37 

38class Object(gws.base.database.provider.Object): 

39 def configure(self): 

40 self.url = connection_url(self.config) 

41 if not self.url: 

42 raise sa.Error(f'"host/database" or "serviceName" are required') 

43 

44 def engine(self, **kwargs): 

45 pool = self.cfg('pool') or {} 

46 p = pool.get('disabled') 

47 if p is True: 

48 kwargs.setdefault('poolclass', sa.NullPool) 

49 p = pool.get('pre_ping') 

50 if p is True: 

51 kwargs.setdefault('pool_pre_ping', True) 

52 p = pool.get('size') 

53 if isinstance(p, int): 

54 kwargs.setdefault('pool_size', p) 

55 p = pool.get('recycle') 

56 if isinstance(p, int): 

57 kwargs.setdefault('pool_recycle', p) 

58 p = pool.get('timeout') 

59 if isinstance(p, int): 

60 kwargs.setdefault('pool_timeout', p) 

61 

62 if self.root.app.developer_option('db.engine_echo'): 

63 kwargs.setdefault('echo', True) 

64 kwargs.setdefault('echo_pool', True) 

65 

66 url = connection_url(self.config) 

67 return sa.create_engine(url, **kwargs) 

68 

69 _RE_TABLE_NAME = r'''(?x)  

70 ^ 

71 ( 

72 ( " (?P<a1> ([^"] | "")+ ) " ) 

73 | 

74 (?P<a2> [^".]+ ) 

75 ) 

76 ( 

77 \. 

78 ( 

79 ( " (?P<b1> ([^"] | "")+ ) " ) 

80 | 

81 (?P<b2> [^".]+ ) 

82 ) 

83 )? 

84 $ 

85 ''' 

86 

87 _DEFAULT_SCHEMA = 'public' 

88 

89 def split_table_name(self, table_name): 

90 m = re.match(self._RE_TABLE_NAME, table_name.strip()) 

91 if not m: 

92 raise ValueError(f'invalid table name {table_name!r}') 

93 

94 d = m.groupdict() 

95 s = d['a1'] or d['a2'] 

96 t = d['b1'] or d['b2'] 

97 if not t: 

98 s, t = self._DEFAULT_SCHEMA, s 

99 

100 return s.replace('""', '"'), t.replace('""', '"') 

101 

102 def join_table_name(self, schema, name): 

103 if schema: 

104 return schema + '.' + name 

105 schema, name2 = self.split_table_name(name) 

106 return schema + '.' + name2 

107 

108 def table_bounds(self, table): 

109 desc = self.describe(table) 

110 if not desc.geometryName: 

111 return 

112 

113 tab = self.table(table) 

114 sql = sa.select(sa.func.ST_Extent(tab.columns.get(desc.geometryName))) 

115 with self.connect() as conn: 

116 box = conn.execute(sql).scalar_one() 

117 extent = gws.gis.extent.from_box(box) 

118 if extent: 

119 return gws.Bounds(extent=extent, crs=gws.gis.crs.get(desc.geometrySrid)) 

120 

121 def describe_column(self, table, column_name): 

122 col = super().describe_column(table, column_name) 

123 

124 if col.nativeType == 'ARRAY': 

125 sa_col = self.column(table, column_name) 

126 it = getattr(sa_col.type, 'item_type', None) 

127 ia = self.SA_TO_ATTR.get(type(it).__name__.upper()) 

128 if ia == gws.AttributeType.str: 

129 col.type = gws.AttributeType.strlist 

130 elif ia == gws.AttributeType.int: 

131 col.type = gws.AttributeType.intlist 

132 elif ia == gws.AttributeType.float: 

133 col.type = gws.AttributeType.floatlist 

134 else: 

135 col.type = self.UNKNOWN_ARRAY_TYPE 

136 return col 

137 

138 if col.nativeType == 'GEOMETRY': 

139 typ, srid = self._get_geom_type_and_srid(table, column_name) 

140 col.type = gws.AttributeType.geometry 

141 col.geometryType = self.SA_TO_GEOM.get(typ, gws.GeometryType.geometry) 

142 col.geometrySrid = srid 

143 return col 

144 

145 return col 

146 

147 def _get_geom_type_and_srid(self, table, column_name): 

148 sa_table = self.table(table) 

149 sa_col = self.column(table, column_name) 

150 

151 typ = getattr(sa_col.type, 'geometry_type', '').upper() 

152 srid = getattr(sa_col.type, 'srid', 0) 

153 

154 if typ != 'GEOMETRY' and srid > 0: 

155 return typ, srid 

156 

157 # not a typmod, possibly constraint-based. Query "geometry_columns"... 

158 

159 gcs = getattr(self, '_geometry_columns_cache', None) 

160 if not gcs: 

161 gcs = self.select_text(f''' 

162 SELECT  

163 f_table_schema, 

164 f_table_name, 

165 f_geometry_column, 

166 type, 

167 srid 

168 FROM public.geometry_columns 

169 ''') 

170 setattr(self, '_geometry_columns_cache', gcs) 

171 

172 for gc in gcs: 

173 if ( 

174 gc['f_table_schema'] == sa_table.schema 

175 and gc['f_table_name'] == sa_table.name 

176 and gc['f_geometry_column'] == sa_col.name 

177 ): 

178 return gc['type'], gc['srid'] 

179 

180 return 'GEOMETRY', -1 

181 

182 

183## 

184 

185def connection_url(cfg: gws.Config): 

186 # https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING 

187 # https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS 

188 

189 defaults = { 

190 'application_name': 'GWS', 

191 } 

192 

193 params = gws.u.merge(defaults, cfg.get('options')) 

194 

195 p = cfg.get('host') 

196 if p: 

197 return gws.lib.net.make_url( 

198 scheme='postgresql', 

199 username=cfg.get('username'), 

200 password=cfg.get('password'), 

201 hostname=p, 

202 port=cfg.get('port'), 

203 path=cfg.get('database') or cfg.get('dbname') or '', 

204 params=params, 

205 ) 

206 

207 p = cfg.get('serviceName') 

208 if p: 

209 s = os.getenv('PGSERVICEFILE') 

210 if not s or not os.path.isfile(s): 

211 raise sa.Error(f'PGSERVICEFILE {s!r} not found') 

212 

213 params['service'] = p 

214 

215 return gws.lib.net.make_url( 

216 scheme='postgresql', 

217 hostname='', 

218 path=cfg.get('database') or cfg.get('dbname') or '', 

219 params=params, 

220 )