python 异步编程 第三章 aioMySQL

1.1 连接数据库

sql

create table `student`( id int unsigned not null auto_increment primary key, student_name varchar(512) null, student_age int null)

连接数据库

import asyncio, aioMySQLasync def main(): # 建立与数据库的连接 conn: aioMySQL.Connection = await aioMySQL.connect( host = '127.0.0.1', port = 3306, user = 'root', password = 'pwd', db='mydb' ) # 创建 cursor 对象,用于操作数据库 cur = aioMySQL.Cursor = await conn.cursor() # 执行一条 sql 语句,返回值是受影响的数据的条数 effected = await cur.execute( "INSERT INTO `student` (`student_name`, `student_age`) " "VALUES('张三', 10)" ) print(effected) # 将更改提交到数据库 await conn.commit() await cur.close() conn.close()if __name__ == '__main__': asyncio.run(main())

使用连接池

import asyncio, aioMySQLasync def main(): pool: aioMySQL.Pool = await aioMySQL.create_pool( minsize = 0, maxsize = 10, host = '127.0.0.1', port = 3306, user = 'root', password = 'pwd', db='mydb' ) # 通过连接池启用一个链接,如果当前连接池已满,则等待其他 # 连接被释放后再建立连接 conn: aioMySQL.Connection = await pool.acquire() cur: aioMySQL.Cursor = await conn.cursor() effected = await cur.execute( "INSERT INTO `student` (`student_name`, `student_age`) " "VALUES('张三', 10)" ) print(effected) # 将更改提交到数据库 await conn.commit() await cur.close() # 使用完毕后释放该链接 pool.release(conn) pool.close() await pool.wait_closed()if __name__ == '__main__': asyncio.run(main())

1.2 操作数据库

import asyncio, aioMySQLasync def print_all_data(cur): await cur.execute( "select * from student where id>0;" ) # 获取原始数据结果 raw_data = await cur.fetchall() # 计算字段个数 field_range = range(len(cur.description)) # 将字段名与原始数据结果映射生成对象数组 result = [ {cur.description[i][0]: row[i] for i in field_range} for row in raw_data ] print(result)async def main(): # 创建连接池 pool: aioMySQL.Pool = await aioMySQL.create_pool( minsize = 0, maxsize = 10, host = '127.0.0.1', port = 3306, user = 'root', password = 'pwd', db='mydb' ) conn: aioMySQL.Connection = await pool.acquire() # 启用连接 cur: aioMySQL.Cursor = await conn.cursor() # 创建一个 cursor 对象,用于操作数据库 await print_all_data(cur) await cur.execute( "update student set " "student_name = '李四' " "student_age = '22' " "where id = '1'" ) await print_all_data(cur) # await print_all_data(cur) # await cur.execute("delete from student where id = '1'") # await print_all_data(cur) await conn.commit() await cur.close() pool.release(conn) pool.close() await pool.wait_closed()if __name__ == '__main__': asyncio.run(main())

1.3 SQLAlchemy 异步

import asyncio, aioMySQL.sa, sqlalchemy, aioMySQL.sa.result# 声明数据库的结构student = sqlalchemy.Table( 'student', sqlalchemy.MetaData(), sqlalchemy.Column('id', sqlalchemy.Integer, primary_key = True), sqlalchemy.Column('student_name', sqlalchemy.String(255)), sqlalchemy.Column('student_age', sqlalchemy.Integer))async def print_all_data(conn): result: aioMySQL.sa.result.ResultProxy = await conn.execute( student.select() ) all_data = await result.fetchall() print(all_data)async def main(): engine: aioMySQL.sa.Engine = await aioMySQL.sa.create_engine( host = '127.0.0.1', port = 3306, user = 'root', password = 'pwd', db='mydb' ) conn: aioMySQL.sa.SAConnection = await engine.acquire() # 插入数据 await conn.execute(student.insert().values( student_name = '王五', student_age = 20 )) await print_all_data(conn) # 修改数据 await conn.execute(student.update().where(student.columns.id == 1).values(student_age=10)) await print_all_data(conn) # 删除数据 await conn.excute(student.delete().where(student.columns.id == 1)) await print_all_data(conn) engine.release(conn)if __name__ == '__main__': asyncio.run(main())

1.4 与 AIOHTTP 集成

- config.py- db.py- tables.py- server.py- static/index.js- tpls/index.html- tpls/edit.html- tpls/layout.html

config.py

import osSERVER_ROOT = os.path.dirname(__file__)# 所有静态文件目录STATIC_MAPPING = [ dict( web_path = "/node_modules", dir = os.path.join(SERVER_ROOT, "node_modules") ), dict( web_path = "/static", dir = os.path.join(SERVER_ROOT, "static") )]# 模板文件根目录TEMPLATE_ROOT = os.path.join(SERVER_ROOT, "tpls")DB_HOST = 'db'DB_PORT = 3306DB_NAME = 'mydb'DB_USER = 'root'DB_PASSWORD = 'pwd'

db.py

from aioMySQL.sa import create_engine, Engineimport configfile_scope_vars = {}async def get_engine(): # 获取数据库引擎单例 if "engine" not in file_scope_vars: file_scope_vars['engine'] = await create_engine( host = config.DB_HOST, port = config.DB_PORT, user = config.DB_USER, password = config.DB_PASSWORD, db = config.DB_NAME ) return file_scope_vars['engine']def with_db(fun): passasync def wrapper(req): db = await engine.acquire() try: result = await fun(req, db) engine.release(db) return result except Exception as e: engine.release(db) raise ereturn wrapper

tables.py

import sqlalchemystudent = sqlalchemy.Table( 'student', sqlalchemy.MetaData(), sqlalchemy.Column('id', sqlalchemy.Integer, primary_key = True), sqlalchemy.Column('student_name', sqlalchemy.String(255)), sqlalchemy.Column('student_age', sqlalchemy.Integer))

server.py

from aiohttp import webfrom db import with_dbfrom aioMySQL.sa import SAConnection, resultimport aiohttp_jinja2, jinja2, config, tablesroutes = web.RouteTableDef()@routes.get('/')@aiohttp_jinja2.template("index.html")@with_dbasync def index(req, db: SAConnection): # 查询所有学生并呈现出来 exec_result: result.ResultProxy = await db.execute( tables.student.select() ) data = await exec_result.fetchall() return dict(students=data, title="学生列表")@routes.get('/edit')@aiohttp_jinja2.template("edit.html")@with_dbasync def edit(req: web.Request, db:SAConnection): # 编辑页面,编辑和添加功能放在一起,如果页面没有出入 id,则把该页面当成添加学生信息页面对待,如果传入 id, 当成编辑学生信息页面对待 student_id = req.query.getone("id") if "id" in req.query else None student = None # 如果页面传入 student_id,则启用编辑,否则执行添加操作 if student_id: student_result: result.ResultProxy = await db.execute( tables.student.select().where(tables.student.columns.id == student_id) ) student = await student_result.fetchone() return dict(title="编辑", student=student)@routes.post('/edit')@with_dbasync def edit(req: web.Request, db: SAConnection): # 处理表单提交的页面,如果没有传入 id,则执行添加学生信息的操作,如果传入了 id,则根据 id 判断指定的学生是否存在,如果存在则更新该学生信息,如果不存在则添加学生,处理完成后跳转到首页 params = await req.post() student_name = params['student_name'] if "student_name" in params else None student_age = params['student_age'] if "student_age" in params else None student_id = params['student_id'] if "student_id" in params else None if not student_name or not student_age: return web.Response(text="Parameters error") if student_id: ret: result.ResultProxy = await db.execute( tables.student.select().where(tables.student.columns.id == student_id) ) if ret.rowcount: conn = await db.begin() await db.execute( tables.student.update() .where(table.student.columns.id == student_id) .values( student_name=student_name, student_age=student_age ) ) await conn.commit() raise web.HTTPFound('/') conn = await db.begin() await db.execute( tables.student.insert().values(student_name=student_name, student_age=student_age) ) await conn.commit() raise web.HTTPFound("/")@routes.get('/remove')@with_dbasync def remove(req: web.Request, db: SAConnection): student_id = req query.getone("id") if "id" in req.query else None if student_id: conn = await db.begin() await db.execute( tables.student.delete().where(tables.student.columns.id = student_id) ) await conn.commit() raise web.HTTPFound("/") else: return web.Response(text="Parameters error")if __name__ == '__main__': app = web.Application() # 配置模板文件根目录 aiohttp_jinja2.setup( app, loader = jinja2.FileSystemLoader(config.TEMPLATE_ROOT) ) app.add_routes(routes) # 配置静态文件根目录 for m in config.STATIC_MAPPING: app.router.add_static(m['web_path'], m['dir']) web.run_app(app, port=8000)

index.html

{% extends "layout.html" %}{% block body %}<div class="card" style="maring-top: 2rem"> <div class="card-header" style="postion: relative;"> <a href="/edit" style="position: absolute; right: 1rem; top:0;font-size:20pt"> + </a>学生列表 </div> <div> <table class="table table-striped" style="margin-bottom: 0"> <tr> <th>id</th> <th>姓名</th> <th>年龄</th> <th></th> </tr> {% for s in students %} <tr> <td>{{ s.id }}</td> <td>{{ s.student_name }}</td> <td>{{ s.student_age }}</td> <td style="width: 10rem; text-align: center;"> <a href="/edit?id={{ s.id }}" class="btn btn-primary btn-sm">编辑</a> <a href="/remove?id={{ s.id }}" class="btn-delete-item bt btn-danger bt-sm">删除</a> </td> </tr> {% endfor %} </table> </div></div><script src="/static/index.js"></script>{% endblock %}

index.js

(function(){ $(".btn-delete-item").click(function(e){ if(!confirm("你真的要删除这条数据吗?")){ e.preventDefault(); } });})();

edit.html

{% extends "layout.html" %}{% block body %}<form method="post" encrype="application/x-www-form-URLencoded">{% if student %} <input type="hidden" value="{{ student.id}}" name="student_id">{% endif %}<table class="table"> <tr> <td>姓名</td> <td> <input type="text" required class="form-control" name="student_name" value="{{ student.student_name if student else ''}}"> </td> </tr> <tr> <td>年龄</td> <td> <input type="number" required class="form-control" name="student_age" value="{{ student.student_age if student else '' }}"> </td> </tr> <tr> <td colspan="2"> <input class="btn btn-primary" type="submit" value="保存"> </td> </tr></table></form>{% endblock %}

layout.html

<!DOCTYPE html><html lang="zh"><head> <meta charset="UTF-8"> <title>{{ title }}</title> <link rel="stylesheet" href="/node_modules/bootstrap/dist/css/bootstrap.min.css"> <script src="/node_modules/jquery/dist/jquery.min.js"></script> <script src="/node_modules/bootstrap/dist/js/bootstrap.bundle.min.js"></script></head><body class="container"> {% block body %} {% endblock %}</body></html>