最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

【全球独家】【网关开发】4.Openresty 使用events插件进行事件通知

来源:博客园


(资料图)

背景

在某些业务场景下,比如数据更新,程序执行,只需要某个进程执行一次,但是其他进程需要知道本次执行的结果,所以就需要一个通知机制,主工作进程执行程序,执行之后的结果通知给其他进程,当然本质上也是通过共享内存进行处理。

场景逻辑

源码准备

本次插件来源依然是来自kong的插件插件地址:https://github.com/Kong/lua-resty-worker-events使用就是一个文件lib/resty/worker 下的events.lua 不需要编译与安装将其放在自己的resty 目录下本次代码地址:https://github.com/zhaoshoucheng/openresty/blob/main/pkg/lua_script/upstream/init.lua

主进程准备

我们需要一个worker进行任务的执行,以及推送event,为了与openresty的master做区别,我们可以将这个进行命名为main worker进程如何确定main worker呢?因为所有worker在启动时都是平级的,所以我们可以任意选择worker作为main worker,所以我们利用第一个入共享内存数据的进程作为main worker。相当于一个锁。每次启动时,因为master先执行,可以在这个阶段清空共享内存,启动其他进程时在重新选择main worker

local module_name = (...):match("(.-)[^%.]+$")local cjson = require "cjson.safe"local upstream_conf = require(module_name .. "config")local upstream_shm = ngx.shared[upstream_conf.events_shm_name]-- 这里是引入的配置文件-- on_init 需要在master执行,也就是init_by_lua_file 中执行local function on_init()    -- 删除master     upstream_shm:delete("upstream_master")end-- 判断main worker的函数local function is_master()    local master = upstream_shm:get("upstream_master")    if master then        return false    end    upstream_shm:set("upstream_master","true")    return trueend

程序中利用is_master()返回值判断是普通worker进程还是main worker

events程序

-- worker init  事件配置等初始化。shm 是使用共享内存的名字 upstream_conf.events_shm_name = events    local ev = require "resty.worker.events"    local events_ok, err =    ev.configure(    {        shm = upstream_conf.events_shm_name,        timeout = 2, -- life time of unique event data in shm        interval = 0.1, -- poll interval (seconds)        wait_interval = 0.010, -- wait before retry fetching event data        wait_max = 0.5, -- max wait time before discarding event        shm_retries = 100 -- number of retries when the shm returns "no memory" on posting an event    })    if not events_ok then        ngx.log(ngx.ERR, "failed to init events, err: "..tostring(err))    end  -- 定义事件列表  local events = ev.event_list(        upstream_conf.watch_path, -- available as _M.events._source        "full_sync",                -- available as _M.events.full_sync        "sync_keys"                  -- available as _M.events.sync_keys  )  -- 这里包含两个事件full_sync和sync_keys , _source 应该用任意字符串就可以   --所有workers 注册事件处理函数   local my_callback = function(data, event, source, pid)        if event == events.full_sync then            -- do sth        elseif event == events.sync_keys then            -- do sth        end        ngx.log(ngx.INFO,"get data event: "..cjson.encode(event).."data :"..data.."pid :"..tostring(pid).." now pid: "..tostring(ngx.worker.pid()))    end    ev.register(my_callback, events._source, events.full_sync)-- ev.register(my_callback, events._source, events.sync_keys)  -- master 发送事件  if is_master() then      local raise_event = function(p, event, data)          ngx.log(ngx.INFO,"master post event ")          return ev.post(events._source, event, data)      end      -- raise_event(nil, events.full_sync, "test_event")      ngx.timer.at(0, raise_event,events.full_sync, "test_event")  end 

测试

nginx 进程列表

思考与总结

直接执行 raise_event 和 ngx.timer.at(0) 有什么区别的?如果是ngx.timer.at(1) 有什么不一样的现象吗?例如我们的master 启动4个worker进程,他们分别会如何打印 “get data event:......”的日志呢,可以观察pid实验一下。我们上面的程序正常情况应该是4条(3条普通worker +1 条main worker)。

  1. 直接调用raise_event 会打印5条日志,其中1条是main worker ,另外4条的进程全部是is shutting down的进程,也就是正在销毁的4个进程还是会收到事件。这时另外3个进程还没有register成功,post event 自然不会收到数据。
  2. 使用ngx.timer.at(0) 会打印 5 ~8 条不等,因为是ngx.timer是启动lua协程启动,但是这并不能保证其他进程的register会成功,所以除了上面的5条外,还有部分成功register进程会收到数据3.将timer调长,由于我们的进程并没有太多处理工作,所以reload时is shutting down的进程很快就会退出,register也很快就会成功,所以一般会按照预想的打印4条

这个在数据同步等设计时可能需要考虑一下,否则可能会出现意想不到数据不同步的问题。整体在插件使用时比较简单,只是在融合其他插件进行程序设计时需要多考虑一些。后续会集合etcd、lmdb、cache、events 进行整体融合实现注册中心的功能。

关键词: 收到数据 很快就会