最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

Python使用pyppeteer搭建网页截图api

来源:博客园


(相关资料图)

因为跨语言需要,打算把pyppeteer、图片压缩、aws S3封装成一个api来调用。

首先自然是要安装依赖 pip3 install pillow boto3 pyppeteer

运行一次脚本,pyppeteer会自动下载最新的浏览器到本地

实测windows启动Chrome时需要设置headless=False 阅读几篇关于pyppeteer的文章你应该知道我在说什么。

请求用GET/POST均可,GET将参数放进query,POST将参数放进消息体,使用application/x-www-urluncoded

# 使用GETcurl -i "http://127.0.0.1:9871/?token=xry1029&uri=https://www.baidu.com"# 使用POSTcurl -X POST -i "http://127.0.0.1:9871" --data "token=xry1029" --data "uri=https://www.baidu.com"

切记uri需要加http://,不需要的功能就注释掉吧...

#!/usr/bin/python3# author: 如雨yu# date: 2023/1/18# httpd importimport http.server as BaseHTTPServerimport socketserver as SocketServerimport urllib.parse as urlparseimport threadingimport reimport argparseimport json# chrome importfrom pyppeteer import launch, connectimport asyncioimport tracebackimport timeimport sys# 图片压缩 importimport osfrom PIL import Imagefrom PIL import ImageFile# aws S3 importimport osimport boto3import base64def compress_image(outfile, savefile, kb=1536, quality=75, k=0.75): # tx服务器貌似只要1.5m以内的图    """    :param outfile: 要压缩的文件    :param savefile: 导出文件    :param kb: 压缩目标,KB    :param k: 每次调整的压缩比率    :param quality: 初始压缩比率    :return: 压缩文件地址,压缩文件大小    outfile => savefile    """    o_size = os.path.getsize(outfile) // 1024 # 函数返回为字节,除1024转为kb(1kb = 1024 bit)    if o_size <= kb:        os.rename(outfile,savefile)        print("[compress] 无需压缩")        return savefile    ImageFile.LOAD_TRUNCATED_IMAGES = True # 防止图像被截断而报错    while o_size > kb:        im = Image.open(outfile)        x, y = im.size        out = im.resize((int(x*k), int(y*k)), Image.Resampling.LANCZOS)        try:            out.save(outfile, quality=quality) # quality 质量        except Exception as e:            print(e)            break        o_size = os.path.getsize(outfile) // 1024    os.rename(outfile,savefile)    print("[compress] 压缩完成")    return savefileasync def launchchrome():    # 全局化变量    global WsToken    #global browser    browser = await launch(autoClose=False,userDataDir="/home/user/user-data/",args=["--disable-infobars","--disable-gpu"])    WsToken = browser.wsEndpoint    print ("[browser] 浏览器启动完成!Ws: "+str(WsToken))    await browser.disconnect()loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)loop.run_until_complete(launchchrome())loop.close()async def screenshot(Width, Height, Uri, Output, JSexec, SleepTime, FullPage):    # 连接浏览器    browser = await connect(browserWSEndpoint=WsToken)     page = await browser.newPage()    # 设置页面视图大小    await page.setViewport(viewport={"width":Width,"height":Height})    # 是否启用JS,enabled设为False,则无渲染效果    await page.setJavaScriptEnabled(enabled=True)    # 跳到地址    page.setDefaultNavigationTimeout(15000)  #渲染时间15s 多了就是出现未知的bug    # 屏蔽webdriver    await page.evaluateOnNewDocument("Object.defineProperty(navigator, "webdriver", {get: () => false})")    await page.goto(Uri)    # 运行js    await page.evaluate(JSexec)    # 等待时间    time.sleep(SleepTime)    # 截图    Saveput = Output    Output =  Saveput+"-cache.png"    # 长截图...    if FullPage == "true":        await page.screenshot({"path": Output, "fullPage": True})    else:        await page.screenshot({"path": Output})    # 压缩一下    compress_image(Output, Saveput)    # 断开浏览器    print("[browser] 生成图片完成")    await page.close() #关掉标签防止内存太多ww    await browser.disconnect()async def StopChrome():    # 连接浏览器    browser = await connect(browserWSEndpoint=WsToken)     # 关掉!    await browser.close()    await browser.disconnect()def upload_files(path_local, path_s3):    """    上传(重复上传会覆盖同名文件)    :param path_local: 本地路径    :param path_s3: s3路径    """    print(f"[aws S3] Start upload files.")     if not upload_single_file(path_local, path_s3):        raise Exception(f"[aws S3] Upload files failed.")     print(f"[aws S3] Upload files successful.")  def upload_single_file(src_local_path, dest_s3_path):    """    上传单个文件    用upload_files方法    :param src_local_path:    :param dest_s3_path:    :return:    """    try:        with open(src_local_path, "rb") as f:            s3.upload_fileobj(f, BUCKET_NAME, dest_s3_path)    except Exception as e:        print(f"[aws S3] Upload data failed. | src: {src_local_path} | dest: {dest_s3_path} | Exception: {e}")        return False    #print(f"[aws S3] Uploading file successful. | src: {src_local_path} | dest: {dest_s3_path}")    return True# 开始操作s3BUCKET_NAME = ""  # 存储桶名称# aws_access_key_id和aws_secret_access_key# 使用base64加密一下S3_AKI = b""S3_SAK = b""# str类型CN_S3_AKI = base64.b64decode(S3_AKI).decode("utf-8") CN_S3_SAK = base64.b64decode(S3_SAK).decode("utf-8")CN_S3_AKI = CN_S3_AKI.replace(""", "")CN_S3_SAK = CN_S3_SAK.replace(""", "")CN_REGION_NAME = "" #前缀域名ENDPOINT_URL = "" # endpoint 端点域名 一定看给的那个啊!!!# 打开实例s3_session = boto3.Session(region_name=CN_REGION_NAME,                  aws_access_key_id=CN_S3_AKI,                  aws_secret_access_key=CN_S3_SAK)s3 = s3_session.client("s3", endpoint_url=ENDPOINT_URL)print("[aws S3] 初始化完成")class apiHandler(BaseHTTPServer.BaseHTTPRequestHandler):    def do_GET(self):        path,args=urlparse.splitquery(self.path)        self._response(path, args)    def do_POST(self):        args = self.rfile.read(int(self.headers["content-length"])).decode("utf-8")        self._response(self.path, args)    def _response(self, path, args):        self.send_response(200)        self.send_header("Content-type","text/plain")        self.end_headers()        # 开始call chrome        # 参数处理        if args:                args=urlparse.parse_qs(args).items()                args=dict([(k,v[0]) for k,v in args])        else:                args={}                Uri=args.get("uri","https://www.baidu.com")        ScreenShotPath=args.get("path","test.jpg")        Height=int(args.get("h","1080"))        Token=args.get("token","null")        Width=int(args.get("w","1920"))        FullPage=args.get("fullpage","false")        JSexec=args.get("jsexec", "void(0);")        SleepTime=int(args.get("sleeptime", "0"))        err_str = None        try:            # 在这里可以指定一个token 没有token不生成截图            if Token != "":                print("[httpd] 无token访问...")                # 抛出无token                raise Exception("Bad Token")            else:                loop = asyncio.new_event_loop()                asyncio.set_event_loop(loop)                loop.run_until_complete(screenshot(Width, Height, Uri, "<输出图片地址>"+ScreenShotPath, JSexec, SleepTime, FullPage))                loop.close()        except Exception as e:            err_str = "服务器错误: "+str(e) #+"\n"+traceback.format_exc()        if err_str is not None:            response_str = err_str        else:            # 上传s3            upload_files("<输出图片地址>"+ScreenShotPath, "data/"+ScreenShotPath)            response_str = "https:///data/"+ScreenShotPath        self.wfile.write(response_str.encode("UTF-8"))class ThreadedServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):    def __init__(self, *args,**kwargs):        self.screen_lock = threading.Lock()        BaseHTTPServer.HTTPServer.__init__(self, *args, **kwargs)    def safe_print(self,*args,**kwargs):        try:            self.screen_lock.acquire()            print(*args,**kwargs)        finally:            self.screen_lock.release()if __name__=="__main__":    parser=argparse.ArgumentParser()    parser.add_argument("-ip","--address",required=False,help="IP address to listen. Default is 127.0.0.1",default="127.0.0.1")    parser.add_argument("-p","--port",type=int,help="port to bind",default=9087)    args = parser.parse_args()        server = ThreadedServer((args.address, args.port), apiHandler)     #start the server    print("[httpd] Server is Ready. %s:%s" % (args.address, args.port))        while True:        try:            server.handle_request()        except KeyboardInterrupt:            break            server.safe_print("[main] Control-C hit: Exiting server...")    loop = asyncio.new_event_loop()    asyncio.set_event_loop(loop)    loop.run_until_complete(StopChrome())    loop.close()

如果要关掉chrome呢?当然不是手动ctrl+c然后kill

用这个,重新连接chrome浏览器并且关闭

#!/usr/bin/python3from pyppeteer import connectimport asyncioglobal WsTokenf = open("/home/user/user-data/DevToolsActivePort", "r",encoding="utf-8")Port = f.readline() # 第一行 有\nPort = Port.replace("\n", "")Token = f.readline() # 第二行 没有WsToken = "ws://127.0.0.1:"+Port+Tokenasync def StopChrome():    browser = await connect(browserWSEndpoint=WsToken)     await browser.close()    await browser.disconnect() # 离谱 关浏览器还要断连...loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)loop.run_until_complete(StopChrome())loop.close()

大概1.5s一张图,还是不太稳定..希望有大佬能帮忙提提意见

关键词: 导出文件 重新连接 生成图片