# gRPC 源码分析(三): gRPC Server 的 RPC 交互阶段

语言: CN / TW / HK

gRPC 源码分析(三): gRPC Server 的 RPC 交互阶段

在上一篇中, 我们知道了 gRPC server 是如何和 client 之间建立起 HTTP2 连接的, 在本篇中, 我们一起来看看 gRPC server 在收到了一个来自 client 的 RPC request 之后是怎么处理的.

gRPC Server 如何处理不同的 frame

HTTP2 中定义了多种类型的 frame, 包括 data, headers, settings, ping, goaway 等等. 对于不同的 frame 类型, HTTP2 server 应该有不同的处理逻辑.

```go func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) { if s.quit.HasFired() { rawConn.Close() return } rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))

// RPC 连接阶段, 完成 HTTP2 握手
st := s.newHTTP2Transport(rawConn)
rawConn.SetDeadline(time.Time{})
if st == nil {
    return
}

if !s.addConn(lisAddr, st) {
    return
}
go func() {
    // 开始 RPC 交互阶段
    s.serveStreams(st)
    s.removeConn(lisAddr, st)
}()

} ```

在 gRPC 中, 对 frame 类型的分类和处理, 被包含在 s.serveStreams 中.

```go func (s *Server) serveStreams(st transport.ServerTransport) { defer st.Close() var wg sync.WaitGroup

var roundRobinCounter uint32
// 阻塞并接收来自 client 的 frame
st.HandleStreams(func(stream *transport.Stream) {
    wg.Add(1)
    ......
    go func() {
        defer wg.Done()
        // 当一个新的 stream 被创建之后, 进行一些配置
        s.handleStream(st, stream, s.traceInfo(st, stream))
    }()
}, func(ctx context.Context, method string) context.Context {
    if !EnableTracing {
        return ctx
    }
    tr := trace.New("grpc.Recv."+methodFamily(method), method)
    return trace.NewContext(ctx, tr)
})
wg.Wait()

} ```

st.HandleStreams 会阻塞当前 goroutine, 并等待来自 client 的 frame.

go func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { defer close(t.readerDone) for { t.controlBuf.throttle() frame, err := t.framer.fr.ReadFrame() ... switch frame := frame.(type) { case *http2.MetaHeadersFrame: if t.operateHeaders(frame, handle, traceCtx) { t.Close() break } case *http2.DataFrame: t.handleData(frame) case *http2.RSTStreamFrame: t.handleRSTStream(frame) case *http2.SettingsFrame: t.handleSettings(frame) case *http2.PingFrame: t.handlePing(frame) case *http2.WindowUpdateFrame: t.handleWindowUpdate(frame) case *http2.GoAwayFrame: // TODO: Handle GoAway from the client appropriately. default: if logger.V(logLevel) { logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame) } } } }

st.HandleStreams 在一个 for 循环中等待并读取来自 client 的 frame, 并采取不同的处理方式. 本篇中将以 headers, data 和 settings frame 为例, 简要描述 gRPC server 的处理方法.

Headers Frame

Headers frame 的处理函数 operateHeaders 较长, 接下来我们会分段来看看其中的内容.

go func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { ...... streamID := frame.Header().StreamID ...... buf := newRecvBuffer() s := &Stream{ id: streamID, st: t, buf: buf, fc: &inFlow{limit: uint32(t.initialWindowSize)}, } ...... }

在 gRPC server 和 client 端, 存在着一个 stream 的概念, 用来表征一次 gRPC call. 一个 gRPC call 总是以一个来自 client 的 headers frame 开始, 因此 server 会在operateHeaders 中创建一个 Stream 对象, stream 有一个 client 和 server 端一致的 id, 也有一个 buf 缓存. 关于缓存和流量控制的细节, 我们会在后续的篇章中讨论.

go func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { ...... for _, hf := range frame.Fields { switch hf.Name { case "content-type": contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value) if !validContentType { break } mdata[hf.Name] = append(mdata[hf.Name], hf.Value) s.contentSubtype = contentSubtype isGRPC = true case "grpc-encoding": s.recvCompress = hf.Value case ":method": httpMethod = hf.Value case ":path": s.method = hf.Value case "grpc-timeout": timeoutSet = true var err error if timeout, err = decodeTimeout(hf.Value); err != nil { headerError = true } // "Transports must consider requests containing the Connection header // as malformed." - A41 case "connection": if logger.V(logLevel) { logger.Errorf("transport: http2Server.operateHeaders parsed a :connection header which makes a request malformed as per the HTTP/2 spec") } headerError = true default: if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) { break } v, err := decodeMetadataHeader(hf.Name, hf.Value) if err != nil { headerError = true logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err) break } mdata[hf.Name] = append(mdata[hf.Name], v) } } ...... }

gRPC server 会遍历 frame 中的 field, 并将 field 中的信息记录在 stream 中. 值得注意的是 :method:path 两个 field, client 端需要填写好这两个 field 来明确地指定要调用 server 端提供的哪一个 remote procedure. 也就是说, 调用哪一个 server 方法的信息是和调用方法的参数分开在不同的 frame 中的.

go func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { ...... t.activeStreams[streamID] = s ...... handle(s) ...... }

这个新建的 stream 对象会被放到 server 的 activeStreams map 中, 并调用 handle(s) 来进一步处理这个 stream. handle 这个函数最终会触发调用 s.handleStream .

```go func (s Server) handleStream(t transport.ServerTransport, stream transport.Stream, trInfo *traceInfo) { sm := stream.Method() ...... service := sm[:pos] method := sm[pos+1:]

srv, knownService := s.services[service]
if knownService {
    if md, ok := srv.methods[method]; ok {
        s.processUnaryRPC(t, stream, srv, md, trInfo)
        return
    }
    if sd, ok := srv.streams[method]; ok {
        s.processStreamingRPC(t, stream, srv, sd, trInfo)
        return
    }
}
......
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
    ......
}
......

} ```

根据 headers frame 中 path 和 method 的信息, gRPC server 找到注册好的 method 并执行. 以 s.processUnaryRPC 为例:

go func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) { ...... d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) ...... reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt) ...... if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { ...... } err = t.WriteStatus(stream, statusOK) ...... }

首先从 stream 中读取 data frame, 即 RPC 方法中的参数信息. 随后调用 md.Handler 执行已经注册好的方法, 并将 reply 发送给 client (并不会直接翻送给 client, 而是将数据存储在 buffer 中, 由 loopyWriter 发送. 最后将 status statusOK 发送给 client. WriteStatus 在一个 stream 的结尾处执行, 即标志着这个 stream 的结束.

Data Frame

go func (t *http2Server) handleData(f *http2.DataFrame) { ...... // Select the right stream to dispatch. s, ok := t.getStream(f) if !ok { return } ...... if size > 0 { ...... if len(f.Data()) > 0 { buffer := t.bufferPool.get() buffer.Reset() buffer.Write(f.Data()) s.write(recvMsg{buffer: buffer}) } } ...... }

在处理 data frame 时

  1. 根据 streamId, 从 server 的 activeStreams map 中找到 stream 对象.
  2. bufferPool 中拿到一块 buffer, 并把 frame 的数据写入到 buffer 中.
  3. 将这块 buffer 保存到 stream 的 recvBuffer 中.

recvBuffer 中缓存的数据, 最终会被前面提到的 recvAndDecompress 函数读取, 从而在 server 端重建 RPC 的参数.

Settings Frame

除了完成一个 gRPC call 所必须的的 headers frame 以及 data frame 以外, server 端还可能在 RPC 交互阶段收到来自 client 的 setting frame, 来更新 HTTP2 的一些参数.

go func (t *http2Server) handleSettings(f *http2.SettingsFrame) { ...... var ss []http2.Setting ...... f.ForeachSetting(func(s http2.Setting) error { switch s.ID { ...... default: ss = append(ss, s) } return nil }) t.controlBuf.executeAndPut(func(interface{}) bool { for _, f := range updateFuncs { f() } return true }, &incomingSettings{ ss: ss, }) }

handleSettings 并没有直接将 settings frame 的参数应用在 server 上, 而是将其放到了 controlBuf 中, controlBuf 的相关内容会在后续的篇章中涉及到.

总结

可以看到 gRPC server 在整个处理流程中, 除了执行注册好的方法以外, 基本上都是异步的. 各个操作之间通过 buffer 连接在一起, 最大限度地避免 goroutine 阻塞.