本文共 3538 字,大约阅读时间需要 11 分钟。
Celery 是一个简单、灵活且可靠的分布式任务队列系统,专注于实时处理异步任务,同时支持任务调度。本文将深入解析 Celery 内部的 EventDispatcher 和 Event 组件,探讨它们如何实现事件的生产、分发和处理。
EventDispatcher 和 Event 组件负责 Celery 内部事件的处理。从字面上可以看出,EventDispatcher 的功能是事件分发,因此我们可以得出以下结论:
Events 组件的作用是接收和处理事件。通过 Kombu 的消费者实现事件处理,具体处理方式依据 Celery 的当前状态决定,涉及到 State 功能。
EventDispatcher 代码位于 celery/events/dispatcher.py。一个事件分发者需要以下成员变量:
connection (kombu.Connection):用于与 Broker 交互的连接。channel (kombu.Channel):用于操作连接的轻量化实例。producer:事件生产者,使用 Kombu 的 producer 概念。exchange:事件发送到交换机,用于将事件路由到队列。hostname:标示实例名称。groups:事件组功能。_outbound_buffer:事件缓存。clock:Lamport 逻辑时钟,用于区分事件发生顺序。EventDispatcher 使用 Kombu 的 producer 来实现事件生产。通过以下代码可以看到 producer 的配置:
self.producer = Producer(self.channel or self.connection, exchange=self.exchange, serializer=self.serializer, auto_declare=False)
producer 需要配置以下内容:
发送事件分为两种情况:
_outbound_buffer,然后批量发送。具体实现如下:
def send(self, type, blind=False, utcoffset=utcoffset, retry=False, retry_policy=None, Event=Event, **fields): if self.enabled: groups, group = self.groups, group_from(type) if groups and group not in groups: return if group in self.buffer_group: # 缓存到 buffer 中 event = Event(type, hostname=self.hostname, utcoffset=utcoffset(), pid=self.pid, clock=self.clock.forward(), **fields) buf = self._group_buffer[group] buf.append(event) if len(buf) >= self.buffer_limit: self.flush() elif self.on_send_buffered: self.on_send_buffered() else: # 直接发送 return self.publish(type, fields, self.producer, blind=blind, Event=Event, retry=retry, retry_policy=retry_policy)
Events 组件负责接收和处理事件。通过 Kombu 的 consumer 来实现。具体实现如下:
def capture(self, limit=None, timeout=None, wakeup=True): for _ in self.consume(limit=limit, timeout=timeout, wakeup=wakeup): pass
consume 方法内部使用 ConsumerMixin 来处理事件。
ConsumerMixin 提供了方便的 Consumer 程序实现。通过以下代码可以看到 consumer 的配置:
def get_consumers(self, Consumer, channel): return [Consumer(queues=[self.queue], callbacks=[self._receive], no_ack=True, accept=self.accept)]
Consumer 的配置包括:
queues:事件队列。callbacks:事件处理回调。no_ack:不确认消息。accept:消息接受条件。当事件接收后,通过 _receive 方法处理:
def _receive(self, body, message, list=list, isinstance=isinstance): if isinstance(body, list): # 列表形式的事件处理 for event in body: self.process(*self.event_from_message(event)) else: self.process(*self.event_from_message(body))
process 方法根据事件类型调用相应的处理函数:
def process(self, type, event): handler = self.handlers.get(type) or self.handlers.get('*') if handler: handler(event) 通过 _create_dispatcher 方法处理事件:
def _create_dispatcher(self): # ... 详细逻辑 ... def _event(event): # ... 详细逻辑 ... return (worker, created), subject
该方法根据事件类型(worker 或 task)调用相应的处理函数,更新 worker 或 task 的状态。
Events 可以用于开启快照相机或将事件 dump 到标准输出:
celery -A proj events -c myapp.DumpCam --frequency=2.0celery -A proj events --camera=celery -A proj events --dump
通过以下代码可以实现:
app.start(argv=['events'])
转载地址:http://xvgnz.baihongyu.com/