9.3 异步连接

安装 Connector/Python 也会安装 mysql.connector.aio 包,该包将 asyncio 与连接器集成,以允许将异步 MySQL 交互与应用程序集成。

以下是集成 mysql.connector.aio 功能的代码示例

基本用法

from mysql.connector.aio import connect

# Connect to a MySQL server and get a cursor
cnx = await connect(user="myuser", password="mypass")
cur = await cnx.cursor()

# Execute a non-blocking query
await cur.execute("SELECT version()")

# Retrieve the results of the query asynchronously
results = await cur.fetchall()
print(results)

# Close cursor and connection
await cur.close()
await cnx.close()

与上下文管理器一起使用

from mysql.connector.aio import connect

# Connect to a MySQL server and get a cursor
async with await connect(user="myuser", password="mypass") as cnx:
    async with await cnx.cursor() as cur:
        # Execute a non-blocking query
        await cur.execute("SELECT version()")

        # Retrieve the results of the query asynchronously
        results = await cur.fetchall()
        print(results)

异步运行多个任务

此示例展示了如何异步运行任务以及 to_thread 的用法,它是异步运行阻塞函数的支柱

注意

此示例的同步版本实现了协程而不是遵循常见的同步方法;这是为了明确地说明仅仅等待协程并不能使代码异步运行。必须使用 asyncio API 中包含的函数才能实现异步性。

import asyncio
import os
import time

from mysql.connector.aio import connect

# Global variable which will help to format the job sequence output.
# DISCLAIMER: this is an example for showcasing/demo purposes,
# you should avoid global variables usage for production code.
global indent
indent = 0

# MySQL Connection arguments
config = {
    "host": "127.0.0.1",
    "user": "root",
    "password": os.environ.get("MYPASS", ":("),
    "use_pure": True,
    "port": 3306,
}


async def job_sleep(n):
    """Take a nap for n seconds.

    This job represents any generic task - it may be or not an IO task.
    """
    # Increment indent
    global indent
    offset = "\t" * indent
    indent += 1

    # Emulating a generic job/task
    print(f"{offset}START_SLEEP")
    await asyncio.sleep(n)
    print(f"{offset}END_SLEEP")

    return f"I slept for {n} seconds"


async def job_mysql():
    """Connect to a MySQL Server and do some operations.

    Run queries, run procedures, insert data, etc.
    """
    # Increment indent
    global indent
    offset = "\t" * indent
    indent += 1

    # MySQL operations
    print(f"{offset}START_MYSQL_OPS")
    async with await connect(**config) as cnx:
        async with await cnx.cursor() as cur:
            await cur.execute("SELECT @@version")
            res = await cur.fetchone()
            time.sleep(1)  # for simulating that the fetch isn't immediate
    print(f"{offset}END_MYSQL_OPS")

    # return server version
    return res


async def job_io():
    """Emulate an IO operation.

    `to_thread` allows to run a blocking function asynchronously.

    References:
        [asyncio.to_thread]: https://docs.pythonlang.cn/3/library/asyncio-task.html#asyncio.to_thread
    """

    # Emulating a native blocking IO procedure
    def io():
        """Blocking IO operation."""
        time.sleep(5)

    # Increment indent
    global indent
    offset = "\t" * indent
    indent += 1

    # Showcasing how a native blocking IO procedure can be awaited,
    print(f"{offset}START_IO")
    await asyncio.to_thread(io)
    print(f"{offset}END_IO")

    return "I am an IO operation"


async def main_asynchronous():
    """Running tasks asynchronously.

    References:
        [asyncio.gather]: https://docs.pythonlang.cn/3/library/asyncio-task.html#asyncio.gather
    """
    print("-------------------- ASYNCHRONOUS --------------------")

    # reset indent
    global indent
    indent = 0

    clock = time.time()

    # `asyncio.gather()` allows to run awaitable objects
    # in the aws sequence asynchronously.\

    # If all awaitables are completed successfully,
    # the result is an aggregate list of returned values.
    aws = (job_io(), job_mysql(), job_sleep(4))
    returned_vals = await asyncio.gather(*aws)

    print(f"Elapsed time: {time.time() - clock:0.2f}")

    # The order of result values corresponds to the
    # order of awaitables in aws.
    print(returned_vals, end="\n" * 2)

    # Example expected output
    # -------------------- ASYNCHRONOUS --------------------
    # START_IO
    #         START_MYSQL_OPS
    #                 START_SLEEP
    #         END_MYSQL_OPS
    #                 END_SLEEP
    # END_IO
    # Elapsed time: 5.01
    # ['I am an IO operation', ('8.3.0-commercial',), 'I slept for 4 seconds']


async def main_non_asynchronous():
    """Running tasks non-asynchronously"""
    print("------------------- NON-ASYNCHRONOUS -------------------")

    # reset indent
    global indent
    indent = 0

    clock = time.time()

    # Sequence of awaitable objects
    aws = (job_io(), job_mysql(), job_sleep(4))

    # The line below this docstring is the short version of:
    #     coro1, coro2, coro3 = *aws
    #     res1 = await coro1
    #     res2 = await coro2
    #     res3 = await coro3
    #     returned_vals = [res1, res2, res3]
    # NOTE: Simply awaiting a coro does not make the code run asynchronously!
    returned_vals = [await coro for coro in aws]  # this will run synchronously

    print(f"Elapsed time: {time.time() - clock:0.2f}")

    print(returned_vals, end="\n")

    # Example expected output
    # ------------------- NON-ASYNCHRONOUS -------------------
    # START_IO
    # END_IO
    #         START_MYSQL_OPS
    #         END_MYSQL_OPS
    #                 START_SLEEP
    #                 END_SLEEP
    # Elapsed time: 10.07
    # ['I am an IO operation', ('8.3.0-commercial',), 'I slept for 4 seconds']


if __name__ == "__main__":
    # `asyncio.run()`` allows to execute a coroutine (`coro`) and return the result.
    # You cannot run a coro without it.

    # References:
    #     [asyncio.run]: https://docs.pythonlang.cn/3/library/asyncio-runner.html#asyncio.run
    assert asyncio.run(main_asynchronous()) == asyncio.run(main_non_asynchronous())

它显示了这三个作业异步运行

  • job_io: 模拟 I/O 操作;使用 to_thread 允许异步运行阻塞函数。

    首先开始,需要五秒钟才能完成,因此是最后一个完成的作业。

  • job_mysql: 连接到 MySQL 服务器以执行操作,例如查询和存储过程。

    第二个开始,需要一秒钟才能完成,因此是第一个完成的作业。

  • job_sleep: 休眠 n 秒以表示一个通用任务。

    最后开始,需要四秒钟才能完成,因此是第二个完成的作业。

注意

没有在 indent 变量中添加锁/互斥锁,因为没有使用多线程;而是唯一的活动线程执行所有作业。异步执行是指在等待 I/O 操作结果时完成其他作业。

异步 MySQL 查询

这是一个类似的示例,使用 MySQL 查询而不是通用作业。

注意

虽然这些示例中没有使用游标,但这些原则和工作流程可以应用于游标,方法是让每个连接对象创建一个游标以从中操作。

创建和填充数百个表的同步代码

import os
import time
from typing import TYPE_CHECKING, Callable, List, Tuple

from mysql.connector import connect

if TYPE_CHECKING:
    from mysql.connector.abstracts import (
        MySQLConnectionAbstract,
    )


# MySQL Connection arguments
config = {
    "host": "127.0.0.1",
    "user": "root",
    "password": os.environ.get("MYPASS", ":("),
    "use_pure": True,
    "port": 3306,
}

exec_sequence = []


def create_table(
    exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
    """Creates a table."""
    if i >= len(table_names):
        return False

    exec_seq.append(f"start_{i}")
    stmt = f"""
    CREATE TABLE IF NOT EXISTS {table_names[i]} (
        dish_id INT(11) UNSIGNED AUTO_INCREMENT UNIQUE KEY,
        category TEXT,
        dish_name TEXT,
        price FLOAT,
        servings INT,
        order_time TIME
    )
    """
    cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
    cnx.cmd_query(stmt)
    exec_seq.append(f"end_{i}")
    return True


def drop_table(
    exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
    """Drops a table."""
    if i >= len(table_names):
        return False

    exec_seq.append(f"start_{i}")
    cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
    exec_seq.append(f"end_{i}")
    return True


def main(
    kernel: Callable[[List[str], List[str], "MySQLConnectionAbstract", int], None],
    table_names: List[str],
) -> Tuple[List, List]:

    exec_seq = []
    database_name = "TABLE_CREATOR"

    with connect(**config) as cnx:
        # Create/Setup database
        cnx.cmd_query(f"CREATE DATABASE IF NOT EXISTS {database_name}")
        cnx.cmd_query(f"USE {database_name}")

        # Execute Kernel: Create or Delete tables
        for i in range(len(table_names)):
            kernel(exec_seq, table_names, cnx, i)

        # Show tables
        cnx.cmd_query("SHOW tables")
        show_tables = cnx.get_rows()[0]

    # Return execution sequence and table names retrieved with `SHOW tables;`.
    return exec_seq, show_tables


if __name__ == "__main__":
    # with num_tables=511 -> Elapsed time ~ 25.86
    clock = time.time()
    print_exec_seq = False
    num_tables = 511
    table_names = [f"table_sync_{n}" for n in range(num_tables)]

    print("-------------------- SYNC CREATOR --------------------")
    exec_seq, show_tables = main(kernel=create_table, table_names=table_names)
    assert len(show_tables) == num_tables
    if print_exec_seq:
        print(exec_seq)

    print("-------------------- SYNC DROPPER --------------------")
    exec_seq, show_tables = main(kernel=drop_table, table_names=table_names)
    assert len(show_tables) == 0
    if print_exec_seq:
        print(exec_seq)

    print(f"Elapsed time: {time.time() - clock:0.2f}")

    # Expected output with num_tables = 11:
    # -------------------- SYNC CREATOR --------------------
    # [
    #     "start_0",
    #     "end_0",
    #     "start_1",
    #     "end_1",
    #     "start_2",
    #     "end_2",
    #     "start_3",
    #     "end_3",
    #     "start_4",
    #     "end_4",
    #     "start_5",
    #     "end_5",
    #     "start_6",
    #     "end_6",
    #     "start_7",
    #     "end_7",
    #     "start_8",
    #     "end_8",
    #     "start_9",
    #     "end_9",
    #     "start_10",
    #     "end_10",
    # ]
    # -------------------- SYNC DROPPER --------------------
    # [
    #     "start_0",
    #     "end_0",
    #     "start_1",
    #     "end_1",
    #     "start_2",
    #     "end_2",
    #     "start_3",
    #     "end_3",
    #     "start_4",
    #     "end_4",
    #     "start_5",
    #     "end_5",
    #     "start_6",
    #     "end_6",
    #     "start_7",
    #     "end_7",
    #     "start_8",
    #     "end_8",
    #     "start_9",
    #     "end_9",
    #     "start_10",
    #     "end_10",
    # ]

该脚本创建和删除 {num_tables} 个表,并且完全是顺序的,因为它在移至 table_{i+1} 之前创建和删除 table_{i}。

相同任务的异步代码示例

import asyncio
import os
import time
from typing import TYPE_CHECKING, Callable, List, Tuple

from mysql.connector.aio import connect

if TYPE_CHECKING:
    from mysql.connector.aio.abstracts import (
        MySQLConnectionAbstract,
    )


# MySQL Connection arguments
config = {
    "host": "127.0.0.1",
    "user": "root",
    "password": os.environ.get("MYPASS", ":("),
    "use_pure": True,
    "port": 3306,
}

exec_sequence = []


async def create_table(
    exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
    """Creates a table."""
    if i >= len(table_names):
        return False

    exec_seq.append(f"start_{i}")
    stmt = f"""
    CREATE TABLE IF NOT EXISTS {table_names[i]} (
        dish_id INT(11) UNSIGNED AUTO_INCREMENT UNIQUE KEY,
        category TEXT,
        dish_name TEXT,
        price FLOAT,
        servings INT,
        order_time TIME
    )
    """
    await cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
    await cnx.cmd_query(stmt)
    exec_seq.append(f"end_{i}")
    return True


async def drop_table(
    exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
    """Drops a table."""
    if i >= len(table_names):
        return False

    exec_seq.append(f"start_{i}")
    await cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
    exec_seq.append(f"end_{i}")
    return True


async def main_async(
    kernel: Callable[[List[str], List[str], "MySQLConnectionAbstract", int], None],
    table_names: List[str],
    num_jobs: int = 2,
) -> Tuple[List, List]:
    """The asynchronous tables creator...
    Reference:
        [as_completed]: https://docs.pythonlang.cn/3/library/asyncio-task.html#asyncio.as_completed
    """
    exec_seq = []
    database_name = "TABLE_CREATOR"

    # Create/Setup database
    # ---------------------
    # No asynchronous execution is done here.
    # NOTE: observe usage WITH context manager.
    async with await connect(**config) as cnx:
        await cnx.cmd_query(f"CREATE DATABASE IF NOT EXISTS {database_name}")
        await cnx.cmd_query(f"USE {database_name}")
    config["database"] = database_name

    # Open connections
    # ----------------
    # `as_completed` allows to run awaitable objects in the `aws` iterable asynchronously.
    # NOTE: observe usage WITHOUT context manager.
    aws = [connect(**config) for _ in range(num_jobs)]
    cnxs: List["MySQLConnectionAbstract"] = [
        await coro for coro in asyncio.as_completed(aws)
    ]

    # Execute Kernel: Create or Delete tables
    # -------------
    # N tables must be created/deleted and we can run up to `num_jobs` jobs asynchronously,
    # therefore we execute jobs in batches of size num_jobs`.
    returned_values, i = [True], 0
    while any(returned_values):  # Keep running until i >= len(table_names) for all jobs
        # Prepare coros: map connections/cursors and table-name IDs to jobs.
        aws = [
            kernel(exec_seq, table_names, cnx, i + idx) for idx, cnx in enumerate(cnxs)
        ]
        # When i >= len(table_names) coro simply returns False, else True.
        returned_values = [await coro for coro in asyncio.as_completed(aws)]
        # Update table-name ID offset based on the number of jobs
        i += num_jobs

    # Close cursors
    # -------------
    # `as_completed` allows to run awaitable objects in the `aws` iterable asynchronously.
    for coro in asyncio.as_completed([cnx.close() for cnx in cnxs]):
        await coro

    # Load table names
    # ----------------
    # No asynchronous execution is done here.
    async with await connect(**config) as cnx:
        # Show tables
        await cnx.cmd_query("SHOW tables")
        show_tables = (await cnx.get_rows())[0]

    # Return execution sequence and table names retrieved with `SHOW tables;`.
    return exec_seq, show_tables


if __name__ == "__main__":
    # `asyncio.run()`` allows to execute a coroutine (`coro`) and return the result.
    # You cannot run a coro without it.

    # References:
    #     [asyncio.run]: https://docs.pythonlang.cn/3/library/asyncio-runner.html#asyncio.run

    # with num_tables=511 and num_jobs=3 -> Elapsed time ~ 19.09
    # with num_tables=511 and num_jobs=12 -> Elapsed time ~ 13.15
    clock = time.time()
    print_exec_seq = False
    num_tables = 511
    num_jobs = 12
    table_names = [f"table_async_{n}" for n in range(num_tables)]

    print("-------------------- ASYNC CREATOR --------------------")
    exec_seq, show_tables = asyncio.run(
        main_async(kernel=create_table, table_names=table_names, num_jobs=num_jobs)
    )
    assert len(show_tables) == num_tables
    if print_exec_seq:
        print(exec_seq)

    print("-------------------- ASYNC DROPPER --------------------")
    exec_seq, show_tables = asyncio.run(
        main_async(kernel=drop_table, table_names=table_names, num_jobs=num_jobs)
    )
    assert len(show_tables) == 0
    if print_exec_seq:
        print(exec_seq)

    print(f"Elapsed time: {time.time() - clock:0.2f}")

    # Expected output with num_tables = 11 and num_jobs = 3:
    # -------------------- ASYNC CREATOR --------------------
    # 11
    # [
    #     "start_2",
    #     "start_1",
    #     "start_0",
    #     "end_2",
    #     "end_0",
    #     "end_1",
    #     "start_5",
    #     "start_3",
    #     "start_4",
    #     "end_3",
    #     "end_5",
    #     "end_4",
    #     "start_8",
    #     "start_7",
    #     "start_6",
    #     "end_7",
    #     "end_8",
    #     "end_6",
    #     "start_10",
    #     "start_9",
    #     "end_9",
    #     "end_10",
    # ]
    # -------------------- ASYNC DROPPER --------------------
    # [
    #     "start_1",
    #     "start_2",
    #     "start_0",
    #     "end_1",
    #     "end_2",
    #     "end_0",
    #     "start_3",
    #     "start_5",
    #     "start_4",
    #     "end_4",
    #     "end_5",
    #     "end_3",
    #     "start_6",
    #     "start_8",
    #     "start_7",
    #     "end_7",
    #     "end_6",
    #     "end_8",
    #     "start_10",
    #     "start_9",
    #     "end_9",
    #     "end_10",
    # ]

此输出显示了作业流程不是顺序的,因为最多可以异步执行 {num_jobs}。这些作业以 {num_jobs} 的批处理方式运行,并在所有作业终止之前等待,然后启动下一批,循环在没有要创建的表时结束。

这些示例的性能比较:异步实现使用 3 个作业时速度大约快 26%,使用 12 个作业时速度快 49%。请注意,增加作业数量确实会增加作业管理开销,这在某个时候会抵消初始的提速。最佳作业数量取决于问题,并且是通过经验确定的值。

如演示的那样,异步版本比非异步版本需要更多代码才能运行。值得付出努力吗?这取决于目标,因为异步代码可以更好地优化性能,例如 CPU 使用率,而编写标准同步代码则更简单。

有关 asyncio 模块的更多信息,请参阅官方的 异步 I/O Python 文档