Skip to the content.

gRPC etcd 服务注册与发现、自定义负载均衡

2024-06-14 10:00:00


考虑这种常见情景:服务多开,正常连接采用轮询负载均衡,但若服务有状态,重连则需进入之前的服务

本文其实主要在讨论以下两篇官方文档

实现依赖即将废弃的resolver.Address.Metadata,其实也仅是想复用已有的官方代码

自定义负载均衡并未依照示例,官方可能也是想展示更多细节,提供的明显不是最简单的实现方式,毕竟都是支持Service Config集成自定义配置的,也是需要熟悉endpointshardingpickfirst相关逻辑的,所以不太适合用来入门

反观官方源码中roundrobin,基于baseBalancer仅实现Picker的方式,才真是将代码写在了刀刃上

服务启动clientv3.Put,服务关闭clientv3.Delete,创建租约并借助Lease.KeepAlive确保服务异常退出时因未自动续期而删除

通过context.WithValue携带ServiceID实现选择指定服务的连接

其余看看便知的就不赘述啦

server.go

package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"log/slog"
	"net"
	"os"
	"os/signal"

	"github.com/panshiqu/golang/discovery"
	"github.com/panshiqu/golang/utils"
	"google.golang.org/grpc"
	pb "google.golang.org/grpc/examples/features/proto/echo"
)

var id = flag.Int("id", 1, "id")
var env = flag.String("env", "dev", "environment")
var host = flag.String("h", "127.0.0.1", "host")

type echoServer struct {
	pb.UnimplementedEchoServer
}

func (s *echoServer) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
	return &pb.EchoResponse{Message: fmt.Sprintf("%s (from %d)", req.Message, *id)}, nil
}

func main() {
	flag.Parse()

	lis, err := net.Listen("tcp", fmt.Sprintf("%s:0", *host))
	if err != nil {
		log.Fatal(utils.Wrap(err))
	}

	s := grpc.NewServer()
	pb.RegisterEchoServer(s, &echoServer{})

	addr := fmt.Sprintf("%s:%d", *host, lis.Addr().(*net.TCPAddr).Port)
	service, err := discovery.Register("http://127.0.0.1:2379", fmt.Sprintf("%s/game", *env), addr, *id)
	if err != nil {
		log.Fatal(utils.Wrap(err))
	}

	go func() {
		if err := s.Serve(lis); err != nil {
			slog.Error("serve", slog.Any("err", err))
		}
	}()

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)
	sig := <-c
	slog.Info("notify", slog.Any("signal", sig))

	if err := service.Release(); err != nil {
		slog.Error("release", slog.Any("err", err))
	}

	s.GracefulStop()
}

client.go

package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"time"

	"github.com/panshiqu/golang/balancer"
	_ "github.com/panshiqu/golang/balancer"
	"github.com/panshiqu/golang/utils"
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/naming/resolver"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	pb "google.golang.org/grpc/examples/features/proto/echo"
)

var id = flag.Int("id", 0, "id")

func main() {
	flag.Parse()

	cli, err := clientv3.NewFromURL("http://127.0.0.1:2379")
	if err != nil {
		log.Fatal(utils.Wrap(err))
	}

	etcdResolver, err := resolver.NewBuilder(cli)
	if err != nil {
		log.Fatal(utils.Wrap(err))
	}

	cc, err := grpc.NewClient("etcd:///discovery/dev/game", grpc.WithResolvers(etcdResolver), grpc.WithDefaultServiceConfig(`{"loadBalancingConfig":[{"custom_round_robin":{}}]}`), grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatal(utils.Wrap(err))
	}
	defer cc.Close()

	ec := pb.NewEchoClient(cc)
	ctx := context.Background()
	if *id != 0 {
		ctx = context.WithValue(ctx, balancer.ServiceID, fmt.Sprint(*id))
	}

	for {
		r, err := ec.UnaryEcho(ctx, &pb.EchoRequest{Message: "hi"})
		if err != nil {
			log.Fatal(utils.Wrap(err))
		}

		fmt.Println(r)
		time.Sleep(time.Second)
	}
}

终端依次执行以下命令试试

go run client.go
go run server.go -id=1
go run server.go -id=2
go run client.go -id=1
# stop server 2