Coverage for gws-app/gws/lib/datetimex/__init__.py: 34%

278 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Date and time utilities. 

2 

3These utilities are wrappers around the `datetime` module, 

4some functions also use `pendulum` (https://pendulum.eustace.io/), 

5however all functions here return strictly stock ``datetime.datetime`` objects. 

6 

7``date`` objects are silently promoted to ``datetime`` with time set to midnight UTC. 

8``time`` objects are silently promoted to ``datetime`` in the local timezone with the today's date. 

9 

10This module always returns timezone-aware objects. 

11 

12When constructing an object (e.g. from a string), the default time zone should be passed 

13as a zoneinfo string (like ``Europe/Berlin``). An empty zoneinfo string (default) means the local time zone. 

14Alias names like ``CEST`` are not supported. 

15 

16Naive datetime arguments are assumed to be in the local time zone. 

17 

18When running in a docker container, there are several ways to set up the local time zone: 

19 

20- by setting the config variable ``server.timeZone`` (see `gws.config.parser`) 

21- by setting the ``TZ`` environment variable 

22- mounting a host zone info file to ``/etc/localtime`` 

23""" 

24 

25from typing import Optional 

26 

27import datetime as dt 

28import contextlib 

29import os 

30import re 

31import zoneinfo 

32 

33import pendulum 

34import pendulum.parsing 

35 

36import gws 

37import gws.lib.osx 

38 

39 

40class Error(gws.Error): 

41 pass 

42 

43 

44UTC = zoneinfo.ZoneInfo('UTC') 

45 

46_ZI_CACHE = { 

47 'utc': UTC, 

48 'UTC': UTC, 

49 'Etc/UTC': UTC, 

50} 

51 

52_ZI_ALL = set(zoneinfo.available_timezones()) 

53 

54 

55# Time zones 

56 

57def set_local_time_zone(tz: str): 

58 new_zi = time_zone(tz) 

59 cur_zi = _zone_info_from_localtime() 

60 

61 gws.log.debug(f'set_local_time_zone: cur={cur_zi} new={new_zi}') 

62 

63 if new_zi == cur_zi: 

64 return 

65 _set_localtime_from_zone_info(new_zi) 

66 

67 gws.log.debug(f'set_local_time_zone: cur={_zone_info_from_localtime()}') 

68 

69 

70def time_zone(tz: str = '') -> zoneinfo.ZoneInfo: 

71 if tz in _ZI_CACHE: 

72 return _ZI_CACHE[tz] 

73 

74 if not tz: 

75 _ZI_CACHE[''] = _zone_info_from_localtime() 

76 return _ZI_CACHE[''] 

77 

78 return _zone_info_from_string(tz) 

79 

80 

81def _set_localtime_from_zone_info(zi): 

82 if os.getuid() != 0: 

83 raise Error('cannot set timezone, must be root') 

84 gws.lib.osx.run(['ln', '-fs', f'/usr/share/zoneinfo/{zi}', '/etc/localtime']) 

85 

86 

87def _zone_info_from_localtime(): 

88 a = '/etc/localtime' 

89 

90 try: 

91 p = os.readlink(a) 

92 except FileNotFoundError: 

93 gws.log.warning(f'time zone: {a!r} not found, assuming UTC') 

94 return UTC 

95 

96 m = re.search(r'zoneinfo/(.+)$', p) 

97 if not m: 

98 gws.log.warning(f'time zone: {a!r}={p!r} invalid, assuming UTC') 

99 return UTC 

100 

101 try: 

102 return zoneinfo.ZoneInfo(m.group(1)) 

103 except zoneinfo.ZoneInfoNotFoundError: 

104 gws.log.warning(f'time zone: {a!r}={p!r} not found, assuming UTC') 

105 return UTC 

106 

107 

108def _zone_info_from_string(tz): 

109 if tz not in _ZI_ALL: 

110 raise Error(f'invalid time zone {tz!r}') 

111 try: 

112 return zoneinfo.ZoneInfo(tz) 

113 except zoneinfo.ZoneInfoNotFoundError as exc: 

114 raise Error(f'invalid time zone {tz!r}') from exc 

115 

116 

117def _zone_info_from_tzinfo(tzinfo: dt.tzinfo): 

118 if type(tzinfo) is zoneinfo.ZoneInfo: 

119 return tzinfo 

120 s = str(tzinfo) 

121 if s == '+0:0': 

122 return UTC 

123 try: 

124 return _zone_info_from_string(s) 

125 except Error: 

126 pass 

127 

128 

129# init from the env variable right now 

130 

131if 'TZ' in os.environ: 

132 _set_localtime_from_zone_info( 

133 _zone_info_from_string(os.environ['TZ'])) 

134 

135 

136# Constructors 

137 

138 

139def new(year, month, day, hour=0, minute=0, second=0, microsecond=0, fold=0, tz: str = '') -> dt.datetime: 

140 return dt.datetime(year, month, day, hour, minute, second, microsecond, fold=fold, tzinfo=time_zone(tz)) 

141 

142 

143def now(tz: str = '') -> dt.datetime: 

144 return _now(time_zone(tz)) 

145 

146 

147def now_utc() -> dt.datetime: 

148 return _now(UTC) 

149 

150 

151# for testing 

152 

153_MOCK_NOW = None 

154 

155 

156@contextlib.contextmanager 

157def mock_now(d): 

158 global _MOCK_NOW 

159 _MOCK_NOW = d 

160 yield 

161 _MOCK_NOW = None 

162 

163 

164def _now(tzinfo): 

165 return _MOCK_NOW or dt.datetime.now(tz=tzinfo) 

166 

167 

168def today(tz: str = '') -> dt.datetime: 

169 return now(tz).replace(hour=0, minute=0, second=0, microsecond=0) 

170 

171 

172def today_utc() -> dt.datetime: 

173 return now_utc().replace(hour=0, minute=0, second=0, microsecond=0) 

174 

175 

176def parse(s: str | dt.datetime | dt.date | None, tz: str = '') -> Optional[dt.datetime]: 

177 if not s: 

178 return None 

179 

180 if isinstance(s, dt.datetime): 

181 return _ensure_tzinfo(s, tz) 

182 

183 if isinstance(s, dt.date): 

184 return new(s.year, s.month, s.day, tz=tz) 

185 

186 try: 

187 return from_string(str(s), tz) 

188 except Error: 

189 pass 

190 

191 

192def parse_time(s: str | dt.time | None, tz: str = '') -> Optional[dt.datetime]: 

193 if not s: 

194 return 

195 

196 if isinstance(s, dt.time): 

197 return _datetime(_ensure_tzinfo(s, tz)) 

198 

199 try: 

200 return from_iso_time_string(str(s), tz) 

201 except Error: 

202 pass 

203 

204 

205def from_string(s: str, tz: str = '') -> dt.datetime: 

206 return _pend_parse_datetime(s.strip(), tz, iso_only=False) 

207 

208 

209def from_iso_string(s: str, tz: str = '') -> dt.datetime: 

210 return _pend_parse_datetime(s.strip(), tz, iso_only=True) 

211 

212 

213def from_iso_time_string(s: str, tz: str = '') -> dt.datetime: 

214 return _pend_parse_time(s.strip(), tz, iso_only=True) 

215 

216 

217def from_timestamp(n: float, tz: str = '') -> dt.datetime: 

218 return dt.datetime.fromtimestamp(n, tz=time_zone(tz)) 

219 

220 

221# Formatters 

222 

223def to_iso_string(d: Optional[dt.date] = None, with_tz='+', sep='T') -> str: 

224 fmt = f'%Y-%m-%d{sep}%H:%M:%S' 

225 if with_tz: 

226 fmt += '%z' 

227 s = _datetime(d).strftime(fmt) 

228 if with_tz == 'Z' and s.endswith('+0000'): 

229 s = s[:-5] + 'Z' 

230 return s 

231 

232 

233def to_iso_date_string(d: Optional[dt.date] = None) -> str: 

234 return _datetime(d).strftime('%Y-%m-%d') 

235 

236 

237def to_basic_string(d: Optional[dt.date] = None) -> str: 

238 return _datetime(d).strftime("%Y%m%d%H%M%S") 

239 

240 

241def to_iso_time_string(d: Optional[dt.date] = None, with_tz='+') -> str: 

242 fmt = '%H:%M:%S' 

243 if with_tz: 

244 fmt += '%z' 

245 s = _datetime(d).strftime(fmt) 

246 if with_tz == 'Z' and s.endswith('+0000'): 

247 s = s[:-5] + 'Z' 

248 return s 

249 

250 

251def to_string(fmt: str, d: Optional[dt.date] = None) -> str: 

252 return _datetime(d).strftime(fmt) 

253 

254 

255# Converters 

256 

257def to_timestamp(d: Optional[dt.date] = None) -> int: 

258 return int(_datetime(d).timestamp()) 

259 

260 

261def to_millis(d: Optional[dt.date] = None) -> int: 

262 return int(_datetime(d).timestamp() * 1000) 

263 

264 

265def to_utc(d: Optional[dt.date] = None) -> dt.datetime: 

266 return _datetime(d).astimezone(time_zone('UTC')) 

267 

268 

269def to_local(d: Optional[dt.date] = None) -> dt.datetime: 

270 return _datetime(d).astimezone(time_zone('')) 

271 

272 

273def to_time_zone(tz: str, d: Optional[dt.date] = None) -> dt.datetime: 

274 return _datetime(d).astimezone(time_zone(tz)) 

275 

276 

277# Predicates 

278 

279def is_date(x) -> bool: 

280 return isinstance(x, dt.date) 

281 

282 

283def is_datetime(x) -> bool: 

284 return isinstance(x, dt.datetime) 

285 

286 

287def is_utc(d: dt.datetime) -> bool: 

288 return _zone_info_from_tzinfo(_datetime(d).tzinfo) == UTC 

289 

290 

291def is_local(d: dt.datetime) -> bool: 

292 return _zone_info_from_tzinfo(_datetime(d).tzinfo) == time_zone('') 

293 

294 

295# Arithmetic 

296 

297def add( 

298 d: Optional[dt.date] = None, 

299 years=0, months=0, days=0, weeks=0, hours=0, minutes=0, seconds=0, microseconds=0 

300) -> dt.datetime: 

301 return pendulum.helpers.add_duration( 

302 _datetime(d), 

303 years=years, months=months, days=days, 

304 weeks=weeks, hours=hours, minutes=minutes, 

305 seconds=seconds, microseconds=microseconds 

306 ) 

307 

308 

309class Diff: 

310 years: int 

311 months: int 

312 weeks: int 

313 days: int 

314 hours: int 

315 minutes: int 

316 seconds: int 

317 microseconds: int 

318 

319 def __repr__(self): 

320 return repr(vars(self)) 

321 

322 

323def difference(d1: dt.date, d2: Optional[dt.date] = None) -> Diff: 

324 iv = pendulum.Interval(_datetime(d1), _datetime(d2), absolute=False) 

325 df = Diff() 

326 

327 df.years = iv.years 

328 df.months = iv.months 

329 df.weeks = iv.weeks 

330 df.days = iv.remaining_days 

331 df.hours = iv.hours 

332 df.minutes = iv.minutes 

333 df.seconds = iv.remaining_seconds 

334 df.microseconds = iv.microseconds 

335 

336 return df 

337 

338 

339def total_difference(d1: dt.date, d2: Optional[dt.date] = None) -> Diff: 

340 iv = pendulum.Interval(_datetime(d1), _datetime(d2), absolute=False) 

341 df = Diff() 

342 

343 df.years = iv.in_years() 

344 df.months = iv.in_months() 

345 df.weeks = iv.in_weeks() 

346 df.days = iv.in_days() 

347 df.hours = iv.in_hours() 

348 df.minutes = iv.in_minutes() 

349 df.seconds = iv.in_seconds() 

350 df.microseconds = df.seconds * 1_000_000 

351 

352 return df 

353 

354 

355# Wrappers for useful pendulum utilities 

356 

357# @formatter:off 

358 

359def start_of_second(d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('second')) 

360def start_of_minute(d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('minute')) 

361def start_of_hour (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('hour')) 

362def start_of_day (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('day')) 

363def start_of_week (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('week')) 

364def start_of_month (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('month')) 

365def start_of_year (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('year')) 

366 

367 

368def end_of_second(d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('second')) 

369def end_of_minute(d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('minute')) 

370def end_of_hour (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('hour')) 

371def end_of_day (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('day')) 

372def end_of_week (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('week')) 

373def end_of_month (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('month')) 

374def end_of_year (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('year')) 

375 

376 

377def day_of_week (d: Optional[dt.date] = None) -> int: return _pend(d).day_of_week 

378def day_of_year (d: Optional[dt.date] = None) -> int: return _pend(d).day_of_year 

379def week_of_month (d: Optional[dt.date] = None) -> int: return _pend(d).week_of_month 

380def week_of_year (d: Optional[dt.date] = None) -> int: return _pend(d).week_of_year 

381def days_in_month (d: Optional[dt.date] = None) -> int: return _pend(d).days_in_month 

382 

383 

384# @formatter:on 

385 

386_WD = { 

387 0: pendulum.WeekDay.MONDAY, 

388 1: pendulum.WeekDay.TUESDAY, 

389 2: pendulum.WeekDay.WEDNESDAY, 

390 3: pendulum.WeekDay.THURSDAY, 

391 4: pendulum.WeekDay.FRIDAY, 

392 5: pendulum.WeekDay.SATURDAY, 

393 6: pendulum.WeekDay.SUNDAY, 

394 'monday': pendulum.WeekDay.MONDAY, 

395 'tuesday': pendulum.WeekDay.TUESDAY, 

396 'wednesday': pendulum.WeekDay.WEDNESDAY, 

397 'thursday': pendulum.WeekDay.THURSDAY, 

398 'friday': pendulum.WeekDay.FRIDAY, 

399 'saturday': pendulum.WeekDay.SATURDAY, 

400 'sunday': pendulum.WeekDay.SUNDAY, 

401} 

402 

403 

404def next(day: int | str, d: Optional[dt.date] = None, keep_time=False) -> dt.datetime: 

405 return _unpend(_pend(d).next(_WD[day], keep_time)) 

406 

407 

408def prev(day: int | str, d: Optional[dt.date] = None, keep_time=False) -> dt.datetime: 

409 return _unpend(_pend(d).previous(_WD[day], keep_time)) 

410 

411 

412# Duration 

413 

414_DURATION_UNITS = { 

415 'w': 3600 * 24 * 7, 

416 'd': 3600 * 24, 

417 'h': 3600, 

418 'm': 60, 

419 's': 1, 

420} 

421 

422 

423def parse_duration(s: str) -> int: 

424 """Converts weeks, days, hours or minutes to seconds. 

425 

426 Args: 

427 s: Time of duration. 

428 

429 Returns: 

430 Input as seconds. 

431 Raises: 

432 ``ValueError``: if the duration is invalid. 

433 """ 

434 if isinstance(s, int): 

435 return s 

436 

437 p = None 

438 r = 0 

439 

440 for n, v in re.findall(r'(\d+)|(\D+)', str(s).strip()): 

441 if n: 

442 p = int(n) 

443 continue 

444 v = v.strip() 

445 if p is None or v not in _DURATION_UNITS: 

446 raise Error('invalid duration', s) 

447 r += p * _DURATION_UNITS[v] 

448 p = None 

449 

450 if p: 

451 r += p 

452 

453 return r 

454 

455 

456## 

457 

458# conversions 

459 

460def _datetime(d: dt.date | dt.time | None) -> dt.datetime: 

461 # ensure a valid datetime object 

462 

463 if d is None: 

464 return now() 

465 

466 if isinstance(d, dt.datetime): 

467 # if a value is a naive datetime, assume the local tz 

468 # see https://www.postgresql.org/docs/current/datatype-datetime.html#DATATYPE-DATETIME-INPUT-TIME-STAMPS: 

469 # > Conversions between timestamp without time zone and timestamp with time zone normally assume 

470 # > that the timestamp without time zone value should be taken or given as timezone local time. 

471 return _ensure_tzinfo(d, tz='') 

472 

473 if isinstance(d, dt.date): 

474 # promote date to midnight UTC 

475 return dt.datetime(d.year, d.month, d.day, tzinfo=UTC) 

476 

477 if isinstance(d, dt.time): 

478 # promote time to today's time 

479 n = _now(d.tzinfo) 

480 return dt.datetime(n.year, n.month, n.day, d.hour, d.minute, d.second, d.microsecond, d.tzinfo, fold=d.fold) 

481 

482 raise Error(f'invalid datetime value {d!r}') 

483 

484 

485def _ensure_tzinfo(d: dt.datetime | dt.time, tz: str): 

486 # attach tzinfo if not set 

487 

488 if not d.tzinfo: 

489 return d.replace(tzinfo=time_zone(tz)) 

490 

491 # try to convert 'their' tzinfo (might be an unnamed dt.timezone or pendulum.FixedTimezone) to zoneinfo 

492 zi = _zone_info_from_tzinfo(d.tzinfo) 

493 if zi: 

494 return d.replace(tzinfo=zi) 

495 

496 # failing that, keep existing tzinfo 

497 return d 

498 

499 

500# pendulum.DateTime <-> python datetime 

501 

502def _pend(d: dt.datetime | None) -> pendulum.DateTime: 

503 return pendulum.instance(_datetime(d)) 

504 

505 

506def _unpend(p: pendulum.DateTime) -> dt.datetime: 

507 return dt.datetime( 

508 p.year, 

509 p.month, 

510 p.day, 

511 p.hour, 

512 p.minute, 

513 p.second, 

514 p.microsecond, 

515 tzinfo=p.tzinfo, 

516 fold=p.fold, 

517 ) 

518 

519 

520# NB using private APIs 

521 

522 

523def _pend_parse_datetime(s, tz, iso_only): 

524 try: 

525 if iso_only: 

526 d = pendulum.parsing.parse_iso8601(s) 

527 else: 

528 # do not normalize 

529 d = pendulum.parsing._parse(s) 

530 except (ValueError, pendulum.parsing.ParserError) as exc: 

531 raise Error(f'invalid date {s!r}') from exc 

532 

533 if isinstance(d, dt.datetime): 

534 return _ensure_tzinfo(d, tz) 

535 if isinstance(d, dt.date): 

536 return new(d.year, d.month, d.day, tz=tz) 

537 

538 # times and durations not accepted 

539 raise Error(f'invalid date {s!r}') 

540 

541 

542def _pend_parse_time(s, tz, iso_only): 

543 try: 

544 if iso_only: 

545 d = pendulum.parsing.parse_iso8601(s) 

546 else: 

547 # do not normalize 

548 d = pendulum.parsing._parse(s) 

549 except (ValueError, pendulum.parsing.ParserError) as exc: 

550 raise Error(f'invalid time {s!r}') from exc 

551 

552 if isinstance(d, dt.time): 

553 return _datetime(_ensure_tzinfo(d, tz)) 

554 

555 # dates and durations not accepted 

556 raise Error(f'invalid time {s!r}')