Coverage for scheduler/asyncio/scheduler.py: 100%
130 statements
« prev ^ index » next coverage.py v7.6.10, created at 2026-02-15 22:37 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2026-02-15 22:37 +0000
1"""
2Implementation of a `asyncio` compatible in-process scheduler.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7from __future__ import annotations
9import asyncio as aio
10import datetime as dt
11from asyncio.selector_events import BaseSelectorEventLoop
12from collections.abc import Iterable
13from logging import Logger
14from typing import Any, Callable, Coroutine, Optional
16import typeguard as tg
18from scheduler.asyncio.job import Job
19from scheduler.base.definition import JOB_TYPE_MAPPING, JobType
20from scheduler.base.scheduler import BaseScheduler, deprecated, select_jobs_by_tag
21from scheduler.base.scheduler_util import create_job_instance, str_cutoff
22from scheduler.base.timingtype import (
23 TimingCyclic,
24 TimingDailyUnion,
25 TimingOnceUnion,
26 TimingWeeklyUnion,
27)
28from scheduler.error import SchedulerError
29from scheduler.message import (
30 CYCLIC_TYPE_ERROR_MSG,
31 DAILY_TYPE_ERROR_MSG,
32 HOURLY_TYPE_ERROR_MSG,
33 MINUTELY_TYPE_ERROR_MSG,
34 ONCE_TYPE_ERROR_MSG,
35 WEEKLY_TYPE_ERROR_MSG,
36)
38_UTC = dt.timezone.utc
41class Scheduler(BaseScheduler[Job, Callable[..., Coroutine[Any, Any, None]]]):
42 r"""
43 Implementation of an asyncio scheduler.
45 This implementation enables the planning of |AioJob|\ s depending on time
46 cycles, fixed times, weekdays, dates, offsets and execution counts.
48 Notes
49 -----
50 Due to the support of `datetime` objects, the |AioScheduler| is able to work
51 with timezones.
53 Parameters
54 ----------
55 loop : asyncio.selector_events.BaseSelectorEventLoop
56 Set a AsyncIO event loop, default is the global event loop
57 tzinfo : datetime.tzinfo
58 Set the timezone of the |AioScheduler|.
59 logger : Optional[logging.Logger]
60 A custom Logger instance.
61 """
63 def __init__(
64 self,
65 *,
66 loop: Optional[BaseSelectorEventLoop] = None,
67 tzinfo: Optional[dt.tzinfo] = None,
68 logger: Optional[Logger] = None,
69 ):
70 super().__init__(logger=logger)
71 try:
72 self.__loop = loop if loop else aio.get_running_loop()
73 except RuntimeError:
74 raise SchedulerError("The asyncio Scheduler requires a running event loop.") from None
75 self.__tzinfo = tzinfo
76 self._jobs: dict[Job, aio.Task[None]] = {}
78 def __repr__(self) -> str:
79 return "scheduler.asyncio.scheduler.Scheduler({0}, jobs={{{1}}})".format(
80 ", ".join((repr(elem) for elem in (self.__tzinfo,))),
81 ", ".join([repr(job) for job in sorted(self.jobs)]),
82 )
84 def __str__(self) -> str:
85 # Scheduler meta heading
86 scheduler_headings = "{0}, {1}\n\n".format(*self.__headings())
88 # Job table (we join two of the Job._repr() fields into one)
89 # columns
90 c_align = ("<", "<", "<", "<", ">", ">")
91 c_width = (8, 16, 19, 12, 9, 13)
92 c_name = (
93 "type",
94 "function / alias",
95 "due at",
96 "tzinfo",
97 "due in",
98 "attempts",
99 )
100 form = [
101 f"{ {idx}:{align}{width}} " for idx, (align, width) in enumerate(zip(c_align, c_width))
102 ]
103 if self.__tzinfo is None:
104 form = form[:3] + form[4:]
106 fstring = " ".join(form) + "\n"
107 job_table = fstring.format(*c_name)
108 job_table += fstring.format(*("-" * width for width in c_width))
109 for job in sorted(self.jobs):
110 row = job._str()
111 entries = (
112 row[0],
113 str_cutoff(row[1] + row[2], c_width[1], False),
114 row[3],
115 str_cutoff(row[4] or "", c_width[3], False),
116 str_cutoff(row[5], c_width[4], True),
117 str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True),
118 )
119 job_table += fstring.format(*entries)
121 return scheduler_headings + job_table
123 def __headings(self) -> list[str]:
125 if self.__tzinfo is None:
126 tzname = None
127 else:
128 reference_dt = dt.datetime.now(tz=_UTC)
129 tzname = self.__tzinfo.tzname(reference_dt)
131 headings = [
132 f"tzinfo={tzname}",
133 f"#jobs={len(self._jobs)}",
134 ]
135 return headings
137 def __schedule(
138 self,
139 **kwargs,
140 ) -> Job:
141 """Encapsulate the `Job` and add the `Scheduler`'s timezone."""
142 job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs)
144 task = self.__loop.create_task(self.__supervise_job(job))
145 self._jobs[job] = task
147 return job
149 async def __supervise_job(self, job: Job) -> None:
150 try:
151 reference_dt = dt.datetime.now(tz=self.__tzinfo)
152 while job.has_attempts_remaining:
153 sleep_seconds: float = job.timedelta(reference_dt).total_seconds()
154 await aio.sleep(sleep_seconds)
156 await job._exec(logger=self._logger) # pylint: disable=protected-access
158 reference_dt = dt.datetime.now(tz=self.__tzinfo)
159 job._calc_next_exec(reference_dt) # pylint: disable=protected-access
160 except aio.CancelledError: # TODO asyncio does not trigger this exception in pytest, why?
161 # raised, when `task.cancel()` in `delete_job` was run
162 pass # pragma: no cover
163 else:
164 self.delete_job(job)
166 def delete_job(self, job: Job) -> None:
167 """
168 Delete a `Job` from the `Scheduler`.
170 Parameters
171 ----------
172 job : Job
173 |AioJob| instance to delete.
175 Raises
176 ------
177 SchedulerError
178 Raises if the |AioJob| of the argument is not scheduled.
179 """
180 try:
181 task: aio.Task[None] = self._jobs.pop(job)
182 _: bool = task.cancel()
183 except KeyError:
184 raise SchedulerError("An unscheduled Job can not be deleted!") from None
186 def delete_jobs(
187 self,
188 tags: Optional[set[str]] = None,
189 any_tag: bool = False,
190 ) -> int:
191 r"""
192 Delete a set of |AioJob|\ s from the |AioScheduler| by tags.
194 If no tags or an empty set of tags are given defaults to the deletion
195 of all |AioJob|\ s.
197 Parameters
198 ----------
199 tags : Optional[set[str]]
200 Set of tags to identify target |AioJob|\ s.
201 any_tag : bool
202 False: To delete a |AioJob| all tags have to match.
203 True: To delete a |AioJob| at least one tag has to match.
204 """
205 all_jobs: set[Job] = set(self._jobs.keys())
206 jobs_to_delete: set[Job]
208 if tags is None or tags == set():
209 jobs_to_delete = all_jobs
210 else:
211 jobs_to_delete = select_jobs_by_tag(all_jobs, tags, any_tag)
213 for job in jobs_to_delete:
214 self.delete_job(job)
216 return len(jobs_to_delete)
218 def get_jobs(
219 self,
220 tags: Optional[set[str]] = None,
221 any_tag: bool = False,
222 ) -> set[Job]:
223 r"""
224 Get a set of |AioJob|\ s from the |AioScheduler| by tags.
226 If no tags or an empty set of tags are given defaults to returning
227 all |AioJob|\ s.
229 Parameters
230 ----------
231 tags : set[str]
232 Tags to filter scheduled |AioJob|\ s.
233 If no tags are given all |AioJob|\ s are returned.
234 any_tag : bool
235 False: To match a |AioJob| all tags have to match.
236 True: To match a |AioJob| at least one tag has to match.
238 Returns
239 -------
240 set[Job]
241 Currently scheduled |AioJob|\ s.
242 """
243 if tags is None or tags == set():
244 return self.jobs
245 return select_jobs_by_tag(self.jobs, tags, any_tag)
247 @deprecated(["delay"])
248 def cyclic(
249 self, timing: TimingCyclic, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs
250 ) -> Job:
251 r"""
252 Schedule a cyclic `Job`.
254 Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects
255 to schedule a cyclic |AioJob|.
257 Parameters
258 ----------
259 timing : TimingTypeCyclic
260 Desired execution time.
261 handle : Callable[..., Coroutine[Any, Any, None]]
262 Handle to a callback function.
264 Returns
265 -------
266 Job
267 Instance of a scheduled |AioJob|.
269 Other Parameters
270 ----------------
271 **kwargs
272 |AioJob| properties, optional
274 `kwargs` are used to specify |AioJob| properties.
276 Here is a list of available |AioJob| properties:
278 .. include:: ../_assets/aio_kwargs.rst
279 """
280 try:
281 tg.check_type(timing, TimingCyclic)
282 except tg.TypeCheckError as err:
283 raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err
284 return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs)
286 @deprecated(["delay"])
287 def minutely(
288 self, timing: TimingDailyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs
289 ) -> Job:
290 r"""
291 Schedule a minutely `Job`.
293 Use a `datetime.time` object or a `list` of `datetime.time` objects
294 to schedule a |AioJob| every minute.
296 Notes
297 -----
298 If given a `datetime.time` object with a non zero hour or minute property, these
299 information will be ignored.
301 Parameters
302 ----------
303 timing : TimingDailyUnion
304 Desired execution time(s).
305 handle : Callable[..., Coroutine[Any, Any, None]]
306 Handle to a callback function.
308 Returns
309 -------
310 Job
311 Instance of a scheduled |AioJob|.
313 Other Parameters
314 ----------------
315 **kwargs
316 |AioJob| properties, optional
318 `kwargs` are used to specify |AioJob| properties.
320 Here is a list of available |AioJob| properties:
322 .. include:: ../_assets/aio_kwargs.rst
323 """
324 try:
325 tg.check_type(timing, TimingDailyUnion)
326 except tg.TypeCheckError as err:
327 raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err
328 return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs)
330 @deprecated(["delay"])
331 def hourly(
332 self, timing: TimingDailyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs
333 ) -> Job:
334 r"""
335 Schedule an hourly `Job`.
337 Use a `datetime.time` object or a `list` of `datetime.time` objects
338 to schedule a |AioJob| every hour.
340 Notes
341 -----
342 If given a `datetime.time` object with a non zero hour property, this information
343 will be ignored.
345 Parameters
346 ----------
347 timing : TimingDailyUnion
348 Desired execution time(s).
349 handle : Callable[..., Coroutine[Any, Any, None]]
350 Handle to a callback function.
352 Returns
353 -------
354 Job
355 Instance of a scheduled |AioJob|.
357 Other Parameters
358 ----------------
359 **kwargs
360 |AioJob| properties, optional
362 `kwargs` are used to specify |AioJob| properties.
364 Here is a list of available |AioJob| properties:
366 .. include:: ../_assets/aio_kwargs.rst
367 """
368 try:
369 tg.check_type(timing, TimingDailyUnion)
370 except tg.TypeCheckError as err:
371 raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err
372 return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs)
374 @deprecated(["delay"])
375 def daily(
376 self, timing: TimingDailyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs
377 ) -> Job:
378 r"""
379 Schedule a daily `Job`.
381 Use a `datetime.time` object or a `list` of `datetime.time` objects
382 to schedule a |AioJob| every day.
384 Parameters
385 ----------
386 timing : TimingDailyUnion
387 Desired execution time(s).
388 handle : Callable[..., Coroutine[Any, Any, None]]
389 Handle to a callback function.
391 Returns
392 -------
393 Job
394 Instance of a scheduled |AioJob|.
396 Other Parameters
397 ----------------
398 **kwargs
399 |AioJob| properties, optional
401 `kwargs` are used to specify |AioJob| properties.
403 Here is a list of available |AioJob| properties:
405 .. include:: ../_assets/aio_kwargs.rst
406 """
407 try:
408 tg.check_type(timing, TimingDailyUnion)
409 except tg.TypeCheckError as err:
410 raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err
411 return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs)
413 @deprecated(["delay"])
414 def weekly(
415 self, timing: TimingWeeklyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs
416 ) -> Job:
417 r"""
418 Schedule a weekly `Job`.
420 Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly
421 recurring |AioJob|. Combine multiple desired `tuples` in
422 a `list`. If the planed execution time is `00:00` the `datetime.time` object
423 can be ignored, just pass a `Weekday` without a `tuple`.
425 Parameters
426 ----------
427 timing : TimingWeeklyUnion
428 Desired execution time(s).
429 handle : Callable[..., Coroutine[Any, Any, None]]
430 Handle to a callback function.
432 Returns
433 -------
434 Job
435 Instance of a scheduled |AioJob|.
437 Other Parameters
438 ----------------
439 **kwargs
440 |AioJob| properties, optional
442 `kwargs` are used to specify |AioJob| properties.
444 Here is a list of available |AioJob| properties:
446 .. include:: ../_assets/aio_kwargs.rst
447 """
448 try:
449 tg.check_type(timing, TimingWeeklyUnion)
450 except tg.TypeCheckError as err:
451 raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err
452 return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs)
454 def once(
455 self,
456 timing: TimingOnceUnion,
457 handle: Callable[..., Coroutine[Any, Any, None]],
458 *,
459 args: Optional[tuple[Any, ...]] = None,
460 kwargs: Optional[dict[str, Any]] = None,
461 tags: Optional[Iterable[str]] = None,
462 alias: Optional[str] = None,
463 ) -> Job:
464 r"""
465 Schedule a oneshot `Job`.
467 Parameters
468 ----------
469 timing : TimingOnceUnion
470 Desired execution time.
471 handle : Callable[..., Coroutine[Any, Any, None]]
472 Handle to a callback function.
473 args : tuple[Any]
474 Positional argument payload for the function handle within a |AioJob|.
475 kwargs : Optional[dict[str, Any]]
476 Keyword arguments payload for the function handle within a |AioJob|.
477 tags : Optional[Iterable[str]]
478 The tags of the |AioJob|.
479 alias : Optional[str]
480 Overwrites the function handle name in the string representation.
482 Returns
483 -------
484 Job
485 Instance of a scheduled |AioJob|.
486 """
487 try:
488 tg.check_type(timing, TimingOnceUnion)
489 except tg.TypeCheckError as err:
490 raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err
491 if isinstance(timing, dt.datetime):
492 return self.__schedule(
493 job_type=JobType.CYCLIC,
494 timing=dt.timedelta(),
495 handle=handle,
496 args=args,
497 kwargs=kwargs,
498 max_attempts=1,
499 tags=set(tags) if tags else set(),
500 alias=alias,
501 delay=False,
502 start=timing,
503 )
504 return self.__schedule(
505 job_type=JOB_TYPE_MAPPING[type(timing)],
506 timing=timing,
507 handle=handle,
508 args=args,
509 kwargs=kwargs,
510 max_attempts=1,
511 tags=tags,
512 alias=alias,
513 )
515 @property
516 def jobs(self) -> set[Job]:
517 r"""
518 Get the set of all `Job`\ s.
520 Returns
521 -------
522 set[Job]
523 Currently scheduled |AioJob|\ s.
524 """
525 return set(self._jobs.keys())