Python处理数据文件的途径有很多种,可以操作的文件类型主要包括文本文件(csv、txt、json等)、excel文件、数据库文件、api等其他数据文件。其中,Python连接各种数据库,包括关系数据库:sqlite,mysql,mssql
非关系数据库:MongoDB,Redis。
下面整理下python有哪些方式可以读写数据文件。
1. read、readline、readlines
- read() :一次性读取整个文件内容。推荐使用read(size)方法,size越大运行时间越长
- readline() :每次读取一行内容。内存不够时使用,一般不太用
- readlines() :一次性读取整个文件内容,并按行返回到list,方便我们遍历
2. 内置模块csv
python内置了csv模块用于读写csv文件,csv是一种逗号分隔符文件,是数据科学中最常见的数据存储格式之一。csv模块能轻松完成各种体量数据的读写操作,当然大数据量需要代码层面的优化。
- csv模块读取文件
# 读取csv文件
import csv
with open('test.csv','r') as myFile:
lines=csv.reader(myFile)
for line in lines:
print (line)
- csv模块写入文件
import csv
with open('test.csv','w+') as myFile:
myWriter=csv.writer(myFile)
# writerrow一行一行写入
myWriter.writerow([7,8,9])
myWriter.writerow([8,'h','f'])
# writerow多行写入
myList=[[1,2,3],[4,5,6]]
myWriter.writerows(myList)
3. numpy库
- loadtxt方法
loadtxt用来读取文本文件(包含txt、csv等)以及.gz 或.bz2格式压缩文件,前提是文件数据每一行必须要有数量相同的值。
import numpy as np
# loadtxt()中的dtype参数默认设置为float
# 这里设置为str字符串便于显示
np.loadtxt('test.csv',dtype=str)
# out:array(['1,2,3', '4,5,6', '7,8,9'], dtype='<U5')
- load方法
load用来读取numpy专用的.npy
, .npz
或者pickled
持久化文件。
import numpy as np
# 先生成npy文件
np.save('test.npy', np.array([[1, 2, 3], [4, 5, 6]]))
# 使用load加载npy文件
np.load('test.npy')
'''
out:array([[1, 2, 3],
[4, 5, 6]])
'''
- fromfile方法
fromfile方法可以读取简单的文本数据或二进制数据,数据来源于tofile方法保存的二进制数据。读取数据时需要用户指定元素类型,并对数组的形状进行适当的修改。
import numpy as np
x = np.arange(9).reshape(3,3)
x.tofile('test.bin')
np.fromfile('test.bin',dtype=np.int)
# out:array([0, 1, 2, 3, 4, 5, 6, 7, 8])
4. pandas库
pandas是数据处理最常用的分析库之一,可以读取各种各样格式的数据文件,一般输出dataframe格式。如:txt、csv、excel、json、剪切板、数据库、html、hdf、parquet、pickled文件、sas、stata等等
- read_csv方法read_csv方法用来读取csv格式文件,输出dataframe格式。
import pandas as pd
pd.read_csv('test.csv')
- read_excel方法
读取excel文件,包括xlsx、xls、xlsm格式
import pandas as pd
pd.read_excel('test.xlsx')
- read_table方法
通过对sep参数(分隔符)的控制来对任何文本文件读取
- read_json方法
读取json格式文件
df = pd.DataFrame([['a', 'b'], ['c', 'd']],index=['row 1', 'row 2'],columns=['col 1', 'col 2'])
j = df.to_json(orient='split')
pd.read_json(j,orient='split')
- read_html方法
读取html表格
- read_clipboard方法
读取剪切板内容
- read_pickle方法
读取plckled持久化文件
- read_sql方法
读取数据库数据,连接好数据库后,传入sql语句即可
- read_dhf方法
读取hdf5文件,适合大文件读取
- read_parquet方法
读取parquet文件
- read_sas方法
读取sas文件
- read_stata方法
读取stata文件
- read_gbq方法
读取google bigquery数据
pandas学习网站:https://pandas.pydata.org/
5.读写excel文件
python用于读写excel文件的库有很多,除了前面提到的pandas,还有xlrd、xlwt、openpyxl、xlwings等等。
主要模块:
- xlrd库
从excel中读取数据,支持xls、xlsx
- xlwt库
对excel进行修改操作,不支持对xlsx格式的修改
- xlutils库
在xlw和xlrd中,对一个已存在的文件进行修改
- openpyxl
主要针对xlsx格式的excel进行读取和编辑
- xlwings
对xlsx、xls、xlsm格式文件进行读写、格式修改等操作
- xlsxwriter
用来生成excel表格,插入数据、插入图标等表格操作,不支持读取
- Microsoft Excel API
需安装pywin32,直接与Excel进程通信,可以做任何在Excel里可以做的事情,但比较慢
6. 操作数据库
python几乎支持对所有数据库的交互,连接数据库后,可以使用sql语句进行增删改查。
主要模块:
- pymysql
用于和mysql数据库的交互
- sqlalchemy
用于和mysql数据库的交互
- cx_Oracle
用于和oracle数据库的交互
- pymssql
用于和sql server数据库的交互
- pymongo
用于和mongodb非关系型数据库的交互
- redis、pyredis
用于和redis非关系型数据库的交互
python连接MySQL数据库
使用MySQLdb,不支持Python3.x
pymysql对Python2.x和Python3.x的支持都比较好
1、使用pymysql
# coding=utf-8
# https://github.com/PyMySQL/PyMySQL/
import pymysql
from contextlib import closing
import traceback
try:
# 获取一个数据库连接,with关键字 表示退出时,conn自动关闭
# with 嵌套上一层的with 要使用closing()
with closing(pymysql.connect(host='localhost', user='root', passwd='root', db='test', port=3306,
charset='utf8')) as conn:
print("connect database successfully")
# 获取游标,with关键字 表示退出时,cur自动关闭
with conn.cursor() as cur:
# 删除表
cur.execute("DROP TABLE IF EXISTS COMPANY")
# 创建表
sql = """
CREATE TABLE IF NOT EXISTS COMPANY
(ID INTEGER PRIMARY KEY NOT NULL auto_increment,
NAME TEXT NOT NULL,
AGE INT NOT NULL,
ADDRESS CHAR(50),
SALARY REAL);
"""
cur.execute(sql)
print("create table successfully")
# 添加数据
# 在一个conn.execute里面里面执行多个sql语句是非法的
cur.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES ( %s, %s, %s, %s )",
[('Paul', 32, 'California', 20000.00),
('Allen', 25, 'Texas', 15000.00),
('Teddy', 23, 'Norway', 20000.00),
('Mark', 25, 'Rich-Mond ', 65000.00),
('David', 27, 'Texas', 85000.00),
('Kim', 22, 'South-Hall', 45000.00),
('James', 24, 'Houston', 10000.00)])
# 提交,否则重新运行程序时,表中无数据
conn.commit()
print("insert successfully")
# 查询表
sql = """
select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY
"""
cur.execute(sql)
for row in cur.fetchall():
print("-" * 50) # 输出50个-,作为分界线
print("%-10s %s" % ("id", row[0])) # 字段名固定10位宽度,并且左对齐
print("%-10s %s" % ("name", row[1]))
print("%-10s %s" % ("age", row[2]))
print("%-10s %s" % ("address", row[3]))
print("%-10s %s" % ("salary", row[4]))
except pymysql.Error as e:
print("Mysql Error:", e)
traceback.print_exc()
2、使用MySQLdb
#! /usr/bin/env python2.7
# coding=utf-8
# mysqldb 只支持python2.7
# http://mysql-python.sourceforge.net/
import MySQLdb
from contextlib import closing
import traceback
try:
# 获取一个数据库连接
with closing(MySQLdb.connect(host='localhost', user='root', passwd='root', db='test', port=3306,charset='utf8')) as conn:
print("connect database successfully")
with closing(conn.cursor()) as cur:
# 删除表
cur.execute("DROP TABLE IF EXISTS COMPANY")
# 创建表
sql = """
CREATE TABLE IF NOT EXISTS COMPANY
(ID INTEGER PRIMARY KEY NOT NULL auto_increment,
NAME TEXT NOT NULL,
AGE INT NOT NULL,
ADDRESS CHAR(50),
SALARY REAL);
"""
cur.execute(sql)
print("create table successfully")
# 添加数据
# 在一个conn.execute里面里面执行多个sql语句是非法的
cur.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES ( %s, %s, %s, %s )",
[('Paul', 32, 'California', 20000.00),
('Allen', 25, 'Texas', 15000.00),
('Teddy', 23, 'Norway', 20000.00),
('Mark', 25, 'Rich-Mond ', 65000.00),
('David', 27, 'Texas', 85000.00),
('Kim', 22, 'South-Hall', 45000.00),
('James', 24, 'Houston', 10000.00)])
# 提交,否则重新运行程序时,表中无数据
conn.commit()
print("insert successfully")
# 查询表
sql = """
select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY
"""
cur.execute(sql)
for row in cur.fetchall():
print("-" * 50) # 输出50个-,作为分界线
print("%-10s %s" % ("id", row[0])) # 字段名固定10位宽度,并且左对齐
print("%-10s %s" % ("name", row[1]))
print("%-10s %s" % ("age", row[2]))
print("%-10s %s" % ("address", row[3]))
print("%-10s %s" % ("salary", row[4]))
except MySQLdb.Error as e:
print("Mysql Error:", e)
traceback.print_exc() # 打印错误栈信息
3、使用MySQLdb库中的_mysql
#! /usr/bin/env python2.7
# coding=utf-8
# mysqldb 只支持python2.7
# http://mysql-python.sourceforge.net/
import MySQLdb
from contextlib import closing
import traceback
try:
# 获取一个数据库连接
with closing(MySQLdb.connect(host='localhost', user='root', passwd='root', db='test', port=3306,charset='utf8')) as conn:
print("connect database successfully")
with closing(conn.cursor()) as cur:
# 删除表
cur.execute("DROP TABLE IF EXISTS COMPANY")
# 创建表
sql = """
CREATE TABLE IF NOT EXISTS COMPANY
(ID INTEGER PRIMARY KEY NOT NULL auto_increment,
NAME TEXT NOT NULL,
AGE INT NOT NULL,
ADDRESS CHAR(50),
SALARY REAL);
"""
cur.execute(sql)
print("create table successfully")
# 添加数据
# 在一个conn.execute里面里面执行多个sql语句是非法的
cur.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES ( %s, %s, %s, %s )",
[('Paul', 32, 'California', 20000.00),
('Allen', 25, 'Texas', 15000.00),
('Teddy', 23, 'Norway', 20000.00),
('Mark', 25, 'Rich-Mond ', 65000.00),
('David', 27, 'Texas', 85000.00),
('Kim', 22, 'South-Hall', 45000.00),
('James', 24, 'Houston', 10000.00)])
# 提交,否则重新运行程序时,表中无数据
conn.commit()
print("insert successfully")
# 查询表
sql = """
select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY
"""
cur.execute(sql)
for row in cur.fetchall():
print("-" * 50) # 输出50个-,作为分界线
print("%-10s %s" % ("id", row[0])) # 字段名固定10位宽度,并且左对齐
print("%-10s %s" % ("name", row[1]))
print("%-10s %s" % ("age", row[2]))
print("%-10s %s" % ("address", row[3]))
print("%-10s %s" % ("salary", row[4]))
except MySQLdb.Error as e:
print("Mysql Error:", e)
traceback.print_exc() # 打印错误栈信息
python连接MongoDB数据库
# https://docs.mongodb.com/ecosystem/drivers/python/
# https://pypi.python.org/pypi/pymongo/
import pymongo
from pymongo.mongo_client import MongoClient
import pymongo.errors
import traceback
try:
# 连接到 mongodb 服务
mongoClient = MongoClient('localhost', 27017)
# 连接到数据库
mongoDatabase = mongoClient.test
print("connect database successfully")
# 获取集合
mongoCollection = mongoDatabase.COMPANY
# 移除所有数据
mongoCollection.remove()
# 添加数据
mongoCollection.insert_many([{"Name": "Paul", "Age": "32", "Address": "California", "Salary": "20000.00"},
{"Name": "Allen", "Age": "25", "Address": "Texas", "Salary": "15000.00"},
{"Name": "Teddy", "Age": "23", "Address": "Norway", "Salary": "20000.00"},
{"Name": "Mark", "Age": "25", "Address": "Rich-Mond", "Salary": "65000.00"},
{"Name": "David", "Age": "27", "Address": "Texas", "Salary": "85000.00"},
{"Name": "Kim", "Age": "22", "Address": "South-Hall", "Salary": "45000.00"},
{"Name": "James", "Age": "24", "Address": "Houston", "Salary": "10000.00"}, ])
#获取集合中的值
for row in mongoCollection.find():
print("-" * 50) # 输出50个-,作为分界线
print("%-10s %s" % ("_id", row['_id'])) # 字段名固定10位宽度,并且左对齐
print("%-10s %s" % ("name", row['Name']))
print("%-10s %s" % ("age", row['Age']))
print("%-10s %s" % ("address", row['Address']))
print("%-10s %s" % ("salary", row['Salary']))
print('\n\n\n')
# 使id自增
mongoCollection.remove()
# 创建计数表
mongoDatabase.counters.save({"_id": "people_id", "sequence_value": 0})
# 创建存储过程
mongoDatabase.system_js.getSequenceValue = '''function getSequenceValue(sequenceName){
var sequenceDocument = db.counters.findAndModify({
query: {_id: sequenceName},
update: {$inc:{sequence_value: 1}},
new:true
});
return sequenceDocument.sequence_value;
}'''
mongoCollection.insert_many(
[{"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Paul", "Age": "32",
"Address": "California", "Salary": "20000.00"},
{"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Allen", "Age": "25",
"Address": "Texas", "Salary": "15000.00"},
{"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Teddy", "Age": "23",
"Address": "Norway", "Salary": "20000.00"},
{"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Mark", "Age": "25",
"Address": "Rich-Mond", "Salary": "65000.00"},
{"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "David", "Age": "27",
"Address": "Texas", "Salary": "85000.00"},
{"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Kim", "Age": "22",
"Address": "South-Hall", "Salary": "45000.00"},
{"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "James", "Age": "24",
"Address": "Houston", "Salary": "10000.00"}, ])
for row in mongoCollection.find():
print("-" * 50) # 输出50个-,作为分界线
print("%-10s %s" % ("_id", int(row['_id']))) # 字段名固定10位宽度,并且左对齐
print("%-10s %s" % ("name", row['Name']))
print("%-10s %s" % ("age", row['Age']))
print("%-10s %s" % ("address", row['Address']))
print("%-10s %s" % ("salary", row['Salary']))
except pymongo.errors.PyMongoError as e:
print("mongo Error:", e)
traceback.print_exc()
python连接Redis数据库
1、使用redis
# coding=utf-8
# https://pypi.python.org/pypi/redis/2.10.5
# http://redis-py.readthedocs.io/en/latest/#
import redis
r = redis.Redis(host='localhost', port=6379, db=0, password="12345")
print("connect", r.ping())
# 看信息
info = r.info()
# or 查看部分信息
# info = r.info("Server")
# 输出信息
items = info.items()
for i, (key, value) in enumerate(items):
print("item %s----%s:%s" % (i, key, value))
# 删除键和对应的值
r.delete("company")
# 可以一次性push一条或多条数据
r.rpush("company", {"id": 1, "Name": "Paul", "Age": "32", "Address": "California", "Salary": "20000.00"},
{"id": 2, "Name": "Allen", "Age": "25", "Address": "Texas", "Salary": "15000.00"},
{"id": 3, "Name": "Teddy", "Age": "23", "Address": "Norway", "Salary": "20000.00"})
r.rpush("company", {"id": 4, "Name": "Mark", "Age": "25", "Address": "Rich-Mond", "Salary": "65000.00"})
r.rpush("company", {"id": 5, "Name": "David", "Age": "27", "Address": "Texas", "Salary": "85000.00"})
r.rpush("company", {"id": 6, "Name": "Kim", "Age": "22", "Address": "South-Hall", "Salary": "45000.00"})
r.rpush("company", {"id": 7, "Name": "James", "Age": "24", "Address": "Houston", "Salary": "10000.00"})
# eval用来将dict格式的字符串转换成dict
for row in map(lambda x: eval(x), r.lrange("company", 0, r.llen("company"))):
print("-" * 50) # 输出50个-,作为分界线
print("%-10s %s" % ("_id", row['id'])) # 字段名固定10位宽度,并且左对齐
print("%-10s %s" % ("name", row['Name']))
print("%-10s %s" % ("age", row['Age']))
print("%-10s %s" % ("address", row['Address']))
print("%-10s %s" % ("salary", row['Salary']))
# 关闭当前连接
# r.shutdown() #这个是关闭redis服务端
2、使用pyredis
# http://pyredis.readthedocs.io/en/latest/
import pyredis
r = pyredis.Client(host='localhost', port=6379, database=0, password="12345")
print("connect", r.ping().decode("utf-8"))
# 看信息
# info = r.execute("info").decode()
# or 查看部分信息
info = r.execute("info", "Server").decode()
# 输出信息
print(info)
# 删除键和对应的值
r.delete("company")
# 可以一次性push一条或多条数据
r.rpush("company", '''{"id": 1, "Name": "Paul", "Age": "32", "Address": "California", "Salary": "20000.00"}''',
'''{"id": 2, "Name": "Allen", "Age": "25", "Address": "Texas", "Salary": "15000.00"}''',
'''{"id": 3, "Name": "Teddy", "Age": "23", "Address": "Norway", "Salary": "20000.00"}''')
r.rpush("company", '''{"id": 4, "Name": "Mark", "Age": "25", "Address": "Rich-Mond", "Salary": "65000.00"}''')
r.rpush("company", '''{"id": 5, "Name": "David", "Age": "27", "Address": "Texas", "Salary": "85000.00"}''')
r.rpush("company", '''{"id": 6, "Name": "Kim", "Age": "22", "Address": "South-Hall", "Salary": "45000.00"}''')
r.rpush("company", '''{"id": 7, "Name": "James", "Age": "24", "Address": "Houston", "Salary": "10000.00"}''')
# eval用来将dict格式的字符串转换成dict
for row in map(lambda x: eval(x), r.lrange("company", 0, r.llen("company"))):
print("-" * 50) # 输出50个-,作为分界线
print("%-10s %s" % ("_id", row['id'])) # 字段名固定10位宽度,并且左对齐
print("%-10s %s" % ("name", row['Name']))
print("%-10s %s" % ("age", row['Age']))
print("%-10s %s" % ("address", row['Address']))
print("%-10s %s" % ("salary", row['Salary']))
# 关闭当前连接
r.close()
python连接sqlite数据库
sqlite3——内置库,用于和sqlite数据库的交互
# coding=utf-8
# http://www.runoob.com/sqlite/sqlite-python.html
import sqlite3
import traceback
try:
# 如果表不存在,就创建
with sqlite3.connect('test.db') as conn:
print("Opened database successfully")
# 删除表
conn.execute("DROP TABLE IF EXISTS COMPANY")
# 创建表
sql = """
CREATE TABLE IF NOT EXISTS COMPANY
(ID INTEGER PRIMARY KEY AUTOINCREMENT,
NAME TEXT NOT NULL,
AGE INT NOT NULL,
ADDRESS CHAR(50),
SALARY REAL);
"""
conn.execute(sql)
print("create table successfully")
# 添加数据
conn.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES (?, ?, ?, ? )",
[('Paul', 32, 'California', 20000.00),
('Allen', 25, 'Texas', 15000.00),
('Teddy', 23, 'Norway', 20000.00),
('Mark', 25, 'Rich-Mond ', 65000.00),
('David', 27, 'Texas', 85000.00),
('Kim', 22, 'South-Hall', 45000.00),
('James', 24, 'Houston', 10000.00)])
# conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)\
# VALUES ( 'Paul', 32, 'California', 20000.00 )")
#
# conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)\
# VALUES ('Allen', 25, 'Texas', 15000.00 )")
#
# conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)\
# VALUES ('Teddy', 23, 'Norway', 20000.00 )")
#
# conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)\
# VALUES ( 'Mark', 25, 'Rich-Mond ', 65000.00 )")
#
# conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)\
# VALUES ( 'David', 27, 'Texas', 85000.00 )");
#
# conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)\
# VALUES ( 'Kim', 22, 'South-Hall', 45000.00 )")
#
# conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)\
# VALUES ( 'James', 24, 'Houston', 10000.00 )")
# 提交,否则重新运行程序时,表中无数据
conn.commit()
print("insert successfully")
# 查询表
sql = """
select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY
"""
result = conn.execute(sql)
for row in result:
print("-" * 50) # 输出50个-,作为分界线
print("%-10s %s" % ("id", row[0])) # 字段名固定10位宽度,并且左对齐
print("%-10s %s" % ("name", row[1]))
print("%-10s %s" % ("age", row[2]))
print("%-10s %s" % ("address", row[3]))
print("%-10s %.2f" % ("salary", row[4]))
# or
# print('{:10s} {:.2f}'.format("salary", row[4]))
except sqlite3.Error as e:
print("sqlite3 Error:", e)
traceback.print_exc()