在我们编程时,有一些代码是固定的,例如socket连接的代码,读取文件内容的代码,一般情况下我都是到网上搜一下然后直接粘贴下来改一改,当然如果你能自己记住所有的代码那更厉害,但是自己写毕竟不如粘贴来的快,而且自己写的代码还要测试,而一段经过测试的代码则可以多次使用,所以这里我就自己总结了一下python中常用的编程模板,如果还有哪些漏掉了请大家及时补充哈。
一、读写文件
1、读文件
(1)、一次性读取全部内容
filepath=’d:/data.txt’ #文件路径
with open(filepath, ‘r’) as f:
print f.read()
(2)读取固定字节大小
# -*- coding: utf-8 -*-
filepath=’d:/data.txt’ #文件路径
f = open(filepath, ‘r’)
content=””
try:
while true:
chunk = f.read(8)
if not chunk:
break
content+=chunk
finally:
f.close()
print content
(3)每次读取一行
# -*- coding: utf-8 -*-
filepath=’d:/data.txt’ #文件路径
f = open(filepath, “r”)
content=””
try:
while true:
line = f.readline()
if not line:
break
content+=line
finally:
f.close()
print content
(4)一次读取所有的行
# -*- coding: utf-8 -*-
filepath=’d:/data.txt’ #文件路径
with open(filepath, “r”) as f:
txt_list = f.readlines()
for i in txt_list:
print i,
2、写文件
# -*- coding: utf-8 -*-
filepath=’d:/data1.txt’ #文件路径
with open(filepath, “w”) as f: #w会覆盖原来的文件,a会在文件末尾追加
f.write(‘1234’)
二、连接mysql数据库
1、连接
#!/usr/bin/python
# -*- coding: utf-8 -*-
import mysqldb
db_url=’localhost’
user_name=’root’
passwd=’1234′
db_name=’test’
# 打开数据库连接
db = mysqldb.connect(db_url,user_name,passwd,db_name)
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# 使用execute方法执行sql语句
cursor.execute(“select version()”)
# 使用 fetchone() 方法获取一条数据库。
data = cursor.fetchone()
print “database version : %s ” % data
# 关闭数据库连接
db.close()
2、创建表
#!/usr/bin/python
# -*- coding: utf-8 -*-
import mysqldb
# 打开数据库连接
db = mysqldb.connect(“localhost”,”testuser”,”test123″,”testdb” )
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# 如果数据表已经存在使用 execute() 方法删除表。
cursor.execute(“drop table if exists employee”)
# 创建数据表sql语句
sql = “””create table employee (
first_name char(20) not null,
last_name char(20),
age int,
sex char(1),
income float )”””
cursor.execute(sql)
# 关闭数据库连接
db.close()
3、插入
#!/usr/bin/python
# -*- coding: utf-8 -*-
import mysqldb
# 打开数据库连接
db = mysqldb.connect(“localhost”,”testuser”,”test123″,”testdb” )
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# sql 插入语句
sql = “””insert into employee(first_name,
last_name, age, sex, income)
values (‘mac’, ‘mohan’, 20, ‘m’, 2000)”””
try:
# 执行sql语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
except:
# rollback in case there is any error
db.rollback()
# 关闭数据库连接
db.close()
4、查询
#!/usr/bin/python
# -*- coding: utf-8 -*-
import mysqldb
# 打开数据库连接
db = mysqldb.connect(“localhost”,”testuser”,”test123″,”testdb” )
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# sql 查询语句
sql = “select * from employee \
where income > ‘%d'” % (1000)
try:
# 执行sql语句
cursor.execute(sql)
# 获取所有记录列表
results = cursor.fetchall()
for row in results:
fname = row[0]
lname = row[1]
age = row[2]
sex = row[3]
income = row[4]
# 打印结果
print “fname=%s,lname=%s,age=%d,sex=%s,income=%d” % \
(fname, lname, age, sex, income )
except:
print “error: unable to fecth data”
# 关闭数据库连接
db.close()
5、更新
#!/usr/bin/python
# -*- coding: utf-8 -*-
import mysqldb
# 打开数据库连接
db = mysqldb.connect(“localhost”,”testuser”,”test123″,”testdb” )
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# sql 更新语句
sql = “update employee set age = age + 1
where sex = ‘%c'” % (‘m’)
try:
# 执行sql语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
except:
# 发生错误时回滚
db.rollback()
# 关闭数据库连接
db.close()
三、socket
1、服务器
from socket import *
from time import ctime
host = ”
port = 21568
bufsiz = 1024
addr = (host, port)
tcpsersock = socket(af_inet, sock_stream)
tcpsersock.bind(addr)
tcpsersock.listen(5)
while true:
print ‘waiting for connection…’
tcpclisock, addr = tcpsersock.accept()
print ‘…connected from:’, addr
while true:
try:
data = tcpclisock.recv(bufsiz)
print ”)
if data == ‘close’:
break
if not data:
continue
tcpclisock.send(data)
data = tcpclisock.recv(bufsiz)
print data
except:
tcpclisock.close()
四、多线程
import time, threading
# 新线程执行的代码:
def loop():
print ‘thread %s is running…’ % threading.current_thread().name
n = 0
while n < 5:
n = n + 1
print 'thread %s >>> %s’ % (threading.current_thread().name, n)
time.sleep(1)
print ‘thread %s ended.’ % threading.current_thread().name
print ‘thread %s is running…’ % threading.current_thread().name
t = threading.thread(target=loop, name=’loopthread’)
t.start()
t.join()
print ‘thread %s ended.’ % threading.current_thread().name
还请大家积极补充!