最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

最资讯丨Python 中连接MSSQL,MySQL,SQLite,Redis,ElasticSearch,Mongodb,PostgreSQL,Oracle,RabbitMQ

来源:博客园
在Python 中连接MSSQL,MySQL,SQLite,Redis,ElasticSearch,Mongodb,PostgreSQL,Oracle,RabbitMQ等数据库/中间件的方式

Python 中连接 MSSQL 数据库

要在 Python 中连接 MSSQL 数据库,可以使用 pyodbc 模块。以下是一个简单的示例:

首先,需要安装 pyodbc 模块。可以使用 pip 命令进行安装:

pip install pyodbc

然后,可以使用以下代码连接到 MSSQL 数据库:


(相关资料图)

import pyodbc# 建立连接conn = pyodbc.connect("DRIVER={SQL Server};""SERVER=server_name;""DATABASE=database_name;""UID=username;""PWD=password")# 创建游标cursor = conn.cursor()# 执行查询cursor.execute("SELECT * FROM table_name")# 获取查询结果for row in cursor:print(row)# 关闭连接conn.close()

在上面的代码中,将 server_name替换为 MSSQL 服务器的名称,将 database_name替换为要连接的数据库的名称,将 usernamepassword替换为登录 MSSQL 的用户名和密码。

接下来,使用 pyodbc.connect()方法建立连接,该方法需要指定连接字符串。在连接字符串中,使用 DRIVER={SQL Server}指定要使用的驱动程序,SERVER指定服务器名称,DATABASE指定要连接的数据库名称,UID指定登录的用户名,PWD指定登录的密码。

然后,使用 conn.cursor()方法创建游标,使用 cursor.execute()方法执行查询语句,使用 for循环遍历查询结果,最后使用 conn.close()方法关闭连接。

Python 中连接 MySQL 数据库

要在 Python 中连接 MySQL 数据库,可以使用 PyMySQL 模块。以下是一个简单的示例:

首先,需要安装 PyMySQL 模块。可以使用 pip 命令进行安装:

pip install pymysql

然后,可以使用以下代码连接到 MySQL 数据库:

import pymysql# 建立连接conn = pymysql.connect(host="localhost", port=3306, user="username", password="password", database="database_name")# 创建游标cursor = conn.cursor()# 执行查询cursor.execute("SELECT * FROM table_name")# 获取查询结果for row in cursor:print(row)# 关闭连接conn.close()

在上面的代码中,将 host替换为 MySQL 服务器的地址,将 port替换为 MySQL 服务器的端口号,将 usernamepassword替换为登录 MySQL 的用户名和密码,将 database_name替换为要连接的数据库的名称。

接下来,使用 pymysql.connect()方法建立连接,该方法需要指定连接参数。在连接参数中,使用 host指定 MySQL 服务器的地址,port指定 MySQL 服务器的端口号,user指定登录的用户名,password指定登录的密码,database指定要连接的数据库名称。

然后,使用 conn.cursor()方法创建游标,使用 cursor.execute()方法执行查询语句,使用 for循环遍历查询结果,最后使用 conn.close()方法关闭连接。

Python 中连接 SQLite 数据库

要在 Python 中连接 SQLite 数据库,可以使用内置的 sqlite3 模块。以下是一个简单的示例:

首先,需要使用以下代码建立连接:

import sqlite3# 建立连接conn = sqlite3.connect("my_database.db")# 创建游标对象cursor = conn.cursor()

在上面的代码中,使用 sqlite3.connect()方法建立连接。该方法需要指定数据库文件的路径。如果该文件不存在,则会自动创建一个新的数据库文件。如果路径为 :memory:,则表示在内存中创建一个临时数据库。

接下来,使用 cursor()方法创建游标对象,该对象用于执行 SQL 查询语句。

例如,可以使用以下代码创建一个名为 users的表:

# 创建表格cursor.execute("""    CREATE TABLE users (        id INTEGER PRIMARY KEY,        name TEXT,        email TEXT    )""")

在上面的代码中,使用 execute()方法执行 SQL 查询语句。在该示例中,创建了一个包含 idnameemail三个字段的表格。

接下来,可以使用以下代码插入一些数据:

# 插入数据cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Alice", "alice@example.com"))cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Bob", "bob@example.com"))cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Charlie", "charlie@example.com"))# 提交更改conn.commit()

在上面的代码中,使用 execute()方法执行 SQL 查询语句。在该示例中,插入了三条数据,每条数据包含 nameemail两个字段的值。在插入数据后,使用 commit()方法提交更改。

接下来,可以使用以下代码查询数据:

# 查询数据cursor.execute("SELECT * FROM users")rows = cursor.fetchall()# 打印结果for row in rows:    print(row)

在上面的代码中,使用 execute()方法执行 SQL 查询语句。在该示例中,查询了 users表中的所有数据,并使用 fetchall()方法获取查询结果。最后,使用循环遍历查询结果,并打印每行数据。

最后,可以使用以下代码关闭连接:

# 关闭连接cursor.close()conn.close()

在上面的代码中,使用 close()方法关闭游标对象和数据库连接对象。

除了以上的连接方法外,还可以使用 SQLAlchemy 模块来连接 SQLite 数据库。SQLAlchemy 是一个 Python 的 SQL 工具包,支持多种 SQL 数据库,包括 SQLite。以下是一个使用 SQLAlchemy 的示例:

from sqlalchemy import create_engine, Column, Integer, Stringfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker# 建立连接engine = create_engine("sqlite:///my_database.db", echo=True)# 创建 Session 类Session = sessionmaker(bind=engine)# 创建 Base 类Base = declarative_base()# 创建 User 类class User(Base):    __tablename__ = "users"    id = Column(Integer, primary_key=True)    name = Column(String)    email = Column(String)# 创建表格Base.metadata.create_all(engine)# 创建 Session 对象session = Session()# 插入数据session.add_all([    User(name="Alice", email="alice@example.com"),    User(name="Bob", email="bob@example.com"),    User(name="Charlie", email="charlie@example.com")])session.commit()# 查询数据users = session.query(User).all()for user in users:    print(user.id, user.name, user.email)# 关闭 Session 对象session.close()

在上面的代码中,使用 create_engine()方法建立连接。该方法需要指定数据库文件的路径。在该示例中,使用了 echo=True参数,表示在控制台输出 SQL 查询语句,方便调试。

接下来,使用 sessionmaker()方法创建 Session 类,该类用于创建 Session 对象。使用 declarative_base()方法创建 Base 类,该类用于定义数据模型。

在该示例中,定义了一个 User类,该类继承了 Base类,并定义了 id

Python 中连接 Redis

要在 Python 中连接 Redis 数据库,可以使用 redis 模块。以下是一个简单的示例:

首先,需要安装 redis 模块。可以使用 pip 命令进行安装:

pip install redis

然后,可以使用以下代码连接到 Redis 数据库:

import redis# 建立连接r = redis.Redis(host="localhost", port=6379, db=0)# 设置键值对r.set("key", "value")# 获取键值对value = r.get("key")print(value)# 关闭连接r.close()

在上面的代码中,将 host替换为 Redis 服务器的地址,将 port替换为 Redis 服务器的端口号,将 db替换为要连接的数据库的编号。

接下来,使用 redis.Redis()方法建立连接,该方法需要指定连接参数。在连接参数中,使用 host指定 Redis 服务器的地址,port指定 Redis 服务器的端口号,db指定要连接的数据库的编号。

然后,使用 r.set()方法设置键值对,使用 r.get()方法获取键值对,最后使用 r.close()方法关闭连接。

除了以上的连接方法外,还可以使用 Redis 的连接池来提高连接的效率。连接池可以在多个线程之间共享连接,从而减少连接的开销。以下是一个使用连接池的示例:

import redisfrom redis import ConnectionPool# 建立连接池pool = ConnectionPool(host="localhost", port=6379, db=0)# 获取连接r = redis.Redis(connection_pool=pool)# 设置键值对r.set("key", "value")# 获取键值对value = r.get("key")print(value)# 关闭连接r.close()

在上面的代码中,使用 redis.ConnectionPool()方法创建连接池,然后使用 redis.Redis()方法获取连接,指定 connection_pool参数为连接池。其他操作与前面的示例相同。

使用连接池可以提高连接的效率,但需要注意连接池的大小,以免占用过多的资源。可以使用 max_connections参数来指定连接池的大小。

Python 中连接 ElasticSearch

要在 Python 中连接 ElasticSearch 数据库,可以使用 elasticsearch 模块。以下是一个简单的示例:

首先,需要安装 elasticsearch 模块。可以使用 pip 命令进行安装:

pip install elasticsearch

然后,可以使用以下代码连接到 ElasticSearch 数据库:

from elasticsearch import Elasticsearch# 建立连接es = Elasticsearch(["localhost:9200"])# 创建索引es.indices.create(index="my_index")# 添加文档doc = {"title": "Elasticsearch Tutorial", "content": "This is a tutorial for Elasticsearch"}es.index(index="my_index", id=1, body=doc)# 搜索文档res = es.search(index="my_index", body={"query": {"match": {"content": "tutorial"}}})print(res)# 关闭连接es.close()

在上面的代码中,将 localhost:9200替换为 ElasticSearch 服务器的地址和端口号。

接下来,使用 Elasticsearch()方法建立连接,该方法需要指定连接参数。在连接参数中,使用一个包含 ElasticSearch 服务器地址的列表。

然后,使用 es.indices.create()方法创建索引,使用 es.index()方法添加文档,使用 es.search()方法搜索文档,最后使用 es.close()方法关闭连接。

除了以上的连接方法外,还可以使用 ElasticSearch 的连接池来提高连接的效率。连接池可以在多个线程之间共享连接,从而减少连接的开销。以下是一个使用连接池的示例:

from elasticsearch import Elasticsearchfrom elasticsearch import ConnectionPool, ConnectionSelector# 创建连接池pool = ConnectionPool([{"host": "localhost", "port": 9200}], maxsize=10)# 创建连接选择器selector = ConnectionSelector([pool])# 建立连接es = Elasticsearch(connection_class=selector)# 创建索引es.indices.create(index="my_index")# 添加文档doc = {"title": "Elasticsearch Tutorial", "content": "This is a tutorial for Elasticsearch"}es.index(index="my_index", id=1, body=doc)# 搜索文档res = es.search(index="my_index", body={"query": {"match": {"content": "tutorial"}}})print(res)# 关闭连接es.close()

在上面的代码中,使用 elasticsearch.ConnectionPool()方法创建连接池,指定 ElasticSearch 服务器的地址和端口号。然后,使用 elasticsearch.ConnectionSelector()方法创建连接选择器,将连接池传递给选择器。最后,使用 Elasticsearch()方法建立连接,指定 connection_class参数为连接选择器。

使用连接池可以提高连接的效率,但需要注意连接池的大小,以免占用过多的资源。可以使用 maxsize参数来指定连接池的大小。

Python 中连接 MongoDB

要在 Python 中连接 MongoDB 数据库,可以使用 pymongo 模块。以下是一个简单的示例:

首先,需要安装 pymongo 模块。可以使用 pip 命令进行安装:

pip install pymongo

然后,可以使用以下代码连接到 MongoDB 数据库:

import pymongo# 建立连接client = pymongo.MongoClient("mongodb://localhost:27017/")# 创建数据库和集合db = client["my_database"]col = db["my_collection"]# 插入文档doc = {"name": "John", "age": 30}col.insert_one(doc)# 查询文档res = col.find({"name": "John"})for doc in res:print(doc)# 关闭连接client.close()

在上面的代码中,将 mongodb://localhost:27017/替换为 MongoDB 服务器的地址和端口号。

接下来,使用 pymongo.MongoClient()方法建立连接,该方法需要指定连接参数。在连接参数中,使用 MongoDB 服务器的地址和端口号。

然后,使用 client["my_database"]方法创建数据库,使用 db["my_collection"]方法创建集合。可以将 my_databasemy_collection替换为要创建的数据库和集合的名称。

接下来,使用 col.insert_one()方法插入文档,使用 col.find()方法查询文档,最后使用 client.close()方法关闭连接。

除了以上的连接方法外,还可以使用 MongoDB 的连接池来提高连接的效率。连接池可以在多个线程之间共享连接,从而减少连接的开销。以下是一个使用连接池的示例:

import pymongofrom pymongo import MongoClient, uri_parserfrom pymongo.pool import Pool, PooledConnectionclass ConnectionPool(Pool):def __init__(self, uri, *args, **kwargs):super().__init__(*args, **kwargs)self.uri = uriself.client = MongoClient(uri, connect=False)def create_connection(self):return PooledConnection(self.client)def get_uri(self):return self.uri# 建立连接池uri = "mongodb://localhost:27017/my_database"pool = ConnectionPool(uri, max_size=10)# 建立连接client = MongoClient(pool=pool)# 创建数据库和集合db = client.my_databasecol = db.my_collection# 插入文档doc = {"name": "John", "age": 30}col.insert_one(doc)# 查询文档res = col.find({"name": "John"})for doc in res:print(doc)# 关闭连接client.close()

在上面的代码中,使用 ConnectionPool类继承自 pymongo.pool.Pool类,重写了 create_connection()方法和 get_uri()方法。在 create_connection()方法中,使用 PooledConnection类创建连接。在 get_uri()方法中,返回 MongoDB 服务器的地址和端口号。

然后,使用 ConnectionPool类创建连接池,指定 MongoDB 服务器的地址和端口号。使用 max_size参数来指定连接池的大小。

最后,使用 MongoClient(pool=pool)方法建立连接,指定 pool参数为连接池。使用 client.my_databasedb.my_collection方法创建数据库和集合。可以将 my_databasemy_collection替换为要创建的数据库和集合的名称。

Python 中连接 PostgreSQL

要在 Python 中连接 PostgreSQL 数据库,可以使用 psycopg2 模块。以下是一个简单的示例:

首先,需要安装 psycopg2 模块。可以使用 pip 命令进行安装:

pip install psycopg2

然后,可以使用以下代码连接到 PostgreSQL 数据库:

import psycopg2# 建立连接conn = psycopg2.connect(host="localhost", port=5432, dbname="my_database", user="my_username", password="my_password")# 创建游标cur = conn.cursor()# 创建表cur.execute("CREATE TABLE my_table (id SERIAL PRIMARY KEY, name VARCHAR(50), age INTEGER)")# 插入数据cur.execute("INSERT INTO my_table (name, age) VALUES (%s, %s)", ("John", 30))# 查询数据cur.execute("SELECT * FROM my_table WHERE name = %s", ("John",))rows = cur.fetchall()for row in rows:print(row)# 提交事务conn.commit()# 关闭游标和连接cur.close()conn.close()

在上面的代码中,将 localhost替换为 PostgreSQL 服务器的地址,将 5432替换为 PostgreSQL 服务器的端口号,将 my_databasemy_usernamemy_password替换为要连接的数据库、用户名和密码。

接下来,使用 psycopg2.connect()方法建立连接,该方法需要指定连接参数。在连接参数中,使用 PostgreSQL 服务器的地址、端口号、数据库、用户名和密码。

然后,使用 conn.cursor()方法创建游标,使用 cur.execute()方法执行 SQL 命令。在上面的示例中,使用 CREATE TABLE命令创建表,使用 INSERT INTO命令插入数据,使用 SELECT命令查询数据。

最后,使用 conn.commit()方法提交事务,使用 cur.close()conn.close()方法关闭游标和连接。

除了以上的连接方法外,还可以使用 SQLAlchemy 模块来连接 PostgreSQL 数据库。SQLAlchemy 是一个 Python 的 SQL 工具和 ORM(对象关系映射)框架,支持多种数据库,包括 PostgreSQL。以下是一个使用 SQLAlchemy 的示例:

from sqlalchemy import create_engine, Column, Integer, Stringfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker# 建立连接engine = create_engine("postgresql://my_username:my_password@localhost:5432/my_database")# 创建 SessionSession = sessionmaker(bind=engine)session = Session()# 定义模型Base = declarative_base()class MyTable(Base):__tablename__ = "my_table"id = Column(Integer, primary_key=True)name = Column(String(50))age = Column(Integer)# 创建表Base.metadata.create_all(engine)# 插入数据my_data = MyTable(name="John", age=30)session.add(my_data)session.commit()# 查询数据rows = session.query(MyTable).filter_by(name="John").all()for row in rows:print(row.id, row.name, row.age)# 关闭 Sessionsession.close()

在上面的代码中,将 my_usernamemy_password替换为要连接的用户名和密码,将 localhost替换为 PostgreSQL 服务器的地址,将 5432替换为 PostgreSQL 服务器的端口号,将 my_database替换为要连接的数据库。

接下来,使用 create_engine()方法建立连接,该方法需要指定连接字符串。在连接字符串中,使用 PostgreSQL 服务器的地址、端口号、用户名、密码和数据库。

然后,使用 sessionmaker()方法创建 Session,使用 Session()方法创建一个 Session 对象。这里使用 SQLAlchemy 的 ORM 功能,定义一个模型 MyTable,该模型对应一个名为 my_table的表。使用 Base.metadata.create_all()方法创建表。

接下来,使用 session.add()方法插入数据,使用 session.query()方法查询数据。在上面的示例中,使用 filter_by()方法过滤数据,使用 all()方法获取所有结果。

最后,使用 session.close()方法关闭 Session。

使用 SQLAlchemy 连接 PostgreSQL 数据库可以提供更方便的 ORM 功能,并且可以支持更多的数据库。

Python 中连接 Oracle

要在 Python 中连接 Oracle 数据库,可以使用 cx_Oracle 模块。以下是一个简单的示例:

首先,需要安装 cx_Oracle 模块。可以使用 pip 命令进行安装:

pip install cx_Oracle

然后,可以使用以下代码连接到 Oracle 数据库:

import cx_Oracle# 建立连接conn = cx_Oracle.connect("my_username/my_password@my_host:my_port/my_service_name")# 创建游标cur = conn.cursor()# 创建表cur.execute("CREATE TABLE my_table (id NUMBER(10), name VARCHAR2(50), age NUMBER(3))")# 插入数据cur.execute("INSERT INTO my_table (id, name, age) VALUES (:1, :2, :3)", (1, "John", 30))# 查询数据cur.execute("SELECT * FROM my_table WHERE name = :1", ("John",))rows = cur.fetchall()for row in rows:print(row)# 提交事务conn.commit()# 关闭游标和连接cur.close()conn.close()

在上面的代码中,将 my_usernamemy_passwordmy_hostmy_portmy_service_name替换为要连接的用户名、密码、主机名、端口号和服务名。

接下来,使用 cx_Oracle.connect()方法建立连接,该方法需要指定连接字符串。在连接字符串中,使用 Oracle 数据库的用户名、密码、主机名、端口号和服务名。

然后,使用 conn.cursor()方法创建游标,使用 cur.execute()方法执行 SQL 命令。在上面的示例中,使用 CREATE TABLE命令创建表,使用 INSERT INTO命令插入数据,使用 SELECT命令查询数据。

最后,使用 conn.commit()方法提交事务,使用 cur.close()conn.close()方法关闭游标和连接。

除了以上的连接方法外,还可以使用 SQLAlchemy 模块来连接 Oracle 数据库。SQLAlchemy 是一个 Python 的 SQL 工具和 ORM(对象关系映射)框架,支持多种数据库,包括 Oracle。以下是一个使用 SQLAlchemy 的示例:

from sqlalchemy import create_engine, Column, Integer, Stringfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker# 建立连接engine = create_engine("oracle+cx_oracle://my_username:my_password@my_host:my_port/my_service_name")# 创建 SessionSession = sessionmaker(bind=engine)session = Session()# 定义模型Base = declarative_base()class MyTable(Base):__tablename__ = "my_table"id = Column(Integer, primary_key=True)name = Column(String(50))age = Column(Integer)# 创建表Base.metadata.create_all(engine)# 插入数据my_data = MyTable(id=1, name="John", age=30)session.add(my_data)session.commit()# 查询数据rows = session.query(MyTable).filter_by(name="John").all()for row in rows:print(row.id, row.name, row.age)# 关闭 Sessionsession.close()

在上面的代码中,将 my_usernamemy_passwordmy_hostmy_portmy_service_name替换为要连接的用户名、密码、主机名、端口号和服务名。

接下来,使用 create_engine()方法建立连接,该方法需要指定连接字符串。在连接字符串中,使用 Oracle 数据库的用户名、密码、主机名、端口号和服务名。

然后,使用 sessionmaker()方法创建 Session,使用 Session()方法创建一个 Session 对象。这里使用 SQLAlchemy 的 ORM 功能,定义一个模型 MyTable,该模型对应一个名为 my_table的表。

使用 Base.metadata.create_all()方法创建表。在上面的示例中,使用 add()方法插入数据,使用 query()方法查询数据。

最后,使用 session.close()方法关闭 Session。

Python 中连接 RabbitMQ

要在 Python 中连接 RabbitMQ,可以使用 pika 模块。以下是一个简单的示例:

首先,需要安装 pika 模块。可以使用 pip 命令进行安装:

pip install pika

然后,可以使用以下代码连接到 RabbitMQ:

import pika# 建立连接credentials = pika.PlainCredentials("my_username", "my_password")connection = pika.BlockingConnection(pika.ConnectionParameters("my_host", 5672, "/", credentials))channel = connection.channel()# 声明队列channel.queue_declare(queue="my_queue")# 发送消息channel.basic_publish(exchange="", routing_key="my_queue", body="Hello, RabbitMQ!")# 接收消息def callback(ch, method, properties, body):print("Received message:", body)channel.basic_consume(queue="my_queue", on_message_callback=callback, auto_ack=True)channel.start_consuming()# 关闭连接channel.close()connection.close()

在上面的代码中,将 my_usernamemy_passwordmy_host替换为要连接的用户名、密码和主机名。

接下来,使用 pika.BlockingConnection()方法建立连接,该方法需要指定连接参数。在连接参数中,使用 RabbitMQ 的主机名、端口号、虚拟主机名和凭证。

然后,使用 channel.queue_declare()方法声明队列,使用 channel.basic_publish()方法发送消息。

使用 channel.basic_consume()方法接收消息,需要定义一个回调函数 callback(),接收消息时会调用该函数。在上面的示例中,定义的回调函数将收到的消息打印出来。

最后,使用 channel.close()connection.close()方法关闭连接。

除了以上的连接方法外,还可以使用 kombu 模块来连接 RabbitMQ。kombu 是一个 Python 的消息传递库,支持多种消息传递协议,包括 RabbitMQ。以下是一个使用 kombu 的示例:

from kombu import Connection, Exchange, Queue# 建立连接conn = Connection("amqp://my_username:my_password@my_host:5672//")channel = conn.channel()# 定义 Exchange 和 Queueexchange = Exchange("my_exchange", type="direct")queue = Queue("my_queue", exchange, routing_key="my_queue")# 发送消息producer = conn.Producer(serializer="json")producer.publish({"message": "Hello, RabbitMQ!"}, exchange=exchange, routing_key="my_queue")# 接收消息def callback(body, message):print("Received message:", body)message.ack()with conn.Consumer(queue, callbacks=[callback]) as consumer:while True:conn.drain_events()# 关闭连接channel.close()conn.close()

在上面的代码中,将 my_usernamemy_passwordmy_host替换为要连接的用户名、密码和主机名。

接下来,使用 Connection()方法建立连接,该方法需要指定连接字符串。在连接字符串中,使用 RabbitMQ 的用户名、密码、主机名和端口号。

然后,使用 Exchange()Queue()方法定义 Exchange 和 Queue,使用 Producer()方法发送消息。

使用 Consumer()方法接收消息,需要定义一个回调函数 callback(),接收到消息时会调用该函数。在上面的示例中,定义的回调函数将收到的消息打印出来,并使用 ack()方法确认消息已接收。

最后,使用 channel.close()conn.close()方法关闭连接。

来源:Python 中连接 MSSQL/MySQL/SQLite/Redis/ElasticSearch/Mongodb/PostgreSQL/Oracle/RabbitMQhttps://www.iwmyx.cn/pythonconndb.html

关键词: