gRPC
transporter/grpc
中基于谷歌的 grpc 框架实现了 Transporter
,用以注册 grpc 到 eagle.Server()
中。
Server
配置
Network(network string) ServerOption
配置服务端的 network 协议,如 tcp
Address(addr string) ServerOption
配置服务端监听的地址
Timeout(timeout time.Duration) ServerOption
配置服务端的超时设置
EnableTracing() ServerOption
启用服务端的链路追踪
EnableLog() ServerOption
启用服务端的日志
UnaryInterceptor(in ...grpc.UnaryServerInterceptor) ServerOption
配置服务端使用的 grpc 单元拦截器
Options(opts ...grpc.ServerOption) ServerOption
配置一些额外的 grpc.ServerOption
主要的实现细节
NewServer()
func NewServer(opts ...ServerOption) *Server {
srv := &Server{
network: "tcp",
address: ":0",
timeout: 1 * time.Second,
health: health.NewServer(),
}
for _, o := range opts {
o(srv)
}
// Unary
chainUnaryInterceptors := []grpc.UnaryServerInterceptor{
unaryServerInterceptor(srv),
grpcPrometheus.UnaryServerInterceptor,
grpcRecovery.UnaryServerInterceptor(),
}
if len(srv.inters) > 0 {
chainUnaryInterceptors = append(chainUnaryInterceptors, srv.inters...)
}
// stream
chainStreamInterceptors := []grpc.StreamServerInterceptor{
grpcPrometheus.StreamServerInterceptor,
grpcRecovery.StreamServerInterceptor(),
}
// enable tracing
if srv.enableTracing {
chainUnaryInterceptors = append(chainUnaryInterceptors, otelgrpc.UnaryServerInterceptor(srv.TracerOptions...))
chainStreamInterceptors = append(chainStreamInterceptors, otelgrpc.StreamServerInterceptor(srv.TracerOptions...))
}
// enable log
if srv.enableLog {
chainUnaryInterceptors = append(chainUnaryInterceptors, grpcZap.UnaryServerInterceptor(logger.GetZapLogger()))
chainStreamInterceptors = append(chainStreamInterceptors, grpcZap.StreamServerInterceptor(logger.GetZapLogger()))
}
grpcOpts := []grpc.ServerOption{
grpc.ChainUnaryInterceptor(chainUnaryInterceptors...),
grpc.ChainStreamInterceptor(chainStreamInterceptors...),
}
if len(srv.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, srv.grpcOpts...)
}
grpcServer := grpc.NewServer(grpcOpts...)
// health check
healthPb.RegisterHealthServer(grpcServer, srv.health)
// register reflection and the interface can be debugged through the grpcurl tool
reflection.Register(grpcServer)
// set zero values for metrics registered for this grpc server
grpcPrometheus.Register(grpcServer)
srv.Server = grpcServer
return srv
}
}
使用方式
简单列举了一些 eagle 中 grpc 的用法,其他 grpc 用法可以到 grpc 仓库中查看。
注册 grpc server
gs := grpc.NewServer()
app := eagle.New(
eagle.Name("eagle"),
eagle.Version("v1.0.0"),
eagle.Server(gs),
)
Client
配置
WithEndpoint(endpoint string) ClientOption
配置客户端使用的对端连接地址,如果不使用服务发现则为ip:port,如果使用服务发现则格式为discovery://<authority>/<serviceName>
WithTimeout(timeout time.Duration) ClientOption
配置客户端的请求默认超时时间,如果有链路超时优先使用链路超时时间
WithMetric() ClientOption
启用监控指标
WithLog() ClientOption
启用日志
WithTracing() ClientOption
启用链路追踪
WithKeepalive() ClientOption
启用长连接
WithGzip() ClientOption
启用日志压缩
WithoutRetry() ClientOption
启用重试功能
WithDiscovery(d registry.Discovery) ClientOption
配置客户端服务发现
WithUnaryInterceptor(in ...grpc.UnaryClientInterceptor) ClientOption
配置客户端使用的 grpc 原生拦截器
WithOptions(opts ...grpc.DialOption) ClientOption
配置一些额外的 grpc.ClientOption
主要的实现细节
dial()
func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
// default client options
options := clientOptions{
timeout: 2000 * time.Millisecond,
balancerName: roundrobin.Name,
enableGzip: true,
enableMetric: true,
disableRetry: false,
NumRetries: 2,
enableKeepalive: true,
kp: keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: time.Second,
PermitWithoutStream: false,
},
}
for _, opt := range opts {
opt(&options)
}
// merge inters
inters := []grpc.UnaryClientInterceptor{
unaryClientInterceptor(),
}
if len(options.inters) > 0 {
inters = append(inters, options.inters...)
}
// default dial option
dialOpts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingPolicy": "%s"}`, options.balancerName)),
grpc.WithChainUnaryInterceptor(inters...),
}
if len(options.dialOpts) > 0 {
dialOpts = append(dialOpts, options.dialOpts...)
}
// service discovery
if options.discovery != nil {
dialOpts = append(dialOpts, grpc.WithResolvers(discovery.NewBuilder(
options.discovery, discovery.WithInsecure(insecure))))
}
if insecure {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(grpcInsecure.NewCredentials()))
} else {
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
cred := credentials.NewTLS(tlsConfig)
dialOpts = append(dialOpts, grpc.WithTransportCredentials(cred))
}
if options.enableKeepalive {
kp := keepalive.ClientParameters{
Time: options.kp.Time,
Timeout: options.kp.Timeout,
PermitWithoutStream: options.kp.PermitWithoutStream,
}
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(kp))
}
if options.enableGzip {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))
}
if options.enableMetric {
dialOpts = append(dialOpts,
grpc.WithChainUnaryInterceptor(grpcPrometheus.UnaryClientInterceptor),
grpc.WithChainStreamInterceptor(grpcPrometheus.StreamClientInterceptor),
)
}
// enable tracing
if options.enableTracing {
dialOpts = append(dialOpts,
grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()),
)
}
if options.enableLog {
dialOpts = append(dialOpts,
grpc.WithChainUnaryInterceptor(grpcZap.UnaryClientInterceptor(logger.GetZapLogger())),
grpc.WithChainStreamInterceptor(grpcZap.StreamClientInterceptor(logger.GetZapLogger())),
)
}
if !options.disableRetry {
dialOpts = append(dialOpts,
grpc.WithDefaultServiceConfig(getRetryPolicy(options.balancerName, options.NumRetries)),
)
}
return grpc.DialContext(ctx, options.endpoint, dialOpts...)
}
使用方式
创建客户端连接
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("127.0.0.1:9000"),
)
使用服务发现
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("discovery:///helloworld"),
grpc.WithDiscovery(r),
)