前言

基于原生模块:pymysql

推荐教程

Python 数据库处理的类库

pymysql [python原生模块]

SQLAchemy [ORM框架]

Python3 MySQL 数据库连接 - 菜鸟教程

小工具结构

DatabaseUtil([databseName])

getConnection()

query(sql)

save(sql)

delete(sql)

closeResources(cursor,connection)

源码

import pymysql # 导入mysql模块

## 数据库操纵工具

class DatabaseUtil:

"""

database util / 数据库操纵工具

------------------------------

[特别注意]

+ pymysql.connect(...)函数

+ unix_socket

+ 本字段是使用 socket 连接才用到的,而我们使用的是IP连接

+ 故:unix_socket='/tmp/mysql.sock' 改成 port=3306

"""

className = "DatabaseUtil";

def __init__(self,database='default_db_name'):

self.host='xxxx.com.cn';

self.port = 3306;

# unix_socket= "/tmp/mysql.sock",//module 'socket' has no attribute 'AF_UNIX'

self.user='root';

self.password='root';

self.database=database;

self.charset='utf8mb4';

self.cursorclass=pymysql.cursors.DictCursor;

pass;

def getConnection(self):

connection = None;

connection = pymysql.connect(host=self.host,

port=self.port,

# unix_socket= "/tmp/mysql.sock",//module 'socket' has no attribute 'AF_UNIX'

user=self.user,

password=self.password,

database=self.database,

charset=self.charset,

cursorclass=self.cursorclass);

# 切换数据库子模式

# cursor.execute("USE "+str(self.database)); # 形如: USE db_demoDatabase

# 使用cursor()方法创建一个游标对象

# cursor = connection.cursor();

if connection == None:

print("[",self.className,".getConnection] Fail to load and get database Connection!");

return connection;

pass;

def query(self,sql):## 查 select

# sql = "select * from tb_url limit 0,1000"; # just for test

print("[",self.className,".query] SQL:",sql);

connection = self.getConnection();

cursor = connection.cursor();

try:

cursor.execute(sql) # 执行SQL语句

# sql = "select * from userinfo where username=%(u)s and password=%(p)s"

# cursor.execute(sql,user,pwd) #直接传值

# cursor.execute(sql,[user,pwd]) #列表形式

# cursor.execute(sql,{'u':user,'p':pwd}) #字典格式

# print("row number:",(cursor.rownumber))

# print("rowcount:",cursor.rowcount); # 只读属性,并返回执行execute()方法后影响的行数

# 获取所有记录列表

#results = print("row [1]: ", cursor.fetchone()); # 获取下一个查询结果集。结果集是一个对象

if cursor.rowcount < 1:

print("[",self.className,".query] Not found any data from database!")

return;

results = None; # 初始化

# + cursor.fetchone() 获取下一行数据,第一次为首行

# + cursor.fetchall() 获取所有行数据源

# + cursor.fetchmany(n) 获取下N行数据

results = cursor.fetchall(); # 接收全部的返回结果行.

# print("results:",results);

# print("results[0]:",results[0]);

i = 0; #记录计数

for row in results: # row :字典类型

object = [];#存储各字段信息

for key in row:

object.append(row[key]);

# print("[", i , "] " , key , " : " , row[key]);

pass;

i+=1;

# 打印结果

# print("Url {\n\tpk_url_id:%s,\n\tstate:%s,\n\turl:%s,\n\tresolve_type:%s,\n\tresolver_class_bean:%s\n}" % (object[0], object[1], object[2], object[3], object[4]))

except e:

print("[",self.className,".query] ",e.message);

print("[",self.className,".query] Error: unable to fetch data!")

pass;

self.closeResources(cursor,connection);

return results; # 以结果行为字典类型的数组 [{ key1:value1, key2:value2 }]

pass;

def save(self,sql): ## 插 insert / 更新 update

# sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M')

# sql = "insert into tb_url values('2','WAITING_RESOLVE','http://test.for.python.mysql.cn','article/author','resolver_class_bean:Unknown')";

print("[",self.className,".save] SQL:",sql);

connection = self.getConnection();

cursor = connection.cursor();

result = -100; # 初始化一个异常值

try:

# 执行sql语句

result = cursor.execute(sql);

# print("[",className,".insert] the last rowId is ",cursor.lastrowid) # 当表中有自增的主键的时候,可以使用lastrowid来获取最后一次自增的ID

# 提交到数据库执行

connection.commit();

except:

# 如果发生错误则回滚

connection.rollback();

pass;

self.closeResources(cursor,connection);

if result == 1:

print("[",self.className,".save] Done!");

else:

print("[",self.className,".save] Failed!")

pass;

# print("[",self.className,".insert] result:",result)

return result;

pass;

def delete(self,sql): # 删除 delete

# sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)

print("[",self.className,".delete] SQL:",sql);

connection = self.getConnection();

cursor = connection.cursor();

result = -100; # 初始化一个异常值

try:

# 执行sql语句

result = cursor.execute(sql);

# print("[",className,".insert] the last rowId is ",cursor.lastrowid) # 当表中有自增的主键的时候,可以使用lastrowid来获取最后一次自增的ID

# 提交到数据库执行

connection.commit();

except:

# 如果发生错误则回滚

connection.rollback();

pass;

self.closeResources(cursor,connection);

if result == 1:

print("[",self.className,".delete] Done!");

else:

print("[",self.className,".delete] Failed!")

pass;

# print("[",self.className,".insert] result:",result)

return result;

pass;

def closeResources(self,cursor,connection):

try:

cursor.close() # 关闭数据库游标对象

connection.close() # 关闭数据库连接

pass;

except e:

print (e.message);

pass;

pass;

pass; # class end

Demo

# 插 / 更新

# sql = "insert into tb_url values('5','WAITING_RESOLVE','http://test.for.python.mysql.cn','article/author','resolver_class_bean:Unknown')";

# print(dbUtil.save(sql));

# 删

# for i in range(2,5):[2,5)

# sql = "delete from tb_url where pk_url_id='%d'" % (i);

# dbUtil.delete(sql);

# pass;

# 查

sql = "select * from tb_url where pk_url_id='%s'" % ('5');

print(dbUtil.query(sql));

//output

[ DatabaseUtil .query] SQL: select * from tb_url where pk_url_id='5'

[{'pk_url_id': 5, 'state': 'WAITING_RESOLVE', 'url': 'http://test.for.python.mysql.cn', 'resolve_type': 'article/author', 'resolver_class_bean': 'resolver_class_bean:Unknown'}]

推荐链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。