最新要闻

广告

手机

光庭信息跌4.57% 2021上市超募11亿2022扣非降74% 时快讯

光庭信息跌4.57% 2021上市超募11亿2022扣非降74% 时快讯

搜狐汽车全球快讯 | 大众汽车最新专利曝光:仪表支持拆卸 可用手机、平板替代-环球关注

搜狐汽车全球快讯 | 大众汽车最新专利曝光:仪表支持拆卸 可用手机、平板替代-环球关注

家电

kratos http原理 每日看点

来源:博客园

概念

kratos 为了使http协议的逻辑代码和grpc的逻辑代码使用同一份,选择了基于protobuf的IDL文件使用proto插件生成辅助代码的方式。

protoc http插件的地址为:https://github.com/go-kratos/kratos/tree/main/cmd/protoc-gen-go-http


【资料图】

示例

syntax = "proto3";package helloworld;option go_package = "test/helloworld;helloworld";option java_multiple_files = true;option java_package = "helloworld";import "google/api/annotations.proto";service Greeter {rpc SayHello (HelloRequest) returns (HelloReply)  {  option (google.api.http) = {  post: "/helloworld", // 声明路由  body: "*"  };}  }  message HelloRequest {string name = 1;}  message HelloReply {string msg = 1;}

使用kratos proto client xxx生成的代码为:

// Code generated by protoc-gen-go-http. DO NOT EDIT.// versions:// - protoc-gen-go-http v2.4.0// - protoc             v3.19.4// source: helloworld/helloworld.protopackage helloworldimport (context "context"http "github.com/go-kratos/kratos/v2/transport/http"binding "github.com/go-kratos/kratos/v2/transport/http/binding")// This is a compile-time assertion to ensure that this generated file// is compatible with the kratos package it is being compiled against.var _ = new(context.Context)var _ = binding.EncodeURLconst _ = http.SupportPackageIsVersion1const OperationGreeterSayHello = "/helloworld.Greeter/SayHello"type GreeterHTTPServer interface {SayHello(context.Context, *HelloRequest) (*HelloReply, error)}func RegisterGreeterHTTPServer(s *http.Server, srv GreeterHTTPServer) {r := s.Route("/")r.POST("/helloworld", _Greeter_SayHello0_HTTP_Handler(srv))}func _Greeter_SayHello0_HTTP_Handler(srv GreeterHTTPServer) func(ctx http.Context) error {return func(ctx http.Context) error {var in HelloRequestif err := ctx.Bind(&in); err != nil {return err}http.SetOperation(ctx, OperationGreeterSayHello)h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {return srv.SayHello(ctx, req.(*HelloRequest))})out, err := h(ctx, &in)if err != nil {return err}reply := out.(*HelloReply)return ctx.Result(200, reply)}}type GreeterHTTPClient interface {SayHello(ctx context.Context, req *HelloRequest, opts ...http.CallOption) (rsp *HelloReply, err error)}type GreeterHTTPClientImpl struct {cc *http.Client}func NewGreeterHTTPClient(client *http.Client) GreeterHTTPClient {return &GreeterHTTPClientImpl{client}}func (c *GreeterHTTPClientImpl) SayHello(ctx context.Context, in *HelloRequest, opts ...http.CallOption) (*HelloReply, error) {var out HelloReplypattern := "/helloworld"path := binding.EncodeURL(pattern, in, false)opts = append(opts, http.Operation(OperationGreeterSayHello))opts = append(opts, http.PathTemplate(pattern))err := c.cc.Invoke(ctx, "POST", path, in, &out, opts...)if err != nil {return nil, err}return &out, err}

开启一个grpc及http服务:

package mainimport ("context""fmt""log""test/helloworld""github.com/go-kratos/kratos/v2""github.com/go-kratos/kratos/v2/middleware/recovery""github.com/go-kratos/kratos/v2/transport/grpc""github.com/go-kratos/kratos/v2/transport/http")type server struct {helloworld.UnimplementedGreeterServer}func (s *server) SayHello(ctx context.Context, in *helloworld.HelloRequest) (*helloworld.HelloReply, error) {return &helloworld.HelloReply{Msg: fmt.Sprintf("Hello %+v", in.Name)}, nil}func main() {s := &server{}httpSrv := http.NewServer(http.Address(":8000"),http.Middleware(recovery.Recovery(),),)grpcSrv := grpc.NewServer(grpc.Address(":9000"),grpc.Middleware(recovery.Recovery(),),)    helloworld.RegisterGreeterServer(grpcSrv, s)helloworld.RegisterGreeterHTTPServer(httpSrv, s)app := kratos.New(kratos.Name("test"),kratos.Server(httpSrv,grpcSrv,),)if err := app.Run(); err != nil {log.Fatal(err)}}

http client:

package mainimport ("context""log""test/helloworld""github.com/go-kratos/kratos/v2/middleware/recovery"transhttp "github.com/go-kratos/kratos/v2/transport/http")func main() {callHTTP()}func callHTTP() {conn, err := transhttp.NewClient(context.Background(),transhttp.WithMiddleware(recovery.Recovery(),),transhttp.WithEndpoint("127.0.0.1:8000"),)if err != nil {panic(err)}defer conn.Close()client := helloworld.NewGreeterHTTPClient(conn)reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"})if err != nil {log.Fatal(err)}log.Printf("[http] SayHello %s\n", reply.Msg)}

http server端实现原理

核心流程为下图 :

首先新建一个struct 并实现 http_pb.go种 GreeterHTTPServer interface 的方法,GreeterHTTPServer的命名方式为protobuf文件中的 service+HTTPServer,interface的方法为protobuf中使用google.api.http生命http路由所有的method。

然后使用RegisterGreeterHTTPServer方法把服务注册进去。大体的流程如下:

const OperationGreeterSayHello = "/helloworld.Greeter/SayHello"func RegisterGreeterHTTPServer(s *http.Server, srv GreeterHTTPServer) {r := s.Route("/")r.POST("/helloworld", _Greeter_SayHello0_HTTP_Handler(srv)) // 注册路由}func _Greeter_SayHello0_HTTP_Handler(srv GreeterHTTPServer) func(ctx http.Context) error {return func(ctx http.Context) error {var in HelloRequest // protobuf 中声明的request if err := ctx.Bind(&in); err != nil { // 把http的参数绑定到 inreturn err}http.SetOperation(ctx, OperationGreeterSayHello) // 设置Operation 和grpc一值,用于middleware select 等h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {return srv.SayHello(ctx, req.(*HelloRequest)) // 这个方法也就是上文提到的GreeterHTTPServer接口的方法,也就是我们自己实现的struct server里的SayHello方法}) // 使用责任链模式middleware 这里没有任何中间件out, err := h(ctx, &in) // 执行if err != nil {return err}reply := out.(*HelloReply) return ctx.Result(200, reply) }}

什么事责任链模式?

https://haiyux.cc/post/designmode/behavioral/#责任链模式

上段代码中的POST方法为:

代码在https://github.com/go-kratos/kratos/blob/main/transport/http/router.go#L76

func (r *Router) POST(path string, h HandlerFunc, m ...FilterFunc) {r.Handle(http.MethodPost, path, h, m...) // MethodPost = POST net/http下的常量}// h 为上段xxx_http_pb.go代码中_Greeter_SayHello0_HTTP_Handler的返回值func (r *Router) Handle(method, relativePath string, h HandlerFunc, filters ...FilterFunc) {next := http.Handler(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {ctx := r.pool.Get().(Context)ctx.Reset(res, req) // 把 net/http的http.ResponseWriter 和*http.Request 设置ctx中if err := h(ctx); err != nil { // 执行h             r.srv.ene(res, req, err) // 如果出错了 执行 ene(EncodeErrorFunc)}ctx.Reset(nil, nil)r.pool.Put(ctx)}))next = FilterChain(filters...)(next)next = FilterChain(r.filters...)(next) // 添加filter 责任链模式r.srv.router.Handle(path.Join(r.prefix, relativePath), next).Methods(method) // router 为 mux的router 把方法注册到路由中}

当我们访问 path.Join(r.prefix, relativePath)也就是/helloworld时,会执行上段代码中的next方法,next是一个责任链。

核心为会执行_Greeter_SayHello0_HTTP_Handler方法,

如果没发生错误,执行ctx.Result(200, reply)

type wrapper struct {router *Routerreq    *http.Requestres    http.ResponseWriterw      responseWriter}func (c *wrapper) Result(code int, v interface{}) error {c.w.WriteHeader(code)return c.router.srv.enc(&c.w, c.req, v)}

enc也就是EncodeResponseFunc, 为kratos预留的返回值函数

type EncodeResponseFunc func(http.ResponseWriter, *http.Request, interface{}) error

kratos提供了默认的EncodeResponseFunc

func DefaultResponseEncoder(w http.ResponseWriter, r *http.Request, v interface{}) error {if v == nil {return nil}if rd, ok := v.(Redirector); ok { // 检查有无Redirect方法,如果实现了interface 为跳转路由 也就是http的301 302等url, code := rd.Redirect()http.Redirect(w, r, url, code) // 跳转return nil}codec, _ := CodecForRequest(r, "Accept") // 查看需要返回的参数类型 比如jsondata, err := codec.Marshal(v) // 把数据Marshal成[]byteif err != nil {return err}w.Header().Set("Content-Type", httputil.ContentType(codec.Name())) // 设置header_, err = w.Write(data) // 写数据if err != nil {return err}return nil}

如果没发生错误,执行ene,也就是EncodeErrorFunc, 为kratos预留的错误返回值删除

type EncodeErrorFunc func(http.ResponseWriter, *http.Request, error)

kratos提供了默认的EncodeErrorFunc

func DefaultErrorEncoder(w http.ResponseWriter, r *http.Request, err error) {se := errors.FromError(err) // 把error变成自定义的实现error的结构体codec, _ := CodecForRequest(r, "Accept") // 查看需要返回的参数类型 比如jsonbody, err := codec.Marshal(se)if err != nil {w.WriteHeader(http.StatusInternalServerError)return}w.Header().Set("Content-Type", httputil.ContentType(codec.Name()))w.WriteHeader(int(se.Code)) // 写入 error中的code_, _ = w.Write(body) // 返回错误信息}

http client端实现原理

在上传的代码中http client的部分为

type GreeterHTTPClient interface {SayHello(ctx context.Context, req *HelloRequest, opts ...http.CallOption) (rsp *HelloReply, err error)}type GreeterHTTPClientImpl struct { // 实现 GreeterHTTPClient 接口cc *http.Client}func NewGreeterHTTPClient(client *http.Client) GreeterHTTPClient {return &GreeterHTTPClientImpl{client}}func (c *GreeterHTTPClientImpl) SayHello(ctx context.Context, in *HelloRequest, opts ...http.CallOption) (*HelloReply, error) {var out HelloReply // 返回值pattern := "/helloworld" path := binding.EncodeURL(pattern, in, false) // 整理path 传入in 是由于可能有path参数或者queryopts = append(opts, http.Operation(OperationGreeterSayHello))opts = append(opts, http.PathTemplate(pattern))err := c.cc.Invoke(ctx, "POST", path, in, &out, opts...) // 访问接口if err != nil {return nil, err}return &out, err}

上段代码中的Invoke方法为:

代码在https://github.com/go-kratos/kratos/blob/main/transport/http/client.go#L192

func (client *Client) Invoke(ctx context.Context, method, path string, args interface{}, reply interface{}, opts ...CallOption) error {var (contentType stringbody        io.Reader)c := defaultCallInfo(path)for _, o := range opts {if err := o.before(&c); err != nil {return err}}if args != nil {data, err := client.opts.encoder(ctx, c.contentType, args)if err != nil {return err}contentType = c.contentTypebody = bytes.NewReader(data)}url := fmt.Sprintf("%s://%s%s", client.target.Scheme, client.target.Authority, path)req, err := http.NewRequest(method, url, body)if err != nil {return err}if contentType != "" {req.Header.Set("Content-Type", c.contentType)}if client.opts.userAgent != "" {req.Header.Set("User-Agent", client.opts.userAgent)}ctx = transport.NewClientContext(ctx, &Transport{endpoint:     client.opts.endpoint,reqHeader:    headerCarrier(req.Header),operation:    c.operation,request:      req,pathTemplate: c.pathTemplate,})return client.invoke(ctx, req, args, reply, c, opts...)}func (client *Client) invoke(ctx context.Context, req *http.Request, args interface{}, reply interface{}, c callInfo, opts ...CallOption) error {h := func(ctx context.Context, in interface{}) (interface{}, error) {res, err := client.do(req.WithContext(ctx))if res != nil {cs := csAttempt{res: res}for _, o := range opts {o.after(&c, &cs)}}if err != nil {return nil, err}defer res.Body.Close()if err := client.opts.decoder(ctx, res, reply); err != nil {return nil, err}return reply, nil}var p selector.Peerctx = selector.NewPeerContext(ctx, &p)if len(client.opts.middleware) > 0 {h = middleware.Chain(client.opts.middleware...)(h)}_, err := h(ctx, args)return err}

关键词: