最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

Python工具箱系列(三十五) 全球新消息

来源:博客园

前文使用了SQLAlchemy的Core层来实现数据库、表与数据的CRUD。初步体现出了SQLAlchemy的优势。但ORM的特点没有充分地表现出来。下面的代码则从Python的类出现,生成表结构,并且进行数据的CRUD操作。


【资料图】

from sqlalchemy import (Column, DateTime, Float, ForeignKey, Integer, MetaData,                        String, Table, create_engine, text)from sqlalchemy.orm import Session, registry, relationshipfrom sqlalchemy_utils.functions import create_database, database_existsimport randomimport datetimefrom sqlalchemy import selectconfigure_pg = {"user": "postgres",                "password": "88488848",                "dns": "192.168.0.155",                "port": 5432,                "prefix": "postgresql+psycopg2",                "postfix": ""                }configure_mssql = {"user": "sa",                   "password": "88488848",                   "dns": "192.168.0.155",                   "port": 1433,                   "prefix": "mssql+pymssql",                   "postfix": "?charset=utf8"                   }configure_mysql = {"user": "root",                   "password": "88488848",                   "dns": "192.168.0.155",                   "port": 3306,                   "prefix": "mysql+mysqlconnector",                   "postfix": ""                   }config = {"mssql": configure_mssql,          "mysql": configure_mysql, "postgresql": configure_pg}database_name = "sensordb"table_sensor_location = "sensor_location_orm"table_sensor_data = "sensor_data_orm"mapper_registry = registry()Base = mapper_registry.generate_base()class Sensor(Base):    __tablename__ = table_sensor_location    id = Column(Integer, primary_key=True, autoincrement=False)    location = Column(String(30))    def __repr__(self):        return f"Sensor(id={self.id!r}, location={self.location!r}"class SensorData(Base):    __tablename__ = table_sensor_data    id = Column(Integer, primary_key=True, autoincrement=False)    sensor_id = Column(Integer, ForeignKey(f"{table_sensor_location}.id"))    sensor = relationship("Sensor")    area = Column(String(30))    pm25 = Column(Float)    timestamp = Column(DateTime)    def __repr__(self):        return f"SensorData(id={self.id!r}, pm25={self.pm25!r})"def linkdb(targetstr):    """    连接不同的数据库    Args:        targetstr (string): 数据库名称    Returns:        engine: 用于后续的数据库连接    """    if targetstr in config.keys():        item = config[targetstr]        connectstring = f"{item["prefix"]}://{item["user"]}:{item["password"]}@{item["dns"]}:{item["port"]}/{database_name}{item["postfix"]}"        engine = create_engine(connectstring, echo=True, future=True)    # 如果数据库不存在,则创建之    if not database_exists(engine.url):        create_database(engine.url)    return enginedef gen_sensorinfo(connector):    # 向sensor_location_orm插入传感器记录    sensor1 = Sensor(id=1, location="floor")    sensor2 = Sensor(id=2, location="wall")    session = Session(connector)    session.add(sensor1)    session.add(sensor2)    session.commit()def gen_sensordata(connector):    # 向sensor_data_orm插入测量数据    session = Session(connector)    currenttime = datetime.datetime.now()    # 向sensor_data_orm插入多条记录    for index in range(10):        currenttime = currenttime+datetime.timedelta(seconds=1)        sensordata = SensorData(id=index, sensor_id=index % 2 + 1,                                area="xian", pm25=random.uniform(0, 300), timestamp=currenttime)        session.add(sensordata)        session.flush()    session.commit()def query_demo(connector):    """    查询示例代码    Args:        connector (engine): 数据库连接    """    session = Session(connector)        # 查询传感器信息    stmt = select(Sensor).where(Sensor.id == 2)    print(stmt)    result = session.execute(stmt)    for sensor_obj in result.scalars():        print(sensor_obj.location)    # 查询传感器数据    stmt = select(SensorData).order_by(SensorData.sensor_id)    print(stmt)    result = session.execute(stmt)    for sensordata_obj in result.scalars():        print(sensordata_obj.pm25,sensordata_obj.timestamp)    # join    stmt = select(SensorData.id,SensorData.pm25, Sensor.location).join(SensorData.sensor).order_by(SensorData.pm25)    for row in session.execute(stmt):        print(row)# 依次连接多个数据库。从而验证代码的一致性for dbname in config.keys():    con = linkdb(dbname)    # 创建数据库    mapper_registry.metadata.create_all(con)    # 创建传感器信息    gen_sensorinfo(con)    # 创建传感器数据    gen_sensordata(con)        # 查询示例    query_demo(con)

ORM编程的一个好处就是不用写SQL语句,而以面向对象的思维方式来看数据库。正如前文所说,ORM将类与数据库的表一一对应。我们定义两个类。

◆Sensor类。保存传感器本身的信息。

◆SensorData类-保存传感器测量出来的数据。

这两个类均继续SQLAlchemy提供的Base类。虽然也可以自行定义类与表的映射关系,但最方便的仍然是直接继承Base类。两个类的写法类似,均需要定义主键,同时在SensorData类定义了外键形成Sensor与SensorData的ONE-TO-MANY的关系如下图所示:

类Sensor中的repr是为了方便输出信息用。类SensorData中的relationship定义了与Sensor的关联,后续在ORM基于JOIN的查询中起作用。

定义好Sensor与SensorData类后,直接使用mapper_registry.metadata.create_all(con)就可以创建与两个类对应的数据库。代码分别在mysql/mssql/postgresql等数据库中同时建立sensordb数据库与对应的两个表。

随后gen_sensorinfo创建2个Sensor类的实例,然后当作记录插入到表中。gen_sensordata创建10个模拟数据,并且当作记录插入表中,如下图所示:

在整个创建数据库/表/记录的过程中,全程没有SQL语句出现。随后使用query_demo演示查询的能力。

stmt = select(Sensor).where(Sensor.id == 2)print(stmt)

以上代码能够输出对应的SQL语句如下:

SELECT sensor_location_orm.id, sensor_location_orm.locationFROM sensor_location_ormWHERE sensor_location_orm.id = %(id_1)s

这些代码就是SQLAlchemy在后台实际运行的SQL语句。更为复杂的基于表关联的查询如下所示:

stmt = select(SensorData.id,SensorData.pm25, Sensor.location).join(SensorData.sensor).order_by(SensorData.pm25)for row in session.execute(stmt):    print(row)

以上语句对表sensor_data_orm进行查询,同时基于sensor_id关联到sensor_location_orm表,将传感的位置信息输出。形成的查询结果如下所示:

(1, 52.030998649965355, "wall")(0, 52.33204928632623, "floor")(6, 59.39568057249285, "floor")(2, 73.41761802578553, "floor")(9, 77.12459935499986, "wall")(4, 119.62669535658156, "floor")(5, 154.50858896768042, "wall")(7, 165.03277721842784, "wall")(3, 186.1637895876464, "wall")(8, 288.3729395365834, "floor")

由于语句中要求order_by(SensorData.pm25),所以id不是连续的,但pm列是从小到大排列。同时传感器的位置信息也显示出来。从示例代码来看,对于数据库的操作完成类似于对类的操作,查询方式也非常简单,不用关心SQL的实现,体现了ORM编程的强大威力。不过SQLAlchemy虽然好,但设计之初的目标是针对关系型数据库的,NOSQL数据库的支持不足。部分数据库(例如clickhouse等)自行提供了对SQLAlchemy的支持。

关键词: