最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

回顾、信号、flask-script、sqlalchemy介绍和快速使用、创建操作数据表

来源:博客园


(资料图片仅供参考)

回顾

# 3 local对象-并发编程中的一个对象,它可以保证多线程并发访问数据安全    -本质原理是:不同的线程,操作的是自己的数据    -不支持协程    # 4 自己定义local,支持线程和协程# 注意点一:    try:        # 只要解释器没有装greenlet,这句话就会报错        # 一旦装了,有两种情况,使用了协程和没用协程,无论使用不使用,用getcurrent都能拿到协程id号        from greenlet import getcurrent as get_ident     except Exception as e:        from threading import get_ident# 注意点二:重写类的  __setattr__和__getattr__    对象.属性 取值 不存在会触发  __getattr__   对象.属性 设置值 不存在时会触发  __setattr__                   # 注意点三:由于重写了__setattr__和__getattr__类内部使用 self.storage  会递归    使用类调用对象的方法,它就是普通函数,有几个值传几个值        object.__setattr__(self, "storage", {})        等同于:self.storage={}        等价于:setattr(self,"storage", {})  会递归                        # 5 flask是如何实现这个local类的    def __setattr__(self, name, value):        ident = self.__ident_func__()        storage = self.__storage__        try:            storage[ident][name] = value        except KeyError:            storage[ident] = {name: value}                def __getattr__(self, name):        try:            return self.__storage__[self.__ident_func__()][name]        except KeyError:            raise AttributeError(name)                                     def __getattr__(self, k):        ident = get_ident()        return self.storage[ident][k]    def __setattr__(self, k, v):        ident = get_ident() #如果用协程,这就是协程号,如果是线程,这就是线程号        if ident in self.storage:  #{"协程id号":{arg:1},"协程id号":{arg:2},"协程id号":{arg:3}}            self.storage[ident][k] = v        else:            self.storage[ident] = {k: v}                                                # 6 偏函数  :提前传值,返回一个对象,后期可以调用这个对象,传入后续的值# 7 请求上下文源码分析(ctx 对象),整个flask的执行流程-一旦请求来了----》会执行 Flask类的对象app()---》触发Flask __call__--->self.wsgi_app(environ, start_response)    -Flask类wsgi_app 方法  大约 2417行     def wsgi_app(self, environ, start_response):        #1 返回了一个ctx,请求上下文对象,RequestContext 的对象,里面有session,request        ctx = self.request_context(environ)        try:            try:                # 2 ctx.push---->RequestContext的push---》382行                 # _request_ctx_stack.push(self)--self是ctx---》是全局变量                # 是LocalStack()的对象                ctx.push()                # 匹配路由执行视图函数,请求扩展                response = self.full_dispatch_request()            except Exception as e:                error = e                response = self.handle_exception(e)            except:  # noqa: B001                error = sys.exc_info()[1]                raise            # 把结果返回给wsgi服务器            return response(environ, start_response)        finally:            if self.should_ignore_error(error):                error = None            # 把当前放进去的ctx剔除,当次请求结束了            ctx.auto_pop(error)-是LocalStack()的对象 的push ,传入了ctx        def push(self, obj):            # self._local是 Flask自己定义的兼容线程和协程的Local            #self._local中反射 stack,会根据不同线程或协程,返回不同线程的stack            #rv是None的            rv = getattr(self._local, "stack", None)            if rv is None:                # rv=[]                # self._local.stack=rv          #self._local={"协程id号1":{stack:[]},"协程id号2":{stack:[]}}                self._local.stack = rv = []            rv.append(obj)            #self._local={"协程id号1":{stack:[ctx,]},"协程id号2":{stack:[]}}            return rv                                 在视图函数中:request = LocalProxy(partial(_lookup_req_object, "request"))         print(request.method) # 执行requets对象的 __getattr__         LocalProxy的__getattr__-->核心:         return getattr(self._get_current_object(), name)         self._get_current_object() 是 ctx中的真正request对象,那method,自如就拿到当次请求的method         def _get_current_object(self):            if not hasattr(self.__local, "__release_local__"):                #object.__setattr__(self, "_LocalProxy__local", local),初始化传入的                # local 是 partial(_lookup_req_object, "request")                #                 # getattr(_lookup_req_object("request"), "method")                # getattr(当次请求的reuqest, "method")                return self.__local() # self中的 __local,隐藏属性            try:                return getattr(self.__local, self.__name__)            except AttributeError:                raise RuntimeError("no object bound to %s" % self.__name__)                    def _lookup_req_object(name):        # 这里把当前线程下 的ctx取出来了        top = _request_ctx_stack.top        if top is None:            raise RuntimeError(_request_ctx_err_msg)        return getattr(top, name) # 去ctx中反射request,返回的就是当次请求的requets    # django flask 同步框架,部署的时候,使用uwsgi部署,uwsgi是进程线程架构,并发量不高# 可以通过uwsgi+gevent,部署成异步程序

信号

# Flask框架中的信号基于blinker(安装这个模块),其主要就是让开发者可是在flask请求过程中定制一些用户行为  flask 和django都有#观察者模式,又叫发布-订阅(Publish/Subscribe)  23 种设计模式之一pip3.8 install blinker# 信号:signial 翻译过来的,并发编程中学过 信号量Semaphore# 比如:用户表新增一条记录,就记录一下日志     方案一:在每个增加后,都写一行代码  ---》后期要删除,比较麻烦     方案二:使用信号,写一个函数,绑定内置信号,只要程序执行到这,就会执行这个函数# 内置信号:flask少一些,django多一些request_started = _signals.signal("request-started")                # 请求到来前执行request_finished = _signals.signal("request-finished")              # 请求结束后执行 before_render_template = _signals.signal("before-render-template")  # 模板渲染前执行template_rendered = _signals.signal("template-rendered")            # 模板渲染后执行 got_request_exception = _signals.signal("got-request-exception")    # 请求执行出现异常时执行 request_tearing_down = _signals.signal("request-tearing-down")      # 请求执行完毕后自动执行(无论成功与否)appcontext_tearing_down = _signals.signal("appcontext-tearing-down")# 应用上下文执行完毕后自动执行(无论成功与否) appcontext_pushed = _signals.signal("appcontext-pushed")            # 应用上下文push时执行appcontext_popped = _signals.signal("appcontext-popped")            # 应用上下文pop时执行message_flashed = _signals.signal("message-flashed")                # 调用flask在其中添加数据时,自动触发# 使用内置信号的步骤    1 写一个函数    2 绑定内置信号    3 等待被触发from flask import Flask, render_template,signals,sessionfrom flask.signals import _signalsapp = Flask(__name__)app.debug = Trueapp.secret_key = "SSSSSSSSSSSSS"# 定义信号session_set = _signals.signal("session_set")# 写一个函数def test1(*args,**kwargs):    print(args)    print(kwargs)    print("session设置了")# 绑定自定义的信号session_set.connect(test1)@app.route("/")def hhhh():    session["lqz"] = "lqz"    session_set.send("lqz")  # 触发信号执行    return "hello world"@app.route("/index")def index():    return render_template("index.html", name="lqz")if __name__ == "__main__":    app.run()# django中使用信号https://www.cnblogs.com/liuqingzheng/articles/9803403.html

django中信号

Model signals    pre_init                    # django的modal执行其构造方法前,自动触发    post_init                   # django的modal执行其构造方法后,自动触发    pre_save                    # django的modal对象保存前,自动触发    post_save                   # django的modal对象保存后,自动触发    pre_delete                  # django的modal对象删除前,自动触发    post_delete                 # django的modal对象删除后,自动触发    m2m_changed                 # django的modal中使用m2m字段操作第三张表(add,remove,clear)前后,自动触发    class_prepared              # 程序启动时,检测已注册的app中modal类,对于每一个类,自动触发Management signals    pre_migrate                 # 执行migrate命令前,自动触发    post_migrate                # 执行migrate命令后,自动触发Request/response signals    request_started             # 请求到来前,自动触发    request_finished            # 请求结束后,自动触发    got_request_exception       # 请求异常后,自动触发Database Wrappers    connection_created          # 创建数据库连接时,自动触发# django中使用内置信号1 写一个函数    def callBack(*args, **kwargs):        print(args)        print(kwargs)    2 绑定信号    #方式一    post_save.connect(callBack)    # 方式二    from django.db.models.signals import pre_save    from django.dispatch import receiver    @receiver(pre_save)    def my_callback(sender, **kwargs):        print("对象创建成功")        print(sender)        print(kwargs)    3 等待触发

flask-script

# django中,有命令       python manage.py runserver#flask启动项目,像djagno一样,通过命令启动(版本必须匹配)Flask==2.2.2Flask_Script==2.0.3#借助于:flask-script 实现     安装:pip3.8 install flask-script     修改代码:        from flask_script import Manager        manager=Manager(app)        manager.run()     用命令启动        python manage.py runserverfrom flask import Flaskfrom flask_script import Managerapp = Flask(__name__)app.debug =Truemanager = Manager(app)@app.route("/")def index():    return "index"if __name__ == "__main__":    # app.run()    manager.run()# 自定制命令    #1  简单自定制命令    @manager.command    def custom(arg):        # 命令的代码,比如:初始化数据库, 有个excel表格,使用命令导入到mysql中        print(arg)   #2 复杂一些的自定制命令    @manager.option("-n", "--name", dest="name")    @manager.option("-u", "--url", dest="url")    def cmd(name, url):        # python run.py cmd -n lqz -u xxx        # python run.py cmd --name lqz --url uuu        print(name, url)# django 中如何自定制命令

sqlalchemy快速使用

# flask 中没有orm框架,对象关系映射,方便我们快速操作数据库# flask,fastapi中用sqlalchemy居多# SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果# 安装pip3.8 install sqlalchemy#了解SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件 pymysql    mysql+pymysql://:@/[?]    cx_Oracle    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]    更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html

原生操作的快速使用

先不是orm  而是原生sql第一步:导入from sqlalchemy import create_engine# 第二步:生成引擎对象engine = create_engine(    "mysql+pymysql://root@127.0.0.1:3306/cnblogs",    max_overflow=0,  # 超过连接池大小外最多创建的连接    pool_size=5,  # 连接池大小    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置))# 第三步:使用引擎获取连接,操作数据库conn = engine.raw_connection()cursor=conn.cursor()cursor.execute("select * from aritcle")print(cursor.fetchall())

创建操作数据表

# 第一步:导入from sqlalchemy import create_engineimport datetimefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index# 第二步:执行declarative_base,得到一个类Base = declarative_base()# 第三步:继承生成的Base类class User(Base):    # 第四步:写字段    id = Column(Integer, primary_key=True)  # 生成一列,类型是Integer,主键    name = Column(String(32), index=True, nullable=False)  # name列varchar32,索引,不可为空    email = Column(String(32), unique=True)    # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间    ctime = Column(DateTime, default=datetime.datetime.now)    # extra = Column(Text, nullable=True)    # 第五步:写表名 如果不写以类名为表名    __tablename__ = "users"  # 数据库表名称    # 第六步:建立联合索引,联合唯一    __table_args__ = (        UniqueConstraint("id", "name", name="uix_id_name"),  # 联合唯一        Index("ix_id_name", "name", "email"),  # 索引    )class Book(Base):    __tablename__ = "books"    id = Column(Integer, primary_key=True)    name = Column(String(32))# 第七步:把表同步到数据库中# 不会创建库,只会创建表engine = create_engine(    "mysql+pymysql://root@127.0.0.1:3306/aaa",    max_overflow=0,  # 超过连接池大小外最多创建的连接    pool_size=5,  # 连接池大小    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置))# 把表同步到数据库  (把被Base管理的所有表,都创建到数据库)Base.metadata.create_all(engine)# 把所有表删除# Base.metadata.drop_all(engine)

关键词: