超实用 Demo:使用 FastAPI、Celery、RabbitMQ 和 MongoDB 实现一个异步任务工作流

  发布时间:2025-11-05 11:07:28   作者:玩站小弟   我要评论
异步任务,是 Web 开发中经常遇到的问题,比如说用户提交了一个请求,虽然这个请求对应的任务非常耗时,但是不能让用户等在这里,通常需要立即返回结果,告诉用户任务已提交。任务可以在后续慢慢完成,完成后再 。

异步任务,超实是用D用 Web 开发中经常遇到的问题,比如说用户提交了一个请求,使实现虽然这个请求对应的个异任务非常耗时,但是步任不能让用户等在这里,通常需要立即返回结果,作流告诉用户任务已提交。超实任务可以在后续慢慢完成,用D用完成后再给用户发一个完成的使实现通知。源码库

今天分享一份代码,个异使用 Celery、步任RabbitMQ 和 MongoDB 实现一个异步任务工作流,作流你可以修改 task.py 来实现你自己的超实异步任务。

架构图如下:

其中 Celery 来执行异步任务,用D用RabbitMQ 作为消息队列,使实现MongoDB 存储任务执行结果,FastAPI 提供 Web 接口。

以上所有模块均可使用 Docker 一键部署。

下面为 Demo 使用方法:

1、确保本机已安装 Docker、Git

2、云服务器提供商下载源代码:

复制git clone https://github.com/aarunjith/async-demo.git1.

3、部署并启动:

复制cd async-

demo

docker compose up --build1.2.

4、启动一个异步任务:

复制$ curl -X POST http://localhost:8080/process1.

任务会发送到消息队列,同时会立即返回一个任务 id:

复制❯ curl -X POST http://localhost:8080/

process

{"status":"PENDING","id":"a129c666-7b5b-45f7-ba54-9d7b96a1fe58","error":""}%1.2.

5、查询任务状态:

复制curl -X POST http://localhost:8080/check_progress/<task_id>1.

任务完成后的返回结果如下:

复制 ❯ curl -X POST http://localhost:8080/check_progress/a129c666-7b5b-45f7-ba54-9

d7b96a1fe58

{"status":"SUCEESS","data":""hello""}%1.2.

代码目录结构如下:

其中 app.py 如下:

复制from

fastapi import FastAPI

from celery.result

import AsyncResult

from

tasks import start_processing

from

loguru import logger

from

pymongo import MongoClient

import uvicorn

# Lets create a connection to our backend where

celery stores the results

client = MongoClient("mongodb://mongodb:27017")# Default database and collection names that Celery createdb = client[task_results]coll = db["celery_taskmeta"]app = FastAPI()@app.post(/process)async def process_text_file(): Process endpoint to trigger the start of a process try: result = start_processing.delay() logger.info(fStarted processing the task with id {result.id}) return { "status": result.state, id: result.id, error: } except Exception as e: logger.info(fTask Execution failed: {e}) return { "status": "FAILURE", id: None, error:

e

}@app.post(/check_progress/{task_id})async def check_async_progress(task_id: str): Endpoint to check the task progress and fetch the results if the task is complete. try: result = AsyncResult(task_id) if result.ready(): data = coll.find({_id: task_id})[0] return {status: SUCEESS, data: data[result]} else: return {"status": result.state, "error":} except Exception as e: data = coll.find({_id: task_id})[0] if data: return {status: SUCEESS, data: data[result]} return {status: Task ID invalid, error: e}if __name__ == "__main__": uvicorn.run("app:app", host=0.0.0.0, port=8080)1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.

如果要实现自己的任务队列,就修改 task.py 来添加自己的异步任务,可以整合到自己的项目中。

  • Tag:

相关文章

  • 探索笔记本i3-7100的性能与功能(揭秘i3-7100的处理器性能与多样化功能)

    摘要:当谈到性能强大且便携的计算设备时,笔记本电脑毫无疑问是首选。而i3-7100作为一款集高性能与多样化功能于一身的笔记本处理器,备受消费者青睐。本文将深入探讨i3-7100的处理器性...
    2025-11-05
  • 可穿戴设备与物联网在未来将如何发展

    智能手表和可穿戴技术如今处于不同的发展阶段,但不同的技术可以协同工作以使其变得更智能。物联网就是其中一项技术。它使物联网设备能够通过网络进行通信,并使可用的资源和数据尽可能地发挥作用。[[392551
    2025-11-05
  • SQL 面试攻略:从普通到困难的副本通关之旅

    本文汇总了一些 SQL 面试中经典且具有代表性的题目,涵盖普通和困难模式,帮助读者熟悉一些常考的 SQL 问题。每道题均附有详细解答,提供示例代码和解析。示例表结构
    2025-11-05
  • OT安全挑战:从被动防御到主动防护

    企业运营技术OT)安全已经取得了长足的进步,但随着时间的推移,挑战也在不断增加。早期,OT 系统与外部网络完全隔离,使其本质上是安全的。那时安全并非生产制造领域考虑的首要因素,而是着重于确保稳定可靠的
    2025-11-05
  • 英特尔Corei3-6100(性能稳定,功耗低,为日常办公和轻度游戏提供出色表现)

    摘要:英特尔Corei3-6100是英特尔推出的一款处理器,专为日常办公和轻度游戏而设计。它采用了先进的技术和架构,提供稳定的性能表现,并在功耗方面做到了较低水平。本文将详细介绍英特尔C...
    2025-11-05
  • 攻防对抗中的六个“AI VS. AI”最佳实践

    人工智能技术的发展正在深刻影响着网络安全领域。一方面,AI赋能了网络攻击手段的自动化和智能化,使得攻击者能够更快、更隐蔽地入侵企业网络,窃取数据、资金和身份信息;另一方面,AI也为网络防御带来了新的机
    2025-11-05

最新评论