一、PyMysql
在使用Python操作MySQL数据过的过程中,基本的增删改查操作如何更加高效优雅的执行。这里将以PyMySQL为例,介绍一下如何使用Python操作数据库。 Python对MySQL数据库进行操作,基本思路是先连接数据库 Connection 对象,建立游标 Cursor 对象,然后执行SQL语句对数据库进行操作,获取执行结果,最终断开连接。大致过程是这样,在对其进行介绍之前,先介绍一些基本的概念。
Connection
Connection 对象即为数据库连接对象,在python中可以使用pymysql.connect()方法创建Connection对象,该方法的常用参数如下:
host:IP地址,字符串类型
user:用户名, 字符串类型
passwd:无默认值;字符串类
db:数据库名称,无默认值;字符串类型(可以不传但是SQL中必须体现)
port:端口, 默认为3306, 整型
charset:设置utf8, 字符串类型
close:关闭当前连接对象
Cursor
Cursor对象即为游标对象,用于执行查询和获取结果,在python中可以使用connect.cursor()创建
execute():执行数据库单个查询或命令,将结果从数据库获取
executemany(): 对一个查询运行多个数据,其返回是:受影响的行数(如果有的话)
close():关闭当前游标对象
Transaction
1.事务是数据库理论中一个比较重要的概念,指访问和更新数据库的一个程序执行单元,具有ACID特性:
原子性(Atomic):事务中的各项操作要么全都做,要么全都不做,任何一项操作的失败都会导致整个事务的失败
一致性(Consistent):事务必须使数据库从一个一致性状态变到另一个一致性状态
隔离性(Isolated):并发执行的事务彼此无法看到对方的中间状态,一个事务的执行不能被其他事务干扰
持久性(Durable):事务一旦提交,它对数据库的改变就是永久性的,可以通过日志和同步备份在故障发生后重建数据。
2.常用事务方法
Connection.commit():正常事务提交
Connection.rollback():事务异常回滚
Connection.autocommit():事务自动提交机制,默认TRUE,设置FALSE则关闭。
二、Python操作MySQL
1.安装
$ pip3 install PyMySQL
2.数据库连接
import pymysql# 打开数据库连接db = pymysql.connect(host='127.0.0.1',user='user',password='123456',database='demo',port=3306,charset='utf8')# 使用 cursor() 方法创建一个游标对象 cursorcursor = db.cursor()# 使用 execute() 方法执行 SQL 查询cursor.execute("SELECT * FROM USER;")# 使用 fetchone() 方法获取单条数据.data = cursor.fetchone()print ("Database data : %s " % data)# 关闭数据库连接cursor.close()db.close()
3.数据库DML操作
事务执行过程
import pymysql# 打开数据库连接db = pymysql.connect(**config) # 省略连接信息# 使用cursor()方法获取操作游标cursor = db.cursor()# SQL插入语句sql = """INSERT INTO EMPLOYEE(FIRST_NAME,LAST_NAME, AGE, SEX, INCOME)VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""try:# 执行sql语句cursor.execute(sql)# 提交到数据库执行db.commit()except:# 如果发生错误则回滚db.rollback()# 关闭当前游标对象cursor.close()# 关闭数据库连接db.close()# 执行传入的SQL也可以更加的灵活,可以使用另外一种%s 占位符,后续的参数依次传入。sql2 = """INSERT INTO EMPLOYEE(FIRST_NAME,LAST_NAME, AGE, SEX, INCOME)VALUES ('%s', '%s', %s, '%s', %s)"""cursor.execute(sql2, 'Mac', 'Mohan', 20, 'M', 2000)
4.数据库DQL操作
Python查询Mysql使用常用几个方法。
fetchone(): 该方法获取下一个查询结果集。结果集是一个对象.
fetchmany():获取结果集的指定几行.
fetchall(): 接收全部的返回结果行.
rowcount: 这是一个只读属性,并返回执行execute()方法后影响的行数。
import pymysql# 打开数据库连接db = pymysql.connect(**config)# 使用cursor()方法获取操作游标cursor = db.cursor()# SQL 查询语句sql = "SELECT * FROM EMPLOYEE WHERE INCOME > %s" % (1000)try:# 执行SQL语句cursor.execute(sql)# 获取所有记录列表result1=cursor.fetchone()result2=cursor.fetchmany(2)results = cursor.fetchall()print(result1)print(result2)print(results)except:print ("Error: unable to fetch data")# 关闭数据库连接cursor.close()db.close()
三、工具类封装
通过封装常用方法将会大大降低对数据库操作的成本。接下来分为几步进行操作:
1.可以通过env文件来存储数据库的连接信息
2.将env文件数据加载进系统环境变量
3.从系统环境变量中获取对应连接数据
4.连接数据库,操作增删改查
# .env
DB_INFO={"host": "127.0.0.1","port":3306,"user": "user","passwd": "123456","charset": "utf8"}
import io
import osclass EnvironmentVarUtils(object):def __init__(self, fileName=None):self.file_name = fileNameself._load_dot_env_file(self._get_environment_path())def _get_environment_path(self):""":return: project_path"""return os.path.join(os.path.dirname(os.getcwd()), '.env') if self.file_name is None\else os.path.join(os.path.dirname(os.getcwd()), self.file_name)def _load_dot_env_file(self, dot_env_path):""" load .env file.Args:dot_env_path (str): .env file path"""if not os.path.isfile(dot_env_path):raise FileNotFoundError(".env file not found Error.")print("Loading environment variables from 【{}】".format(dot_env_path))env_variables_mapping = {}with io.open(dot_env_path, 'r', encoding='utf-8') as fp:for line in fp:if "=" in line:variable, value = line.split("=", 1)else:raise Exception(".env format error")env_variables_mapping[variable.strip()] = value.strip()self._set_os_environ(env_variables_mapping)@staticmethoddef _set_os_environ(variables_mapping):""" set variables mapping to os.environ """for variable in variables_mapping:os.environ[variable] = variables_mapping[variable]print("Set OS environment variable: {}".format(variable))@staticmethoddef get_os_environ(variable_name):""" get value of environment variable."""try:return os.environ[variable_name]except Exception as e:raise e
import pymysqlclass SqlHelper(object):def __init__(self, config):self.connect = pymysql.connect(**eval(config))self.connect.autocommit(True)# default return tuple, DictCursor return Json .self.cursor = self.connect.cursor()def __enter__(self):# DictCursor return Json .self.cursor = self.connect.cursor(cursor=pymysql.cursors.DictCursor)return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.cursor.close()self.connect.close()def queryAll(self, sql, params=None):""":param sql::param params::return:"""self.cursor.execute(sql, params)return self.cursor.fetchall()def queryMany(self, sql, num, params=None):""":param sql::param num::param params::return:"""self.cursor.execute(sql, params)return self.cursor.fetchmany(num)def queryOne(self, sql, params=None):""":param sql::param params::return:"""self.cursor.execute(sql, params)return self.cursor.fetchone()def operation(self, sql, params=None, DML=True):"""DML: insert / update / deleteDDL: CREATE TABLE/VIEW/INDEX/SYN/CLUSTER:param DML::param sql::param params::return:"""try:self.cursor.execute(sql, params)except Exception as e:if DML:self.connect.rollback()raise edef batch_operation(self, sql_list, params_list=None, DML=True):"""Process multiple SQL files in batches .:param DML::param sql_list::param params_list::return:"""for i in range(len(sql_list)):try:if params_list is not None:self.operation(sql_list[i], params_list[i], DML)else:self.operation(sql_list[i], params_list, DML)except Exception as e:raise edef batch_processing(self, sql, params_list, DML=True):"""The same SQL is executed multiple times in batches.:param DML::param sql::param params_list::return:"""try:self.cursor.executemany(sql, params_list)except Exception as e:if DML:self.connect.rollback()raise edef __del__(self):"""Automatic disconnection:return:"""if self.connect.open: # 解决连接重复关闭的self.cursor.close()self.connect.close()
1.正常方式执行
if __name__ == '__main__':sql = "select age from `demo`.`user` where name= 'Amy';"env = EnvironmentVarUtils() # 初始化对象加载env文件数据config = env.get_os_environ("DB_INFO") # 获取指定key数据# 1.正常方式执行db = SqlHelper(config)result = db.queryOne(sql)print(result)-------------------------------------------------------------------------------控制台输出:Loading environment variables from 【C:\Users\lenovo\PycharmProjects\job\httpRunner_demo\.env】
Set OS environment variable: USERNAME
Set OS environment variable: PASSWORD
Set OS environment variable: BASE_URL
Set OS environment variable: db_info
(18,)Process finished with exit code 0
2.通过上下文管理的方式执行
if __name__ == '__main__':sql = "select age from `demo`.`user` where name= 'Amy';"env = EnvironmentVarUtils() # 初始化对象加载env文件数据config = env.get_os_environ("DB_INFO") # 获取指定key数据# 2.上下文管理方式执行with SqlHelper(config) as db:result = db.queryOne(sql)print(result)-------------------------------------------------------------------------------控制台输出:Loading environment variables from 【C:\Users\lenovo\PycharmProjects\job\httpRunner_demo\.env】
Set OS environment variable: USERNAME
Set OS environment variable: PASSWORD
Set OS environment variable: BASE_URL
Set OS environment variable: db_info
{'age': 18}Process finished with exit code 0