Coverage for scheduler/threading/scheduler.py: 100%

172 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2026-02-15 22:37 +0000

1""" 

2Implementation of a `threading` compatible in-process scheduler. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7import datetime as dt 

8import queue 

9import threading 

10from collections.abc import Iterable 

11from logging import Logger 

12from typing import Any, Callable, Optional 

13 

14import typeguard as tg 

15 

16from scheduler.base.definition import JOB_TYPE_MAPPING, JobType 

17from scheduler.base.scheduler import BaseScheduler, deprecated, select_jobs_by_tag 

18from scheduler.base.scheduler_util import create_job_instance, str_cutoff 

19from scheduler.base.timingtype import ( 

20 TimingCyclic, 

21 TimingDailyUnion, 

22 TimingOnceUnion, 

23 TimingWeeklyUnion, 

24) 

25from scheduler.error import SchedulerError 

26from scheduler.message import ( 

27 CYCLIC_TYPE_ERROR_MSG, 

28 DAILY_TYPE_ERROR_MSG, 

29 HOURLY_TYPE_ERROR_MSG, 

30 MINUTELY_TYPE_ERROR_MSG, 

31 ONCE_TYPE_ERROR_MSG, 

32 TZ_ERROR_MSG, 

33 WEEKLY_TYPE_ERROR_MSG, 

34) 

35from scheduler.prioritization import linear_priority_function 

36from scheduler.threading.job import Job 

37 

38_UTC = dt.timezone.utc 

39 

40 

41def _exec_job_worker(que: queue.Queue[Job], logger: Logger) -> None: 

42 running = True 

43 while running: 

44 try: 

45 job = que.get(block=False) 

46 except queue.Empty: 

47 running = False 

48 else: 

49 job._exec(logger=logger) # pylint: disable=protected-access 

50 que.task_done() 

51 

52 

53class Scheduler(BaseScheduler[Job, Callable[..., None]]): 

54 r""" 

55 Implementation of a scheduler for callback functions. 

56 

57 This implementation enables the planning of |Job|\ s depending on time 

58 cycles, fixed times, weekdays, dates, offsets, execution counts and weights. 

59 

60 Notes 

61 ----- 

62 Due to the support of `datetime` objects, `scheduler` is able to work 

63 with timezones. 

64 

65 Parameters 

66 ---------- 

67 tzinfo : datetime.tzinfo 

68 Set the timezone of the |Scheduler|. 

69 max_exec : int 

70 Limits the number of overdue |Job|\ s that can be executed 

71 by calling function `Scheduler.exec_jobs()`. 

72 priority_function : Callable[[float, Job, int, int], float] 

73 A function handle to compute the priority of a |Job| depending 

74 on the time it is overdue and its respective weight. Defaults to a linear 

75 priority function. 

76 jobs : set[Job] 

77 A collection of job instances. 

78 n_threads : int 

79 The number of worker threads. 0 for unlimited, default 1. 

80 logger : Optional[logging.Logger] 

81 A custom Logger instance. 

82 """ 

83 

84 def __init__( 

85 self, 

86 *, 

87 max_exec: int = 0, 

88 tzinfo: Optional[dt.tzinfo] = None, 

89 priority_function: Callable[ 

90 [float, Job, int, int], 

91 float, 

92 ] = linear_priority_function, 

93 jobs: Optional[Iterable[Job]] = None, 

94 n_threads: int = 1, 

95 logger: Optional[Logger] = None, 

96 ): 

97 super().__init__(logger=logger) 

98 self.__max_exec = max_exec 

99 self.__tzinfo = tzinfo 

100 self.__priority_function = priority_function 

101 self.__jobs_lock = threading.RLock() 

102 if not jobs: 

103 self.__jobs = set() 

104 elif isinstance(jobs, set): 

105 self.__jobs = jobs 

106 else: 

107 self.__jobs = set(jobs) 

108 

109 for job in self.__jobs: 

110 if job._tzinfo != self.__tzinfo: 

111 raise SchedulerError(TZ_ERROR_MSG) 

112 

113 self.__n_threads = n_threads 

114 

115 def __repr__(self) -> str: 

116 with self.__jobs_lock: 

117 return "scheduler.Scheduler({0}, jobs={{{1}}})".format( 

118 ", ".join( 

119 ( 

120 repr(elem) 

121 for elem in ( 

122 self.__max_exec, 

123 self.__tzinfo, 

124 self.__priority_function, 

125 ) 

126 ) 

127 ), 

128 ", ".join([repr(job) for job in sorted(self.jobs)]), 

129 ) 

130 

131 def __str__(self) -> str: 

132 with self.__jobs_lock: 

133 # Scheduler meta heading 

134 scheduler_headings = "{0}, {1}, {2}, {3}\n\n".format(*self.__headings()) 

135 

136 # Job table (we join two of the Job._repr() fields into one) 

137 # columns 

138 c_align = ("<", "<", "<", "<", ">", ">", ">") 

139 c_width = (8, 16, 19, 12, 9, 13, 6) 

140 c_name = ( 

141 "type", 

142 "function / alias", 

143 "due at", 

144 "tzinfo", 

145 "due in", 

146 "attempts", 

147 "weight", 

148 ) 

149 form = [ 

150 f"{ {idx}:{align}{width}} " 

151 for idx, (align, width) in enumerate(zip(c_align, c_width)) 

152 ] 

153 

154 if self.__tzinfo is None: 

155 form = form[:3] + form[4:] 

156 

157 fstring = " ".join(form) + "\n" 

158 job_table = fstring.format(*c_name) + fstring.format( 

159 *("-" * width for width in c_width) 

160 ) 

161 for job in sorted(self.jobs): 

162 row = job._str() 

163 entries = ( 

164 row[0], 

165 str_cutoff(row[1] + row[2], c_width[1], False), 

166 row[3], 

167 str_cutoff(row[4] or "", c_width[3], False), 

168 str_cutoff(row[5], c_width[4], True), 

169 str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True), 

170 str_cutoff(f"{job.weight}", c_width[6], True), 

171 ) 

172 job_table += fstring.format(*entries) 

173 

174 return scheduler_headings + job_table 

175 

176 def __headings(self) -> list[str]: 

177 

178 if self.__tzinfo is None: 

179 tzname = None 

180 else: 

181 reference_dt = dt.datetime.now(tz=_UTC) 

182 tzname = self.__tzinfo.tzname(reference_dt) 

183 

184 with self.__jobs_lock: 

185 headings = [ 

186 f"max_exec={self.__max_exec if self.__max_exec else float('inf')}", 

187 f"tzinfo={tzname}", 

188 f"priority_function={self.__priority_function.__name__}", 

189 f"#jobs={len(self.__jobs)}", 

190 ] 

191 return headings 

192 

193 def __schedule( 

194 self, 

195 **kwargs, 

196 ) -> Job: 

197 """Encapsulate the `Job` and add the `Scheduler`'s timezone.""" 

198 job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs) 

199 if job.has_attempts_remaining: 

200 with self.__jobs_lock: 

201 self.__jobs.add(job) 

202 return job 

203 

204 def __exec_jobs(self, jobs: list[Job], ref_dt: dt.datetime) -> int: 

205 n_jobs = len(jobs) 

206 

207 que: queue.Queue[Job] = queue.Queue() 

208 for job in jobs: 

209 que.put(job) 

210 

211 workers = [] 

212 for _ in range(self.__n_threads or n_jobs): 

213 worker = threading.Thread(target=_exec_job_worker, args=(que, self._logger)) 

214 worker.daemon = True 

215 worker.start() 

216 workers.append(worker) 

217 

218 que.join() 

219 for worker in workers: 

220 worker.join() 

221 

222 for job in jobs: 

223 job._calc_next_exec(ref_dt) # pylint: disable=protected-access 

224 if not job.has_attempts_remaining: 

225 self.delete_job(job) 

226 

227 return n_jobs 

228 

229 def exec_jobs(self, force_exec_all: bool = False) -> int: 

230 r""" 

231 Execute scheduled `Job`\ s. 

232 

233 By default executes the |Job|\ s that are overdue. 

234 

235 |Job|\ s are executed in order of their priority 

236 :ref:`examples.weights`. If the |Scheduler| instance 

237 has a limit on the job execution counts per call of 

238 :func:`~scheduler.core.Scheduler.exec_jobs`, via the `max_exec` argument, 

239 |Job|\ s of lower priority might not get executed when 

240 competing |Job|\ s are overdue. 

241 

242 Parameters 

243 ---------- 

244 force_exec_all : bool 

245 Ignore the both - the status of the |Job| timers 

246 as well as the execution limit of the |Scheduler| 

247 

248 Returns 

249 ------- 

250 int 

251 Number of executed |Job|\ s. 

252 """ 

253 ref_dt = dt.datetime.now(tz=self.__tzinfo) 

254 

255 if force_exec_all: 

256 return self.__exec_jobs(list(self.__jobs), ref_dt) 

257 # collect the current priority for all jobs 

258 

259 job_priority: dict[Job, float] = {} 

260 n_jobs = len(self.__jobs) 

261 with self.__jobs_lock: 

262 for job in self.__jobs: 

263 delta_seconds = job.timedelta(ref_dt).total_seconds() 

264 job_priority[job] = self.__priority_function( 

265 -delta_seconds, 

266 job, 

267 self.__max_exec, 

268 n_jobs, 

269 ) 

270 # sort the jobs by priority 

271 sorted_jobs = sorted(job_priority, key=job_priority.get, reverse=True) # type: ignore 

272 # filter jobs by max_exec and priority greater zero 

273 filtered_jobs = [ 

274 job 

275 for idx, job in enumerate(sorted_jobs) 

276 if (self.__max_exec == 0 or idx < self.__max_exec) and job_priority[job] > 0 

277 ] 

278 return self.__exec_jobs(filtered_jobs, ref_dt) 

279 

280 def delete_job(self, job: Job) -> None: 

281 """ 

282 Delete a `Job` from the `Scheduler`. 

283 

284 Parameters 

285 ---------- 

286 job : Job 

287 |Job| instance to delete. 

288 

289 Raises 

290 ------ 

291 SchedulerError 

292 Raises if the |Job| of the argument is not scheduled. 

293 """ 

294 try: 

295 with self.__jobs_lock: 

296 self.__jobs.remove(job) 

297 except KeyError: 

298 raise SchedulerError("An unscheduled Job can not be deleted!") from None 

299 

300 def delete_jobs( 

301 self, 

302 tags: Optional[set[str]] = None, 

303 any_tag: bool = False, 

304 ) -> int: 

305 r""" 

306 Delete a set of |Job|\ s from the |Scheduler| by tags. 

307 

308 If no tags or an empty set of tags are given defaults to the deletion 

309 of all |Job|\ s. 

310 

311 Parameters 

312 ---------- 

313 tags : Optional[set[str]] 

314 Set of tags to identify target |Job|\ s. 

315 any_tag : bool 

316 False: To delete a |Job| all tags have to match. 

317 True: To deleta a |Job| at least one tag has to match. 

318 """ 

319 with self.__jobs_lock: 

320 if tags is None or tags == set(): 

321 n_jobs = len(self.__jobs) 

322 self.__jobs = set() 

323 return n_jobs 

324 

325 to_delete = select_jobs_by_tag(self.__jobs, tags, any_tag) 

326 

327 self.__jobs = self.__jobs - to_delete 

328 return len(to_delete) 

329 

330 def get_jobs( 

331 self, 

332 tags: Optional[set[str]] = None, 

333 any_tag: bool = False, 

334 ) -> set[Job]: 

335 r""" 

336 Get a set of |Job|\ s from the |Scheduler| by tags. 

337 

338 If no tags or an empty set of tags are given defaults to returning 

339 all |Job|\ s. 

340 

341 Parameters 

342 ---------- 

343 tags : set[str] 

344 Tags to filter scheduled |Job|\ s. 

345 If no tags are given all |Job|\ s are returned. 

346 any_tag : bool 

347 False: To match a |Job| all tags have to match. 

348 True: To match a |Job| at least one tag has to match. 

349 

350 Returns 

351 ------- 

352 set[Job] 

353 Currently scheduled |Job|\ s. 

354 """ 

355 with self.__jobs_lock: 

356 if tags is None or tags == set(): 

357 return self.__jobs.copy() 

358 return select_jobs_by_tag(self.__jobs, tags, any_tag) 

359 

360 @deprecated(["delay"]) 

361 def cyclic(self, timing: TimingCyclic, handle: Callable[..., None], **kwargs) -> Job: 

362 r""" 

363 Schedule a cyclic `Job`. 

364 

365 Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects 

366 to schedule a cyclic |Job|. 

367 

368 Parameters 

369 ---------- 

370 timing : TimingTypeCyclic 

371 Desired execution time. 

372 handle : Callable[..., None] 

373 Handle to a callback function. 

374 

375 Returns 

376 ------- 

377 Job 

378 Instance of a scheduled |Job|. 

379 

380 Other Parameters 

381 ---------------- 

382 **kwargs 

383 |Job| properties, optional 

384 

385 `kwargs` are used to specify |Job| properties. 

386 

387 Here is a list of available |Job| properties: 

388 

389 .. include:: ../_assets/kwargs.rst 

390 """ 

391 try: 

392 tg.check_type(timing, TimingCyclic) 

393 except tg.TypeCheckError as err: 

394 raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err 

395 return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs) 

396 

397 @deprecated(["delay"]) 

398 def minutely(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job: 

399 r""" 

400 Schedule a minutely `Job`. 

401 

402 Use a `datetime.time` object or a `list` of `datetime.time` objects 

403 to schedule a |Job| every minute. 

404 

405 Notes 

406 ----- 

407 If given a `datetime.time` object with a non zero hour or minute property, these 

408 information will be ignored. 

409 

410 Parameters 

411 ---------- 

412 timing : TimingDailyUnion 

413 Desired execution time(s). 

414 handle : Callable[..., None] 

415 Handle to a callback function. 

416 

417 Returns 

418 ------- 

419 Job 

420 Instance of a scheduled |Job|. 

421 

422 Other Parameters 

423 ---------------- 

424 **kwargs 

425 |Job| properties, optional 

426 

427 `kwargs` are used to specify |Job| properties. 

428 

429 Here is a list of available |Job| properties: 

430 

431 .. include:: ../_assets/kwargs.rst 

432 """ 

433 try: 

434 tg.check_type(timing, TimingDailyUnion) 

435 except tg.TypeCheckError as err: 

436 raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err 

437 return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs) 

438 

439 @deprecated(["delay"]) 

440 def hourly(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job: 

441 r""" 

442 Schedule an hourly `Job`. 

443 

444 Use a `datetime.time` object or a `list` of `datetime.time` objects 

445 to schedule a |Job| every hour. 

446 

447 Notes 

448 ----- 

449 If given a `datetime.time` object with a non zero hour property, this information 

450 will be ignored. 

451 

452 Parameters 

453 ---------- 

454 timing : TimingDailyUnion 

455 Desired execution time(s). 

456 handle : Callable[..., None] 

457 Handle to a callback function. 

458 

459 Returns 

460 ------- 

461 Job 

462 Instance of a scheduled |Job|. 

463 

464 Other Parameters 

465 ---------------- 

466 **kwargs 

467 |Job| properties, optional 

468 

469 `kwargs` are used to specify |Job| properties. 

470 

471 Here is a list of available |Job| properties: 

472 

473 .. include:: ../_assets/kwargs.rst 

474 """ 

475 try: 

476 tg.check_type(timing, TimingDailyUnion) 

477 except tg.TypeCheckError as err: 

478 raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err 

479 return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs) 

480 

481 @deprecated(["delay"]) 

482 def daily(self, timing: TimingDailyUnion, handle: Callable[..., None], **kwargs) -> Job: 

483 r""" 

484 Schedule a daily `Job`. 

485 

486 Use a `datetime.time` object or a `list` of `datetime.time` objects 

487 to schedule a |Job| every day. 

488 

489 Parameters 

490 ---------- 

491 timing : TimingDailyUnion 

492 Desired execution time(s). 

493 handle : Callable[..., None] 

494 Handle to a callback function. 

495 

496 Returns 

497 ------- 

498 Job 

499 Instance of a scheduled |Job|. 

500 

501 Other Parameters 

502 ---------------- 

503 **kwargs 

504 |Job| properties, optional 

505 

506 `kwargs` are used to specify |Job| properties. 

507 

508 Here is a list of available |Job| properties: 

509 

510 .. include:: ../_assets/kwargs.rst 

511 """ 

512 try: 

513 tg.check_type(timing, TimingDailyUnion) 

514 except tg.TypeCheckError as err: 

515 raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err 

516 return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs) 

517 

518 @deprecated(["delay"]) 

519 def weekly(self, timing: TimingWeeklyUnion, handle: Callable[..., None], **kwargs) -> Job: 

520 r""" 

521 Schedule a weekly `Job`. 

522 

523 Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly 

524 recurring |Job|. Combine multiple desired `tuples` in 

525 a `list`. If the planed execution time is `00:00` the `datetime.time` object 

526 can be ignored, just pass a `Weekday` without a `tuple`. 

527 

528 Parameters 

529 ---------- 

530 timing : TimingWeeklyUnion 

531 Desired execution time(s). 

532 handle : Callable[..., None] 

533 Handle to a callback function. 

534 

535 Returns 

536 ------- 

537 Job 

538 Instance of a scheduled |Job|. 

539 

540 Other Parameters 

541 ---------------- 

542 **kwargs 

543 |Job| properties, optional 

544 

545 `kwargs` are used to specify |Job| properties. 

546 

547 Here is a list of available |Job| properties: 

548 

549 .. include:: ../_assets/kwargs.rst 

550 """ 

551 try: 

552 tg.check_type(timing, TimingWeeklyUnion) 

553 except tg.TypeCheckError as err: 

554 raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err 

555 return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs) 

556 

557 def once( # pylint: disable=arguments-differ 

558 self, 

559 timing: TimingOnceUnion, 

560 handle: Callable[..., None], 

561 *, 

562 args: Optional[tuple[Any, ...]] = None, 

563 kwargs: Optional[dict[str, Any]] = None, 

564 tags: Optional[Iterable[str]] = None, 

565 alias: Optional[str] = None, 

566 weight: float = 1, 

567 ) -> Job: 

568 r""" 

569 Schedule a oneshot `Job`. 

570 

571 Parameters 

572 ---------- 

573 timing : TimingOnceUnion 

574 Desired execution time. 

575 handle : Callable[..., None] 

576 Handle to a callback function. 

577 args : tuple[Any] 

578 Positional argument payload for the function handle within a |Job|. 

579 kwargs : Optional[dict[str, Any]] 

580 Keyword arguments payload for the function handle within a |Job|. 

581 tags : Optional[Iterable[str]] 

582 The tags of the |Job|. 

583 alias : Optional[str] 

584 Overwrites the function handle name in the string representation. 

585 weight : float 

586 Relative weight against other |Job|\ s. 

587 

588 Returns 

589 ------- 

590 Job 

591 Instance of a scheduled |Job|. 

592 """ 

593 try: 

594 tg.check_type(timing, TimingOnceUnion) 

595 except tg.TypeCheckError as err: 

596 raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err 

597 if isinstance(timing, dt.datetime): 

598 return self.__schedule( 

599 job_type=JobType.CYCLIC, 

600 timing=dt.timedelta(), 

601 handle=handle, 

602 args=args, 

603 kwargs=kwargs, 

604 max_attempts=1, 

605 tags=set(tags) if tags else set(), 

606 alias=alias, 

607 weight=weight, 

608 delay=False, 

609 start=timing, 

610 ) 

611 return self.__schedule( 

612 job_type=JOB_TYPE_MAPPING[type(timing)], 

613 timing=timing, 

614 handle=handle, 

615 args=args, 

616 kwargs=kwargs, 

617 max_attempts=1, 

618 tags=tags, 

619 alias=alias, 

620 weight=weight, 

621 ) 

622 

623 @property 

624 def jobs(self) -> set[Job]: 

625 r""" 

626 Get the set of all `Job`\ s. 

627 

628 Returns 

629 ------- 

630 set[Job] 

631 Currently scheduled |Job|\ s. 

632 """ 

633 return self.__jobs.copy()