跳到主要内容

gRPC

transporter/grpc 中基于谷歌的 grpc 框架实现了 Transporter,用以注册 grpc 到 eagle.Server() 中。

Server

配置

Network(network string) ServerOption

配置服务端的 network 协议,如 tcp

Address(addr string) ServerOption

配置服务端监听的地址

Timeout(timeout time.Duration) ServerOption

配置服务端的超时设置

EnableTracing() ServerOption

启用服务端的链路追踪

EnableLog() ServerOption

启用服务端的日志

UnaryInterceptor(in ...grpc.UnaryServerInterceptor) ServerOption

配置服务端使用的 grpc 单元拦截器

Options(opts ...grpc.ServerOption) ServerOption

配置一些额外的 grpc.ServerOption

主要的实现细节

NewServer()

func NewServer(opts ...ServerOption) *Server {
srv := &Server{
network: "tcp",
address: ":0",
timeout: 1 * time.Second,
health: health.NewServer(),
}
for _, o := range opts {
o(srv)
}
// Unary
chainUnaryInterceptors := []grpc.UnaryServerInterceptor{
unaryServerInterceptor(srv),
grpcPrometheus.UnaryServerInterceptor,
grpcRecovery.UnaryServerInterceptor(),
}
if len(srv.inters) > 0 {
chainUnaryInterceptors = append(chainUnaryInterceptors, srv.inters...)
}

// stream
chainStreamInterceptors := []grpc.StreamServerInterceptor{
grpcPrometheus.StreamServerInterceptor,
grpcRecovery.StreamServerInterceptor(),
}

// enable tracing
if srv.enableTracing {
chainUnaryInterceptors = append(chainUnaryInterceptors, otelgrpc.UnaryServerInterceptor(srv.TracerOptions...))
chainStreamInterceptors = append(chainStreamInterceptors, otelgrpc.StreamServerInterceptor(srv.TracerOptions...))
}

// enable log
if srv.enableLog {
chainUnaryInterceptors = append(chainUnaryInterceptors, grpcZap.UnaryServerInterceptor(logger.GetZapLogger()))
chainStreamInterceptors = append(chainStreamInterceptors, grpcZap.StreamServerInterceptor(logger.GetZapLogger()))
}

grpcOpts := []grpc.ServerOption{
grpc.ChainUnaryInterceptor(chainUnaryInterceptors...),
grpc.ChainStreamInterceptor(chainStreamInterceptors...),
}
if len(srv.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, srv.grpcOpts...)
}

grpcServer := grpc.NewServer(grpcOpts...)

// health check
healthPb.RegisterHealthServer(grpcServer, srv.health)

// register reflection and the interface can be debugged through the grpcurl tool
reflection.Register(grpcServer)

// set zero values for metrics registered for this grpc server
grpcPrometheus.Register(grpcServer)

srv.Server = grpcServer

return srv
}
}

使用方式

简单列举了一些 eagle 中 grpc 的用法,其他 grpc 用法可以到 grpc 仓库中查看。

注册 grpc server

gs := grpc.NewServer()
app := eagle.New(
eagle.Name("eagle"),
eagle.Version("v1.0.0"),
eagle.Server(gs),
)

Client

配置

WithEndpoint(endpoint string) ClientOption

配置客户端使用的对端连接地址,如果不使用服务发现则为ip:port,如果使用服务发现则格式为discovery://<authority>/<serviceName>

WithTimeout(timeout time.Duration) ClientOption

配置客户端的请求默认超时时间,如果有链路超时优先使用链路超时时间

WithMetric() ClientOption

启用监控指标

WithLog() ClientOption

启用日志

WithTracing() ClientOption

启用链路追踪

WithKeepalive() ClientOption

启用长连接

WithGzip() ClientOption

启用日志压缩

WithoutRetry() ClientOption

启用重试功能

WithDiscovery(d registry.Discovery) ClientOption

配置客户端服务发现

WithUnaryInterceptor(in ...grpc.UnaryClientInterceptor) ClientOption

配置客户端使用的 grpc 原生拦截器

WithOptions(opts ...grpc.DialOption) ClientOption

配置一些额外的 grpc.ClientOption

主要的实现细节

dial()

func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
// default client options
options := clientOptions{
timeout: 2000 * time.Millisecond,
balancerName: roundrobin.Name,
enableGzip: true,
enableMetric: true,
disableRetry: false,
NumRetries: 2,
enableKeepalive: true,
kp: keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: time.Second,
PermitWithoutStream: false,
},
}
for _, opt := range opts {
opt(&options)
}

// merge inters
inters := []grpc.UnaryClientInterceptor{
unaryClientInterceptor(),
}
if len(options.inters) > 0 {
inters = append(inters, options.inters...)
}

// default dial option
dialOpts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingPolicy": "%s"}`, options.balancerName)),
grpc.WithChainUnaryInterceptor(inters...),
}
if len(options.dialOpts) > 0 {
dialOpts = append(dialOpts, options.dialOpts...)
}
// service discovery
if options.discovery != nil {
dialOpts = append(dialOpts, grpc.WithResolvers(discovery.NewBuilder(
options.discovery, discovery.WithInsecure(insecure))))
}
if insecure {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(grpcInsecure.NewCredentials()))
} else {
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
cred := credentials.NewTLS(tlsConfig)
dialOpts = append(dialOpts, grpc.WithTransportCredentials(cred))
}
if options.enableKeepalive {
kp := keepalive.ClientParameters{
Time: options.kp.Time,
Timeout: options.kp.Timeout,
PermitWithoutStream: options.kp.PermitWithoutStream,
}
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(kp))
}
if options.enableGzip {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))
}
if options.enableMetric {
dialOpts = append(dialOpts,
grpc.WithChainUnaryInterceptor(grpcPrometheus.UnaryClientInterceptor),
grpc.WithChainStreamInterceptor(grpcPrometheus.StreamClientInterceptor),
)
}
// enable tracing
if options.enableTracing {
dialOpts = append(dialOpts,
grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()),
)
}
if options.enableLog {
dialOpts = append(dialOpts,
grpc.WithChainUnaryInterceptor(grpcZap.UnaryClientInterceptor(logger.GetZapLogger())),
grpc.WithChainStreamInterceptor(grpcZap.StreamClientInterceptor(logger.GetZapLogger())),
)
}
if !options.disableRetry {
dialOpts = append(dialOpts,
grpc.WithDefaultServiceConfig(getRetryPolicy(options.balancerName, options.NumRetries)),
)
}

return grpc.DialContext(ctx, options.endpoint, dialOpts...)
}

使用方式

创建客户端连接

conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("127.0.0.1:9000"),
)

使用服务发现

conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("discovery:///helloworld"),
grpc.WithDiscovery(r),
)

References