Coverage for scheduler/asyncio/scheduler.py: 100%

130 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2026-02-15 22:37 +0000

1""" 

2Implementation of a `asyncio` compatible in-process scheduler. 

3 

4Author: Jendrik A. Potyka, Fabian A. Preiss 

5""" 

6 

7from __future__ import annotations 

8 

9import asyncio as aio 

10import datetime as dt 

11from asyncio.selector_events import BaseSelectorEventLoop 

12from collections.abc import Iterable 

13from logging import Logger 

14from typing import Any, Callable, Coroutine, Optional 

15 

16import typeguard as tg 

17 

18from scheduler.asyncio.job import Job 

19from scheduler.base.definition import JOB_TYPE_MAPPING, JobType 

20from scheduler.base.scheduler import BaseScheduler, deprecated, select_jobs_by_tag 

21from scheduler.base.scheduler_util import create_job_instance, str_cutoff 

22from scheduler.base.timingtype import ( 

23 TimingCyclic, 

24 TimingDailyUnion, 

25 TimingOnceUnion, 

26 TimingWeeklyUnion, 

27) 

28from scheduler.error import SchedulerError 

29from scheduler.message import ( 

30 CYCLIC_TYPE_ERROR_MSG, 

31 DAILY_TYPE_ERROR_MSG, 

32 HOURLY_TYPE_ERROR_MSG, 

33 MINUTELY_TYPE_ERROR_MSG, 

34 ONCE_TYPE_ERROR_MSG, 

35 WEEKLY_TYPE_ERROR_MSG, 

36) 

37 

38_UTC = dt.timezone.utc 

39 

40 

41class Scheduler(BaseScheduler[Job, Callable[..., Coroutine[Any, Any, None]]]): 

42 r""" 

43 Implementation of an asyncio scheduler. 

44 

45 This implementation enables the planning of |AioJob|\ s depending on time 

46 cycles, fixed times, weekdays, dates, offsets and execution counts. 

47 

48 Notes 

49 ----- 

50 Due to the support of `datetime` objects, the |AioScheduler| is able to work 

51 with timezones. 

52 

53 Parameters 

54 ---------- 

55 loop : asyncio.selector_events.BaseSelectorEventLoop 

56 Set a AsyncIO event loop, default is the global event loop 

57 tzinfo : datetime.tzinfo 

58 Set the timezone of the |AioScheduler|. 

59 logger : Optional[logging.Logger] 

60 A custom Logger instance. 

61 """ 

62 

63 def __init__( 

64 self, 

65 *, 

66 loop: Optional[BaseSelectorEventLoop] = None, 

67 tzinfo: Optional[dt.tzinfo] = None, 

68 logger: Optional[Logger] = None, 

69 ): 

70 super().__init__(logger=logger) 

71 try: 

72 self.__loop = loop if loop else aio.get_running_loop() 

73 except RuntimeError: 

74 raise SchedulerError("The asyncio Scheduler requires a running event loop.") from None 

75 self.__tzinfo = tzinfo 

76 self._jobs: dict[Job, aio.Task[None]] = {} 

77 

78 def __repr__(self) -> str: 

79 return "scheduler.asyncio.scheduler.Scheduler({0}, jobs={{{1}}})".format( 

80 ", ".join((repr(elem) for elem in (self.__tzinfo,))), 

81 ", ".join([repr(job) for job in sorted(self.jobs)]), 

82 ) 

83 

84 def __str__(self) -> str: 

85 # Scheduler meta heading 

86 scheduler_headings = "{0}, {1}\n\n".format(*self.__headings()) 

87 

88 # Job table (we join two of the Job._repr() fields into one) 

89 # columns 

90 c_align = ("<", "<", "<", "<", ">", ">") 

91 c_width = (8, 16, 19, 12, 9, 13) 

92 c_name = ( 

93 "type", 

94 "function / alias", 

95 "due at", 

96 "tzinfo", 

97 "due in", 

98 "attempts", 

99 ) 

100 form = [ 

101 f"{ {idx}:{align}{width}} " for idx, (align, width) in enumerate(zip(c_align, c_width)) 

102 ] 

103 if self.__tzinfo is None: 

104 form = form[:3] + form[4:] 

105 

106 fstring = " ".join(form) + "\n" 

107 job_table = fstring.format(*c_name) 

108 job_table += fstring.format(*("-" * width for width in c_width)) 

109 for job in sorted(self.jobs): 

110 row = job._str() 

111 entries = ( 

112 row[0], 

113 str_cutoff(row[1] + row[2], c_width[1], False), 

114 row[3], 

115 str_cutoff(row[4] or "", c_width[3], False), 

116 str_cutoff(row[5], c_width[4], True), 

117 str_cutoff(f"{row[6]}/{row[7]}", c_width[5], True), 

118 ) 

119 job_table += fstring.format(*entries) 

120 

121 return scheduler_headings + job_table 

122 

123 def __headings(self) -> list[str]: 

124 

125 if self.__tzinfo is None: 

126 tzname = None 

127 else: 

128 reference_dt = dt.datetime.now(tz=_UTC) 

129 tzname = self.__tzinfo.tzname(reference_dt) 

130 

131 headings = [ 

132 f"tzinfo={tzname}", 

133 f"#jobs={len(self._jobs)}", 

134 ] 

135 return headings 

136 

137 def __schedule( 

138 self, 

139 **kwargs, 

140 ) -> Job: 

141 """Encapsulate the `Job` and add the `Scheduler`'s timezone.""" 

142 job: Job = create_job_instance(Job, tzinfo=self.__tzinfo, **kwargs) 

143 

144 task = self.__loop.create_task(self.__supervise_job(job)) 

145 self._jobs[job] = task 

146 

147 return job 

148 

149 async def __supervise_job(self, job: Job) -> None: 

150 try: 

151 reference_dt = dt.datetime.now(tz=self.__tzinfo) 

152 while job.has_attempts_remaining: 

153 sleep_seconds: float = job.timedelta(reference_dt).total_seconds() 

154 await aio.sleep(sleep_seconds) 

155 

156 await job._exec(logger=self._logger) # pylint: disable=protected-access 

157 

158 reference_dt = dt.datetime.now(tz=self.__tzinfo) 

159 job._calc_next_exec(reference_dt) # pylint: disable=protected-access 

160 except aio.CancelledError: # TODO asyncio does not trigger this exception in pytest, why? 

161 # raised, when `task.cancel()` in `delete_job` was run 

162 pass # pragma: no cover 

163 else: 

164 self.delete_job(job) 

165 

166 def delete_job(self, job: Job) -> None: 

167 """ 

168 Delete a `Job` from the `Scheduler`. 

169 

170 Parameters 

171 ---------- 

172 job : Job 

173 |AioJob| instance to delete. 

174 

175 Raises 

176 ------ 

177 SchedulerError 

178 Raises if the |AioJob| of the argument is not scheduled. 

179 """ 

180 try: 

181 task: aio.Task[None] = self._jobs.pop(job) 

182 _: bool = task.cancel() 

183 except KeyError: 

184 raise SchedulerError("An unscheduled Job can not be deleted!") from None 

185 

186 def delete_jobs( 

187 self, 

188 tags: Optional[set[str]] = None, 

189 any_tag: bool = False, 

190 ) -> int: 

191 r""" 

192 Delete a set of |AioJob|\ s from the |AioScheduler| by tags. 

193 

194 If no tags or an empty set of tags are given defaults to the deletion 

195 of all |AioJob|\ s. 

196 

197 Parameters 

198 ---------- 

199 tags : Optional[set[str]] 

200 Set of tags to identify target |AioJob|\ s. 

201 any_tag : bool 

202 False: To delete a |AioJob| all tags have to match. 

203 True: To delete a |AioJob| at least one tag has to match. 

204 """ 

205 all_jobs: set[Job] = set(self._jobs.keys()) 

206 jobs_to_delete: set[Job] 

207 

208 if tags is None or tags == set(): 

209 jobs_to_delete = all_jobs 

210 else: 

211 jobs_to_delete = select_jobs_by_tag(all_jobs, tags, any_tag) 

212 

213 for job in jobs_to_delete: 

214 self.delete_job(job) 

215 

216 return len(jobs_to_delete) 

217 

218 def get_jobs( 

219 self, 

220 tags: Optional[set[str]] = None, 

221 any_tag: bool = False, 

222 ) -> set[Job]: 

223 r""" 

224 Get a set of |AioJob|\ s from the |AioScheduler| by tags. 

225 

226 If no tags or an empty set of tags are given defaults to returning 

227 all |AioJob|\ s. 

228 

229 Parameters 

230 ---------- 

231 tags : set[str] 

232 Tags to filter scheduled |AioJob|\ s. 

233 If no tags are given all |AioJob|\ s are returned. 

234 any_tag : bool 

235 False: To match a |AioJob| all tags have to match. 

236 True: To match a |AioJob| at least one tag has to match. 

237 

238 Returns 

239 ------- 

240 set[Job] 

241 Currently scheduled |AioJob|\ s. 

242 """ 

243 if tags is None or tags == set(): 

244 return self.jobs 

245 return select_jobs_by_tag(self.jobs, tags, any_tag) 

246 

247 @deprecated(["delay"]) 

248 def cyclic( 

249 self, timing: TimingCyclic, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs 

250 ) -> Job: 

251 r""" 

252 Schedule a cyclic `Job`. 

253 

254 Use a `datetime.timedelta` object or a `list` of `datetime.timedelta` objects 

255 to schedule a cyclic |AioJob|. 

256 

257 Parameters 

258 ---------- 

259 timing : TimingTypeCyclic 

260 Desired execution time. 

261 handle : Callable[..., Coroutine[Any, Any, None]] 

262 Handle to a callback function. 

263 

264 Returns 

265 ------- 

266 Job 

267 Instance of a scheduled |AioJob|. 

268 

269 Other Parameters 

270 ---------------- 

271 **kwargs 

272 |AioJob| properties, optional 

273 

274 `kwargs` are used to specify |AioJob| properties. 

275 

276 Here is a list of available |AioJob| properties: 

277 

278 .. include:: ../_assets/aio_kwargs.rst 

279 """ 

280 try: 

281 tg.check_type(timing, TimingCyclic) 

282 except tg.TypeCheckError as err: 

283 raise SchedulerError(CYCLIC_TYPE_ERROR_MSG) from err 

284 return self.__schedule(job_type=JobType.CYCLIC, timing=timing, handle=handle, **kwargs) 

285 

286 @deprecated(["delay"]) 

287 def minutely( 

288 self, timing: TimingDailyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs 

289 ) -> Job: 

290 r""" 

291 Schedule a minutely `Job`. 

292 

293 Use a `datetime.time` object or a `list` of `datetime.time` objects 

294 to schedule a |AioJob| every minute. 

295 

296 Notes 

297 ----- 

298 If given a `datetime.time` object with a non zero hour or minute property, these 

299 information will be ignored. 

300 

301 Parameters 

302 ---------- 

303 timing : TimingDailyUnion 

304 Desired execution time(s). 

305 handle : Callable[..., Coroutine[Any, Any, None]] 

306 Handle to a callback function. 

307 

308 Returns 

309 ------- 

310 Job 

311 Instance of a scheduled |AioJob|. 

312 

313 Other Parameters 

314 ---------------- 

315 **kwargs 

316 |AioJob| properties, optional 

317 

318 `kwargs` are used to specify |AioJob| properties. 

319 

320 Here is a list of available |AioJob| properties: 

321 

322 .. include:: ../_assets/aio_kwargs.rst 

323 """ 

324 try: 

325 tg.check_type(timing, TimingDailyUnion) 

326 except tg.TypeCheckError as err: 

327 raise SchedulerError(MINUTELY_TYPE_ERROR_MSG) from err 

328 return self.__schedule(job_type=JobType.MINUTELY, timing=timing, handle=handle, **kwargs) 

329 

330 @deprecated(["delay"]) 

331 def hourly( 

332 self, timing: TimingDailyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs 

333 ) -> Job: 

334 r""" 

335 Schedule an hourly `Job`. 

336 

337 Use a `datetime.time` object or a `list` of `datetime.time` objects 

338 to schedule a |AioJob| every hour. 

339 

340 Notes 

341 ----- 

342 If given a `datetime.time` object with a non zero hour property, this information 

343 will be ignored. 

344 

345 Parameters 

346 ---------- 

347 timing : TimingDailyUnion 

348 Desired execution time(s). 

349 handle : Callable[..., Coroutine[Any, Any, None]] 

350 Handle to a callback function. 

351 

352 Returns 

353 ------- 

354 Job 

355 Instance of a scheduled |AioJob|. 

356 

357 Other Parameters 

358 ---------------- 

359 **kwargs 

360 |AioJob| properties, optional 

361 

362 `kwargs` are used to specify |AioJob| properties. 

363 

364 Here is a list of available |AioJob| properties: 

365 

366 .. include:: ../_assets/aio_kwargs.rst 

367 """ 

368 try: 

369 tg.check_type(timing, TimingDailyUnion) 

370 except tg.TypeCheckError as err: 

371 raise SchedulerError(HOURLY_TYPE_ERROR_MSG) from err 

372 return self.__schedule(job_type=JobType.HOURLY, timing=timing, handle=handle, **kwargs) 

373 

374 @deprecated(["delay"]) 

375 def daily( 

376 self, timing: TimingDailyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs 

377 ) -> Job: 

378 r""" 

379 Schedule a daily `Job`. 

380 

381 Use a `datetime.time` object or a `list` of `datetime.time` objects 

382 to schedule a |AioJob| every day. 

383 

384 Parameters 

385 ---------- 

386 timing : TimingDailyUnion 

387 Desired execution time(s). 

388 handle : Callable[..., Coroutine[Any, Any, None]] 

389 Handle to a callback function. 

390 

391 Returns 

392 ------- 

393 Job 

394 Instance of a scheduled |AioJob|. 

395 

396 Other Parameters 

397 ---------------- 

398 **kwargs 

399 |AioJob| properties, optional 

400 

401 `kwargs` are used to specify |AioJob| properties. 

402 

403 Here is a list of available |AioJob| properties: 

404 

405 .. include:: ../_assets/aio_kwargs.rst 

406 """ 

407 try: 

408 tg.check_type(timing, TimingDailyUnion) 

409 except tg.TypeCheckError as err: 

410 raise SchedulerError(DAILY_TYPE_ERROR_MSG) from err 

411 return self.__schedule(job_type=JobType.DAILY, timing=timing, handle=handle, **kwargs) 

412 

413 @deprecated(["delay"]) 

414 def weekly( 

415 self, timing: TimingWeeklyUnion, handle: Callable[..., Coroutine[Any, Any, None]], **kwargs 

416 ) -> Job: 

417 r""" 

418 Schedule a weekly `Job`. 

419 

420 Use a `tuple` of a `Weekday` and a `datetime.time` object to define a weekly 

421 recurring |AioJob|. Combine multiple desired `tuples` in 

422 a `list`. If the planed execution time is `00:00` the `datetime.time` object 

423 can be ignored, just pass a `Weekday` without a `tuple`. 

424 

425 Parameters 

426 ---------- 

427 timing : TimingWeeklyUnion 

428 Desired execution time(s). 

429 handle : Callable[..., Coroutine[Any, Any, None]] 

430 Handle to a callback function. 

431 

432 Returns 

433 ------- 

434 Job 

435 Instance of a scheduled |AioJob|. 

436 

437 Other Parameters 

438 ---------------- 

439 **kwargs 

440 |AioJob| properties, optional 

441 

442 `kwargs` are used to specify |AioJob| properties. 

443 

444 Here is a list of available |AioJob| properties: 

445 

446 .. include:: ../_assets/aio_kwargs.rst 

447 """ 

448 try: 

449 tg.check_type(timing, TimingWeeklyUnion) 

450 except tg.TypeCheckError as err: 

451 raise SchedulerError(WEEKLY_TYPE_ERROR_MSG) from err 

452 return self.__schedule(job_type=JobType.WEEKLY, timing=timing, handle=handle, **kwargs) 

453 

454 def once( 

455 self, 

456 timing: TimingOnceUnion, 

457 handle: Callable[..., Coroutine[Any, Any, None]], 

458 *, 

459 args: Optional[tuple[Any, ...]] = None, 

460 kwargs: Optional[dict[str, Any]] = None, 

461 tags: Optional[Iterable[str]] = None, 

462 alias: Optional[str] = None, 

463 ) -> Job: 

464 r""" 

465 Schedule a oneshot `Job`. 

466 

467 Parameters 

468 ---------- 

469 timing : TimingOnceUnion 

470 Desired execution time. 

471 handle : Callable[..., Coroutine[Any, Any, None]] 

472 Handle to a callback function. 

473 args : tuple[Any] 

474 Positional argument payload for the function handle within a |AioJob|. 

475 kwargs : Optional[dict[str, Any]] 

476 Keyword arguments payload for the function handle within a |AioJob|. 

477 tags : Optional[Iterable[str]] 

478 The tags of the |AioJob|. 

479 alias : Optional[str] 

480 Overwrites the function handle name in the string representation. 

481 

482 Returns 

483 ------- 

484 Job 

485 Instance of a scheduled |AioJob|. 

486 """ 

487 try: 

488 tg.check_type(timing, TimingOnceUnion) 

489 except tg.TypeCheckError as err: 

490 raise SchedulerError(ONCE_TYPE_ERROR_MSG) from err 

491 if isinstance(timing, dt.datetime): 

492 return self.__schedule( 

493 job_type=JobType.CYCLIC, 

494 timing=dt.timedelta(), 

495 handle=handle, 

496 args=args, 

497 kwargs=kwargs, 

498 max_attempts=1, 

499 tags=set(tags) if tags else set(), 

500 alias=alias, 

501 delay=False, 

502 start=timing, 

503 ) 

504 return self.__schedule( 

505 job_type=JOB_TYPE_MAPPING[type(timing)], 

506 timing=timing, 

507 handle=handle, 

508 args=args, 

509 kwargs=kwargs, 

510 max_attempts=1, 

511 tags=tags, 

512 alias=alias, 

513 ) 

514 

515 @property 

516 def jobs(self) -> set[Job]: 

517 r""" 

518 Get the set of all `Job`\ s. 

519 

520 Returns 

521 ------- 

522 set[Job] 

523 Currently scheduled |AioJob|\ s. 

524 """ 

525 return set(self._jobs.keys())