Coverage for scheduler/base/scheduler_util.py: 100%
17 statements
« prev ^ index » next coverage.py v7.6.10, created at 2026-02-15 22:37 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2026-02-15 22:37 +0000
1"""
2Implementation of essential functions and components for a `BaseJob`.
4Author: Jendrik A. Potyka, Fabian A. Preiss
5"""
7import datetime as dt
8from typing import Optional, Union, cast
10from scheduler.base.job import BaseJobType
11from scheduler.base.timingtype import (
12 TimingCyclic,
13 TimingDailyUnion,
14 TimingJobUnion,
15 TimingWeeklyUnion,
16)
17from scheduler.error import SchedulerError
20def str_cutoff(string: str, max_length: int, cut_tail: bool = False) -> str:
21 """
22 Abbreviate a string to a given length.
24 The resulting string will carry an indicator if it's abbreviated,
25 like ``stri#``.
27 Parameters
28 ----------
29 string : str
30 String which is to be cut.
31 max_length : int
32 Max resulting string length.
33 cut_tail : bool
34 ``False`` for string abbreviation from the front, else ``True``.
36 Returns
37 -------
38 str
39 Resulting string
40 """
41 if max_length < 1:
42 raise ValueError("max_length < 1 not allowed")
44 if len(string) > max_length:
45 pos = max_length - 1
46 return string[:pos] + "#" if cut_tail else "#" + string[-pos:]
48 return string
51def create_job_instance(
52 job_class: type[BaseJobType],
53 timing: Union[TimingCyclic, TimingDailyUnion, TimingWeeklyUnion],
54 **kwargs,
55) -> BaseJobType:
56 """Create a job instance from the given input parameters."""
57 if not isinstance(timing, list):
58 timing_list = cast(TimingJobUnion, [timing])
59 else:
60 timing_list = cast(TimingJobUnion, timing)
62 return job_class(
63 timing=timing_list,
64 **kwargs,
65 )