Coverage for gws-app/gws/lib/datetimex/__init__.py: 34%
278 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Date and time utilities.
3These utilities are wrappers around the `datetime` module,
4some functions also use `pendulum` (https://pendulum.eustace.io/),
5however all functions here return strictly stock ``datetime.datetime`` objects.
7``date`` objects are silently promoted to ``datetime`` with time set to midnight UTC.
8``time`` objects are silently promoted to ``datetime`` in the local timezone with the today's date.
10This module always returns timezone-aware objects.
12When constructing an object (e.g. from a string), the default time zone should be passed
13as a zoneinfo string (like ``Europe/Berlin``). An empty zoneinfo string (default) means the local time zone.
14Alias names like ``CEST`` are not supported.
16Naive datetime arguments are assumed to be in the local time zone.
18When running in a docker container, there are several ways to set up the local time zone:
20- by setting the config variable ``server.timeZone`` (see `gws.config.parser`)
21- by setting the ``TZ`` environment variable
22- mounting a host zone info file to ``/etc/localtime``
23"""
25from typing import Optional
27import datetime as dt
28import contextlib
29import os
30import re
31import zoneinfo
33import pendulum
34import pendulum.parsing
36import gws
37import gws.lib.osx
40class Error(gws.Error):
41 pass
44UTC = zoneinfo.ZoneInfo('UTC')
46_ZI_CACHE = {
47 'utc': UTC,
48 'UTC': UTC,
49 'Etc/UTC': UTC,
50}
52_ZI_ALL = set(zoneinfo.available_timezones())
55# Time zones
57def set_local_time_zone(tz: str):
58 new_zi = time_zone(tz)
59 cur_zi = _zone_info_from_localtime()
61 gws.log.debug(f'set_local_time_zone: cur={cur_zi} new={new_zi}')
63 if new_zi == cur_zi:
64 return
65 _set_localtime_from_zone_info(new_zi)
67 gws.log.debug(f'set_local_time_zone: cur={_zone_info_from_localtime()}')
70def time_zone(tz: str = '') -> zoneinfo.ZoneInfo:
71 if tz in _ZI_CACHE:
72 return _ZI_CACHE[tz]
74 if not tz:
75 _ZI_CACHE[''] = _zone_info_from_localtime()
76 return _ZI_CACHE['']
78 return _zone_info_from_string(tz)
81def _set_localtime_from_zone_info(zi):
82 if os.getuid() != 0:
83 raise Error('cannot set timezone, must be root')
84 gws.lib.osx.run(['ln', '-fs', f'/usr/share/zoneinfo/{zi}', '/etc/localtime'])
87def _zone_info_from_localtime():
88 a = '/etc/localtime'
90 try:
91 p = os.readlink(a)
92 except FileNotFoundError:
93 gws.log.warning(f'time zone: {a!r} not found, assuming UTC')
94 return UTC
96 m = re.search(r'zoneinfo/(.+)$', p)
97 if not m:
98 gws.log.warning(f'time zone: {a!r}={p!r} invalid, assuming UTC')
99 return UTC
101 try:
102 return zoneinfo.ZoneInfo(m.group(1))
103 except zoneinfo.ZoneInfoNotFoundError:
104 gws.log.warning(f'time zone: {a!r}={p!r} not found, assuming UTC')
105 return UTC
108def _zone_info_from_string(tz):
109 if tz not in _ZI_ALL:
110 raise Error(f'invalid time zone {tz!r}')
111 try:
112 return zoneinfo.ZoneInfo(tz)
113 except zoneinfo.ZoneInfoNotFoundError as exc:
114 raise Error(f'invalid time zone {tz!r}') from exc
117def _zone_info_from_tzinfo(tzinfo: dt.tzinfo):
118 if type(tzinfo) is zoneinfo.ZoneInfo:
119 return tzinfo
120 s = str(tzinfo)
121 if s == '+0:0':
122 return UTC
123 try:
124 return _zone_info_from_string(s)
125 except Error:
126 pass
129# init from the env variable right now
131if 'TZ' in os.environ:
132 _set_localtime_from_zone_info(
133 _zone_info_from_string(os.environ['TZ']))
136# Constructors
139def new(year, month, day, hour=0, minute=0, second=0, microsecond=0, fold=0, tz: str = '') -> dt.datetime:
140 return dt.datetime(year, month, day, hour, minute, second, microsecond, fold=fold, tzinfo=time_zone(tz))
143def now(tz: str = '') -> dt.datetime:
144 return _now(time_zone(tz))
147def now_utc() -> dt.datetime:
148 return _now(UTC)
151# for testing
153_MOCK_NOW = None
156@contextlib.contextmanager
157def mock_now(d):
158 global _MOCK_NOW
159 _MOCK_NOW = d
160 yield
161 _MOCK_NOW = None
164def _now(tzinfo):
165 return _MOCK_NOW or dt.datetime.now(tz=tzinfo)
168def today(tz: str = '') -> dt.datetime:
169 return now(tz).replace(hour=0, minute=0, second=0, microsecond=0)
172def today_utc() -> dt.datetime:
173 return now_utc().replace(hour=0, minute=0, second=0, microsecond=0)
176def parse(s: str | dt.datetime | dt.date | None, tz: str = '') -> Optional[dt.datetime]:
177 if not s:
178 return None
180 if isinstance(s, dt.datetime):
181 return _ensure_tzinfo(s, tz)
183 if isinstance(s, dt.date):
184 return new(s.year, s.month, s.day, tz=tz)
186 try:
187 return from_string(str(s), tz)
188 except Error:
189 pass
192def parse_time(s: str | dt.time | None, tz: str = '') -> Optional[dt.datetime]:
193 if not s:
194 return
196 if isinstance(s, dt.time):
197 return _datetime(_ensure_tzinfo(s, tz))
199 try:
200 return from_iso_time_string(str(s), tz)
201 except Error:
202 pass
205def from_string(s: str, tz: str = '') -> dt.datetime:
206 return _pend_parse_datetime(s.strip(), tz, iso_only=False)
209def from_iso_string(s: str, tz: str = '') -> dt.datetime:
210 return _pend_parse_datetime(s.strip(), tz, iso_only=True)
213def from_iso_time_string(s: str, tz: str = '') -> dt.datetime:
214 return _pend_parse_time(s.strip(), tz, iso_only=True)
217def from_timestamp(n: float, tz: str = '') -> dt.datetime:
218 return dt.datetime.fromtimestamp(n, tz=time_zone(tz))
221# Formatters
223def to_iso_string(d: Optional[dt.date] = None, with_tz='+', sep='T') -> str:
224 fmt = f'%Y-%m-%d{sep}%H:%M:%S'
225 if with_tz:
226 fmt += '%z'
227 s = _datetime(d).strftime(fmt)
228 if with_tz == 'Z' and s.endswith('+0000'):
229 s = s[:-5] + 'Z'
230 return s
233def to_iso_date_string(d: Optional[dt.date] = None) -> str:
234 return _datetime(d).strftime('%Y-%m-%d')
237def to_basic_string(d: Optional[dt.date] = None) -> str:
238 return _datetime(d).strftime("%Y%m%d%H%M%S")
241def to_iso_time_string(d: Optional[dt.date] = None, with_tz='+') -> str:
242 fmt = '%H:%M:%S'
243 if with_tz:
244 fmt += '%z'
245 s = _datetime(d).strftime(fmt)
246 if with_tz == 'Z' and s.endswith('+0000'):
247 s = s[:-5] + 'Z'
248 return s
251def to_string(fmt: str, d: Optional[dt.date] = None) -> str:
252 return _datetime(d).strftime(fmt)
255# Converters
257def to_timestamp(d: Optional[dt.date] = None) -> int:
258 return int(_datetime(d).timestamp())
261def to_millis(d: Optional[dt.date] = None) -> int:
262 return int(_datetime(d).timestamp() * 1000)
265def to_utc(d: Optional[dt.date] = None) -> dt.datetime:
266 return _datetime(d).astimezone(time_zone('UTC'))
269def to_local(d: Optional[dt.date] = None) -> dt.datetime:
270 return _datetime(d).astimezone(time_zone(''))
273def to_time_zone(tz: str, d: Optional[dt.date] = None) -> dt.datetime:
274 return _datetime(d).astimezone(time_zone(tz))
277# Predicates
279def is_date(x) -> bool:
280 return isinstance(x, dt.date)
283def is_datetime(x) -> bool:
284 return isinstance(x, dt.datetime)
287def is_utc(d: dt.datetime) -> bool:
288 return _zone_info_from_tzinfo(_datetime(d).tzinfo) == UTC
291def is_local(d: dt.datetime) -> bool:
292 return _zone_info_from_tzinfo(_datetime(d).tzinfo) == time_zone('')
295# Arithmetic
297def add(
298 d: Optional[dt.date] = None,
299 years=0, months=0, days=0, weeks=0, hours=0, minutes=0, seconds=0, microseconds=0
300) -> dt.datetime:
301 return pendulum.helpers.add_duration(
302 _datetime(d),
303 years=years, months=months, days=days,
304 weeks=weeks, hours=hours, minutes=minutes,
305 seconds=seconds, microseconds=microseconds
306 )
309class Diff:
310 years: int
311 months: int
312 weeks: int
313 days: int
314 hours: int
315 minutes: int
316 seconds: int
317 microseconds: int
319 def __repr__(self):
320 return repr(vars(self))
323def difference(d1: dt.date, d2: Optional[dt.date] = None) -> Diff:
324 iv = pendulum.Interval(_datetime(d1), _datetime(d2), absolute=False)
325 df = Diff()
327 df.years = iv.years
328 df.months = iv.months
329 df.weeks = iv.weeks
330 df.days = iv.remaining_days
331 df.hours = iv.hours
332 df.minutes = iv.minutes
333 df.seconds = iv.remaining_seconds
334 df.microseconds = iv.microseconds
336 return df
339def total_difference(d1: dt.date, d2: Optional[dt.date] = None) -> Diff:
340 iv = pendulum.Interval(_datetime(d1), _datetime(d2), absolute=False)
341 df = Diff()
343 df.years = iv.in_years()
344 df.months = iv.in_months()
345 df.weeks = iv.in_weeks()
346 df.days = iv.in_days()
347 df.hours = iv.in_hours()
348 df.minutes = iv.in_minutes()
349 df.seconds = iv.in_seconds()
350 df.microseconds = df.seconds * 1_000_000
352 return df
355# Wrappers for useful pendulum utilities
357# @formatter:off
359def start_of_second(d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('second'))
360def start_of_minute(d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('minute'))
361def start_of_hour (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('hour'))
362def start_of_day (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('day'))
363def start_of_week (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('week'))
364def start_of_month (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('month'))
365def start_of_year (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).start_of('year'))
368def end_of_second(d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('second'))
369def end_of_minute(d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('minute'))
370def end_of_hour (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('hour'))
371def end_of_day (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('day'))
372def end_of_week (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('week'))
373def end_of_month (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('month'))
374def end_of_year (d: Optional[dt.date] = None) -> dt.datetime: return _unpend(_pend(d).end_of('year'))
377def day_of_week (d: Optional[dt.date] = None) -> int: return _pend(d).day_of_week
378def day_of_year (d: Optional[dt.date] = None) -> int: return _pend(d).day_of_year
379def week_of_month (d: Optional[dt.date] = None) -> int: return _pend(d).week_of_month
380def week_of_year (d: Optional[dt.date] = None) -> int: return _pend(d).week_of_year
381def days_in_month (d: Optional[dt.date] = None) -> int: return _pend(d).days_in_month
384# @formatter:on
386_WD = {
387 0: pendulum.WeekDay.MONDAY,
388 1: pendulum.WeekDay.TUESDAY,
389 2: pendulum.WeekDay.WEDNESDAY,
390 3: pendulum.WeekDay.THURSDAY,
391 4: pendulum.WeekDay.FRIDAY,
392 5: pendulum.WeekDay.SATURDAY,
393 6: pendulum.WeekDay.SUNDAY,
394 'monday': pendulum.WeekDay.MONDAY,
395 'tuesday': pendulum.WeekDay.TUESDAY,
396 'wednesday': pendulum.WeekDay.WEDNESDAY,
397 'thursday': pendulum.WeekDay.THURSDAY,
398 'friday': pendulum.WeekDay.FRIDAY,
399 'saturday': pendulum.WeekDay.SATURDAY,
400 'sunday': pendulum.WeekDay.SUNDAY,
401}
404def next(day: int | str, d: Optional[dt.date] = None, keep_time=False) -> dt.datetime:
405 return _unpend(_pend(d).next(_WD[day], keep_time))
408def prev(day: int | str, d: Optional[dt.date] = None, keep_time=False) -> dt.datetime:
409 return _unpend(_pend(d).previous(_WD[day], keep_time))
412# Duration
414_DURATION_UNITS = {
415 'w': 3600 * 24 * 7,
416 'd': 3600 * 24,
417 'h': 3600,
418 'm': 60,
419 's': 1,
420}
423def parse_duration(s: str) -> int:
424 """Converts weeks, days, hours or minutes to seconds.
426 Args:
427 s: Time of duration.
429 Returns:
430 Input as seconds.
431 Raises:
432 ``ValueError``: if the duration is invalid.
433 """
434 if isinstance(s, int):
435 return s
437 p = None
438 r = 0
440 for n, v in re.findall(r'(\d+)|(\D+)', str(s).strip()):
441 if n:
442 p = int(n)
443 continue
444 v = v.strip()
445 if p is None or v not in _DURATION_UNITS:
446 raise Error('invalid duration', s)
447 r += p * _DURATION_UNITS[v]
448 p = None
450 if p:
451 r += p
453 return r
456##
458# conversions
460def _datetime(d: dt.date | dt.time | None) -> dt.datetime:
461 # ensure a valid datetime object
463 if d is None:
464 return now()
466 if isinstance(d, dt.datetime):
467 # if a value is a naive datetime, assume the local tz
468 # see https://www.postgresql.org/docs/current/datatype-datetime.html#DATATYPE-DATETIME-INPUT-TIME-STAMPS:
469 # > Conversions between timestamp without time zone and timestamp with time zone normally assume
470 # > that the timestamp without time zone value should be taken or given as timezone local time.
471 return _ensure_tzinfo(d, tz='')
473 if isinstance(d, dt.date):
474 # promote date to midnight UTC
475 return dt.datetime(d.year, d.month, d.day, tzinfo=UTC)
477 if isinstance(d, dt.time):
478 # promote time to today's time
479 n = _now(d.tzinfo)
480 return dt.datetime(n.year, n.month, n.day, d.hour, d.minute, d.second, d.microsecond, d.tzinfo, fold=d.fold)
482 raise Error(f'invalid datetime value {d!r}')
485def _ensure_tzinfo(d: dt.datetime | dt.time, tz: str):
486 # attach tzinfo if not set
488 if not d.tzinfo:
489 return d.replace(tzinfo=time_zone(tz))
491 # try to convert 'their' tzinfo (might be an unnamed dt.timezone or pendulum.FixedTimezone) to zoneinfo
492 zi = _zone_info_from_tzinfo(d.tzinfo)
493 if zi:
494 return d.replace(tzinfo=zi)
496 # failing that, keep existing tzinfo
497 return d
500# pendulum.DateTime <-> python datetime
502def _pend(d: dt.datetime | None) -> pendulum.DateTime:
503 return pendulum.instance(_datetime(d))
506def _unpend(p: pendulum.DateTime) -> dt.datetime:
507 return dt.datetime(
508 p.year,
509 p.month,
510 p.day,
511 p.hour,
512 p.minute,
513 p.second,
514 p.microsecond,
515 tzinfo=p.tzinfo,
516 fold=p.fold,
517 )
520# NB using private APIs
523def _pend_parse_datetime(s, tz, iso_only):
524 try:
525 if iso_only:
526 d = pendulum.parsing.parse_iso8601(s)
527 else:
528 # do not normalize
529 d = pendulum.parsing._parse(s)
530 except (ValueError, pendulum.parsing.ParserError) as exc:
531 raise Error(f'invalid date {s!r}') from exc
533 if isinstance(d, dt.datetime):
534 return _ensure_tzinfo(d, tz)
535 if isinstance(d, dt.date):
536 return new(d.year, d.month, d.day, tz=tz)
538 # times and durations not accepted
539 raise Error(f'invalid date {s!r}')
542def _pend_parse_time(s, tz, iso_only):
543 try:
544 if iso_only:
545 d = pendulum.parsing.parse_iso8601(s)
546 else:
547 # do not normalize
548 d = pendulum.parsing._parse(s)
549 except (ValueError, pendulum.parsing.ParserError) as exc:
550 raise Error(f'invalid time {s!r}') from exc
552 if isinstance(d, dt.time):
553 return _datetime(_ensure_tzinfo(d, tz))
555 # dates and durations not accepted
556 raise Error(f'invalid time {s!r}')