FastApi(自用脚手架)+Snowy搭建后台管理系统(13)-以类的方式定义后台任务依赖项

语言: CN / TW / HK

theme: awesome-green

前言

我们知道定义后台任务,我们只需要在路由函数中添加具体的后台任务即可,如下代码所示: ``` from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

def back_tasks_notification(message=""): pass

@app.post("/send-backtask/") async def backtask(message: str, background_tasks: BackgroundTasks): background_tasks.add_task(back_tasks_notification, message=f"后台任务...{message}") return {"message": "BackgroundTasks执行"} ```

其实上面的代码执行也是非常优雅可观。不过对于一些任务定义来说,可能就相对的分散凌乱,没有进行统一管理,所以下面我个人习惯把一些或一组的后台任务以类的方式来定义。

实现

首先我们定义用于添加后台任务类,如下代码所示: ``` import functools import typing from typing import Optional, List from fastapi import BackgroundTasks

class BackgroundTasksDependencyInject:

def __init__(self, only_run_first: bool = True) -> None:
    # 访问路由组时候是否仅仅执行一次后台任务
    self.only_run_first: bool = only_run_first
    # 后台任务
    self.background_task = None
    # 存放任务列表
    self.tasks: Optional[List[typing.Callable]] = []

def add_task(self, function=None, *args, **kwargs) -> None:
    if function:
        func_obj = functools.partial(function, *args, **kwargs)
        self.tasks.append(func_obj)

def on_task(self, *args, **kwargs) -> typing.Callable:  # pragma: nocover
    def decorator(func: typing.Callable) -> typing.Callable:
        self.add_task(func, *args, **kwargs)
        return func

    return decorator

async def __call__(self, background_task: BackgroundTasks) -> Optional[BackgroundTasks]:
    """
    """
    pass
    if not self.background_task:
        self.background_task = background_task
    if self.only_run_first:
        tasks = [self.background_task.add_task(func_obj) for func_obj in self.tasks]
    else:
        tasks = [background_task.add_task(func_obj) for func_obj in self.tasks]
    # print("tasks_len", len(tasks))
    return background_task

```

在上面的代码中,首先定义了相关的属性,它们主要包括了:

  • only_run_first 判断这些任务组是否只是执行一次的标记
  • self.tasks 后台任务的列表

其中类的包含的方法以及内部装饰器有: - add_task 表示添加后台任务 - on_task 定义装饰器,在内部其实是对add_task的调用 - call 则是关键中的关键,它可以把类的实例对象转化为可调用对象,在内进行任务的调用的循环添加到BackgroundTasks上。

用法

在上面定义完成我们的后台任务管理类后,接下来则是创建具体的任务组对象并,添加具体的任务,如下代码所示:

image.png

文本代码如下: ``` import asyncio import time

from snowy_src.snowy_common.snowy_dependencies.inject_backgrond_tasks import BackgroundTasksDependencyInject newtasks = BackgroundTasksDependencyInject(only_run_first=False)

后台任务注册

@newtasks.on_task() async def _background_task_1(): print("background_task01") await asyncio.sleep(1) pass

后台任务注册

@newtasks.on_task() def _background_task_2(): print("background_task02") time.sleep(2) pass ``` 在前面我们的知道,此时我们的newtask对象是一个可调用对象,可以对它进行()调用,所以它也可以是一个依赖项,所以,我们可以把它当做一个依赖注入项,进行使用,如下代码图所示:

image.png

需注意点:相关后台任务需要是正常响应逻辑处理完成才会执行。

最终我们可以在控制台看到我们的任务被执行,如下图所示:

image.png

至此,关于【-以类的方式定义后台任务依赖项】相关介绍分享已完成!以上内容分享纯属个人经验,仅供参考!

文笔有限,如有笔误或错误!欢迎批评指正!感谢各位大佬!有什么问题也可以随时交流!

结尾

END

简书:https://www.jianshu.com/u/d6960089b087

掘金:https://juejin.cn/user/2963939079225608

公众号:微信搜【程序员小钟同学】

新开QQ群号,欢迎随时加群交流,相互学习。QQ群号:247491107

小钟同学 | 文 【欢迎一起学习交流】| QQ:308711822