Python读写数据文件


Python处理数据文件的途径有很多种,可以操作的文件类型主要包括文本文件(csv、txt、json等)、excel文件、数据库文件、api等其他数据文件。其中,Python连接各种数据库,包括关系数据库:sqlite,mysql,mssql
非关系数据库:MongoDB,Redis

下面整理下python有哪些方式可以读写数据文件。

1. read、readline、readlines

  • read() :一次性读取整个文件内容。推荐使用read(size)方法,size越大运行时间越长
  • readline() :每次读取一行内容。内存不够时使用,一般不太用
  • readlines() :一次性读取整个文件内容,并按行返回到list,方便我们遍历

2. 内置模块csv

python内置了csv模块用于读写csv文件,csv是一种逗号分隔符文件,是数据科学中最常见的数据存储格式之一。csv模块能轻松完成各种体量数据的读写操作,当然大数据量需要代码层面的优化。

  • csv模块读取文件
# 读取csv文件
import csv
with open('test.csv','r') as myFile:
    lines=csv.reader(myFile)
    for line in lines:
        print (line)
  • csv模块写入文件
import csv
with open('test.csv','w+') as myFile:
    myWriter=csv.writer(myFile)
    # writerrow一行一行写入
    myWriter.writerow([7,8,9])
    myWriter.writerow([8,'h','f'])
    # writerow多行写入
    myList=[[1,2,3],[4,5,6]]
    myWriter.writerows(myList)

3. numpy库

  • loadtxt方法

loadtxt用来读取文本文件(包含txt、csv等)以及.gz 或.bz2格式压缩文件,前提是文件数据每一行必须要有数量相同的值。

import numpy as np
# loadtxt()中的dtype参数默认设置为float
# 这里设置为str字符串便于显示
np.loadtxt('test.csv',dtype=str)
# out:array(['1,2,3', '4,5,6', '7,8,9'], dtype='<U5')
  • load方法

load用来读取numpy专用的.npy, .npz 或者pickled持久化文件。

import numpy as np
# 先生成npy文件
np.save('test.npy', np.array([[1, 2, 3], [4, 5, 6]]))
# 使用load加载npy文件
np.load('test.npy')
'''
out:array([[1, 2, 3],
       [4, 5, 6]])
'''
  • fromfile方法

fromfile方法可以读取简单的文本数据或二进制数据,数据来源于tofile方法保存的二进制数据。读取数据时需要用户指定元素类型,并对数组的形状进行适当的修改。

import numpy as np
x = np.arange(9).reshape(3,3)
x.tofile('test.bin')
np.fromfile('test.bin',dtype=np.int)
# out:array([0, 1, 2, 3, 4, 5, 6, 7, 8])

4. pandas库

pandas是数据处理最常用的分析库之一,可以读取各种各样格式的数据文件,一般输出dataframe格式。如:txt、csv、excel、json、剪切板、数据库、html、hdf、parquet、pickled文件、sas、stata等等

  • read_csv方法read_csv方法用来读取csv格式文件,输出dataframe格式。
import pandas as pd
pd.read_csv('test.csv')
  • read_excel方法

读取excel文件,包括xlsx、xls、xlsm格式

import pandas as pd
pd.read_excel('test.xlsx')
  • read_table方法

通过对sep参数(分隔符)的控制来对任何文本文件读取

  • read_json方法

读取json格式文件

df = pd.DataFrame([['a', 'b'], ['c', 'd']],index=['row 1', 'row 2'],columns=['col 1', 'col 2'])
j = df.to_json(orient='split')
pd.read_json(j,orient='split')
  • read_html方法

读取html表格

  • read_clipboard方法

读取剪切板内容

  • read_pickle方法

读取plckled持久化文件

  • read_sql方法

读取数据库数据,连接好数据库后,传入sql语句即可

  • read_dhf方法

读取hdf5文件,适合大文件读取

  • read_parquet方法

读取parquet文件

  • read_sas方法

读取sas文件

  • read_stata方法

读取stata文件

  • read_gbq方法

读取google bigquery数据

pandas学习网站:https://pandas.pydata.org/

5.读写excel文件

python用于读写excel文件的库有很多,除了前面提到的pandas,还有xlrd、xlwt、openpyxl、xlwings等等。

主要模块:

  • xlrd库

从excel中读取数据,支持xls、xlsx

  • xlwt库

对excel进行修改操作,不支持对xlsx格式的修改

  • xlutils库

在xlw和xlrd中,对一个已存在的文件进行修改

  • openpyxl

主要针对xlsx格式的excel进行读取和编辑

  • xlwings

对xlsx、xls、xlsm格式文件进行读写、格式修改等操作

  • xlsxwriter

用来生成excel表格,插入数据、插入图标等表格操作,不支持读取

  • Microsoft Excel API

需安装pywin32,直接与Excel进程通信,可以做任何在Excel里可以做的事情,但比较慢

6. 操作数据库

python几乎支持对所有数据库的交互,连接数据库后,可以使用sql语句进行增删改查。

主要模块:

  • pymysql

用于和mysql数据库的交互

  • sqlalchemy

用于和mysql数据库的交互

  • cx_Oracle

用于和oracle数据库的交互

  • pymssql

用于和sql server数据库的交互

  • pymongo

用于和mongodb非关系型数据库的交互

  • redis、pyredis

用于和redis非关系型数据库的交互

python连接MySQL数据库

使用MySQLdb,不支持Python3.x
pymysql对Python2.x和Python3.x的支持都比较好

1、使用pymysql

# coding=utf-8

# https://github.com/PyMySQL/PyMySQL/
import pymysql
from contextlib import closing
import traceback

try:
    # 获取一个数据库连接,with关键字 表示退出时,conn自动关闭
    # with 嵌套上一层的with 要使用closing()
    with closing(pymysql.connect(host='localhost', user='root', passwd='root', db='test', port=3306,
                                 charset='utf8')) as conn:

        print("connect database successfully")

        # 获取游标,with关键字 表示退出时,cur自动关闭
        with conn.cursor() as cur:
            # 删除表
            cur.execute("DROP TABLE IF EXISTS  COMPANY")
            # 创建表
            sql = """
                     CREATE TABLE IF NOT EXISTS COMPANY
                   (ID INTEGER  PRIMARY KEY NOT NULL  auto_increment,
                   NAME           TEXT    NOT NULL,
                   AGE            INT     NOT NULL,
                   ADDRESS        CHAR(50),
                   SALARY         REAL);
            """
            cur.execute(sql)

            print("create table successfully")

            # 添加数据
            # 在一个conn.execute里面里面执行多个sql语句是非法的
            cur.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES ( %s, %s, %s, %s )",
                            [('Paul', 32, 'California', 20000.00),
                             ('Allen', 25, 'Texas', 15000.00),
                             ('Teddy', 23, 'Norway', 20000.00),
                             ('Mark', 25, 'Rich-Mond ', 65000.00),
                             ('David', 27, 'Texas', 85000.00),
                             ('Kim', 22, 'South-Hall', 45000.00),
                             ('James', 24, 'Houston', 10000.00)])

            # 提交,否则重新运行程序时,表中无数据
            conn.commit()
            print("insert successfully")

            # 查询表
            sql = """
                select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY
             """

            cur.execute(sql)

            for row in cur.fetchall():
                print("-" * 50)  # 输出50个-,作为分界线
                print("%-10s %s" % ("id", row[0]))  # 字段名固定10位宽度,并且左对齐
                print("%-10s %s" % ("name", row[1]))
                print("%-10s %s" % ("age", row[2]))
                print("%-10s %s" % ("address", row[3]))
                print("%-10s %s" % ("salary", row[4]))
except pymysql.Error as e:
    print("Mysql Error:", e)
    traceback.print_exc()

2、使用MySQLdb

#! /usr/bin/env python2.7
# coding=utf-8

# mysqldb 只支持python2.7
# http://mysql-python.sourceforge.net/

import MySQLdb
from contextlib import closing
import traceback

try:
    # 获取一个数据库连接
    with closing(MySQLdb.connect(host='localhost', user='root', passwd='root', db='test', port=3306,charset='utf8')) as conn:
        print("connect database successfully")
        with closing(conn.cursor()) as cur:
            # 删除表
            cur.execute("DROP TABLE IF EXISTS  COMPANY")
            # 创建表
            sql = """
                     CREATE TABLE IF NOT EXISTS COMPANY
                   (ID INTEGER  PRIMARY KEY NOT NULL  auto_increment,
                   NAME           TEXT    NOT NULL,
                   AGE            INT     NOT NULL,
                   ADDRESS        CHAR(50),
                   SALARY         REAL);
            """
            cur.execute(sql)

            print("create table successfully")

            # 添加数据
            # 在一个conn.execute里面里面执行多个sql语句是非法的
            cur.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES ( %s, %s, %s, %s )",
                            [('Paul', 32, 'California', 20000.00),
                             ('Allen', 25, 'Texas', 15000.00),
                             ('Teddy', 23, 'Norway', 20000.00),
                             ('Mark', 25, 'Rich-Mond ', 65000.00),
                             ('David', 27, 'Texas', 85000.00),
                             ('Kim', 22, 'South-Hall', 45000.00),
                             ('James', 24, 'Houston', 10000.00)])

            # 提交,否则重新运行程序时,表中无数据
            conn.commit()
            print("insert successfully")

            # 查询表
            sql = """
                select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY
             """

            cur.execute(sql)

            for row in cur.fetchall():
                print("-" * 50)  # 输出50个-,作为分界线
                print("%-10s %s" % ("id", row[0]))  # 字段名固定10位宽度,并且左对齐
                print("%-10s %s" % ("name", row[1]))
                print("%-10s %s" % ("age", row[2]))
                print("%-10s %s" % ("address", row[3]))
                print("%-10s %s" % ("salary", row[4]))

except MySQLdb.Error as e:
    print("Mysql Error:", e)
    traceback.print_exc()  # 打印错误栈信息

3、使用MySQLdb库中的_mysql

#! /usr/bin/env python2.7
# coding=utf-8
# mysqldb 只支持python2.7
# http://mysql-python.sourceforge.net/

import MySQLdb
from contextlib import closing
import traceback

try:
    # 获取一个数据库连接
    with closing(MySQLdb.connect(host='localhost', user='root', passwd='root', db='test', port=3306,charset='utf8')) as conn:
        print("connect database successfully")
        with closing(conn.cursor()) as cur:
            # 删除表
            cur.execute("DROP TABLE IF EXISTS  COMPANY")
            # 创建表
            sql = """
                     CREATE TABLE IF NOT EXISTS COMPANY
                   (ID INTEGER  PRIMARY KEY NOT NULL  auto_increment,
                   NAME           TEXT    NOT NULL,
                   AGE            INT     NOT NULL,
                   ADDRESS        CHAR(50),
                   SALARY         REAL);
            """
            cur.execute(sql)

            print("create table successfully")

            # 添加数据
            # 在一个conn.execute里面里面执行多个sql语句是非法的
            cur.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES ( %s, %s, %s, %s )",
                            [('Paul', 32, 'California', 20000.00),
                             ('Allen', 25, 'Texas', 15000.00),
                             ('Teddy', 23, 'Norway', 20000.00),
                             ('Mark', 25, 'Rich-Mond ', 65000.00),
                             ('David', 27, 'Texas', 85000.00),
                             ('Kim', 22, 'South-Hall', 45000.00),
                             ('James', 24, 'Houston', 10000.00)])

            # 提交,否则重新运行程序时,表中无数据
            conn.commit()
            print("insert successfully")

            # 查询表
            sql = """
                select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY
             """

            cur.execute(sql)

            for row in cur.fetchall():
                print("-" * 50)  # 输出50个-,作为分界线
                print("%-10s %s" % ("id", row[0]))  # 字段名固定10位宽度,并且左对齐
                print("%-10s %s" % ("name", row[1]))
                print("%-10s %s" % ("age", row[2]))
                print("%-10s %s" % ("address", row[3]))
                print("%-10s %s" % ("salary", row[4]))

except MySQLdb.Error as e:
    print("Mysql Error:", e)
    traceback.print_exc()  # 打印错误栈信息

python连接MongoDB数据库

# https://docs.mongodb.com/ecosystem/drivers/python/
# https://pypi.python.org/pypi/pymongo/

import pymongo
from pymongo.mongo_client import MongoClient
import pymongo.errors
import traceback

try:
    # 连接到 mongodb 服务
    mongoClient = MongoClient('localhost', 27017)
    # 连接到数据库
    mongoDatabase = mongoClient.test
    print("connect database successfully")

    # 获取集合
    mongoCollection = mongoDatabase.COMPANY

    # 移除所有数据
    mongoCollection.remove()

    # 添加数据
    mongoCollection.insert_many([{"Name": "Paul", "Age": "32", "Address": "California", "Salary": "20000.00"},
                                 {"Name": "Allen", "Age": "25", "Address": "Texas", "Salary": "15000.00"},
                                 {"Name": "Teddy", "Age": "23", "Address": "Norway", "Salary": "20000.00"},
                                 {"Name": "Mark", "Age": "25", "Address": "Rich-Mond", "Salary": "65000.00"},
                                 {"Name": "David", "Age": "27", "Address": "Texas", "Salary": "85000.00"},
                                 {"Name": "Kim", "Age": "22", "Address": "South-Hall", "Salary": "45000.00"},
                                 {"Name": "James", "Age": "24", "Address": "Houston", "Salary": "10000.00"}, ])

    #获取集合中的值
    for row in mongoCollection.find():
        print("-" * 50)  # 输出50个-,作为分界线
        print("%-10s %s" % ("_id", row['_id']))  # 字段名固定10位宽度,并且左对齐
        print("%-10s %s" % ("name", row['Name']))
        print("%-10s %s" % ("age", row['Age']))
        print("%-10s %s" % ("address", row['Address']))
        print("%-10s %s" % ("salary", row['Salary']))

    print('\n\n\n')
    # 使id自增
    mongoCollection.remove()
    # 创建计数表
    mongoDatabase.counters.save({"_id": "people_id", "sequence_value": 0})
    # 创建存储过程
    mongoDatabase.system_js.getSequenceValue = '''function getSequenceValue(sequenceName){
            var sequenceDocument = db.counters.findAndModify({
                query: {_id: sequenceName},
                update: {$inc:{sequence_value: 1}},
                new:true
            });
            return sequenceDocument.sequence_value;
        }'''
    mongoCollection.insert_many(
            [{"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Paul", "Age": "32",
              "Address": "California", "Salary": "20000.00"},
             {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Allen", "Age": "25",
              "Address": "Texas", "Salary": "15000.00"},
             {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Teddy", "Age": "23",
              "Address": "Norway", "Salary": "20000.00"},
             {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Mark", "Age": "25",
              "Address": "Rich-Mond", "Salary": "65000.00"},
             {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "David", "Age": "27",
              "Address": "Texas", "Salary": "85000.00"},
             {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "Kim", "Age": "22",
              "Address": "South-Hall", "Salary": "45000.00"},
             {"_id": mongoDatabase.eval("getSequenceValue('people_id')"), "Name": "James", "Age": "24",
              "Address": "Houston", "Salary": "10000.00"}, ])

    for row in mongoCollection.find():
        print("-" * 50)  # 输出50个-,作为分界线
        print("%-10s %s" % ("_id", int(row['_id'])))  # 字段名固定10位宽度,并且左对齐
        print("%-10s %s" % ("name", row['Name']))
        print("%-10s %s" % ("age", row['Age']))
        print("%-10s %s" % ("address", row['Address']))
        print("%-10s %s" % ("salary", row['Salary']))
except pymongo.errors.PyMongoError as e:
    print("mongo Error:", e)
    traceback.print_exc()

python连接Redis数据库

1、使用redis

# coding=utf-8

# https://pypi.python.org/pypi/redis/2.10.5
# http://redis-py.readthedocs.io/en/latest/#
import redis

r = redis.Redis(host='localhost', port=6379, db=0, password="12345")
print("connect", r.ping())

# 看信息
info = r.info()
# or 查看部分信息
# info = r.info("Server")

# 输出信息
items = info.items()
for i, (key, value) in enumerate(items):
    print("item %s----%s:%s" % (i, key, value))

# 删除键和对应的值
r.delete("company")

# 可以一次性push一条或多条数据
r.rpush("company", {"id": 1, "Name": "Paul", "Age": "32", "Address": "California", "Salary": "20000.00"},
        {"id": 2, "Name": "Allen", "Age": "25", "Address": "Texas", "Salary": "15000.00"},
        {"id": 3, "Name": "Teddy", "Age": "23", "Address": "Norway", "Salary": "20000.00"})
r.rpush("company", {"id": 4, "Name": "Mark", "Age": "25", "Address": "Rich-Mond", "Salary": "65000.00"})
r.rpush("company", {"id": 5, "Name": "David", "Age": "27", "Address": "Texas", "Salary": "85000.00"})
r.rpush("company", {"id": 6, "Name": "Kim", "Age": "22", "Address": "South-Hall", "Salary": "45000.00"})
r.rpush("company", {"id": 7, "Name": "James", "Age": "24", "Address": "Houston", "Salary": "10000.00"})

# eval用来将dict格式的字符串转换成dict
for row in map(lambda x: eval(x), r.lrange("company", 0, r.llen("company"))):
    print("-" * 50)  # 输出50个-,作为分界线
    print("%-10s %s" % ("_id", row['id']))  # 字段名固定10位宽度,并且左对齐
    print("%-10s %s" % ("name", row['Name']))
    print("%-10s %s" % ("age", row['Age']))
    print("%-10s %s" % ("address", row['Address']))
    print("%-10s %s" % ("salary", row['Salary']))

# 关闭当前连接
# r.shutdown() #这个是关闭redis服务端

2、使用pyredis

# http://pyredis.readthedocs.io/en/latest/
import pyredis

r = pyredis.Client(host='localhost', port=6379, database=0, password="12345")
print("connect", r.ping().decode("utf-8"))

# 看信息

# info = r.execute("info").decode()
# or 查看部分信息
info = r.execute("info", "Server").decode()

# 输出信息
print(info)

# 删除键和对应的值
r.delete("company")

# 可以一次性push一条或多条数据
r.rpush("company", '''{"id": 1, "Name": "Paul", "Age": "32", "Address": "California", "Salary": "20000.00"}''',
        '''{"id": 2, "Name": "Allen", "Age": "25", "Address": "Texas", "Salary": "15000.00"}''',
        '''{"id": 3, "Name": "Teddy", "Age": "23", "Address": "Norway", "Salary": "20000.00"}''')
r.rpush("company", '''{"id": 4, "Name": "Mark", "Age": "25", "Address": "Rich-Mond", "Salary": "65000.00"}''')
r.rpush("company", '''{"id": 5, "Name": "David", "Age": "27", "Address": "Texas", "Salary": "85000.00"}''')
r.rpush("company", '''{"id": 6, "Name": "Kim", "Age": "22", "Address": "South-Hall", "Salary": "45000.00"}''')
r.rpush("company", '''{"id": 7, "Name": "James", "Age": "24", "Address": "Houston", "Salary": "10000.00"}''')

# eval用来将dict格式的字符串转换成dict
for row in map(lambda x: eval(x), r.lrange("company", 0, r.llen("company"))):
    print("-" * 50)  # 输出50个-,作为分界线
    print("%-10s %s" % ("_id", row['id']))  # 字段名固定10位宽度,并且左对齐
    print("%-10s %s" % ("name", row['Name']))
    print("%-10s %s" % ("age", row['Age']))
    print("%-10s %s" % ("address", row['Address']))
    print("%-10s %s" % ("salary", row['Salary']))

# 关闭当前连接
r.close()

python连接sqlite数据库

sqlite3——内置库,用于和sqlite数据库的交互

# coding=utf-8
# http://www.runoob.com/sqlite/sqlite-python.html
import sqlite3
import traceback

try:
    # 如果表不存在,就创建
    with sqlite3.connect('test.db') as conn:

        print("Opened database successfully")

        # 删除表
        conn.execute("DROP TABLE IF EXISTS  COMPANY")

        # 创建表
        sql = """
                 CREATE TABLE IF NOT EXISTS COMPANY
               (ID INTEGER  PRIMARY KEY       AUTOINCREMENT,
               NAME           TEXT    NOT NULL,
               AGE            INT     NOT NULL,
               ADDRESS        CHAR(50),
               SALARY         REAL);
        """
        conn.execute(sql)

        print("create table successfully")

        # 添加数据
        conn.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES (?, ?, ?, ? )",
                         [('Paul', 32, 'California', 20000.00),
                          ('Allen', 25, 'Texas', 15000.00),
                          ('Teddy', 23, 'Norway', 20000.00),
                          ('Mark', 25, 'Rich-Mond ', 65000.00),
                          ('David', 27, 'Texas', 85000.00),
                          ('Kim', 22, 'South-Hall', 45000.00),
                          ('James', 24, 'Houston', 10000.00)])
        # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)\
        # VALUES ( 'Paul', 32, 'California', 20000.00 )")
        #
        # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)\
        # VALUES ('Allen', 25, 'Texas', 15000.00 )")
        #
        # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)\
        # VALUES ('Teddy', 23, 'Norway', 20000.00 )")
        #
        # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)\
        # VALUES ( 'Mark', 25, 'Rich-Mond ', 65000.00 )")
        #
        # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)\
        # VALUES ( 'David', 27, 'Texas', 85000.00 )");
        #
        # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)\
        # VALUES ( 'Kim', 22, 'South-Hall', 45000.00 )")
        #
        # conn.execute("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY)\
        # VALUES ( 'James', 24, 'Houston', 10000.00 )")

        # 提交,否则重新运行程序时,表中无数据
        conn.commit()
        print("insert successfully")

        # 查询表
        sql = """
            select id,NAME,AGE,ADDRESS,SALARY FROM COMPANY
         """

        result = conn.execute(sql)

        for row in result:
            print("-" * 50)  # 输出50个-,作为分界线
            print("%-10s %s" % ("id", row[0]))  # 字段名固定10位宽度,并且左对齐
            print("%-10s %s" % ("name", row[1]))
            print("%-10s %s" % ("age", row[2]))
            print("%-10s %s" % ("address", row[3]))
            print("%-10s %.2f" % ("salary", row[4]))
            # or
            # print('{:10s} {:.2f}'.format("salary", row[4]))


except sqlite3.Error as e:
    print("sqlite3 Error:", e)
    traceback.print_exc()

文章作者: 未名
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 未名 !
  目录