最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

环球时讯:【网关开发】8.Openresty 网关自定义健康检查的设计与实现

来源:博客园
目录
  • 背景
  • 设计
    • 健康检查流程图
    • 健康检查过程
  • 核心代码
    • down_checker 模块
    • upstream_context 模块
  • balance 模块
  • 测试
    • 主动健康
    • 被动健康检查
  • 总结与思考
    • 共享内存保存检测列表
    • 提高健康检查效率

背景

使用Openresty作为网关进行动态节点IP负载均衡时,要求网关有能力在负载均衡之前摘除掉有问题的节点。所以网关需要一定的健康检查能力。

设计

如果每一个网关节点都对服务节点进行探活,假设每1s探活一次,网关节点有M个,服务节点有N个,那整个网络中每秒出现M*N条探活协议,而且大部分都是探活成功的结果,不仅浪费带宽,而且增加业务节点的压力。给出的设计方案是引入独立管理服务,进行所有节点的探活,当探活结果是up时不做操作,当结果是down时,说明服务节点可能存在问题,通过API的形式发送给Openresty,Openresty在对节点进行检测,干预负载均衡结果,如果网关探活发现节点已经恢复,再将该节点加入到可被负载均衡的列表中。


(资料图)

健康检查流程图

健康检查过程

  1. manager服务用来管理各个服务的节点,包括配置健康检查信息,写入ETCD等
  2. manager服务进行主动健康检查,定期请求服务节点,如果发现节点异常, 通过API发送给Openresty的down_checker模块
  3. down_checker模块管理所有down的节点,定期向server node发送健康检查协议。
  4. down_checker模块发现节点异常(down),通知upstream_context模块,将该节点移除负载均衡列表
  5. down_checker模块发现节点正常(up), 从管理的down节点列表移除,通知upstream_context模块,将该节点加入负载均衡列表
  6. balance模块进行负载均衡时会从upstream_context中获取负载均衡列表。
  7. 当balance模块进行复杂均衡时如果发现节点异常,则通知down_checker模块进行监控,实现被动健康检查功能。

核心代码

一些依赖模块在以前的博客中都有详细讲解lua绑定委托 delegateevents插件使用自定义负载均衡代码仓库:https://github.com/zhaoshoucheng/openresty/blob/main/pkg/lua_script/upstream/down_peer_checker.lua

down_checker 模块

数据结构

function _M.new()    return setmetatable({        interval = 1,                       -- 检测间隔时间        watches = {},                       -- 检测列表 map        on_peer_up = delegate.new(),        -- 检测节点为up时调用的函数链        on_peer_added = delegate.new(),     -- 添加检测节点调用函数链        on_peer_removed = delegate.new(),   -- 移除检测节点调用函数链    }, _MT)end

一个Openresty有多个进程,但是只需要一个进程发送健康检查请求就可以,将结果通知给其他进程检测结果通过event通知__check_peer 发送http与tcp网络请求【网关开发】7.Openresty使用cosocket API 发送http与tcp网络请求

local function _check_peers_and_notify(self, tasks)    for i = 1, #tasks do        local ctx = tasks[i]        local server_up = __check_peer(ctx)        --TODO 简单策略:遇到一次健康检查成功则设置成up,没有对每次健康检查结果进行保存        if server_up then            ctx.down = false            events.post(self._events._source, self._events.notify, {                name = ctx.name,                peer = ctx.peer,            })        end    endend

遍历down节点列表,需要根据健康检查自身设置的时间间隔进行检测。

local function _do_check(self)    -- get all peers to check this time    local todo = { }    local now = ngx.now()    local i = 1    for watch_name, ctx in pairs(self.watches) do        if ctx.next_check_date and ctx.next_check_date <= now then            todo[i] = ctx            local interval = ctx.ahc.interval            if interval > 1000 then                interval = interval / 1000            end            ctx.next_check_date = now + (interval or 10)            i = i + 1        end    end    local all_task_count = #todo    if all_task_count == 0 then        return    end    _check_peers_and_notify(self, todo)end

定时器检测,循环遍历检测列表

local function _tick_proc(p, self)    local start = ngx.now()    local ok, err = pcall(_do_check, self)    local stop = ngx.now()    local next_tick = self.interval - (stop - start)    if next_tick <= 0.1 then        next_tick = 0.1    end    if not ok then        ngx.log(ngx.ERR, "failed to run healthcheck cycle: " .. tostring(err))    end    ok, err = ngx.timer.at(next_tick, _tick_proc, self)    if not ok then        if err ~= "process exiting" then            ngx.log(ngx.ERR, "failed to create timer: "..tostring(err))        end    endend

开始函数,注册event事件,所有进程都注册event事件处理函数,主进程用来进行健康检查,所有进程响应健康检查结果事件处理函数:

  1. notify事件 --> handle_notify处理函数
  • 节点检测up时触发
  • 调用on_peer_up函数链
  • 移除watches监控列表
  • 调用on_peer_removed 函数链

2.add_watch事件 --> handle_add_watch处理函数

  • 添加watches监控列表
  • 调用on_peer_added函数链

3.remove_watch事件 --> handle_remove_watch处理函数

  • 移除watches监控列表
  • 调用on_peer_removed 函数链
local function start(self, is_master)    if self.__started then        return    end    self._stop = false    self.__started = true    do        -- register events        local handle_notify = function(data, event, source, pid)            if worker_exiting() then                return            end            ngx.log(ngx.ERR, "handle_notify --> ahc events: "..require "cjson.safe".encode(data))            _notify_server_up(self, data.name, data.peer)        end        local handle_add_watch = function(data, event, source, pid)            if worker_exiting() then                return            end            _set_watch_context(self, data.name, data.ahc, data.peer)            ngx.log(ngx.ERR, "handle_add_watch --> added events: "..require "cjson.safe".encode(data))            self.on_peer_added(self, data.name, data.peer)        end        local handle_remove_watch = function(data, event, source, pid)            if worker_exiting() then                return            end            local name = data.name            local ctx = self.watches[name]            if not ctx then                return            end            self.watches[name] = nil            ngx.log(ngx.ERR, "handle_remove_watch --> added events: "..require "cjson.safe".encode(data))            self.on_peer_removed(self, name, ctx.peer, false)        end        self._events = events.event_list("down_peer_checker", "notify", "add_watch", "remove_watch")        events.register(handle_notify, self._events._source, self._events.notify)        events.register(handle_add_watch, self._events._source, self._events.add_watch)        events.register(handle_remove_watch, self._events._source, self._events.remove_watch)    end    if is_master then        ngx.timer.at(0, _tick_proc,self)    endend

add_watch 跟openresty多进程有关,不一定哪个进程会处理manager发送的API,所以需要event通知所有进程更新自己的watches列表

local function add_watch(self, watch_name, ahc, peer)    events.post(self._events._source, self._events.add_watch, {        name = watch_name,        peer = {            ip = peer.ip,            port = peer.port,        },        ahc = ahc    })end

upstream_context 模块

相关代码:https://github.com/zhaoshoucheng/openresty/blob/main/pkg/lua_script/upstream/upstream_context.luaupstream_context之前没有引入健康检查功能,所以需要提供函数让down_checker去更新自身信息

生成负载均衡列表,会收到节点的信息up或down选择

local function process_upstream_nodes(nodes)    -- 增加down标记    local ret = { }    local def = { }    for i = 1, #nodes do        local d = nodes[i]        if d.state ~= "up" then            goto continue        end        local id = d.ip.."\0"..tostring(d.port)        local ew = ret[id]        if ew then            ret[id] = (d.weight or 1) + ew            def[id].weight = ret[id]        else            ret[id] = d.weight or 1            def[id] = d        end        ::continue::    end    return ret, defend

更新函数

local function _update_node_state(self, name ,peer, status)    for i = 1, #self._ups.nodes do        local d = self._ups.nodes[i]        if d.ip == peer.ip or d.port == peer.port then            self._ups.nodes[i].state = status            return        end    end    ngx.log(ngx.ERR, "can"t find peer when dpc up: "..name)endlocal function dpc_on_added(self, name, peer)    -- local sctx = _get_server_context(self, peer, true)    -- sctx.dpc_state = "down"    _update_node_state(self, name, peer, "down")    local _, _all_nodes = process_upstream_nodes(self._ups.nodes)    self._all_nodes = _all_nodesendlocal function dpc_on_up(self, name, peer)    _update_node_state(self, name, peer, "up")    local _, _all_nodes = process_upstream_nodes(self._ups.nodes)    self._all_nodes = _all_nodes    self._prefered_balancer = nilend

balance 模块

初始化down_checker模块,并且注册upstream_context的函数调用链(包括debug代码,模拟manager API调用)

local function _handle_down_peer_watched(dpc, watch_name, peer)    local _, _, upname = watch_name:find("(.+)-")    if upname then        local uctx = get_upstream_context(upname)        if uctx then            uctx:dpc_on_added(watch_name, peer)        end    else        ngx.log(ngx.ERR, "unexpected upstream name: "..tostring(upname))    endendlocal function _handle_down_peer_becomes_up(dpc, watch_name, peer)    local _, _, upname = watch_name:find("(.+)-")    if upname then        local uctx = get_upstream_context(upname)        if uctx then            uctx:dpc_on_up(watch_name, peer)        end    else        ngx.log(ngx.ERR, "unexpected upstream name: "..tostring(upname))    endendlocal function debug_down_watcher(p)    local debug_ctx = down_watcher:debug_ctx()    down_watcher:add_watch(__watch_name("server_test1", debug_ctx.peer), debug_ctx.ahc, debug_ctx.peer)endfunction _M.init(is_master)    down_watcher = require(module_name.."down_peer_checker").new()    down_watcher.on_peer_added:add_delegate2(_handle_down_peer_watched)    down_watcher.on_peer_up:add_delegate2(_handle_down_peer_becomes_up)    down_watcher:start(is_master)    -- debug代码,模拟manager 发送API,通知down信息    if is_master then        ngx.timer.at(1, debug_down_watcher)    endend

被动健康检查

function _M.do_balance(ups_name)··· local sn, sc = ngx_balancer.get_last_failure()    if not sn then        -- first call        local ok, err = ngx_balancer.set_more_tries(3)        if err and #err > 0 then            ngx.log(ngx.WARN, err)        end        key, idx = b:find(ctx.balance_key)        if not key then            ngx.log(ngx.ERR, "failed to get upstream endpoint")            ngx.exit(502)            return        end    else        -- 被动健康检查        down_watcher:add_watch(__watch_name(ups_name, ctx.latest_peer), uctx._ups.health_check, ctx.latest_peer)        key, idx = b:next(ctx.latest_idx)        ngx.log(ngx.WARN, "rebalancing: "..sn..", "..tostring(sc))    end···end

测试

主动健康

1.关闭服务2.触发debug,模拟API调用,client请求信息,观察结果3.启动服务4.client请求信息,观察结果event更新负载均衡到up节点启动服务之后,健康检查成功,event通知负载均衡到不同节点

被动健康检查

1.关闭服务2.client请求信息,观察结果3.打开服务4.client请求信息,观察结果触发一次502,并用新节点替换,和API一致,进行event通知再次请求则直接负载均衡到正常节点,不会经过一次502打开服务后效果和主动健康检查的结果一致,这里不再赘述。

总结与思考

本文主要讲述了Openresty作为网关实现对动态节点的健康检查管理,只要讲解整个架构设计和核心代码,有些细节代码还需要去git上查看还需要补充一些技术细节

共享内存保存检测列表

现在的watches是local的临时变量,reload之后就没了,所以需要维护共享内存shm,来保存现有的全部需要监的服务节点,在初始化时,从shm中构建watches

提高健康检查效率

现在的健康检查是每次线性的对watches列表进行遍历,如果列表过多或单次健康检查服务阻塞,就会影响到其他的健康检查过程,所以需要ngx.thread.spawn增加轻量级线程去并行处理。

关键词: 负载均衡 观察结果