最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

它来了!真正的 python 多线程

来源:博客园

哈喽大家好,我是咸鱼

几天前,IBM 工程师 Martin Heinz 发文表示 python 3.12 版本回引入"Per-Interpreter GIL”,有了这个 Per-Interpreter 全局解释器锁,python 就能实现真正意义上的并行/并发


【资料图】

我们知道,python 的多线程/进程并不是真正意义上的多线程/进程,这是因为 python GIL (Global Interpreter Lock)导致的

而即将发布的 Python 3.12 中引入了名为 "Per-Interpreter GIL" 的新特性,能够实现真正的并发

接下来我们来看下这篇文章,原文链接如下:

https://martinheinz.dev/blog/97

译文

Python 到现在已经 32 岁了,但它到现在还没有实现适当的、真正的并发/并行

由于将在 Python 3.12 (预计 2023 年 10 月发布)中引入 “Per-Interpreter GIL”(全局解释器锁),这种情况将会被改变

虽然距离 python 3.12 的发布还有几个月的时间,但是相关代码已经实现了。所以让我们提前来了解一下如何使用子解释器 API(ub-interpreters API) 来编写出真正的并发Python代码

子解释器(Sub-Interpreters)

我们首先来看下这个 “Per-Interpreter GIL” 是如何解决 Python 缺失适当并发性这个问题的

简单来讲,GIL(全局解释器锁)是一个互斥锁,它只允许一个线程控制 Python 解释器(某个线程想要执行,必须要先拿到 GIL ,在一个 python 解释器里面,GIL 只有一个,拿不到 GIL 的就不允许执行)

这就意味着即使你在 Python 中创建多个线程,也只会有一个线程在运行

随着 “Per-Interpreter GIL” 的引用,单个 python 解释器不再共享同一个 GIL。这种隔离级别允许每个子 python 解释器真正地并发运行

这意味着我们可以通过生成额外的子解释器来绕过 Python 的并发限制,其中每个子解释器都有自己的GIL(拿到一个 GIL 锁)

更详细的说明请参见 PEP 684,该文档描述了此功能/更改:https://peps.python.org/pep-0684/#per-interpreter-state

如何安装

想要使用这个新功能,我们需要安装最新的 python 版本,这需要源码编译安装

# https://devguide.python.org/getting-started/setup-building/#unix-compilinggit clone https://github.com/python/cpython.gitcd cpython./configure --enable-optimizations --prefix=$(pwd)/python-3.12make -s -j2./python# Python 3.12.0a7+ (heads/main:22f3425c3d, May 10 2023, 12:52:07) [GCC 11.3.0] on linux# Type "help", "copyright", "credits" or "license" for more information.

C-API 在哪里

现在我们已经安装好了最新版本,那么我们该如何使用子解释器呢?我们可以直接通过 import来导入吗?不幸的是,还不能

正如 PEP-684 中指出的: ...this is an advanced feature meant for a narrow set of users of the C-API.

Per-Interpreter GIL 的特性目前只能通过 C-API 使用,还没有直接的接口供开发人员使用

接口预计会在 PEP 554中出现,如果大家能够接受,它应该会在 Python 3.13 中出现,在这个版本出现之前,我们必须自己想办法来实现子解释器

虽然还没有相关文档,也没有相关模块可以导入,但 CPython 代码库中有一些代码段向我们展示了如何使用它:

  1. 方法一:我们可以使用 _xxsubinterpreters模块(因为是通过 C 实现的,所以命名比较奇怪,而且在 python 中不能够简单地去检查代码)
  2. 方法二:可以使用 CPython 的 test 模块,该模块具有用于测试的示例 Interpreter(和 Channel)类
# Choose one of these:import _xxsubinterpreters as interpretersfrom test.support import interpreters

通常情况下我们一般用上面的第二种方法来实现

我们已经找到了子解释器,但我们还需要通过 test 模块去借用一些辅助函数,以便将代码传递给子解释器,辅助函数如下

from textwrap import dedentimport os# https://github.com/python/cpython/blob/#   15665d896bae9c3d8b60bd7210ac1b7dc533b093/Lib/test/test__xxsubinterpreters.py#L75def _captured_script(script):    r, w = os.pipe()    indented = script.replace("\n", "\n                ")    wrapped = dedent(f"""        import contextlib        with open({w}, "w", encoding="utf-8") as spipe:            with contextlib.redirect_stdout(spipe):                {indented}        """)    return wrapped, open(r, encoding="utf-8")def _run_output(interp, request, channels=None):    script, rpipe = _captured_script(request)    with rpipe:        interp.run(script, channels=channels)        return rpipe.read()

interpreters模块与上面的辅助函数组合在一起,便可以生成第一个子解释器:

from test.support import interpretersmain = interpreters.get_main()print(f"Main interpreter ID: {main}")# Main interpreter ID: Interpreter(id=0, isolated=None)interp = interpreters.create()print(f"Sub-interpreter: {interp}")# Sub-interpreter: Interpreter(id=1, isolated=True)# https://github.com/python/cpython/blob/#   15665d896bae9c3d8b60bd7210ac1b7dc533b093/Lib/test/test__xxsubinterpreters.py#L236code = dedent("""            from test.support import interpreters            cur = interpreters.get_current()            print(cur.id)            """)out = _run_output(interp, code)print(f"All Interpreters: {interpreters.list_all()}")# All Interpreters: [Interpreter(id=0, isolated=None), Interpreter(id=1, isolated=None)]print(f"Output: {out}")  # Result of "print(cur.id)"# Output: 1

生成和运行新解释器的一种方法是使用 create()函数,然后将解释器与我们想要执行的代码一起传递给 _run_output()辅助函数

还有一种更简单的方法,如下所示

interp = interpreters.create()interp.run(code)

直接使用 interpreters模块的 run方法。

但如果我们运行上面这两段代码时,会收到以下报错

Fatal Python error: PyInterpreterState_Delete: remaining subinterpretersPython runtime state: finalizing (tstate=0x000055b5926bf398)

为了避免这个报错,我们还需要清理一些悬挂的解释器:

def cleanup_interpreters():    for i in interpreters.list_all():        if i.id == 0:  # main            continue        try:            print(f"Cleaning up interpreter: {i}")            i.close()        except RuntimeError:            pass  # already destroyedcleanup_interpreters()# Cleaning up interpreter: Interpreter(id=1, isolated=None)# Cleaning up interpreter: Interpreter(id=2, isolated=None)

线程

虽然使用上面的辅助函数运行代码是可行的,但在 threading模块中使用熟悉的接口可能会更方便

import threadingdef run_in_thread():    t = threading.Thread(target=interpreters.create)    print(t)    t.start()    print(t)    t.join()    print(t)run_in_thread()run_in_thread()# # # # # # 

我们通过把 interpreters.create函数传递给Thread,它会自动在线程内部生成新的子解释器

我们也可以结合这两种方法,并将辅助函数传递给 threading.Thread

import timedef run_in_thread():    interp = interpreters.create(isolated=True)    t = threading.Thread(target=_run_output, args=(interp, dedent("""            import _xxsubinterpreters as _interpreters            cur = _interpreters.get_current()            import time            time.sleep(2)            # Can"t print from here, won"t bubble-up to main interpreter            assert isinstance(cur, _interpreters.InterpreterID)            """)))    print(f"Created Thread: {t}")    t.start()    return tt1 = run_in_thread()print(f"First running Thread: {t1}")t2 = run_in_thread()print(f"Second running Thread: {t2}")time.sleep(4)  # Need to sleep to give Threads time to completecleanup_interpreters()

上面的代码中演示了如何使用 _xxsubinterpreters模块来实现 (方法一)

我们还在每个线程中休眠 2 秒来模拟“工作”状态

请注意,我们甚至不必调用 join() 函数等待线程完成,只需在线程完成时清理解释器即可

Channels

如果我们进一步挖掘 CPython test模块,我们还会发现 RecvChannel 和 SendChannel 类的实现类似于 Golang 中已知的通道

# https://github.com/python/cpython/blob/#   15665d896bae9c3d8b60bd7210ac1b7dc533b093/Lib/test/test_interpreters.py#L583r, s = interpreters.create_channel()print(f"Channel: {r}, {s}")# Channel: RecvChannel(id=0), SendChannel(id=0)orig = b"spam"s.send_nowait(orig)obj = r.recv()print(f"Received: {obj}")# Received: b"spam"cleanup_interpreters()# Need clean up, otherwise:# free(): invalid pointer# Aborted (core dumped)

上面的例子介绍了如何创建一个接收端通道(r)和发送端通道(s),然后我们使用 send_nowait方法将数据发送,通过 recv方法来接收数据

这个通道实际上只是另一个解释器,和以前一样,我们需要在处理完它之后进行清理

Digging Deeper

如果我们想要修改或者调整子解释器的选项(这些选项通常在 C 代码中设置),我们可以使用

test.support模块中的代码,具体来说是run_in_subinterp_with_config

import test.supportdef run_in_thread(script):    test.support.run_in_subinterp_with_config(        script,        use_main_obmalloc=True,        allow_fork=True,        allow_exec=True,        allow_threads=True,        allow_daemon_threads=False,        check_multi_interp_extensions=False,        own_gil=True,    )code = dedent(f"""            from test.support import interpreters            cur = interpreters.get_current()            print(cur)            """)run_in_thread(code)# Interpreter(id=7, isolated=None)run_in_thread(code)# Interpreter(id=8, isolated=None)

上面这个run_in_subinterp_with_config函数是 C 函数的 Python API。它提供了一些子解释器选项,如 own_gil,指定子解释器是否应该拥有自己的 GIL

关键词: