Coverage for gws-app/gws/base/database/_test/model_test.py: 0%

50 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1import os 

2import gws.config 

3import gws.base.feature 

4import gws.lib.sa as sa 

5import gws.test.util as u 

6 

7 

8## 

9 

10_weird_names = [ 

11 'Weird.Schema.. ""äöü"" !', 

12 'Weird.Table.. ""äöü"" !', 

13 'Weird.Column.. ""äöü"" !', 

14] 

15 

16@u.fixture(scope='module') 

17def root(): 

18 u.pg.create('plain', {'id': 'int primary key', 'a': 'text', 'b': 'text', 'c': 'text'}) 

19 u.pg.create('serial_id', {'id': 'serial primary key', 'a': 'text'}) 

20 

21 ws, wt, wc = _weird_names 

22 ddl = f''' 

23 drop schema if exists "{ws}" cascade; 

24 create schema "{ws}"; 

25 create table "{ws}"."{wt}" ( 

26 id integer primary key, 

27 "{wc}" text 

28 ) 

29 ''' 

30 

31 conn = u.pg.connect() 

32 for d in ddl.split(';'): 

33 conn.execute(sa.text(d)) 

34 conn.commit() 

35 

36 cfg = f''' 

37 models+ {{  

38 uid "PLAIN" type "postgres" tableName "plain"  

39 }} 

40 models+ {{ 

41 uid "SERIAL_ID" type "postgres" tableName "serial_id" 

42 }} 

43 models+ {{ 

44 uid "WEIRD" type "postgres" tableName ' "{ws}"."{wt}" ' 

45 }} 

46 ''' 

47 

48 yield u.gws_root(cfg) 

49 

50 

51## 

52 

53def test_get_features(root: gws.Root): 

54 mc = gws.ModelContext( 

55 user=u.gws_system_user(), 

56 op=gws.ModelOperation.read, 

57 ) 

58 mo = u.cast(gws.Model, root.get('PLAIN')) 

59 

60 u.pg.insert('plain', [ 

61 dict(id=1, a='11'), 

62 dict(id=2, a='22'), 

63 dict(id=3, a='33'), 

64 dict(id=4, a='44'), 

65 ]) 

66 

67 fs = mo.get_features([2, 3], mc) 

68 assert [f.get('a') for f in fs] == ['22', '33'] 

69 

70 

71## 

72 

73def test_create_with_explicit_pk(root: gws.Root): 

74 mc = gws.ModelContext( 

75 user=u.gws_system_user(), 

76 ) 

77 mo = u.cast(gws.Model, root.get('PLAIN')) 

78 

79 u.pg.clear('plain') 

80 

81 mo.create_feature(u.feature(mo, id=15, a='aa'), mc) 

82 mo.create_feature(u.feature(mo, id=16, a='bb'), mc) 

83 mo.create_feature(u.feature(mo, id=17, a='cc'), mc) 

84 

85 assert u.pg.content('select id, a from plain') == [ 

86 (15, 'aa'), 

87 (16, 'bb'), 

88 (17, 'cc'), 

89 ] 

90 

91 

92def test_create_with_auto_pk(root: gws.Root): 

93 mc = gws.ModelContext( 

94 user=u.gws_system_user(), 

95 ) 

96 mo = u.cast(gws.Model, root.get('SERIAL_ID')) 

97 

98 u.pg.clear('serial_id') 

99 

100 mo.create_feature(u.feature(mo, a='aa'), mc) 

101 mo.create_feature(u.feature(mo, a='bb'), mc) 

102 mo.create_feature(u.feature(mo, a='cc'), mc) 

103 

104 assert u.pg.content('serial_id') == [ 

105 (1, 'aa'), 

106 (2, 'bb'), 

107 (3, 'cc'), 

108 ] 

109 

110def test_weird_names(root: gws.Root): 

111 mc = gws.ModelContext( 

112 user=u.gws_system_user(), 

113 ) 

114 mo = u.cast(gws.Model, root.get('WEIRD')) 

115 

116 ws, wt, wc = _weird_names 

117 wc_noquot = wc.replace('""', '"') 

118 

119 u.pg.clear(f'"{ws}"."{wt}"') 

120 

121 mo.create_feature(u.feature_from_dict(mo, {'id': 15, wc_noquot: 'aa'}), mc) 

122 mo.create_feature(u.feature_from_dict(mo, {'id': 16, wc_noquot: 'bb'}), mc) 

123 mo.create_feature(u.feature_from_dict(mo, {'id': 17, wc_noquot: 'cc'}), mc) 

124 

125 assert u.pg.content(f'select id, "{wc}" from "{ws}"."{wt}"') == [ 

126 (15, 'aa'), 

127 (16, 'bb'), 

128 (17, 'cc'), 

129 ]