pymsql是python中操作mysql的模块,其使用方法和mysqldb几乎相同
一、下载安装
pip3 install pymysql
二、操作使用
1、执行sql
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
# 创建连接
conn = pymysql.connect(host=’127.0.0.1′, port=3306, user=’root’, passwd=’123′, db=’t1′)
# 创建游标
cursor = conn.cursor()
# 执行sql,并返回收影响行数
effect_row = cursor.execute(“update hosts set host = ‘1.1.1.2’”)
# 执行sql,并返回受影响行数
#effect_row = cursor.execute(“update hosts set host = ‘1.1.1.2’ where nid > %s”, (1,))
# 执行sql,并返回受影响行数
#effect_row = cursor.executemany(“insert into hosts(host,color_id)values(%s,%s)”, [(“1.1.1.11”,1),(“1.1.1.11”,2)])
# 提交,不然无法保存新建或者修改的数据
conn.commit()
# 关闭游标
cursor.close()
# 关闭连接
conn.close()
2、获取新创建数据自增id
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
conn = pymysql.connect(host=’127.0.0.1′, port=3306, user=’root’, passwd=’123′, db=’t1′)
cursor = conn.cursor()
cursor.executemany(“insert into hosts(host,color_id)values(%s,%s)”, [(“1.1.1.11”,1),(“1.1.1.11”,2)])
conn.commit()
cursor.close()
conn.close()
# 获取最新自增id
new_id = cursor.lastrowid
3、获取查询数据
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
conn = pymysql.connect(host=’127.0.0.1′, port=3306, user=’root’, passwd=’123′, db=’t1′)
cursor = conn.cursor()
cursor.execute(“select * from hosts”)
# 获取第一行数据
row_1 = cursor.fetchone()
# 获取前n行数据
# row_2 = cursor.fetchmany(3)
# 获取所有数据
# row_3 = cursor.fetchall()
conn.commit()
cursor.close()
conn.close()
注:在fetch数据时按照顺序进行,可以使用cursor.scroll(num,mode)来移动游标位置,如:
cursor.scroll(1,mode=’relative’) # 相对当前位置移动
cursor.scroll(2,mode=’absolute’) # 相对绝对位置移动
4、fetch数据类型关于默认获取的数据是元祖类型,如果想要或者字典类型的数据,即:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
conn = pymysql.connect(host=’127.0.0.1′, port=3306, user=’root’, passwd=’123′, db=’t1′)
# 游标设置为字典类型
cursor = conn.cursor(cursor=pymysql.cursors.dictcursor)
r = cursor.execute(“call p1()”)
result = cursor.fetchone()
conn.commit()
cursor.close()
conn.close()
sqlalchemy
sqlalchemy是python编程语言下的一款orm框架,该框架建立在数据库api之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成sql,然后使用数据api执行sql并获取执行结果。
安装:
pip3 install sqlalchemy
sqlalchemy本身无法操作数据库,其必须以来pymsql等第三方插件,dialect用于和数据api进行交流,根据配置文件的不同调用不同的数据库api,从而实现对数据库的操作,如:
mysql-python
mysql+mysqldb://:@[:]/
pymysql
mysql+pymysql://:@/[?]
mysql-connector
mysql+mysqlconnector://:@[:]/
cx_oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value…]
一、内部处理
使用 engine/connectionpooling/dialect 进行数据库操作,engine使用connectionpooling连接数据库,然后再通过dialect执行sql语句。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine
engine = create_engine(“mysql+pymysql://root:123@127.0.0.1:3306/t1”, max_overflow=5)
# 执行sql
# cur = engine.execute(
# “insert into hosts (host, color_id) values (‘1.1.1.22’, 3)”
# )
# 新插入行自增id
# cur.lastrowid
# 执行sql
# cur = engine.execute(
# “insert into hosts (host, color_id) values(%s, %s)”,[(‘1.1.1.22’, 3),(‘1.1.1.221′, 3),]
# )
# 执行sql
# cur = engine.execute(
# “insert into hosts (host, color_id) values (%(host)s, %(color_id)s)”,
# host=’1.1.1.99′, color_select * from hosts’)
# 获取第一行数据
# cur.fetchone()
# 获取第n行数据
# cur.fetchmany(3)
# 获取所有数据
# cur.fetchall()
二、orm功能使用
使用 orm/schema type/sql expression language/engine/connectionpooling/dialect 所有组件对数据进行操作。根据类创建对象,对象转换成sql,执行sql。
1、创建表
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import column, integer, string, foreignkey, uniqueconstraint, index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
engine = create_engine(“mysql+pymysql://root:123@127.0.0.1:3306/t1”, max_overflow=5)
base = declarative_base()
# 创建单表
class users(base):
__tablename__ = ‘users’
id = column(integer, primary_key=true)
name = column(string(32))
extra = column(string(16))
__table_args__ = (
uniqueconstraint(‘id’, ‘name’, name=’uix_id_name’),
index(‘ix_id_name’, ‘name’, ‘extra’),
)
# 一对多
class favor(base):
__tablename__ = ‘favor’
nid = column(integer, primary_key=true)
caption = column(string(50), default=’red’, unique=true)
class person(base):
__tablename__ = ‘person’
nid = column(integer, primary_key=true)
name = column(string(32), index=true, nullable=true)
favor_id = column(integer, foreignkey(“favor.nid”))
# 多对多
class group(base):
__tablename__ = ‘group’
id = column(integer, primary_key=true)
name = column(string(64), unique=true, nullable=false)
port = column(integer, default=22)
class server(base):
__tablename__ = ‘server’
id = column(integer, primary_key=true, autoincrement=true)
hostname = column(string(64), unique=true, nullable=false)
class servertogroup(base):
__tablename__ = ‘servertogroup’
nid = column(integer, primary_key=true, autoincrement=true)
server_id = column(integer, foreignkey(‘server.id’))
group_id = column(integer, foreignkey(‘group.id’))
def init_db():
base.metadata.create_all(engine)
def drop_db():
base.metadata.drop_all(engine)
注:设置外检的另一种方式 foreignkeyconstraint([‘other_id’], [‘othertable.other_id’])
2、操作表
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import column, integer, string, foreignkey, uniqueconstraint, index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
engine = create_engine(“mysql+pymysql://root:123@127.0.0.1:3306/t1”, max_overflow=5)
base = declarative_base()
# 创建单表
class users(base):
__tablename__ = ‘users’
id = column(integer, primary_key=true)
name = column(string(32))
extra = column(string(16))
__table_args__ = (
uniqueconstraint(‘id’, ‘name’, name=’uix_id_name’),
index(‘ix_id_name’, ‘name’, ‘extra’),
)
def __repr__(self):
return “%s-%s” %(self.id, self.name)
# 一对多
class favor(base):
__tablename__ = ‘favor’
nid = column(integer, primary_key=true)
caption = column(string(50), default=’red’, unique=true)
def __repr__(self):
return “%s-%s” %(self.nid, self.caption)
class person(base):
__tablename__ = ‘person’
nid = column(integer, primary_key=true)
name = column(string(32), index=true, nullable=true)
favor_id = column(integer, foreignkey(“favor.nid”))
# 与生成表结构无关,仅用于查询方便
favor = relationship(“favor”, backref=’pers’)
# 多对多
class servertogroup(base):
__tablename__ = ‘servertogroup’
nid = column(integer, primary_key=true, autoincrement=true)
server_id = column(integer, foreignkey(‘server.id’))
group_id = column(integer, foreignkey(‘group.id’))
group = relationship(“group”, backref=’s2g’)
server = relationship(“server”, backref=’s2g’)
class group(base):
__tablename__ = ‘group’
id = column(integer, primary_key=true)
name = column(string(64), unique=true, nullable=false)
port = column(integer, default=22)
# group = relationship(‘group’,secondary=servertogroup,backref=’host_list’)
class server(base):
__tablename__ = ‘server’
id = column(integer, primary_key=true, autoincrement=true)
hostname = column(string(64), unique=true, nullable=false)
def init_db():
base.metadata.create_all(engine)
def drop_db():
base.metadata.drop_all(engine)
session = sessionmaker(bind=engine)
session = session()obj = users(name=”alex0″, extra=’sb’)
session.add(obj)
session.add_all([
users(name=”alex1″, extra=’sb’),
users(name=”alex2″, extra=’sb’),
])
session.commit() session.query(users).filter(users.id > 2).delete()
session.commit()session.query(users).filter(users.id > 2).update({“name” : “099”})
session.query(users).filter(users.id > 2).update({users.name: users.name + “099”}, synchronize_session=false)
session.query(users).filter(users.id > 2).update({“num”: users.num + 1}, synchronize_session=”evaluate”)
session.commit() ret = session.query(users).all()
ret = session.query(users.name, users.extra).all()
ret = session.query(users).filter_by(name=’alex’).all()
ret = session.query(users).filter_by(name=’alex’).first()ret = session.query(users).filter_by(name=’alex’).all()
ret = session.query(users).filter(users.id > 1, users.name == ‘eric’).all()
ret = session.query(users).filter(users.id.between(1, 3), users.name == ‘eric’).all()
ret = session.query(users).filter(users.id.in_([1,3,4])).all()
ret = session.query(users).filter(~users.id.in_([1,3,4])).all()
ret = session.query(users).filter(users.id.in_(session.query(users.id).filter_by(name=’eric’))).all()
from sqlalchemy import and_, or_
ret = session.query(users).filter(and_(users.id > 3, users.name == ‘eric’)).all()
ret = session.query(users).filter(or_(users.id < 2, users.name == 'eric')).all()
ret = session.query(users).filter(
or_(
users.id < 2,
and_(users.name == 'eric', users.id > 3),
users.extra != “”
)).all()
# 通配符
ret = session.query(users).filter(users.name.like(‘e%’)).all()
ret = session.query(users).filter(~users.name.like(‘e%’)).all()
# 限制
ret = session.query(users)[1:2]
# 排序
ret = session.query(users).order_by(users.name.desc()).all()
ret = session.query(users).order_by(users.name.desc(), users.id.asc()).all()
# 分组
from sqlalchemy.sql import func
ret = session.query(users).group_by(users.extra).all()
ret = session.query(
func.max(users.id),
func.sum(users.id),
func.min(users.id)).group_by(users.name).all()
ret = session.query(
func.max(users.id),
func.sum(users.id),
func.min(users.id)).group_by(users.name).having(func.min(users.id) >2).all()
# 连表
ret = session.query(users, favor).filter(users.id == favor.nid).all()
ret = session.query(person).join(favor).all()
ret = session.query(person).join(favor, isouter=true).all()
# 组合
q1 = session.query(users.name).filter(users.id > 2)
q2 = session.query(favor.caption).filter(favor.nid < 2)
ret = q1.union(q2).all()
q1 = session.query(users.name).filter(users.id > 2)
q2 = session.query(favor.caption).filter(favor.nid < 2)
ret = q1.union_all(q2).all()