您当前的位置: 首页 >  阿里云云栖号

透过 In-memory Channel 看 Knative Eventing 中 Broker/Trigger 工作机制

阿里云云栖号 发布时间:2019-07-17 10:34:07 ,浏览量:0

In-memory Channel是当前Knative Eventing中默认的Channel, 也是一般刚接触Knative Eventing首先了解到的Channel。本文通过分析 In-memory Channel 来进一步了解 Knative Eventing 中Broker/Trigger事件处理机制。

事件处理概览

我们先整体看一下Knative Eventing 工作机制示意图:

通过 namespace 创建默认 Broker 如果不指定Channel,会使用默认的 Inmemory Channel。

下面我们从数据平面开始分析Event事件是如何通过In-memory Channel分发到Knative Service

Ingress

Ingress是事件进入Channel前的第一级过滤,但目前的功能仅仅是接收事件然后转发到Channel。过滤功能处理TODO状态。

func (h *handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
    tctx := cloudevents.HTTPTransportContextFrom(ctx)
    if tctx.Method != http.MethodPost {
        resp.Status = http.StatusMethodNotAllowed
        return nil
    }

    // tctx.URI is actually the path...
    if tctx.URI != "/" {
        resp.Status = http.StatusNotFound
        return nil
    }

    ctx, _ = tag.New(ctx, tag.Insert(TagBroker, h.brokerName))
    defer func() {
        stats.Record(ctx, MeasureEventsTotal.M(1))
    }()

    send := h.decrementTTL(&event)
    if !send {
        ctx, _ = tag.New(ctx, tag.Insert(TagResult, "droppedDueToTTL"))
        return nil
    }

    // TODO Filter.

    ctx, _ = tag.New(ctx, tag.Insert(TagResult, "dispatched"))
    return h.sendEvent(ctx, tctx, event)
}
In-memory Channel

Broker 字面意思为代理者,那么它代理的是谁呢?是Channel。为什么要代理Channel呢,而不直接发给访问Channel。这个其实涉及到Broker/Trigger设计的初衷:对事件过滤处理。我们知道Channel(消息通道)负责事件传递,Subscription(订阅)负责订阅事件,通常这二者的模型如下:

这里就涉及到消息队列和订阅分发的实现。那么在In-memory Channel中如何实现的呢? 其实 In-memory 的核心处理在Fanout Handler中,它负责将接收到的事件分发到不同的 Subscription。 In-memory Channel处理示意图:

事件接收并分发核心代码如下:

func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error {
    return func(_ provisioners.ChannelReference, m *provisioners.Message) error {
        if f.config.AsyncHandler {
            go func() {
                // Any returned error is already logged in f.dispatch().
                _ = f.dispatch(m)
            }()
            return nil
        }
        return f.dispatch(m)
    }
}

当前分发机制默认是异步机制(可通过AsyncHandler参数控制分发机制)。

消息分发机制:

// dispatch takes the request, fans it out to each subscription in f.config. If all the fanned out
// requests return successfully, then return nil. Else, return an error.
func (f *Handler) dispatch(msg *provisioners.Message) error {
    errorCh := make(chan error, len(f.config.Subscriptions))
    for _, sub := range f.config.Subscriptions {
        go func(s eventingduck.SubscriberSpec) {
            errorCh             
关注
打赏
1688896170
查看更多评论
0.0405s