Coverage for scheduler/threading/scheduler.py: 100%
172 statements
« prev ^ index » next coverage.py v7.6.10, created at 2026-02-15 22:37 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2026-02-15 22:37 +0000
1"""
2Implementation of a `threading` compatible in-process scheduler.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7import datetime as dt
8import queue
9import threading
10from collections.abc import Iterable
11from logging import Logger
12from typing import Any, Callable, Optional
14import typeguard as tg
16from scheduler.base.definition import JOB_TYPE_MAPPING, JobType
17from scheduler.base.scheduler import BaseScheduler, deprecated, select_jobs_by_tag
18from scheduler.base.scheduler_util import create_job_instance, str_cutoff
19from scheduler.base.timingtype import (
20 TimingCyclic,
21 TimingDailyUnion,
22 TimingOnceUnion,
23 TimingWeeklyUnion,
24)
25from scheduler.error import SchedulerError
26from scheduler.message import (
27 CYCLIC_TYPE_ERROR_MSG,
28 DAILY_TYPE_ERROR_MSG,
29 HOURLY_TYPE_ERROR_MSG,
30 MINUTELY_TYPE_ERROR_MSG,
31 ONCE_TYPE_ERROR_MSG,
32 TZ_ERROR_MSG,
33 WEEKLY_TYPE_ERROR_MSG,
34)
35from scheduler.prioritization import linear_priority_function
36from scheduler.threading.job import Job
38_UTC = dt.timezone.utc
41def _exec_job_worker(que: queue.Queue[Job], logger: Logger) -> None:
42 running = True
43 while running:
44 try:
45 job = que.get(block=False)
46 except queue.Empty:
47 running = False
48 else:
49 job._exec(logger=logger) # pylint: disable=protected-access
50 que.task_done()
53class Scheduler(BaseScheduler[Job, Callable[..., None]]):
54 r"""
55 Implementation of a scheduler for callback functions.
57 This implementation enables the planning of |Job|\ s depending on time
58 cycles, fixed times, weekdays, dates, offsets, execution counts and weights.
60 Notes
61 -----
62 Due to the support of `datetime` objects, `scheduler` is able to work
63 with timezones.
65 Parameters
66 ----------
67 tzinfo : datetime.tzinfo
68 Set the timezone of the |Scheduler|.
69 max_exec : int
70 Limits the number of overdue |Job|\ s that can be executed
71 by calling function `Scheduler.exec_jobs()`.
72 priority_function : Callable[[float, Job, int, int], float]
73 A function handle to compute the priority of a |Job| depending
74 on the time it is overdue and its respective weight. Defaults to a linear
75 priority function.
76 jobs : set[Job]
77 A collection of job instances.
78 n_threads : int
79 The number of worker threads. 0 for unlimited, default 1.
80 logger : Optional[logging.Logger]
81 A custom Logger instance.
82 """
84 def __init__(
85 self,
86 *,
87 max_exec: int = 0,
88 tzinfo: Optional[dt.tzinfo] = None,
89 priority_function: Callable[
90 [float, Job, int, int],
91 float,
92 ] = linear_priority_function,
93 jobs: Optional[Iterable[Job]] = None,
94 n_threads: int = 1,
95 logger: Optional[Logger] = None,
96 ):
97 super().__init__(logger=logger)
98 self.__max_exec = max_exec
99 self.__tzinfo = tzinfo
100 self.__priority_function = priority_function
101 self.__jobs_lock = threading.RLock()
102 if not jobs:
103 self.__jobs = set()
104 elif isinstance(jobs, set):
105 self.__jobs = jobs
106 else:
107 self.__jobs = set(jobs)
109 for job in self.__jobs:
110 if job._tzinfo != self.__tzinfo:
111 raise SchedulerError(TZ_ERROR_MSG)
113 self.__n_threads = n_threads
115 def __repr__(self) -> str:
116 with self.__jobs_lock:
117 return "scheduler.Scheduler({0}, jobs={{{1}}})".format(
118 ", ".join(
119 (
120 repr(elem)
121 for elem in (
122 self.__max_exec,
123 self.__tzinfo,
124 self.__priority_function,
125 )
126 )
127 ),
128 ", ".join([repr(job) for job in sorted(self.jobs)]),
129 )
131 def __str__(self) -> str:
132 with self.__jobs_lock:
133 # Scheduler meta heading
134 scheduler_headings = "{0}, {1}, {2}, {3}\n\n".format(*self.__headings())
136 # Job table (we join two of the Job._repr() fields into one)
137 # columns
138 c_align = ("<", "<", "<", "<", ">", ">", ">")
139 c_width = (8, 16, 19, 12, 9, 13, 6)
140 c_name = (
141 "type",
142 "function / alias",
143 "due at",
144 "tzinfo",
145 "due in",
146 "attempts",
147 "weight",
148 )
149 form = [
150 f"{ {idx}:{align}{width}} "
151 for idx, (align, width) in enumerate(zip(c_align, c_width))
152 ]
154 if self.__tzinfo is None:
155 form = form[:3] + form[4:]
157 fstring = " ".join(form) + "\n"
158 job_table = fstring.format(*c_name) + fstring.format(
159 *("-" * width for width in c_width)
160 )
161 for job in sorted(self.jobs):
162 row = job._str()
163 entries = (
164 row[0],
165 str_cutoff(row[1] + row[2], c_width[1], False),
166 row[3],
167 str_cutoff(row[4] or "", c_width[3], False),
168 str_cutoff(row[5], c_width[4], True),
169 str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True),
170 str_cutoff(f"{job.weight}", c_width[6], True),
171 )
172 job_table += fstring.format(*entries)
174 return scheduler_headings + job_table
176 def __headings(self) -> list[str]:
178 if self.__tzinfo is None:
179 tzname = None
180 else:
181 reference_dt = dt.datetime.now(tz=_UTC)
182 tzname = self.__tzinfo.tzname(reference_dt)
184 with self.__jobs_lock:
185 headings = [
186 f"max_exec={self.__max_exec if self.__max_exec else float('inf')}",
187 f"tzinfo={tzname}",
188 f"priority_function={self.__priority_function.__name__}",
189 f"#jobs={len(self.__jobs)}",
190 ]
191 return headings
193 def __schedule(
194 self,
195 **kwargs,
196 ) -> Job:
197 """Encapsulate the `Job` and add the `Scheduler`'s timezone."""
198 job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs)
199 if job.has_attempts_remaining:
200 with self.__jobs_lock:
201 self.__jobs.add(job)
202 return job
204 def __exec_jobs(self, jobs: list[Job], ref_dt: dt.datetime) -> int:
205 n_jobs = len(jobs)
207 que: queue.Queue[Job] = queue.Queue()
208 for job in jobs:
209 que.put(job)
211 workers = []
212 for _ in range(self.__n_threads or n_jobs):
213 worker = threading.Thread(target=_exec_job_worker, args=(que, self._logger))
214 worker.daemon = True
215 worker.start()
216 workers.append(worker)
218 que.join()
219 for worker in workers:
220 worker.join()
222 for job in jobs:
223 job._calc_next_exec(ref_dt) # pylint: disable=protected-access
224 if not job.has_attempts_remaining:
225 self.delete_job(job)
227 return n_jobs
229 def exec_jobs(self, force_exec_all: bool = False) -> int:
230 r"""
231 Execute scheduled `Job`\ s.
233 By default executes the |Job|\ s that are overdue.
235 |Job|\ s are executed in order of their priority
236 :ref:`examples.weights`. If the |Scheduler| instance
237 has a limit on the job execution counts per call of
238 :func:`~scheduler.core.Scheduler.exec_jobs`, via the `max_exec` argument,
239 |Job|\ s of lower priority might not get executed when
240 competing |Job|\ s are overdue.
242 Parameters
243 ----------
244 force_exec_all : bool
245 Ignore the both - the status of the |Job| timers
246 as well as the execution limit of the |Scheduler|
248 Returns
249 -------
250 int
251 Number of executed |Job|\ s.
252 """
253 ref_dt = dt.datetime.now(tz=self.__tzinfo)
255 if force_exec_all:
256 return self.__exec_jobs(list(self.__jobs), ref_dt)
257 # collect the current priority for all jobs
259 job_priority: dict[Job, float] = {}
260 n_jobs = len(self.__jobs)
261 with self.__jobs_lock:
262 for job in self.__jobs:
263 delta_seconds = job.timedelta(ref_dt).total_seconds()
264 job_priority[job] = self.__priority_function(
265 -delta_seconds,
266 job,
267 self.__max_exec,
268 n_jobs,
269 )
270 # sort the jobs by priority
271 sorted_jobs = sorted(job_priority, key=job_priority.get, reverse=True) # type: ignore
272 # filter jobs by max_exec and priority greater zero
273 filtered_jobs = [
274 job
275 for idx, job in enumerate(sorted_jobs)
276 if (self.__max_exec == 0 or idx < self.__max_exec) and job_priority[job] > 0
277 ]
278 return self.__exec_jobs(filtered_jobs, ref_dt)
280 def delete_job(self, job: Job) -> None:
281 """
282 Delete a `Job` from the `Scheduler`.
284 Parameters
285 ----------
286 job : Job
287 |Job| instance to delete.
289 Raises
290 ------
291 SchedulerError
292 Raises if the |Job| of the argument is not scheduled.
293 """
294 try:
295 with self.__jobs_lock:
296 self.__jobs.remove(job)
297 except KeyError:
298 raise SchedulerError("An unscheduled Job can not be deleted!") from None
300 def delete_jobs(
301 self,
302 tags: Optional[set[str]] = None,
303 any_tag: bool = False,
304 ) -> int:
305 r"""
306 Delete a set of |Job|\ s from the |Scheduler| by tags.
308 If no tags or an empty set of tags are given defaults to the deletion
309 of all |Job|\ s.
311 Parameters
312 ----------
313 tags : Optional[set[str]]
314 Set of tags to identify target |Job|\ s.
315 any_tag : bool
316 False: To delete a |Job| all tags have to match.
317 True: To deleta a |Job| at least one tag has to match.
318 """
319 with self.__jobs_lock:
320 if tags is None or tags == set():
321 n_jobs = len(self.__jobs)
322 self.__jobs = set()
323 return n_jobs
325 to_delete = select_jobs_by_tag(self.__jobs, tags, any_tag)
327 self.__jobs = self.__jobs - to_delete
328 return len(to_delete)
330 def get_jobs(
331 self,
332 tags: Optional[set[str]] = None,
333 any_tag: bool = False,
334 ) -> set[Job]:
335 r"""
336 Get a set of |Job|\ s from the |Scheduler| by tags.
338 If no tags or an empty set of tags are given defaults to returning
339 all |Job|\ s.
341 Parameters
342 ----------
343 tags : set[str]
344 Tags to filter scheduled |Job|\ s.
345 If no tags are given all |Job|\ s are returned.
346 any_tag : bool
347 False: To match a |Job| all tags have to match.
348 True: To match a |Job| at least one tag has to match.
350 Returns
351 -------
352 set[Job]
353 Currently scheduled |Job|\ s.
354 """
355 with self.__jobs_lock:
356 if tags is None or tags == set():
357 return self.__jobs.copy()
358 return select_jobs_by_tag(self.__jobs, tags, any_tag)
360 @deprecated(["delay"])
361 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs) -> Job:
362 r"""
363 Schedule a cyclic `Job`.
365 Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects
366 to schedule a cyclic |Job|.
368 Parameters
369 ----------
370 timing : TimingTypeCyclic
371 Desired execution time.
372 handle : Callable[..., None]
373 Handle to a callback function.
375 Returns
376 -------
377 Job
378 Instance of a scheduled |Job|.
380 Other Parameters
381 ----------------
382 **kwargs
383 |Job| properties, optional
385 `kwargs` are used to specify |Job| properties.
387 Here is a list of available |Job| properties:
389 .. include:: ../_assets/kwargs.rst
390 """
391 try:
392 tg.check_type(timing, TimingCyclic)
393 except tg.TypeCheckError as err:
394 raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err
395 return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs)
397 @deprecated(["delay"])
398 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job:
399 r"""
400 Schedule a minutely `Job`.
402 Use a `datetime.time` object or a `list` of `datetime.time` objects
403 to schedule a |Job| every minute.
405 Notes
406 -----
407 If given a `datetime.time` object with a non zero hour or minute property, these
408 information will be ignored.
410 Parameters
411 ----------
412 timing : TimingDailyUnion
413 Desired execution time(s).
414 handle : Callable[..., None]
415 Handle to a callback function.
417 Returns
418 -------
419 Job
420 Instance of a scheduled |Job|.
422 Other Parameters
423 ----------------
424 **kwargs
425 |Job| properties, optional
427 `kwargs` are used to specify |Job| properties.
429 Here is a list of available |Job| properties:
431 .. include:: ../_assets/kwargs.rst
432 """
433 try:
434 tg.check_type(timing, TimingDailyUnion)
435 except tg.TypeCheckError as err:
436 raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err
437 return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs)
439 @deprecated(["delay"])
440 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job:
441 r"""
442 Schedule an hourly `Job`.
444 Use a `datetime.time` object or a `list` of `datetime.time` objects
445 to schedule a |Job| every hour.
447 Notes
448 -----
449 If given a `datetime.time` object with a non zero hour property, this information
450 will be ignored.
452 Parameters
453 ----------
454 timing : TimingDailyUnion
455 Desired execution time(s).
456 handle : Callable[..., None]
457 Handle to a callback function.
459 Returns
460 -------
461 Job
462 Instance of a scheduled |Job|.
464 Other Parameters
465 ----------------
466 **kwargs
467 |Job| properties, optional
469 `kwargs` are used to specify |Job| properties.
471 Here is a list of available |Job| properties:
473 .. include:: ../_assets/kwargs.rst
474 """
475 try:
476 tg.check_type(timing, TimingDailyUnion)
477 except tg.TypeCheckError as err:
478 raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err
479 return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs)
481 @deprecated(["delay"])
482 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job:
483 r"""
484 Schedule a daily `Job`.
486 Use a `datetime.time` object or a `list` of `datetime.time` objects
487 to schedule a |Job| every day.
489 Parameters
490 ----------
491 timing : TimingDailyUnion
492 Desired execution time(s).
493 handle : Callable[..., None]
494 Handle to a callback function.
496 Returns
497 -------
498 Job
499 Instance of a scheduled |Job|.
501 Other Parameters
502 ----------------
503 **kwargs
504 |Job| properties, optional
506 `kwargs` are used to specify |Job| properties.
508 Here is a list of available |Job| properties:
510 .. include:: ../_assets/kwargs.rst
511 """
512 try:
513 tg.check_type(timing, TimingDailyUnion)
514 except tg.TypeCheckError as err:
515 raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err
516 return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs)
518 @deprecated(["delay"])
519 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs) -> Job:
520 r"""
521 Schedule a weekly `Job`.
523 Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly
524 recurring |Job|. Combine multiple desired `tuples` in
525 a `list`. If the planed execution time is `00:00` the `datetime.time` object
526 can be ignored, just pass a `Weekday` without a `tuple`.
528 Parameters
529 ----------
530 timing : TimingWeeklyUnion
531 Desired execution time(s).
532 handle : Callable[..., None]
533 Handle to a callback function.
535 Returns
536 -------
537 Job
538 Instance of a scheduled |Job|.
540 Other Parameters
541 ----------------
542 **kwargs
543 |Job| properties, optional
545 `kwargs` are used to specify |Job| properties.
547 Here is a list of available |Job| properties:
549 .. include:: ../_assets/kwargs.rst
550 """
551 try:
552 tg.check_type(timing, TimingWeeklyUnion)
553 except tg.TypeCheckError as err:
554 raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err
555 return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs)
557 def once( # pylint: disable=arguments-differ
558 self,
559 timing: TimingOnceUnion,
560 handle: Callable[..., None],
561 *,
562 args: Optional[tuple[Any, ...]] = None,
563 kwargs: Optional[dict[str, Any]] = None,
564 tags: Optional[Iterable[str]] = None,
565 alias: Optional[str] = None,
566 weight: float = 1,
567 ) -> Job:
568 r"""
569 Schedule a oneshot `Job`.
571 Parameters
572 ----------
573 timing : TimingOnceUnion
574 Desired execution time.
575 handle : Callable[..., None]
576 Handle to a callback function.
577 args : tuple[Any]
578 Positional argument payload for the function handle within a |Job|.
579 kwargs : Optional[dict[str, Any]]
580 Keyword arguments payload for the function handle within a |Job|.
581 tags : Optional[Iterable[str]]
582 The tags of the |Job|.
583 alias : Optional[str]
584 Overwrites the function handle name in the string representation.
585 weight : float
586 Relative weight against other |Job|\ s.
588 Returns
589 -------
590 Job
591 Instance of a scheduled |Job|.
592 """
593 try:
594 tg.check_type(timing, TimingOnceUnion)
595 except tg.TypeCheckError as err:
596 raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err
597 if isinstance(timing, dt.datetime):
598 return self.__schedule(
599 job_type=JobType.CYCLIC,
600 timing=dt.timedelta(),
601 handle=handle,
602 args=args,
603 kwargs=kwargs,
604 max_attempts=1,
605 tags=set(tags) if tags else set(),
606 alias=alias,
607 weight=weight,
608 delay=False,
609 start=timing,
610 )
611 return self.__schedule(
612 job_type=JOB_TYPE_MAPPING[type(timing)],
613 timing=timing,
614 handle=handle,
615 args=args,
616 kwargs=kwargs,
617 max_attempts=1,
618 tags=tags,
619 alias=alias,
620 weight=weight,
621 )
623 @property
624 def jobs(self) -> set[Job]:
625 r"""
626 Get the set of all `Job`\ s.
628 Returns
629 -------
630 set[Job]
631 Currently scheduled |Job|\ s.
632 """
633 return self.__jobs.copy()