安装 Connector/Python 也会安装 mysql.connector.aio
包,该包将 asyncio 与连接器集成,以允许将异步 MySQL 交互与应用程序集成。
以下是集成 mysql.connector.aio
功能的代码示例
基本用法
from mysql.connector.aio import connect
# Connect to a MySQL server and get a cursor
cnx = await connect(user="myuser", password="mypass")
cur = await cnx.cursor()
# Execute a non-blocking query
await cur.execute("SELECT version()")
# Retrieve the results of the query asynchronously
results = await cur.fetchall()
print(results)
# Close cursor and connection
await cur.close()
await cnx.close()
与上下文管理器一起使用
from mysql.connector.aio import connect
# Connect to a MySQL server and get a cursor
async with await connect(user="myuser", password="mypass") as cnx:
async with await cnx.cursor() as cur:
# Execute a non-blocking query
await cur.execute("SELECT version()")
# Retrieve the results of the query asynchronously
results = await cur.fetchall()
print(results)
异步运行多个任务
此示例展示了如何异步运行任务以及 to_thread 的用法,它是异步运行阻塞函数的支柱
此示例的同步版本实现了协程而不是遵循常见的同步方法;这是为了明确地说明仅仅等待协程并不能使代码异步运行。必须使用 asyncio API 中包含的函数才能实现异步性。
import asyncio
import os
import time
from mysql.connector.aio import connect
# Global variable which will help to format the job sequence output.
# DISCLAIMER: this is an example for showcasing/demo purposes,
# you should avoid global variables usage for production code.
global indent
indent = 0
# MySQL Connection arguments
config = {
"host": "127.0.0.1",
"user": "root",
"password": os.environ.get("MYPASS", ":("),
"use_pure": True,
"port": 3306,
}
async def job_sleep(n):
"""Take a nap for n seconds.
This job represents any generic task - it may be or not an IO task.
"""
# Increment indent
global indent
offset = "\t" * indent
indent += 1
# Emulating a generic job/task
print(f"{offset}START_SLEEP")
await asyncio.sleep(n)
print(f"{offset}END_SLEEP")
return f"I slept for {n} seconds"
async def job_mysql():
"""Connect to a MySQL Server and do some operations.
Run queries, run procedures, insert data, etc.
"""
# Increment indent
global indent
offset = "\t" * indent
indent += 1
# MySQL operations
print(f"{offset}START_MYSQL_OPS")
async with await connect(**config) as cnx:
async with await cnx.cursor() as cur:
await cur.execute("SELECT @@version")
res = await cur.fetchone()
time.sleep(1) # for simulating that the fetch isn't immediate
print(f"{offset}END_MYSQL_OPS")
# return server version
return res
async def job_io():
"""Emulate an IO operation.
`to_thread` allows to run a blocking function asynchronously.
References:
[asyncio.to_thread]: https://docs.pythonlang.cn/3/library/asyncio-task.html#asyncio.to_thread
"""
# Emulating a native blocking IO procedure
def io():
"""Blocking IO operation."""
time.sleep(5)
# Increment indent
global indent
offset = "\t" * indent
indent += 1
# Showcasing how a native blocking IO procedure can be awaited,
print(f"{offset}START_IO")
await asyncio.to_thread(io)
print(f"{offset}END_IO")
return "I am an IO operation"
async def main_asynchronous():
"""Running tasks asynchronously.
References:
[asyncio.gather]: https://docs.pythonlang.cn/3/library/asyncio-task.html#asyncio.gather
"""
print("-------------------- ASYNCHRONOUS --------------------")
# reset indent
global indent
indent = 0
clock = time.time()
# `asyncio.gather()` allows to run awaitable objects
# in the aws sequence asynchronously.\
# If all awaitables are completed successfully,
# the result is an aggregate list of returned values.
aws = (job_io(), job_mysql(), job_sleep(4))
returned_vals = await asyncio.gather(*aws)
print(f"Elapsed time: {time.time() - clock:0.2f}")
# The order of result values corresponds to the
# order of awaitables in aws.
print(returned_vals, end="\n" * 2)
# Example expected output
# -------------------- ASYNCHRONOUS --------------------
# START_IO
# START_MYSQL_OPS
# START_SLEEP
# END_MYSQL_OPS
# END_SLEEP
# END_IO
# Elapsed time: 5.01
# ['I am an IO operation', ('8.3.0-commercial',), 'I slept for 4 seconds']
async def main_non_asynchronous():
"""Running tasks non-asynchronously"""
print("------------------- NON-ASYNCHRONOUS -------------------")
# reset indent
global indent
indent = 0
clock = time.time()
# Sequence of awaitable objects
aws = (job_io(), job_mysql(), job_sleep(4))
# The line below this docstring is the short version of:
# coro1, coro2, coro3 = *aws
# res1 = await coro1
# res2 = await coro2
# res3 = await coro3
# returned_vals = [res1, res2, res3]
# NOTE: Simply awaiting a coro does not make the code run asynchronously!
returned_vals = [await coro for coro in aws] # this will run synchronously
print(f"Elapsed time: {time.time() - clock:0.2f}")
print(returned_vals, end="\n")
# Example expected output
# ------------------- NON-ASYNCHRONOUS -------------------
# START_IO
# END_IO
# START_MYSQL_OPS
# END_MYSQL_OPS
# START_SLEEP
# END_SLEEP
# Elapsed time: 10.07
# ['I am an IO operation', ('8.3.0-commercial',), 'I slept for 4 seconds']
if __name__ == "__main__":
# `asyncio.run()`` allows to execute a coroutine (`coro`) and return the result.
# You cannot run a coro without it.
# References:
# [asyncio.run]: https://docs.pythonlang.cn/3/library/asyncio-runner.html#asyncio.run
assert asyncio.run(main_asynchronous()) == asyncio.run(main_non_asynchronous())
它显示了这三个作业异步运行
-
job_io
: 模拟 I/O 操作;使用 to_thread 允许异步运行阻塞函数。首先开始,需要五秒钟才能完成,因此是最后一个完成的作业。
-
job_mysql
: 连接到 MySQL 服务器以执行操作,例如查询和存储过程。第二个开始,需要一秒钟才能完成,因此是第一个完成的作业。
-
job_sleep
: 休眠 n 秒以表示一个通用任务。最后开始,需要四秒钟才能完成,因此是第二个完成的作业。
没有在 indent
变量中添加锁/互斥锁,因为没有使用多线程;而是唯一的活动线程执行所有作业。异步执行是指在等待 I/O 操作结果时完成其他作业。
异步 MySQL 查询
这是一个类似的示例,使用 MySQL 查询而不是通用作业。
虽然这些示例中没有使用游标,但这些原则和工作流程可以应用于游标,方法是让每个连接对象创建一个游标以从中操作。
创建和填充数百个表的同步代码
import os
import time
from typing import TYPE_CHECKING, Callable, List, Tuple
from mysql.connector import connect
if TYPE_CHECKING:
from mysql.connector.abstracts import (
MySQLConnectionAbstract,
)
# MySQL Connection arguments
config = {
"host": "127.0.0.1",
"user": "root",
"password": os.environ.get("MYPASS", ":("),
"use_pure": True,
"port": 3306,
}
exec_sequence = []
def create_table(
exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
"""Creates a table."""
if i >= len(table_names):
return False
exec_seq.append(f"start_{i}")
stmt = f"""
CREATE TABLE IF NOT EXISTS {table_names[i]} (
dish_id INT(11) UNSIGNED AUTO_INCREMENT UNIQUE KEY,
category TEXT,
dish_name TEXT,
price FLOAT,
servings INT,
order_time TIME
)
"""
cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
cnx.cmd_query(stmt)
exec_seq.append(f"end_{i}")
return True
def drop_table(
exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
"""Drops a table."""
if i >= len(table_names):
return False
exec_seq.append(f"start_{i}")
cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
exec_seq.append(f"end_{i}")
return True
def main(
kernel: Callable[[List[str], List[str], "MySQLConnectionAbstract", int], None],
table_names: List[str],
) -> Tuple[List, List]:
exec_seq = []
database_name = "TABLE_CREATOR"
with connect(**config) as cnx:
# Create/Setup database
cnx.cmd_query(f"CREATE DATABASE IF NOT EXISTS {database_name}")
cnx.cmd_query(f"USE {database_name}")
# Execute Kernel: Create or Delete tables
for i in range(len(table_names)):
kernel(exec_seq, table_names, cnx, i)
# Show tables
cnx.cmd_query("SHOW tables")
show_tables = cnx.get_rows()[0]
# Return execution sequence and table names retrieved with `SHOW tables;`.
return exec_seq, show_tables
if __name__ == "__main__":
# with num_tables=511 -> Elapsed time ~ 25.86
clock = time.time()
print_exec_seq = False
num_tables = 511
table_names = [f"table_sync_{n}" for n in range(num_tables)]
print("-------------------- SYNC CREATOR --------------------")
exec_seq, show_tables = main(kernel=create_table, table_names=table_names)
assert len(show_tables) == num_tables
if print_exec_seq:
print(exec_seq)
print("-------------------- SYNC DROPPER --------------------")
exec_seq, show_tables = main(kernel=drop_table, table_names=table_names)
assert len(show_tables) == 0
if print_exec_seq:
print(exec_seq)
print(f"Elapsed time: {time.time() - clock:0.2f}")
# Expected output with num_tables = 11:
# -------------------- SYNC CREATOR --------------------
# [
# "start_0",
# "end_0",
# "start_1",
# "end_1",
# "start_2",
# "end_2",
# "start_3",
# "end_3",
# "start_4",
# "end_4",
# "start_5",
# "end_5",
# "start_6",
# "end_6",
# "start_7",
# "end_7",
# "start_8",
# "end_8",
# "start_9",
# "end_9",
# "start_10",
# "end_10",
# ]
# -------------------- SYNC DROPPER --------------------
# [
# "start_0",
# "end_0",
# "start_1",
# "end_1",
# "start_2",
# "end_2",
# "start_3",
# "end_3",
# "start_4",
# "end_4",
# "start_5",
# "end_5",
# "start_6",
# "end_6",
# "start_7",
# "end_7",
# "start_8",
# "end_8",
# "start_9",
# "end_9",
# "start_10",
# "end_10",
# ]
该脚本创建和删除 {num_tables} 个表,并且完全是顺序的,因为它在移至 table_{i+1} 之前创建和删除 table_{i}。
相同任务的异步代码示例
import asyncio
import os
import time
from typing import TYPE_CHECKING, Callable, List, Tuple
from mysql.connector.aio import connect
if TYPE_CHECKING:
from mysql.connector.aio.abstracts import (
MySQLConnectionAbstract,
)
# MySQL Connection arguments
config = {
"host": "127.0.0.1",
"user": "root",
"password": os.environ.get("MYPASS", ":("),
"use_pure": True,
"port": 3306,
}
exec_sequence = []
async def create_table(
exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
"""Creates a table."""
if i >= len(table_names):
return False
exec_seq.append(f"start_{i}")
stmt = f"""
CREATE TABLE IF NOT EXISTS {table_names[i]} (
dish_id INT(11) UNSIGNED AUTO_INCREMENT UNIQUE KEY,
category TEXT,
dish_name TEXT,
price FLOAT,
servings INT,
order_time TIME
)
"""
await cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
await cnx.cmd_query(stmt)
exec_seq.append(f"end_{i}")
return True
async def drop_table(
exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
"""Drops a table."""
if i >= len(table_names):
return False
exec_seq.append(f"start_{i}")
await cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
exec_seq.append(f"end_{i}")
return True
async def main_async(
kernel: Callable[[List[str], List[str], "MySQLConnectionAbstract", int], None],
table_names: List[str],
num_jobs: int = 2,
) -> Tuple[List, List]:
"""The asynchronous tables creator...
Reference:
[as_completed]: https://docs.pythonlang.cn/3/library/asyncio-task.html#asyncio.as_completed
"""
exec_seq = []
database_name = "TABLE_CREATOR"
# Create/Setup database
# ---------------------
# No asynchronous execution is done here.
# NOTE: observe usage WITH context manager.
async with await connect(**config) as cnx:
await cnx.cmd_query(f"CREATE DATABASE IF NOT EXISTS {database_name}")
await cnx.cmd_query(f"USE {database_name}")
config["database"] = database_name
# Open connections
# ----------------
# `as_completed` allows to run awaitable objects in the `aws` iterable asynchronously.
# NOTE: observe usage WITHOUT context manager.
aws = [connect(**config) for _ in range(num_jobs)]
cnxs: List["MySQLConnectionAbstract"] = [
await coro for coro in asyncio.as_completed(aws)
]
# Execute Kernel: Create or Delete tables
# -------------
# N tables must be created/deleted and we can run up to `num_jobs` jobs asynchronously,
# therefore we execute jobs in batches of size num_jobs`.
returned_values, i = [True], 0
while any(returned_values): # Keep running until i >= len(table_names) for all jobs
# Prepare coros: map connections/cursors and table-name IDs to jobs.
aws = [
kernel(exec_seq, table_names, cnx, i + idx) for idx, cnx in enumerate(cnxs)
]
# When i >= len(table_names) coro simply returns False, else True.
returned_values = [await coro for coro in asyncio.as_completed(aws)]
# Update table-name ID offset based on the number of jobs
i += num_jobs
# Close cursors
# -------------
# `as_completed` allows to run awaitable objects in the `aws` iterable asynchronously.
for coro in asyncio.as_completed([cnx.close() for cnx in cnxs]):
await coro
# Load table names
# ----------------
# No asynchronous execution is done here.
async with await connect(**config) as cnx:
# Show tables
await cnx.cmd_query("SHOW tables")
show_tables = (await cnx.get_rows())[0]
# Return execution sequence and table names retrieved with `SHOW tables;`.
return exec_seq, show_tables
if __name__ == "__main__":
# `asyncio.run()`` allows to execute a coroutine (`coro`) and return the result.
# You cannot run a coro without it.
# References:
# [asyncio.run]: https://docs.pythonlang.cn/3/library/asyncio-runner.html#asyncio.run
# with num_tables=511 and num_jobs=3 -> Elapsed time ~ 19.09
# with num_tables=511 and num_jobs=12 -> Elapsed time ~ 13.15
clock = time.time()
print_exec_seq = False
num_tables = 511
num_jobs = 12
table_names = [f"table_async_{n}" for n in range(num_tables)]
print("-------------------- ASYNC CREATOR --------------------")
exec_seq, show_tables = asyncio.run(
main_async(kernel=create_table, table_names=table_names, num_jobs=num_jobs)
)
assert len(show_tables) == num_tables
if print_exec_seq:
print(exec_seq)
print("-------------------- ASYNC DROPPER --------------------")
exec_seq, show_tables = asyncio.run(
main_async(kernel=drop_table, table_names=table_names, num_jobs=num_jobs)
)
assert len(show_tables) == 0
if print_exec_seq:
print(exec_seq)
print(f"Elapsed time: {time.time() - clock:0.2f}")
# Expected output with num_tables = 11 and num_jobs = 3:
# -------------------- ASYNC CREATOR --------------------
# 11
# [
# "start_2",
# "start_1",
# "start_0",
# "end_2",
# "end_0",
# "end_1",
# "start_5",
# "start_3",
# "start_4",
# "end_3",
# "end_5",
# "end_4",
# "start_8",
# "start_7",
# "start_6",
# "end_7",
# "end_8",
# "end_6",
# "start_10",
# "start_9",
# "end_9",
# "end_10",
# ]
# -------------------- ASYNC DROPPER --------------------
# [
# "start_1",
# "start_2",
# "start_0",
# "end_1",
# "end_2",
# "end_0",
# "start_3",
# "start_5",
# "start_4",
# "end_4",
# "end_5",
# "end_3",
# "start_6",
# "start_8",
# "start_7",
# "end_7",
# "end_6",
# "end_8",
# "start_10",
# "start_9",
# "end_9",
# "end_10",
# ]
此输出显示了作业流程不是顺序的,因为最多可以异步执行 {num_jobs}。这些作业以 {num_jobs} 的批处理方式运行,并在所有作业终止之前等待,然后启动下一批,循环在没有要创建的表时结束。
这些示例的性能比较:异步实现使用 3 个作业时速度大约快 26%,使用 12 个作业时速度快 49%。请注意,增加作业数量确实会增加作业管理开销,这在某个时候会抵消初始的提速。最佳作业数量取决于问题,并且是通过经验确定的值。
如演示的那样,异步版本比非异步版本需要更多代码才能运行。值得付出努力吗?这取决于目标,因为异步代码可以更好地优化性能,例如 CPU 使用率,而编写标准同步代码则更简单。
有关 asyncio 模块的更多信息,请参阅官方的 异步 I/O Python 文档。