写点什么

Aiomysql 与 Sqlalchemy 的使用

  • 2021-03-17
  • 本文字数:20502 字

    阅读完需:约 67 分钟

Aiomysql 与 Sqlalchemy 的使用

之前一直使用 tornado 做项目,数据库一直使用 mongo 与 redis 居多,凭借其优异的异步特性工作的也很稳定高效,最近的项目需要使用 mysql ,由于之前在使用 mongo 与 redis 时所使用的 moto 与 aioredis 来异步的执行数据库操作,所以在网上查询了异步操作 mysql 的库, 本文记录一下异步操作中所遇到的问题与相应的解决方案。

Aiomysql 介绍

我们在使用 tornado 开发网站的时候,利用 python3 中新加入的异步关键词 async/await , 我们使用各种异步操作为来执行各种异步的操作,如使用 aiohttp 来代替 requests 来执行异步的网络请求操作,使用 motor 来代替同步的 pymongo 库来操作 mongo 数据库,同样,我们在开发同步的 python 程序时,我们会使用 PyMySQL 来操作 mysql 数据库,同样,我们会使用 aiomysql 来异步操作 mysql 数据库。

Aiomysql 连接

为了简单,我使用 docker 下载了 mysql:5.7 镜像,然后启一个容器,密码是 123456

docker run --name mysql -e MYSQL_ROOT_PASSWORD=123456 -p 3306:3306 -d mysql:5.7
复制代码

这样,一个 mysql 数据库就有了,之后我们就以这个数据库为例,执行各种测试代码工作。

我们先准备一些测试数据,创建一个 mytest 的数据库,创建一个 user 表,里面有三个字段,id, username, age, 简单的三个字段,并且添加两条数据。



首先我们先明确一下,aiomysql 可以是原生的连接 mysql 服务器,也可以使用 sqlalchemy(后面简称 sa)来连接 mysql 服务,首先我们先使用原生的引擎来连接 ,后面再说 sa 连接数据库。


#coding: utf-8
import aiomysqlimport asyncio
loop = asyncio.get_event_loop()
async def test(): conn = await aiomysql.connect( host='127.0.0.1', port=3306, user='root', password='123456', db='mytest', loop=loop )
cur = await conn.cursor() await cur.execute("select * from user") r = await cur.fetchall() if r: for i in r: print(i) else: print("no data") await cur.close() conn.close()
loop.run_until_complete(test())
复制代码


上面的脚本就可以将数据库中的所有数据打印出来。

我们来看下代码,来顺一下执行流程

1. 创建连接

首先我们使用 aiomysql.connect()  创建一个连接对象 conn,代码里只是使用了最常用的连接选项,这个 connect() 方法返回一个 Connection 类对象,这个对象里的参数非常多,我们在后面的代码中,如果遇到会进行相应的介绍。

2. 创建游标

之后我们使用 conn 这个对象的 cursor 方法获取 Cursor 对象 cur,我们只有使用 cursor 对象才能对数据库进行各种操作。

3. 执行 SQL 语句

我们使用 cur 对象的 execute() 方法执行 SQL 语句。这里执行 select * from user ,这个方法返回影响的行数,对于查询而言,是命中查询的数据量,我们也可以根据这里的返回值,如果是 0 的话则说明没有符合查询条件的数据。


如将上面的代码改成


cur = await conn.cursor()   count = await cur.execute("select * from user where id = 4")   print("count:{}".format(count))   if count:       r = await cur.fetchall()       for i in r:           print(i)   else:       print("no data")   await cur.close()   conn.close()
复制代码

4. 关闭游标 cur

5. 关闭连接 conn

注意 conn 对象的关闭函数不是协程,直接调用 close() 即可。


aiomysql 中的 Connect 类和 Cursor 类都通过实现了 __aexit__ 方法来保证游标与连接的关闭,所以我们更多的时候是使用 with 上下文管理来写代码,这样我们不用再去处理游标与连接的关闭操作。


async with conn.cursor() as cur:    count = await cur.execute("select * from user")    if count:        r = await cur.fetchall()        for i in r:            print(i)    else:        print("no user")
复制代码

Aiomysql 简单的 CURD

上面我们简单地使用游标对象进行了查询,这节我们来看看更多 CURD 操作,其实这里已经和 aiomysql 没有太多的关系,主要是考查各位的 mysql 能力了,一个 execute 方法走天

下。但是这里我们来看一个老生常谈的问题,sql 注入问题。

SQL 注入的问题

首先我们看一下以下的代码


username = "yyx"
async with conn.cursor() as cur: sql = "select * from user where username = '%s'" % username print(sql) count = await cur.execute(sql) if count: r = await cur.fetchall() for i in r: print(i) else: print("no user")
复制代码


假设,username 是我们从用户的输入中获取到的,比如通过网页 post 或者 get 过来的参数输入,未经过我们的处理然后我们在数据库中查找这个用户,比如 username 是 yyx 时,我们拼接的 sql 语句是 select * from user where username = 'yyx' ,这时一切还比较顺利,我们可以得到 username 为 yyx 的数据, 但是,如果用户恶意构造 sql,将 yyx' or 1=1#  传过来,这时我们接接的 sql 字符串为 select * from user where username = 'yyx' or 1=1#'  , 这个语句会执行 username 为 yyx 或者 1=1 的结果,1=1 这是个永真的条件,加了一个 # 会将之后的语句当成注释,所以这个 SQL 语句会将数据库中的所有数据都返回。这样就存在了注入的漏洞了。

如何避免 SQL 注入

这是个很大的话题,展开来说可以说好多,这里我们只是从框架的角度,来防止一些基本的注入漏洞,防止注入漏洞更多的还是需要程序员对于用户的输入进行必要的检查过滤,永远记住,不要相信用户的输入。我们查看 Cursor 类的 execute 方法


async def execute(self, query, args=None):    """Executes the given operation
Executes the given operation substituting any markers with the given parameters.
For example, getting all rows where id is 5: cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,))
:param query: ``str`` sql statement :param args: ``tuple`` or ``list`` of arguments for sql query :returns: ``int``, number of rows that has been produced of affected """ conn = self._get_db()
while (await self.nextset()): pass
if args is not None: query = query % self._escape_args(args, conn)
await self._query(query) self._executed = query if self._echo: logger.info(query) logger.info("%r", args) return self._rowcount
复制代码


execute 有二个参数,一个是 query, 另外是 args,我们看注释,query 是 sql 的语句, args 是 tulpe 或者 list 类型的参数。如果 args 非空,脚本会通过  query = query % self._escape_args(args, conn)  重新组织 query, 再来看下  _escape_args(args, conn)  的实现


def _escape_args(self, args, conn):    if isinstance(args, (tuple, list)):        return tuple(conn.escape(arg) for arg in args)    elif isinstance(args, dict):        return dict((key, conn.escape(val)) for (key, val) in args.items())    else:        # If it's not a dictionary let's try escaping it anyways.        # Worst case it will throw a Value error        return conn.escape(args)
复制代码


如果是 list 或者 tuple,则返回使用  conn.escape  转换之后的 tuple, 如果是 dict 字典类型的话,则返回一个字典,key 还是原来的 key, value 为  conn.escape(val) , 最终都是使用 conn.escape()  函数来进行转换,再来看下这个函数的实现


def escape(self, obj):    """ Escape whatever value you pass to it"""    if isinstance(obj, str):        return "'" + self.escape_string(obj) + "'"    return escape_item(obj, self._charset)
def escape_string(self, s): if (self.server_status & SERVER_STATUS.SERVER_STATUS_NO_BACKSLASH_ESCAPES): return s.replace("'", "''") return escape_string(s)
复制代码


函数将在传入的字符串两边加上两个单引号 ' , 并且将 字符串中的单引号替换成两个单引号,这样就可以避免大多的 sql 注入问题,我们修改一下脚本


username = 'yanyanxin'async with conn.cursor() as cur:    count = await cur.execute("select * from user where username = %s", username)    if count:        r = await cur.fetchall()        for i in r:            print(i)    else:        print("no user")
复制代码


此时我们可以正常的获取到用户名为 yanyanxin 的数据, 再将用户名换成 yyx' or 1=1#   试试

此时转换后的 SQL 语句为 select * from user where username = 'yyx\' or 1=1#'  已经将单引号进行了转义,此时就不会查找到用户了。


注意为了避免 SQL 注入的问题,我们一定不要自己进行拼接 SQL 语句,一定要对用户的输入进行检查转义。

多参数的查询

上面只是用到了一个参数,我们来看一下多参数的查询使用,比如我们想要查询 age 在 19 到 29 之间的用户, 正常我们写 sql 应该是


select * from user WHERE age >19 and age<29
复制代码


我们使用 aiomysql 的实现


async with conn.cursor() as cur:    count = await cur.execute("select * from user where age>%s and age<%s", (19, 29))    if count:        r = await cur.fetchall()        for i in r:            print(i)    else:        print("no user")
复制代码


这里注意,不能使用 %d  , 因为使用 escape 转义的值返回的是字符串类型的,即使传的是 int 类型的,所回的也是 str。

联合查询

我们再创建一个表,表示用户表中用户的职业, 创建三条数据, userid 对应于 user 表中的 id, 这里之所以没有用外键,之后再讨论,只是记住,这里的 userid 只是一个普通的列,它表示 user 表中的 id。



这里有三条数据,user 表中 id 为 1 的是 qa 和开发,id 为 2 的 qa,我们来查找一下,user 表中名为 yyx 的用户的 jobs 是什么, 正常我们写 sql 语句应该是下面这个样子


SELECT jobs.jobs, user.username from jobs INNER JOIN user ON user.id=jobs.userid where user.username='yyx'
复制代码


将会得到如下结果



使用 aiomysql 实现


async with conn.cursor() as cur:    sql = 'SELECT jobs.jobs, user.username from jobs INNER JOIN user ON user.id=jobs.userid where user.username=%s'    count = await cur.execute(sql, ('yyx',))    if count:        r = await cur.fetchall()        for i in r:            print(i)    else:        print("no user")
复制代码


总的来说,使用 aiomysql 进行查询操作,和使用普通的工具进行 mysql 查询是一样, 需要注意的是注入的问题,一定要使用框架的转义功能。

日期格式的查询

很多时候我们需要进行日期类型的查询,如查询大于某一天的数据, 我们先在 user 表中添加一个代码更新日期的 updatedate 列,并且填上一些数据,我们再次使用 count = await cur.execute("select * from user")  查询数据,将会得到如下的数据


(1, 'yanyanxin', 18, datetime.datetime(2020, 10, 31, 16, 43, 8))(2, 'yyx', 28, datetime.datetime(2020, 11, 1, 21, 44, 35))
复制代码


如果我们想要查询日期大于 2020 年 10 月 31 日的数据我们可以这样写 SQL 


select * from user WHERE  DATE_FORMAT(updatedate,'%Y%m%d') > '20201031'
复制代码


使用 aiomysql 该如果写 sql 呢? 


如果我们写成以下的样子


datestr = datetime.datetime(2020, 10, 31).strftime('%Y%m%d')count = await cur.execute("select * from user WHERE  DATE_FORMAT(updatedate,'%Y%m%d') > %s", (datestr,))
复制代码


将会得到一个异常


ValueError: unsupported format character 'Y' (0x59) at index 51
复制代码


上面在转换拼接字符串的时候, 由于有个 %Y 的存在, python 默认是不支持这个转换的,所以这样写是不行的,这里其实不需要将 datetime.datetime 类型的数据进行转换,aiomysql 会自动的进行转换


datestr = datetime.datetime(2020, 10, 31)count = await cur.execute("select * from user WHERE  updatedate > %s", (datestr,))
复制代码


我们只需要将 datetime.datetime 类型的数据传到参数里即可,pymysql 内置了基本类型的处理方法 


encoders = {    bool: escape_bool,    int: escape_int,    long_type: escape_int,    float: escape_float,    str: escape_str,    text_type: escape_unicode,    tuple: escape_sequence,    list: escape_sequence,    set: escape_sequence,    frozenset: escape_sequence,    dict: escape_dict,    type(None): escape_None,    datetime.date: escape_date,    datetime.datetime: escape_datetime,    datetime.timedelta: escape_timedelta,    datetime.time: escape_time,    time.struct_time: escape_struct_time,    Decimal: escape_object,
复制代码


这些类型不用我们再去手动处理, 直接传入 args 参数即可


添加数据

有了上面查询数据的基础,我们再来看下插入数据, 我们同样以正常的 mysql 语句再结合 aiomysql 中的 query 语句进行对比。


1. 插入单条语句

经过表的修改,目前我们的表字段如下



其中 id 为主键自增,新添加的时候可以不用传参数,mysql 会自动添加, username 和 age 是不能为空的,添加的时候必须要传


先使用 SQL 语句进行添加


INSERT INTO `user` (username, age) VALUES ("aaa", 24);
复制代码


这时就会添加一条数据



使用 aiomysql 来添加


async with conn.cursor() as cur:    count = await cur.execute("insert into user (username, age, updatedate) VALUES(%s, %s, %s)", ("ccc", 33, datetime.datetime.now()))    await conn.commit()      print(count)    if count:        r = await cur.fetchall()        for i in r:            print(i)    print("#########")    count = await cur.execute("select * from user")    if count:        r = await cur.fetchall()        for i in r:            print(i)    else:        print("no user")
复制代码


这里注意到,和查询不一样的是,插入数据会多一个 await conn.commit() 操作, 这里如果不调用 commit 方法, 下面的查询也是可以查询到刚刚添加的数据,但是此时数据并没有真正的添加到数据库里,必须要调用一下 commit 方法,当然也可以不调用,那么要初始化 connect 时需要加入 autocommit=True,  参数,这个后面我们讲事务的时候再详细说一下。


对于日期类型的数据,我们也无需进行处理,直接传入参数即可

2. 插入多条语句

cursor 除了 execute 方法以外,还有一个 executemany 方法,可以执行多条 SQL 语句,非常适合插入多条数据


async with conn.cursor() as cur: users = [     ("eee", 26, datetime.datetime(2019, 10, 23)),     ("fff", 28, datetime.datetime(2018, 11, 13)),     ("ggg", 27, datetime.datetime(2016, 9, 15)), ] count = await cur.executemany("insert into user  ( username, age, updatedate) VALUES(%s, %s, %s)", users) print(count) if count:     r = await cur.fetchall()     for i in r:         print(i) print("#########") count = await cur.execute("select * from user") if count:     r = await cur.fetchall()     for i in r:         print(i) else:     print("no user")
复制代码


将要插入的数据按照格式放入元组或者列表里,再使用 executemany 方法一次性的插入多条数据

其实查看 executemany 的实现,它并不是一次性的写入多条数据,而是通过循环多次调用 execute 方法 


for arg in args:    await self.execute(query, arg)    rows += self._rowcountself._rowcount = rows
复制代码

如何处理插入失败

插入失败常有,比如主键重复,数据类型不对等,我们需要去抓住这些异常来进行处理


比如如下语句


count = await cur.execute("insert into user  (id, username, age, updatedate) VALUES(%s, %s, %s, %s)",(1, "ddd", 34, datetime.datetime.now()))
复制代码


尝试添加一个主键 id 为 1 的数据,但是由于数据库中已经存在了该主键,所以这次插入肯定会失败

程序会报


pymysql.err.IntegrityError: (1062, "Duplicate entry '1' for key 'PRIMARY'")
复制代码


pymysql.err 错误


async with conn.cursor() as cur:    try:        count = await cur.execute("insert into user  (id, username, age, updatedate) VALUES(%s, %s, %s, %s)", (1, "ddd", 34, datetime.datetime.now()))        print(count)    except pymysql.err.IntegrityError as e:        print(e)    except Exception as e:        raise e
复制代码


此时将打印 (1062, "Duplicate entry '1' for key 'PRIMARY'") 异常信息

cursor 类型

可以初始化 cursor 类型时,选择不同的类,默认返回是以元组形式


(1, 'yanyanxin', 18, datetime.datetime(2020, 10, 31, 16, 43, 8), 0)(2, 'yyx', 28, datetime.datetime(2020, 11, 1, 21, 44, 35), 2)(3, 'aaa', 24, None, None)(8, 'ccc', 33, datetime.datetime(2020, 11, 2, 17, 59, 38), None)(27, 'aaa', 16, None, None)
复制代码


可以使用 aiomysql.cursors.DictCursor  类初始化


conn.cursor(aiomysql.cursors.DictCursor) as cur
复制代码


获取到的结果将以字典的形式返回


{'id': 1, 'username': 'yanyanxin', 'age': 18, 'updatedate': datetime.datetime(2020, 10, 31, 16, 43, 8), 'isstudent': 0}{'id': 2, 'username': 'yyx', 'age': 28, 'updatedate': datetime.datetime(2020, 11, 1, 21, 44, 35), 'isstudent': 2}{'id': 3, 'username': 'aaa', 'age': 24, 'updatedate': None, 'isstudent': None}{'id': 8, 'username': 'ccc', 'age': 33, 'updatedate': datetime.datetime(2020, 11, 2, 17, 59, 38), 'isstudent': None}{'id': 27, 'username': 'aaa', 'age': 16, 'updatedate': None, 'isstudent': None}
复制代码


连接池的使用

之前我们一直使用  aiomysql.connect()  方法来连接到数据库,aiomysql 还提供了连接池的接口,有了连接池的话,不必频繁打开和关闭数据库连接。


上面的代码,我们都是执行一个函数就创建一个连接,我们知道,客户端在与服务端创建连接也是一个比较耗时耗资源的操作,所以我们会通过连接池来减少与 mysql 数据库的频繁打开和关闭连接。


这只是其中一个原因,还有一个更重要的原因,因为在协程程序里,大家是共用一个线程, 比如有两个函数,一个函数是查询 user 表,一个函数是查询 jobs 表


loop = asyncio.get_event_loop()async def test():    conn = await aiomysql.connect(        host='127.0.0.1',        port=3306,        user='root',        password='123456',        db='mytest',        loop=loop    )    async def get_user():        async with conn.cursor() as cur:            count = await cur.execute("select * from user")            if not count:                return            r = await cur.fetchall()            print("get data from user")            for i in r:                print(i)
async def get_jobs(): async with conn.cursor() as cur: count = await cur.execute("select * from jobs") if not count: return r = await cur.fetchall() print("get data from jobs......") for i in r: print(i)
await asyncio.gather(get_jobs(), get_user())loop.run_until_complete(test())
复制代码


我们在 test() 函数里写了两个子函数,get_user 和 get_jobs 分别从 user 表和 jobs 表中获取数据,当然我们可以使用


await get_user()await get_jobs()
复制代码


来分别执行,但是这种方式是同步的,并没有异步去执行,我们想要这两个函数异步进行,所以我们使用


await asyncio.gather(get_jobs(), get_user())
复制代码


这种方式调用,让这两个协程并行执行, 但是这样写就会报错


RuntimeError: readexactly() called while another coroutine is already waiting for incoming data
复制代码


意思是,一个协程在等待数据传过来,但是这个时候另外一个协程也要开始读数据,因为这两个协程用的是同一个连接对象 conn。


所以这里我们需要用两个不同的连接, 当然可以在每个函数中都重新对 mysql 数据进行连接,在执行完查询操作以后再关闭,但是这样就会造成之前说有频繁的创建连接会造成一些资源的浪费,同时网站的性能也会受到影响。


所以这时我们需要使用连接池,连接池会保存一定数量的连接对象,每个函数在需要使用的时候从池子中拿一个连接对象, 使用完以后再将连接对象放到池子中, 这样避免了频繁的和 mysql 数据库进行打开关闭操作,同时也避免出现上面的同个连接在不同的协程对象中使用而出现的异常。


修改以上代码


loop = asyncio.get_event_loop()
async def test(): pool = await aiomysql.create_pool( host='127.0.0.1', port=3306, user='root', password='123456', db='mytest', minsize=1, maxsize=2, echo=True, autocommit=True, loop=loop )
async def get_user(): async with pool.acquire() as conn: print(id(conn), 'in get user') async with conn.cursor() as cur: count = await cur.execute("select * from user") if not count: return r = await cur.fetchall() print("get data from user") for i in r: print(i)
async def get_jobs(): async with pool.acquire() as conn: print(id(conn), 'in get jobs') async with conn.cursor() as cur: count = await cur.execute("select * from jobs") if not count: return r = await cur.fetchall() print("get data from jobs......") for i in r: print(i)
async def get_email(): async with pool.acquire() as conn: print(id(conn), 'in get email') async with conn.cursor() as cur: count = await cur.execute("select * from email") if not count: return r = await cur.fetchall() print("get data from email......") for i in r: print(i)
await asyncio.gather(get_jobs(), get_user(), get_email())

loop.run_until_complete(test())
复制代码


连接池的初始化函数 aiomysql.create_pool  和 aiomysql.connect  参数差不多,数据库的基本信息, 这里多了两个参数 minsize,maxsize,  最少连接数和最大连接数,我这里为了实验,将最大连接数设置为 2,然后下面用了三个函数来获取连接池,我们将连接对象 conn 的 id 信息打印出来看下


2977786527496 in get jobs2977786527496 in get user2977786590984 in get email
复制代码


可以看出, get jobs 函数和 get user 函数用的是同一个连接对象


上面的脚本也不再报错,并且可以正常的获取到数据库里的信息,且都是异步的进行查询


我们也要注意一下,由于是演示代码,我们在开发过程中,不太会写这样的代码,更多的时候,我们是写 web 程序,比如用 tornado 写个 web 程序, 不同的接口需要进行不同的查询操作,为了保证查询同时进行,此时我们就需要用连接池了。

事务的处理

关于事务的介绍,网上有好多,关于数据库事务具有 ACID 这 4 个特性:原子性,一致性,隔离性,持久性以及不同的隔离级别所带来的脏读、不可重复读、幻读等问题,推荐廖雪峰的 sql 教程, 讲的很清晰。


这里介绍一下在 aiomysql 中事务的处理,


之前我们在初始化连接或者连接池的时候,都加上了 autocommit=True, 这个设置, autocommit=True

意味着自动提交,在使用事务的时候,需要将其关闭,或者不设置,默认是 False


async with pool.acquire() as conn:    async with conn.cursor() as cur:        await cur.execute("insert into user(username, age) values(%s, %s)", ("aaa", 16))        # 不调用conn.commit()        c = await cur.execute("select * from user")        result = await cur.fetchall()        for i in result:            print(i)
复制代码


此时我们是可以获取到刚才插入的数据的,但是如果使用别的 mysql 客户端查看,刚才的数据是没有提交进来的,这时需要调用 conn.commit()  来提交事务才可以真正的将数据写入数据库。


当然,在执行 conn.commit()  时,是有可能失败的,比如插入一半的数据,被别的事务所干扰,此时这里就会抛异常。


现在有一个问题,既然可以设置 autocommit=True 让数据库自动提交事务,我们为什么还要自己来开启事务,然后再手动调用 conn.commit() 来提交呢?


我们假设有这样的一个场景, 假如你要用两个 sql 语句分别更新两张表,如传统的转账为例, 你的余额减 200, 他的余额加 200, 在 autocommit=True 的情况下, 先使用一行代码更新一张表,这时在更新另外一张表的时候崩溃了,这时由于使用了 autocommit=True , 第一条语句更新成功 ,第二条语句失败了


async with pool.acquire() as conn:    async with conn.cursor(aiomysql.cursors.DictCursor) as cur:        try:            await cur.execute("insert into user(username, age) values(%s, %s)", ("aaa", 16))            await cur.execute("insert into user(username, age) values(%s, %s)", ("aaa2", 13, 1111))        except Exception as e:            print(e)        c = await cur.execute("select * from user")        result = await cur.fetchall()        for i in result:            print(i)
复制代码


在上面的语句中, 第一次 insert 语句没有问题,可以正常的插入数据库,但是第二个语句,由于格式转换有问题,这时会崩溃,第二条语句不会插入成功,但是现在问题就来了,我要求是这两条语句要么全执行,要么都不执行, 上面的代码没法保证数据的一致性, 破坏了事务的原子性与一致性,所以这时我们需要使用自己手工来处理事务。


async with pool.acquire() as conn:    async with conn.cursor(aiomysql.cursors.DictCursor) as cur:        await conn.begin() # 开启事务        try:            await cur.execute("insert into user(username, age) values(%s, %s)", ("aaa", 16))            await cur.execute("insert into user(username, age) values(%s, %s)", ("aaa2", 13, 1111))            await conn.commit()        except Exception as e:            print(e)            await conn.rollback()  #回滚        c = await cur.execute("select * from user")        result = await cur.fetchall()        for i in result:            print(i)
复制代码


上面通过 await conn.begin() 来开启事务, 后面通过 await conn.commit() 来提交事务, 过程中如果有失败或者崩溃的情况则执行 await conn.rollback()  回滚。


此时第一条语句就不会被插入成功了. 如果在初始化连接或者连接池时设置了 autocommit=True 参数,则这里需要调用 conn.begin()` ,如果没有设置 autocommit 参数则默认是 False, 后面也不用显示的调用 conn.begin(), 但是需要显示的调用 conn.commit() 。

Sqlalchemy 介绍

SQLAlchemy 是 Python 编程语言下的一款开源软件。提供了 SQL 工具包及对象关系映射(ORM)工具,使用 MIT 许可证发行。


SQLAlchemy“采用简单的 Python 语言,为高效和高性能的数据库访问设计,实现了完整的企业级持久模型”。SQLAlchemy 的理念是,SQL 数据库的量级和性能重要于对象集合;而对象集合的抽象又重要于表和行。因此,SQLAlchmey 采用了类似于 Java 里 Hibernate 的数据映射[4]模型,而不是其他 ORM 框架采用的 Active Record 模型。不过,Elixir[5]和 declarative 等可选插件可以让用户使用声明语法。


SQLAlchemy 首次发行于 2006 年 2 月,并迅速地在 Python 社区中最广泛使用的 ORM 工具之一,不亚于 Django 的 ORM 框架。


ORM 介绍

ORM, 全称 Object-Relational Mapping,将关系数据库的表结构映射到对象上, 使得操作数据库的关系转换成操作 python 中的对象


在 Aiomysql 中使用 Sqlalchemy

在使用 aiomysql 原生的 mysql 连接时,我们使用 aiomysql.connect  函数来获取 aiomysql 连接对象,在使用 sqlalchemy 时,需要使用 aiomysql.sa.create_engine 函数来创建一个引擎对象。

在 aiomysql 中,不能使用类来定义, 需要使用 aiomysql.sa.Table 来返回 ORM 对象, 也不能使用 session, 执行查询操作需要在一个连接对象上


import aiomysqlimport asyncioimport loggingimport pymysqlimport sqlalchemy as safrom aiomysql.sa import create_engine
loop = asyncio.get_event_loop()
metadata = sa.MetaData()user = sa.Table( "user", metadata, sa.Column("id", sa.Integer, primary_key=True, autoincrement=True), sa.Column('username', sa.String(255), nullable=False, default=""), sa.Column('age', sa.Integer, nullable=False, default=0), sa.Column('updatedate', sa.DateTime, nullable=True), sa.Column('isstudent', sa.Boolean, nullable=True))
async def test(): engine = await create_engine( host='127.0.0.1', port=3306, user='root', password='123456', db='mytest', autocommit=True, loop=loop ) async with engine.acquire() as conn: query = sa.select([user]) result = await conn.execute(query) for i in await result.fetchall(): print(i)
loop.run_until_complete(test())
复制代码


在使用 sqlalchemy 时, 要先定义 ORM 关系,可以使用 sqlalchemy.Table 来定义

1. 创建元类

使用 metadata = sa.MetaData() 创建一个元类,这个元类会包含各种表的关系,之后会介绍

2. 创建表

使用上面创建的元类 metadata 来创建表结构, 第一个字段为表名, 第二个参数为元类对象,之后为每个字段的信息对象,第一个为字段名,第二个为类型,之后会是一些字段选项


以下是一些常用的数据类型

类型描述
Integer整形
String (size)字符串
Text文本
DateTimepython datetime对应的时间
Float浮点
Boolean布尔
PickleTypepython 内存对象
LargeBinary二进制数据

以下是一些常见的字段属性

primary_key: 是否为主键autoincrement: 是否自增index: 是否为索引nullable: 是否可以为空, True的时候为可以为空comment: 注释
复制代码

3. 创建引擎

4. 获取连接

通过 engine.acquire() 来获取一个连接

5. 执行查询语句

这里和 aiomysql 不一样,这里直接使用连接对象 conn 的 execute(query) 方法

6. 打印显示结果

这里调用的 SQL 语句和直接使用 SQL 或者上面使用 aiomysql 的 execute 来执行 sql 语句来讲,比较晦涩难懂, 下面详细记录一下各种查询在 sqlalchemy 中的实现。

使用 Sqlalchemy 的 CURD

简单查询数据

query = sa.select([user])result = await conn.execute(query)
复制代码


可以通过 str(query) 来打印构造出来的 SQL 语句

上面的 query 转换为 SQL 语句为 


SELECT "user".id, "user".username, "user".age, "user".updatedate, "user".isstudent FROM "user"
复制代码


和 select * from user 同一意思

sa.select([user])  select() 函数参数必须是个列表或者可迭代对象,这个简单的查询不用提供 from 表, sa 会自动算出需要在哪张表中查询。


选择哪些返回列

默认会将表中的所有字段返回,当需要指定返回哪些列的字段时,需要设置一下


query = sa.select([user.columns.updatedate, user.c.username])
复制代码


可以通过 user.columns.updatedate, 来返回 updatedate 字段,columns 也可以简写成 c, 如后面的 user.c.username  , 上面的 query 转换成 SQL 语句为 SELECT user.updatedate, user.username FROM user。

带条件的查询

可以在 select() 函数后面加上调用 where() 函数来设置查询条件


1. 查询 username 为 yyx 的数据


query = sa.select([user]).where(user.columns.username == "yyx")
复制代码


注意这里也要使用 user.columns, 或者 user.c 

在返回值中,由于我们在定义 user 的时候,isstudent 字段我们设置的是 sa.Boolean  ,这时,当值为 0 时该值为 False, 非 0 时为 True

上面的打印输出为 (2, 'yyx', 28, datetime.datetime(2020, 11, 1, 21, 44, 35), True) 


2. 防注入

上面的代码,如果我们使用之前构造的可以被注入的查询条件, 我们来看下是什么情况


query = sa.select([user]).where(user.columns.username == "yyx' or 1=1#")
复制代码


得到的 query 语句为


SELECT user.id, user.username, user.age, user.updatedate, user.isstudent FROM user WHERE user.username = 'yyx\' or 1=1#'
复制代码


并且没有获取到任务结果,我们注意到,sa 已经帮我们将单引号给转义了. 所以这里我们无需再做处理


3. 多条件查询

有时我们会使用多个条件查询, 比如我们要查找 age 大于 24, id 小于 11 的用户信息。


逻辑查询关系可以分为或(or)且(and)非(not)的关系,我们可以使用 sqlalchemy.sql 中的 and_, or_, not_ 来指定逻辑关系.注意这里为了和 python 中的关键字作为区分都有一个下划线。


这里的查询条件可以看成是且的关系,我们可以使用 and_操作


async with engine.acquire() as conn:   w = and_(       user.columns.id < 11,       user.columns.age > 14   )   query = sa.select([user]).where(w)   print(str(query))   result = await conn.execute(query)   for i in await result.fetchall():       print(i)
复制代码


这里我们定义一个 and_对象, 它里面设置好要查询的条件, 然后将这个变量放到 where()函数中。

上面有 query 转换成 SQL 语句为


SELECT user.id, user.username, user.age, user.updatedate, user.isstudent FROM user WHERE user.id < 11 AND user.age > 14
复制代码


OR 或者 NOT 同理,只需要将设置的查询依次放入即可


4. 日期查询

我们要查询 updatedate 大于 2020-11-02 的用户信息

这里比较简单,直接使用 datetime 对象就可以做比较


query = sa.select([user]).where(user.columns.updatedate>datetime.datetime(2020, 11, 2))
复制代码


当然也可以精确到秒,总之传入一个 datetime 对象就行

转换为 SQL 语句为


SELECT user.id, user.username, user.age, user.updatedate, user.isstudent FROM user WHERE user.updatedate > '2020-11-02 00:00:00'
复制代码


5. False 查询与 None 查询

我们将 user 的 isstudent 定义为 Boolean 类型,我们可以通过


query = sa.select([user]).where(user.columns.isstudent==False)
复制代码


来查找 isstudent 是 False 的数据

查询到一条数据

(1, 'yanyanxin', 18, datetime.datetime(2020, 10, 31, 16, 43, 8), False)
复制代码

但是表中还有没有设置该字段的数据是查不到的,这里的 False 和 None 是不一样的,如果想要获取到没有设置 isstudens 字段的数据需要使用


query = sa.select([user]).where(user.columns.isstudent==None)
复制代码


来获取。

插入操作

sa 的插入操作很灵活,有好多种插入方法,下面依次进行试验


1. 使用 values 函数


await conn.execute(user.insert().values(username="fan", age=16))
复制代码


这种方式将必填参数以 values 函数的参数形式传递, 定义为 nullable 为 True 的参数在这里可以不用传了。


2. 使用字典 dict 格式插入


userinfo = {"username": "hhh","age": 33,"id": None,"updatedate":None,"isstudent":None}result = await conn.execute(user.insert(), userinfo)
复制代码


这种方法需要将定义 Table 表中的各个字段都要定义上,即使字段设置为 nullable=True, 这里如果不想赋值的话也要写上 None


3. 使用元组 tuple 插入


result = await conn.execute(user.insert(), (None, "yang", 88, None, True))
复制代码


这种方式需要按定义 Table 中字段顺序将值传进去,同样为空的值即使现在不想设置也要使用 None 来占位,并且顺序也是要按照定义表结构时的顺序


4. 使用命名参数的方式


result = await conn.execute(user.insert(), id=None, username="lllll", age=99,                                       updatedate=datetime.datetime.now(), isstudent=True)   
复制代码


这种方式参数可以不必按照定义时的顺序。


5. 按位置插入数据


result = await conn.execute(user.insert(), None, "mmmm", 9, None, None)
复制代码


这种方法是不用写上字段名,但是也需要按照顺序来传入参数。


复杂的查询 join

还是以之前的例子,想要查询 user 表中是名为 yyx 的用户的 jobs 是什么, 正常我们写 sql 语句应该是下面这个样子


SELECT jobs.jobs, user.username from jobs INNER JOIN user ON user.id=jobs.userid where user.username='yyx' 
复制代码


在 sa 中,我们需要使用 select_from 函数来定义 JOIN


# 定义jobs表结构jobs = sa.Table(    'jobs', metadata,    sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),    sa.Column("jobs", sa.String(50), nullable=False, default="qa"),    sa.Column("userid", sa.Integer, nullable=False))
async with engine.acquire() as conn: j = user.join(jobs, user.c.id == jobs.c.userid) query = sa.select([user.c.username, jobs.c.jobs]).select_from(j).where(user.c.username == 'yyx') result = await conn.execute(query) for i in await result.fetchall(): print(i)
复制代码


首先 sa.select 传入需要返回的字段, 这里使用 `user.c.username, jobs.c.jobs` , 然后使用 select_from 定义 join 条件, join 的第一个参数是要连接的表,后面是定义连接的条件。

上面的代码得到的 SQL 语句为


SELECT user.username, jobs.jobs FROM user INNER JOIN jobs ON user.id = jobs.userid WHERE user.username = 'yyx'
复制代码


这里是 INNER JOIN, 对应的还有 outerjoin

use_labels 问题

如果我们这样定义 query


j = user.join(jobs, user.c.id == jobs.c.userid)query = sa.select([user, jobs]).select_from(j).where(user.c.username == 'yyx')
复制代码


我们想要获取 user 和 jobs 的所有字段,此时会报错


aiomysql.sa.exc.InvalidRequestError: Ambiguous column name 'id' in result set! try 'use_labels' option on select statement.
复制代码


这是由于 user 和 jobs 表中都有 id 这个字段,返回的话将无法确定是谁的,需要使用 use_labels 参数,  


query = sa.select([user, jobs], use_labels=True).select_from(j).where(user.c.username == 'yyx')
复制代码


上面的结果返回为


(2, 'yyx', 28, datetime.datetime(2020, 11, 1, 21, 44, 35), True, 2, 'qa', 2)
复制代码


获取返回值字段属性

上面的结果是一个元组,我们还可以打印指定的字段

当没有使用 use_labels=True 时,可以直接调用结果的字段属性


for i in await result.fetchall():    print(i.username, i.jobs)
复制代码


如果加了 use_labels=True 时,也需要添加上表名, 表名_字段


for i in await result.fetchall():    print(i.user_username, i.jobs_jobs)
复制代码


是否需要使用外键

上面无论是使用 aiomysql 还是使用 sa,都没有使用外键进行约束,关于是否使用外键,业内有两种不同的意见,支持使用的会认为,人为的写程序难免会有 bug, 会有不注意的地方,就好比 jobs 表中插入了一个 userid 为 100 的数据,但是 userid 为 100 的用户并没有在 user 表中,这时如果使用外键约束,则插入会失败. 在 mysql 数据库的层面上对数据一致性增加了一层保障。 


但是反对使用外键的人认为,这样会增加数据库本身的负担,数据的一致性正确性应该由开发人员来保障,数据库有了外键的约束在处理数据速度上会受到影响。


业内现在大多数公司已经不使用外键了,甚至在数据库层面上已经将该功能禁掉以保障数据库的速度,所以我们在以后的开发中,也尽量的少使用甚至不使用外键,当然,这个也看业务,但是如果公司将 mysql 的外键都禁掉的话就只能人为的来保障数据的正确性了。

数据库重连问题

有时候会出现这种情况,数据库偶尔的宕机或者网络抖动,造成了程序与数据库连接断了, 此时,当网络恢复了,正常来讲我们不希望再重启的我们的 web 服务,而是程序会自动的进行重新连接。

我们来写一个程序试验一下


loop = asyncio.get_event_loop()
async def test(): conn = await aiomysql.connect( host='127.0.0.1', port=3306, user='root', password='123456', db='mytest', loop=loop )
while True: try: async with conn.cursor(aiomysql.cursors.DictCursor) as cur: c = await cur.execute("select * from user where id = 1") result = await cur.fetchall() for i in result: print(i) except: pass finally: await asyncio.sleep(1)
loop.run_until_complete(test())
复制代码


程序先创建一个 connect 对象,然后使用该对象,不停的从数据库中获取数据,当出现异常的时候不做任务操作

在程序运行过程中,我们人为的将本机的网络断掉来模拟断网的情况,此时由于这个 conn 和数据库已经失去了连接,当我们再恢复网络以后,这个连接还是没能自动恢复


{'id': 1, 'username': 'yanyanxin', 'age': 18, 'updatedate': datetime.datetime(2020, 10, 31, 16, 43, 8), 'isstudent': 0}{'id': 1, 'username': 'yanyanxin', 'age': 18, 'updatedate': datetime.datetime(2020, 10, 31, 16, 43, 8), 'isstudent': 0}2020-11-03 18:24:31,206 - asyncio - WARNING - C:\Python37\lib\asyncio\selector_events.py[:863] - socket.send() raised exception.............
复制代码


一直打印 socket 错误

我们试下使用连接池的方式


loop = asyncio.get_event_loop()
async def test(): pool = await aiomysql.create_pool( host='127.0.0.1', port=3306, user='root', password='123456', db='mytest', loop=loop )
while True: try: async with pool.acquire() as conn: async with conn.cursor(aiomysql.cursors.DictCursor) as cur: c = await cur.execute("select * from user where id = 1") result = await cur.fetchall() for i in result: print(i) except: pass finally: await asyncio.sleep(1)
loop.run_until_complete(test())
复制代码


使用连接池的方式也不能自动重连, 这可如何是好?

由于 aiomysql 本身没有提供自动重连的方法,所以这里需要我们再重新封装一个类,要执行 execute 方法时,自动的检查连接是否还有效,如果无法,则尝试重新连接,当然重新连接也不一定能连接上,只有 mysql 服务器正常上线了,才可以正常的连接。


import aiomysqlfrom functools import wraps
def mysql_connection_check(func): @wraps(func) async def wrapper(*args, **kwargs): mysql = args[0] if mysql: if not mysql.isconnect: # 进行重连 await mysql._lock.acquire() try: await mysql.restart() except: print(traceback.format_exc()) finally: await mysql._lock.release() try: return await func(*args, **kwargs) except (OperationalError, ConnectionResetError, OSError): mysql.isconnect = False except Exception as e: print(traceback.format_exc()) return wrapper
class PMysql: ''' 对于aiomysql进行封,实现自动重连功能 ''' def __init__(self, host, user, password, db, port=3306, **kwargs): '''
:param host: :param user: :param password: :param db: :param port: :param kwargs: minsize=1, maxsize=10,echo=False ''' self.isconnect = False self.host = host self.user = user self.password = password self.db = db self.port = port self.kwargs = kwargs self._lock = asyncio.Lock() self._pool = None self.isconnect = False

async def init_pool(self): try: self._pool = await aiomysql.create_pool( host=self.host, port=self.port, user=self.user, password=self.password, db=self.db, **self.kwargs ) self.isconnect = True except: self.isconnect = False
async def close(self): try: if self._pool: self._pool.close() await self._pool.wait_closed() self._pool = None self.isconnect = False except: print("close error", traceback.format_exc()) self.pool = None self.isconnect = False
async def restart(self): print("will restart connect..... ") await self.close() await self.init_pool()
@mysql_connection_check async def execute(self, query, args=None): ''' 执行execute语句 :param query: :param args: :return: ''' async with self._pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(query, args) return cur
复制代码


这里我们重新封装了一个类 PMysql,并将 aiomysql.create_pool  返回的 pool 作为这个类的_pool 属性,PMysql 有一个 isconnect 属性,只有当正常连接的时候这个属性才为 True,之后我们又写了一个 mysql_connection_check  的装饰器, 在装饰器里执行查询操作,当遇到 OperationalError, ConnectionResetError, OSError  错误的时候,我们认为可能是与 mysql 数据库连接出了问题,将尝试进行重新连接。


这次使用 PMysql 重新写一下刚才的测试程序


loop = asyncio.get_event_loop()async def test():    t = PMysql(        host='127.0.0.1',        port=3306,        user='root',        password='123456',        db='mytest',        autocommit=True,        minsize=1,        maxsize=2,        loop=loop)
await t.init_pool()
while True: try: cur = await t.execute("select * from user where id = %s", 1) for i in await cur.fetchall(): print(i) except: pass finally: await asyncio.sleep(1)
复制代码


这时再进行刚才的试验,启动程序可以正常获取数据,然后再断网, 这时会报错,然后再将网络恢复,此时,可以不用重新启脚本就会自动连接上数据库了。


但是由于重新封装了类,所以在 aiomysql 中一些方法就不能用了,还需要重新再定义一下


    @mysql_connection_check    async def get_a_conn(self):        return await self._pool.acquire()
@mysql_connection_check async def releaseconn(self, conn): return await self._pool.release(conn)
@mysql_connection_check async def get_a_cursor(self, conn): return await conn.cursor()
@mysql_connection_check async def release_a_cur(self, cur): await cur.close()
@mysql_connection_check async def transaction(self, conn): await conn.begin()
@mysql_connection_check async def commit(self, conn): await conn.commit()
@mysql_connection_check async def rollback(self, conn): await conn.rollback()
@mysql_connection_check async def execute(self, query, args=None): ''' 执行execute语句 :param query: :param args: :return: 游标 ''' async with self._pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(query, args) return cur
@mysql_connection_check async def executemany(self, query, args=None): async with self._pool.acquire() as conn: async with conn.cursor() as cur: await cur.executemany(query, args) return cur
复制代码


上面我将在平时使用过程中用的比较多的函数进行了重新封装,关于事务的处理会相应的麻烦一些,我这里通过 acquire 和 release 来获取连接和游标的方法


在事务中执行我们可以这样写


loop = asyncio.get_event_loop()async def test():    t = PMysql(        host='127.0.0.1',        port=3306,        user='root',        password='123456',        db='mytest',        autocommit=True,        minsize=1,        maxsize=2,        loop=loop)
await t.init_pool()
conn = await t.get_a_conn() cur = cur = await t.get_a_cursor(conn) try: await t.transaction(conn) await cur.execute("insert into user (username, age) values(%s, %s)", ("xxx", 11)) await cur.execute("insert into user (username, age) values(%s, %s)", ("xxx", 11, 333)) print(cur.lastrowid) await t.commit(conn) except: await conn.rollback() finally: if cur: await t.release_a_cur(cur) if conn: await t.releaseconn(conn)
复制代码


这里由于第二次插入数据时,故意多加了一个参数 333, 这样会导致触发异常,然后执行`await conn.rollback()` 这里再查看数据库,上面这两条数据都没有插入成功,这样就符合我们对事务的需求了。

是否使用 Sqlalchemy

通过上面的介绍,想必大家也可以看出,sa 在代码的可读性方面似乎没有直接 SQL 语句好,但是 sa 的存在意义在于,你现在使用的是 MySQL, 没准哪天项目需要迁移到 oracle 或者 sqlite,这时你几乎不用修改什么代码就可以顺利的完成迁移,如果直接使用 SQL 语言你就需要修改大量的代码.再者 ORM 会在底层对查询做一些转换,像之前提到的注入问题,如果有手写 SQL 时难免会写出漏洞来。但是我们做项目,又很少能过遇到修改数据库的情况,所以是否要使用 sa,看各位的需求!


本文转载自:360 技术(ID :qihoo_tech)

原文链接:Aiomysql 与 Sqlalchemy 的使用

2021-03-17 08:003258

评论

发布
暂无评论
发现更多内容

干货 | 日采100W新闻数据,如何实现新闻自动分类

八爪鱼采集器︱RPA机器人

爬虫 数据 采集

人工智能与伦理:如何确保AI应用中的隐私保护

天津汇柏科技有限公司

AI 伦理 隐私保护 AI 人工智能

智源研究院推出全球首个中文大模型辩论平台FlagEval Debate

智源研究院

1大成果、2个联盟、3大先锋、N个发布!超聚变全方位助力算力强国建设

业界

家居零售企业的数智人力战略升级,用友BIP超级版精选案例

用友BIP

同风起,耀星河!华为携手伙伴一起创造无限可能

OpenHarmony开发者

钉钉x昇腾:用AI一体机撬动企业数字资产智能化

Alter

携手豆包大模型,创维酷开以AI加速OTT场景智能化

新消费日报

淘宝商品详情数据接口:挖掘电商数据的关键通道

tbapi

淘宝商品详情数据接口 淘宝API接口 淘宝商品详情数据采集 淘宝商品详情数据分析

保利物业:这样构建数智化,从容超越“内卷之困”

用友BIP

低至1元/小时:国庆七天,30元通关《黑神话:悟空》!

轶天下事

如何利用ChatGPT开发一个盈利的AI写作助手网站

陆通

国有企业推行末等调整和不胜任退出制度路径指引

用友BIP

PKG系统安装包及IPSW固件:MacOS 11-14 正式版

你的猪会飞吗

MacOS 14 Sonoma mac系统下载 mca软件下载

观赛邀请|春季超音速四强诞生,邀您见证决赛精彩时刻

声网

盛事启幕 | 第三届OpenHarmony技术大会重磅官宣,邀您共绘智联未来

OpenHarmony开发者

OpenHarmony

如何使用ChatGPT API及Bito插件

陆通

实时语音交互,打造更加智能便捷的应用

HarmonyOS SDK

HarmonyOS

SD-WAN可以替代MPLS吗?

Ogcloud

SD-WAN 企业组网 SD-WAN组网 SD-WAN服务商 SD-WAN国际专线

如何利用 StarRocks 加速 Iceberg 数据湖的查询效率

镜舟科技

数据湖 查询优化 iceberg StarRocks

开课啦!北大-用友CIO/CDO数智化进阶课程正式启航

用友BIP

原生鸿蒙版江苏智慧人社上架,引领全国政务应用加速鸿蒙化

最新动态

高额奖金、校招终面直通卡!北京农商银行2024金融科技挑战赛正式启动!

Geek_2d6073

IPQ9574 Alder Dream Main Board: Next-Gen 802.11BE Wi-Fi 7 with OFDMA Support

wallyslilly

ipq9574

阿里巴巴API与电商创新:商品详情获取的新方法

技术冰糖葫芦

API 接口 API 测试 API 优先 pinduoduo API

Java如何将Object转换成指定Class对象

EquatorCoco

Java

YashanDB Docker镜像制作

YashanDB

数据库 yashandb 崖山数据库

数智化转型进行时:业界共话大模型应用创新实践

Geek_2d6073

使用Yasboot安装YashanDB的疑惑和建议

YashanDB

yashandb 崖山数据库 yasboot

第三方供应商不提供API接口?教你四步破解集成难题

RestCloud

数据处理 API API接口 ipaas

你可能没用过Requests自带重试功能

LLLibra146

python 爬虫 requests

Aiomysql 与 Sqlalchemy 的使用_架构_360技术_InfoQ精选文章