最新要闻
- 世界观点:《暗黑4》与《暗黑3》角色截图对比:女性角色更逼真了
- 捐100亿建大学 曹德旺:我赚了很多钱但不贪婪 会还给社会
- 女子摔跤上牙磕进嘴唇1月后才发现 医生惊叹不已:网友直呼心太大
- 徕卡+双长焦史无前例!小米13 Ultra曝光
- 今日快看!什么叫针灸师
- 全球热文:红心番石榴吃法?
- 130余年老字号 非遗工艺 吴裕泰特种级茉莉香毫100g装40元
- 热资讯!灵隐寺旁小溪中大量放生乌龟死亡被吐槽是伪善 放生已成产业链最高2000元一次
- 【独家焦点】干旱缓解 新疆最大湖泊变了:阿雅克库木湖30年面积扩大近7成
- 周鸿祎与库克共话科技未来:惟一被淘汰的是不会用GPT的人
- 天天观点:12小时保温保冷 朴原良品316不锈钢保温杯16.9元起
- 世界快讯:代码漏洞暴露用户支付信息 ChatGPT安全存隐患:官方致歉
- AMD Yes!Zen4回归骨折价 12核锐龙9 7900到手2549元
- 学生党福音 Redmi小金刚Note 12 Turbo的NFC卡升级:两大功能合体
- 当前资讯!mp3格式转换器哪个好_mp3格式转换器
- 女生靠摆地摊还清百万负债 还有人两周赚1万5、月入万元:摆摊真这么赚钱?
手机
iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
- 警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- 男子被关545天申国赔:获赔18万多 驳回精神抚慰金
- 3天内26名本土感染者,辽宁确诊人数已超安徽
- 广西柳州一男子因纠纷杀害三人后自首
- 洱海坠机4名机组人员被批准为烈士 数千干部群众悼念
家电
python 使用 kafka
python 使用 kafka
说明:关于 kafka 的启动与安装,命令行的使用,此处不做过多的解释,本篇文章主要描述 kafka 在 python 中的使用;
(资料图片)
1. python 使用 kafka 生产者
**说明:**python 在操作 kafka 写入数据的时候,分为发送往已经存在的主题或者是不存在的主题,当主题不存在的时候,生产者会自动创建该主题,并将消息存贮在默认的 0 分区;
使用 python 操作 kafka 首先安装如下的包
pip install kafka pip install kafka-python # 由于 python 3.7 后的版本中 async 的关键字发生了变化,因此需要多安装该包;
常规的使用主要就是根据,第三方包的介绍使用,网上有许多基本的案例,此处不做介绍,下面直接将封装好的常用的方法进行封装;
import jsonimport kafkaclass Producer(object): """ kafka 的生产者模型 """ _coding = "utf-8" def __init__(self, broker="192.168.74.136:9092", topic="add_topic", max_request_size=104857600, batch_size=0, # 即时发送,提高并发可以适当增加,但是会造成消息的延迟; **kwargs): """初始化设置 kafka 生产者连接对象;参数不存在的情况下使用配置文件中的默认连接; """ self.broker = broker self.topic = topic self.max_request_size = max_request_size # 实例化生产者对象 self.producer_json = kafka.KafkaProducer( bootstrap_servers=self.broker, max_request_size=self.max_request_size, batch_size=batch_size, key_serializer=lambda k: json.dumps(k).encode(self._coding), # 设置键的形式使用匿名函数进行转换 value_serializer=lambda v: json.dumps(v).encode(self._coding), # 当需要使用 json 传输地时候必须加上这两个参数 **kwargs ) self.producer = kafka.KafkaProducer( bootstrap_servers=broker, max_request_size=self.max_request_size, batch_size=batch_size, api_version=(0, 10, 1), **kwargs ) def send(self, message: bytes, partition: int = 0): """ 写入普通的消息; Args: message: bytes; 字节流数据;将字符串编码成 utf-8的格式; partition: int; kafka 的分区,将消息发送到指定的分区之中; Returns: None """ future = self.producer.send(self.topic, message, partition=partition) record_metadata = future.get(timeout=30) if future.failed(): # 发送失败,记录异常到日志; raise Exception("send message failed:%s)" % future.exception) def send_json(self, key: str, value: dict, partition: int = 0): """ 发送 json 形式的数据; Args: key: str; kafka 中键的值 value: dict; 发送的具体消息 partition: int; 分区的信息 Returns: None """ future = self.producer_json.send(self.topic, key=key, value=value, partition=partition) record_metadata = future.get(timeout=30) if future.failed(): # 发送失败记录异常; raise Exception("send json message failed:%s)" % future.exception) def close(self): """ 关闭kafka的连接。 Returns: None """ self.producer_json.close() self.producer.close()if __name__ == "__main__": """脚本调用执行;""" kafka_obj = Producer() print(kafka_obj.broker) kafka_obj.send("自动生成".encode())
发送的消息,主要是普通的字符串消息,和字典形式的消息,方便对接;
2. python 使用 kafka 消费者
由于 kafka 消费者的特性,阻塞循环是一个必然的过程,可以使用 python 中的生成器进行优化,但是循环阻塞是无可避免的;
操作 kafka 的消费者依旧只需要安装上述的两个第三方依赖包;
封装指定的操作
import jsonfrom kafka import KafkaConsumer, KafkaProducerfrom kafka.structs import TopicPartitionclass KConsumer(object): """kafka 消费者; 动态传参,非配置文件传入; kafka 的消费者应该尽量和生产者保持在不同的节点上;否则容易将程序陷入死循环中; """ _encode = "UTF-8" def __init__(self, topics="start_server", bootstrap_server=None, group_id="start_task", partitions=None, **kwargs): """ 初始化kafka的消费者; 1. 设置默认 kafka 的主题, 节点地址, 消费者组 id(不传入的时候使用默认的值) 2. 当需要设置特定参数的时候可以直接在 kwargs 直接传入,进行解包传入原始函数; 3. 手动设置偏移量 Args: topics: str; kafka 的消费主题; bootstrap_server: list; kafka 的消费者地址; group_id: str; kafka 的消费者分组 id,默认是 start_task 主要是接收并启动任务的消费者,仅此一个消费者组id; partitions: int; 消费的分区,当不使用分区的时候默认读取是所有分区; **kwargs: dict; 其他原生kafka消费者参数的; """ if bootstrap_server is None: bootstrap_server = ["192.168.74.136:9092", ] self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_server) exist = self.exist_topics(topics) if not exist: # 需要的主题不存在; # 创建一条 self.create_topics(topics) if partitions is not None: self.consumer = KafkaConsumer( bootstrap_servers=bootstrap_server, group_id=group_id, # 目前只有一个消费者,根据情况是否需要进行修改;当扩展多个消费者的时候需要进行扩展; **kwargs ) # print("指定分区信息:", partitions, topics, type(partitions)) self.topic_set = TopicPartition(topics, int(partitions)) self.consumer.assign([self.topic_set]) else: # 默认读取主题下的所有分区, 但是该操作不支持自定义 offset, 因为 offset 一定是在指定的分区中进行的; self.consumer = KafkaConsumer( topics, bootstrap_servers=bootstrap_server, group_id=group_id, **kwargs ) def exist_topics(self, topics): """ 检查 kafka 中的主题是否存在; Args: topics: 主题名称; Returns: bool: True/False ; True,表示存在,False 表示不存在; """ topics_set = set(self.consumer.topics()) if topics not in topics_set: return False return True @staticmethod def create_topics(topics): """ 创建相关的 kafka 主题信息;说明本方法可以实现用户自定义 kafka 的启动服务,默认是使用的是 start_server; Args: topics: str; 主题的名字; Returns: None """ producer = KafkaProducer( bootstrap_servers="192.168.74.136:9092", key_serializer=lambda k: json.dumps(k).encode("utf-8"), value_serializer=lambda v: json.dumps(v).encode("utf-8") ) producer.send(topics, key="start", value={"msg": "aaaa"}) producer.close() def recv(self): """ 接收消费中的数据 Returns: 使用生成器进行返回; """ for message in self.consumer: # 这是一个永久阻塞的过程,生产者消息会缓存在消息队列中,并且不删除,所以每个消息在消息队列中都会有偏移 # print("主题:%s 分区:%d:连续值:%d: 键:key=%s 值:value=%s" % ( # message.topic, message.partition, message.offset, message.key, message.value)) yield {"topic": message.topic, "partition": message.partition, "key": message.key, "value": message.value.decode(self._encode)} def recv_seek(self, offset): """ 接收消费者中的数据,按照 offset 的指定消费位置; Args: offset: int; kafka 消费者中指定的消费位置; Returns: generator; 消费者消息的生成器; """ self.consumer.seek(self.topic_set, offset) for message in self.consumer: # print("主题:%s 分区:%d:连续值:%d: 键:key=%s 值:value=%s" % ( # message.topic, message.partition, message.offset, message.key, message.value)) yield {"topic": message.topic, "partition": message.partition, "key": message.key, "value": message.value.decode(self._encode)}if __name__ == "__main__": """ 测试使用; """ obj = KConsumer("exist_topic", bootstrap_server=["192.168.74.136:9092"]) for i in obj.recv(): print(i)
该消费者多封装时增加了一个需求,消费的主题不存在的时候会默认创建,下次就可以继续消费
3. 使用 docker 中的 kafka
以上两种脚本适用于 Kafka 的生产者和消费者在大多数情况下的使用,在使用的时候只需要将相关的配置信息修改即可;
docker 中使用 kafka 的时候与前面的配置稍有不同,当使用docker-compose
部署 Kafka 的时候,地址在文件中经过修改,可能会被改变,但是配置方式,因此只需要将相关的地址配好,即可;代码信息无需修改;
一般情况下如果是在 docker 中配置相关的参数,需要将端口映射出来,然后如果是 windows 可能需要将host的网络地址解析,与docker 中 kafka 的名称对应;
host 文件127.0.0.1 kafka
当需要远程连接的时候,将地址改成该计算机在内网中的地址即可;
关键词:
python 使用 kafka
世界观点:《暗黑4》与《暗黑3》角色截图对比:女性角色更逼真了
捐100亿建大学 曹德旺:我赚了很多钱但不贪婪 会还给社会
女子摔跤上牙磕进嘴唇1月后才发现 医生惊叹不已:网友直呼心太大
徕卡+双长焦史无前例!小米13 Ultra曝光
焦点信息:Vue——initState【十】
当前信息:国外短信平台收不到验证码,怎么解决?
今日快看!什么叫针灸师
全球热文:红心番石榴吃法?
130余年老字号 非遗工艺 吴裕泰特种级茉莉香毫100g装40元
热资讯!灵隐寺旁小溪中大量放生乌龟死亡被吐槽是伪善 放生已成产业链最高2000元一次
【独家焦点】干旱缓解 新疆最大湖泊变了:阿雅克库木湖30年面积扩大近7成
焦点速讯:WPF 入门基础
周鸿祎与库克共话科技未来:惟一被淘汰的是不会用GPT的人
天天观点:12小时保温保冷 朴原良品316不锈钢保温杯16.9元起
世界快讯:代码漏洞暴露用户支付信息 ChatGPT安全存隐患:官方致歉
世界速讯:聚焦险企负债成本管理 监管座谈会释放调整信号
AMD Yes!Zen4回归骨折价 12核锐龙9 7900到手2549元
学生党福音 Redmi小金刚Note 12 Turbo的NFC卡升级:两大功能合体
当前资讯!mp3格式转换器哪个好_mp3格式转换器
dotnet-csharp
焦点精选!C++ sizeof与strlen,并借此明晰内存对齐
女生靠摆地摊还清百万负债 还有人两周赚1万5、月入万元:摆摊真这么赚钱?
【天天速看料】周鸿祎:中国与GPT-4差距两三年 不存在难以逾越技术障碍
当前热文:Windows OpenGL ES 图像 GPUImageAmatorkaFilter
javascript的基础知识
小彭汽车向小鹏汽车道歉:盘点那些神级模仿的老年代步车
【速看料】《三体》动画今日正式完结!豆瓣评分仅3.8:半数一星差评
焦点关注:998的比亚迪秦PLUS DM-i多能跑?博主极端工况实测:合资还怎么打
当前关注:Office2019永久激活工具_Office2019激活工具神龙版v2023.2
天天快播:What is static and dynamic libraries
[白嫖系列] 白嫖一个永久免费的eu.org域名
速看:76人攻防俱佳夺冠有望,湖人防守第一进攻拉胯,勇士卫冕基本无望
全球新资讯:女子模仿网红穿搭遭对方粉丝网暴!法院判网红连续15天道歉、赔偿
男子喝红牛味道不对发现是累牛引热议:康帅傅、雷碧等山寨货为何层出不穷
资讯推荐:关于aws-s3-bucket-静态网站托管相关的查询-S3.Client.get_bucket_website(**kwargs)
天天速读:西班牙经济连续两年增速达5.5%
用4年不卡!Redmi Note 12 Turbo配置公布:16GB+1TB、5000mAh
环球今头条!男子抖音晒夜爬泰山到井盖下躲风 景区回应:核实后会处罚
【焦点热闻】YCSB工具原理重点内容解读(二)
CSS鼠标样式(cursor)总结(转载)
每日速看!因航班延误缺考复试 考研生崩溃大哭:对不起父母 对不起自己的努力
猪肉含量≥85% 一口全是肉:亚明猪肉烤肠29.9元2斤大促
越秀资本03月24日获深股通增持98.52万股
精选!宏基e1471g怎么样
热文:czide-CALLAN自制语言(python)
环球短讯!《安富莱嵌入式周报》第307期:开源智能制冷板,Keil MDK6发布时间,编程助手Github Copilot X,Matlab2023,高品质电容
英特尔联合创始人戈登·摩尔去世:享年94岁 “摩尔定律”提出者
天天热点!公司回应招聘前台要求身材 硬性要求臀围86腰围58:网友吐槽离谱
每日消息!学习 React 需要具备的 JavaScript 知识
全球观点:gs_probackup增量备份ptrack.cpp : 88
农事贵争时 物资“抢运”忙——黑龙江部分地区备春耕铁路运输见闻
环球观焦点:中国驻科威特大使馆发布斋月期间领事温馨提醒
妮维雅SPA级氨基酸洗面奶19.9元官方大促:原价87.9元
库克中国行:晒和黄龄合影、宣布苹果捐赠增加至1亿元支持教育事业
天天时讯:读Java性能权威指南(第2版)笔记27_线程和同步性能上
热资讯!蓝盘、紫盘、黑盘?2023年机械硬盘怎么买?
世界快看点丨《互联网广告管理办法》公布:弹出广告不能“一键关闭”最高可罚3万
今日精选:在 Ubuntu 和 Fedora 中设置 Python 开发环境
世界今日报丨网友修iPhone偶遇苹果CEO库克:让他在碎了的后盖上签了个名
90后情侣3年存100万裸辞飞荷兰 网友热议:有勇气活的很洒脱
信息:俄称打击乌军人员装备 乌称击退俄军进攻
环球短讯!12-指针02
全球今亮点!C++ 虚函数(virtual) 和纯虚函数(pure virtual) 的区别
40秒看天宇上演“星月神话”:被月亮星星浪漫到了 月牙慢慢掩盖金星
热门:完全禁用汽柴油?中石化及专家回应了
AMD RX 7900 XTX超频3.4GHz:功耗失控650W!还是打不过RTX 4090
环球实时:ChatGPT开放第三方插件!瞬间 其他AI成了绝望的文盲
快消息!英国监管机构相信微软不会独占CoD
全球滚动:安全环保标语8字_安全环保标语有哪些?
信息:中国动力电池发展正当其时 锂电隔膜产业乘势而上
世界今日报丨使用 PC 端浏览器开发者工具对移动端真机环境 Web 页面进行远程调试
每日看点!Microsoft edge锁定在任务栏上,被修改主页360的解决方法
全球热消息:深圳一公司发文拒招已婚未育员工:回应如此大言不惭
世界新动态:郭明錤:iPhone 15全系灵动岛 苹果升级传感器后体验更强大了
全球热点评!蜗牛的资料和特点分别是什么
天天新消息丨Midjourney AI绘画使用指南
Django笔记七之ManyToMany和OneToOne介绍
4K 3D 高帧率重制!《泰坦尼克号》重映预售开启:4月3日上映
热推荐:沪一男子多次在五星级酒店开房,记录和发票被“神秘女”轻松拿走?细思极恐...
【金融街发布】人民银行:2月发行各类债券54724.3亿元
债市日报:3月24日
天天滚动:外卖小哥专属!美团发布头盔新专利:手不脱把接电话
【世界报资讯】库克现身北京三里屯正面“面对”小米!网友手持小米13 Pro合影太吸睛
当前速看:马斯克要建自己的乌托邦城市:美国当地居民这下不淡定了
每日短讯:正式定档4月26日!米哈游大作《崩坏:星穹铁道》公测官宣
快消息!PaddleOCR之高性能Go语言实现OCR识别
环球报道:《暗黑破坏神4》新手入门图文攻略:游戏系统介绍与开荒指南
天天日报丨男孩名字含“坤”被嘲笑抑郁自残 被起绰号“鸡哥”:网友吐槽太过分
环球要闻:监控拍到浙江一车辆撞倒行人后又倒回碾压!官方通报来了
全球快资讯丨金价一路狂飙 有人8斤黄金变现174万:国内中老年狂买 还有人已赚32万元
世界热头条丨暴雪计划进行压力测试:《暗黑破坏神4》公测版排队依旧
以志愿服务赋能基层治理,金山卫镇这样做
9个都要了解的单行Python代码
每日看点!优化不够DLSS来凑 iGame RTX 4080显卡实战《卧龙》:4K轻松百帧
环球热点!男子驾车疯狂违章 上传视频炫耀自曝!后续舒适
169元 红魔散热器4磁吸版上市:20W满血功率、可控RGB灯效
环球热讯:苹果CEO库克现身北京三里屯零售店!众多粉丝合影 有人用小米13自拍
上海嘉定新城发展完成5亿元中票发行 利率3.20%
环球速讯:Leetcode Practice -- 字符串