博客
关于我
[源码解析] 并行分布式任务队列 Celery 之 EventDispatcher & Event 组件
阅读量:530 次
发布时间:2019-03-08

本文共 3538 字,大约阅读时间需要 11 分钟。

[源码解析] 并行分布式任务队列 Celery 之 EventDispatcher & Event 组件

摘要

Celery 是一个简单、灵活且可靠的分布式任务队列系统,专注于实时处理异步任务,同时支持任务调度。本文将深入解析 Celery 内部的 EventDispatcher 和 Event 组件,探讨它们如何实现事件的生产、分发和处理。

思路

EventDispatcher 和 Event 组件负责 Celery 内部事件的处理。从字面上可以看出,EventDispatcher 的功能是事件分发,因此我们可以得出以下结论:

  • 事件分发必然有生产者和消费者,EventDispatcher 作为事件生产者。
  • 需要有一个 broker 存储中间事件。
  • Celery 底层依赖于 Kombu,而 Kombu 提供了 producer 和 consumer 的概念,可以直接利用这些功能。
  • Kombu 提供了 Mailbox 的实现,用于不同实例之间的事件发送和处理,可以实现单播和广播。

Events 组件的作用是接收和处理事件。通过 Kombu 的消费者实现事件处理,具体处理方式依据 Celery 的当前状态决定,涉及到 State 功能。

定义

EventDispatcher 代码位于 celery/events/dispatcher.py。一个事件分发者需要以下成员变量:

  • connection (kombu.Connection):用于与 Broker 交互的连接。
  • channel (kombu.Channel):用于操作连接的轻量化实例。
  • producer:事件生产者,使用 Kombu 的 producer 概念。
  • exchange:事件发送到交换机,用于将事件路由到队列。
  • hostname:标示实例名称。
  • groups:事件组功能。
  • _outbound_buffer:事件缓存。
  • clock:Lamport 逻辑时钟,用于区分事件发生顺序。

Producer

EventDispatcher 使用 Kombu 的 producer 来实现事件生产。通过以下代码可以看到 producer 的配置:

self.producer = Producer(self.channel or self.connection, exchange=self.exchange, serializer=self.serializer, auto_declare=False)

producer 需要配置以下内容:

  • Connection:Redis 连接。
  • Exchange:事件发送到交换机。
  • Serializer:事件序列化方式,默认使用 JSON。
  • Auto_declare:自动声明 exchange 和 queue。

发送事件

发送事件分为两种情况:

  • 单播:直接调用 producer.publish 发送事件。
  • 组播:将事件缓存到 _outbound_buffer,然后批量发送。
  • 具体实现如下:

    def send(self, type, blind=False, utcoffset=utcoffset, retry=False, retry_policy=None, Event=Event, **fields):    if self.enabled:        groups, group = self.groups, group_from(type)        if groups and group not in groups:            return        if group in self.buffer_group:            # 缓存到 buffer 中            event = Event(type, hostname=self.hostname, utcoffset=utcoffset(), pid=self.pid, clock=self.clock.forward(), **fields)            buf = self._group_buffer[group]            buf.append(event)            if len(buf) >= self.buffer_limit:                self.flush()            elif self.on_send_buffered:                self.on_send_buffered()        else:            # 直接发送            return self.publish(type, fields, self.producer, blind=blind, Event=Event, retry=retry, retry_policy=retry_policy)

    Events 组件

    Events 组件负责接收和处理事件。通过 Kombu 的 consumer 来实现。具体实现如下:

    def capture(self, limit=None, timeout=None, wakeup=True):    for _ in self.consume(limit=limit, timeout=timeout, wakeup=wakeup):        pass

    consume 方法内部使用 ConsumerMixin 来处理事件。

    ConsumerMixin

    ConsumerMixin 提供了方便的 Consumer 程序实现。通过以下代码可以看到 consumer 的配置:

    def get_consumers(self, Consumer, channel):    return [Consumer(queues=[self.queue], callbacks=[self._receive], no_ack=True, accept=self.accept)]

    Consumer 的配置包括:

    • queues:事件队列。
    • callbacks:事件处理回调。
    • no_ack:不确认消息。
    • accept:消息接受条件。

    接收和处理

    当事件接收后,通过 _receive 方法处理:

    def _receive(self, body, message, list=list, isinstance=isinstance):    if isinstance(body, list):        # 列表形式的事件处理        for event in body:            self.process(*self.event_from_message(event))    else:        self.process(*self.event_from_message(body))

    process 方法根据事件类型调用相应的处理函数:

    def process(self, type, event):    handler = self.handlers.get(type) or self.handlers.get('*')    if handler:        handler(event)

    state 处理

    通过 _create_dispatcher 方法处理事件:

    def _create_dispatcher(self):    # ... 详细逻辑 ...    def _event(event):        # ... 详细逻辑 ...        return (worker, created), subject

    该方法根据事件类型(worker 或 task)调用相应的处理函数,更新 worker 或 task 的状态。

    调试

    Events 可以用于开启快照相机或将事件 dump 到标准输出:

    celery -A proj events -c myapp.DumpCam --frequency=2.0celery -A proj events --camera=celery -A proj events --dump

    通过以下代码可以实现:

    app.start(argv=['events'])

    个人信息

    • 微信公众账号:罗西的思考
    • 如果需要及时获取技术资讯或文章推荐,可以关注我的公众号。

    参考

    转载地址:http://xvgnz.baihongyu.com/

    你可能感兴趣的文章
    Objective-C实现基于 LIFO的堆栈算法(附完整源码)
    查看>>
    Objective-C实现基于 LinkedList 的添加两个数字的解决方案算法(附完整源码)
    查看>>
    Objective-C实现基于opencv的抖动算法(附完整源码)
    查看>>
    Objective-C实现基于事件对象实现线程同步(附完整源码)
    查看>>
    Objective-C实现基于信号实现线程同步(附完整源码)
    查看>>
    Objective-C实现基于文件流拷贝文件(附完整源码)
    查看>>
    Objective-C实现基于模板的双向链表(附完整源码)
    查看>>
    Objective-C实现基于模板的顺序表(附完整源码)
    查看>>
    Objective-C实现基本二叉树算法(附完整源码)
    查看>>
    Objective-C实现堆排序(附完整源码)
    查看>>
    Objective-C实现填充环形矩阵(附完整源码)
    查看>>
    Objective-C实现声音录制播放程序(附完整源码)
    查看>>
    Objective-C实现备忘录模式(附完整源码)
    查看>>
    Objective-C实现复制粘贴文本功能(附完整源码)
    查看>>
    Objective-C实现复数类+-x%(附完整源码)
    查看>>
    Objective-C实现外观模式(附完整源码)
    查看>>
    Objective-C实现多启发式a star A*算法(附完整源码)
    查看>>
    Objective-C实现多尺度MSR算法(附完整源码)
    查看>>
    Objective-C实现多种方法求解定积分(附完整源码)
    查看>>
    Objective-C实现多组输入(附完整源码)
    查看>>