程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python從門到精通(五):文件處理-01-文件I/O

編輯:Python

一、文件讀寫

文件讀取需要注意三個問題:1、with上下文環境;2、換行符;3、編碼(編碼可用sys.gefdefaultencoding()取得系統默認編碼)。如果想繞過文件編碼層可直接訪問buffer屬性如 sys.stduout.buffer.write();

1.1、讀寫文本文件

open中有幾種模式【文件格式+打開方式】,文件格式:文本-t,二進制-b,壓縮文件-t。打開方式:r-讀,w-寫。

file_name
=
'test.txt'
#默認的是讀當前目錄下的文件

"""讀取文本"""
with open( file_name, 'rt') as f:
f. read()

#文件有可能存在,所以需要用這種方式判斷一下
import os
if not os. path. exists( file_name):
with open( file_name, 'wt') as f:
f. write( 'Hello,I am a test.\n')
else:
print( f'File { file_name} already exists!')
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

1.2、讀寫二進制文件

#二進制寫法

b = b'Hello World'
print( f'binary object b[0] = { b[ 0]} ')
#二進制的讀寫必須要進行解碼和編碼
with open( 'test.bin', 'rb') as f:
data = f. read( 16)
text = data. decode( 'utf-8')

with open( 'test.bin', 'wb') as f:
text = 'Hello World'
f. write( text. encode( 'utf-8'))

import array
a_obj = array. array( 'i', [ 0, 0, 0, 0, 0, 0, 0, 0])
with open( 'test.bin', 'rb') as f:
# readinto會直接操作到內存中,但這個會和平台相關,注意使用
f. readinto( a_obj)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

1.3、讀寫壓縮文件

# gzip compression

import gzip
gz_file, bz_file = "giztext.gz", "bz.gz"
with gzip. open( gz_file, 'rt') as f:
text = f. read()
# bz2 compression
import bz2
with bz2. open( bz_file, 'rt') as f:
text = f. read()
# gzip compression
import gzip
with gzip. open( gz_file, 'wt') as f:
f. write( text)

# bz2 compression
import bz2
with bz2. open( bz_file, 'wt') as f:
f. write( text)
#設置壓縮級別
with gzip. open( gz_file, 'wt', compresslevel = 3) as f:
f. write( text)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.

1.4、文件編碼

import
urllib.
request

import io
#二進制文件編碼修改
url_res = urllib. request. urlopen( 'http://www.python.org')
f_test = io. TextIOWrapper( url_res, encoding = 'utf-8')
text_val = f_test. read()

#修改一個已經打開的文本模式的編碼,先用detach()清除現在的編碼層
import sys
print( f'sys stdout encoding is: { sys. stdout. encoding} ') #utf-8
sys. stdout = io. TextIOWrapper( sys. stdout. detach(), encoding = 'latin-1')
print( f'sys stdout new encoding is: { sys. stdout. encoding} ') #latin-1

#I/O系統示例,下面是一次I/O的完整過程
file_read = open( 'sample.txt', 'w')
print( f'file read: { file_read} ') #<_io.TextIOWrapper name='sample.txt' mode='w' encoding='UTF-8'>
print( f'file buffer: { file_read. buffer} ') #<_io.BufferedWriter name='sample.txt'>
print( f'file buffer raw: { file_read. buffer. raw} ') #<_io.FileIO name='sample.txt' mode='wb' closefd=True>
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.

1.5、讀取定長文件

from
functools
import
partial


RECORD_SIZE = 32

with open( 'somefile.data', 'rb') as f:
records = iter( partial( f. read, RECORD_SIZE), b'')
for r in records:
pass
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

1.6、創建臨時文件

from
tempfile
import
TemporaryFile

#TemporaryFile:創建一個匿名的臨時文件,不可以使用底層的一些方法
#NamedTemporaryFile:創建一個匿名的臨時文件,同時可以使用底層的一些方法
with TemporaryFile( 'w+t') as f:
# Read/write to the file
f. write( 'Hello World\n')
f. write( 'Testing\n')

# Seek back to beginning and read the data
f. seek( 0)
data = f. read()

f = TemporaryFile( 'w+t')
# Use the temporary file
f. close()

# ---------------------------------------------------
from tempfile import NamedTemporaryFile

with NamedTemporaryFile( 'w+t') as f:
print( 'filename is:', f. name)
pass

with NamedTemporaryFile( 'w+t', delete = False) as f:
print( 'filename is:', f. name)
pass

# ---------------------------------------------------
from tempfile import TemporaryDirectory
with TemporaryDirectory() as dirname:
print( 'dirname is:', dirname)
# Use the directory #/var/folders/h1/jwyy02nd1hg5p0_pgxg_9w3c0000gn/T/tmp_3lwonjh

import tempfile
print( tempfile. mkstemp()) #(4, '/var/folders/h1/jwyy02nd1hg5p0_pgxg_9w3c0000gn/T/tmpi_hjdkd0')

print( tempfile. gettempdir()) #/var/folders/h1/jwyy02nd1hg5p0_pgxg_9w3c0000gn/T

f = NamedTemporaryFile( prefix = 'mytemp', suffix = '.txt', dir = '/tmp')
print( f. name) #/tmp/mytempng2rx_bg.txt
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.

1.7、文件描述符包裝

文件描述符就是一個變量,用來指定某個系統的I/O通道,可以通過open()和makefile()函數來包裝,後者性能不如前者但可以跨平台。在unix系統中,可以通過這種主式來創建管道。

import
os

file_data = os. open( 'test.txt', os. O_WRONLY | os. O_CREAT)

# Turn into a proper file
test_file = open( file_data, 'wt')
test_file. write( 'hello world\n')
test_file. close()


from socket import socket, AF_INET, SOCK_STREAM
def echo_client( client_sock, addr):
print( f'Got connection from { addr} ')

# Make text-mode file wrappers for socket reading/writing
client_in = open( client_sock. fileno(), 'rt', encoding = 'latin-1',
closefd = False)

client_out = open( client_sock. fileno(), 'wt', encoding = 'latin-1',
closefd = False)

# Echo lines back to the client using file I/O
for line in client_in:
client_out. write( line)
client_out. flush()

client_sock. close()

def echo_server( address):
sock = socket( AF_INET, SOCK_STREAM)
sock. bind( address)
sock. listen( 1)
while True:
client, addr = sock. accept()
echo_client( client, addr)


import sys
bstd_out = open( sys. stdout. fileno(), 'wb', closefd = False)
bstd_out. write( b'Hello World\n')
bstd_out. flush()
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.

二、文件操作

1.1、路徑

import
os

csv_path = '/usr/test/Data/test.csv'
print( f'{ csv_path} base name is: { os. path. basename( csv_path)} ') #test.csv
print( f'{ csv_path} dir name is: { os. path. dirname( csv_path)} ') #/usr/test/Data
print( f"new path: { os. path. join( 'tmp', 'data', os. path. basename( csv_path))} ") #tmp/data/test.csv
csv_path = '~/Data/test.csv'
print( f'path expand user is: { os. path. expanduser( csv_path)} ') #/Users/liudong/Data/test.csv
print( f'{ csv_path} splitext is: { os. path. splitext( csv_path)} ') #('~/Data/test', '.csv')
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

1.2、檢查

#需要注意權限問題

import os
file_path = '/etc/passwd'
test_path = '/etc/test'
print( f"is { file_path} exists: { os. path. exists( file_path)} ")
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

1.3、列表

import
os

file_path = '/etc'
#列出文件夾中的所有文件
name_list = os. listdir( file_path)
print( f'file list of etc is:\n{ name_list} ')

#以下就是文件過濾
import os. path
dir_name_list = [ name for name in os. listdir( file_path)
if os. path. isdir( os. path. join( file_path, name))]
py_file_list = [ name for name in os. listdir( file_path)
if name. endswith( '.py')]

import os. path
import glob
py_file_list = glob. glob( '*.py')
# Get file sizes and modification dates,獲取更多的文件信息
name_sz_date = [( name, os. path. getsize( name), os. path. getmtime( name))
for name in py_file_list]
for name, size, mtime in name_sz_date:
print( f'name={ name} , size={ size} , mtime={ mtime} ')

# Alternative: Get file metadata
file_metadata = [( name, os. stat( name)) for name in py_file_list]
for name, meta in file_metadata:
print( name, meta. st_size, meta. st_mtime)
print( f'name={ name} , size={ meta. st_size} , mtime={ meta. st_mtime} ')
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.

三、文件的內存操作

3.1、文件的內存映射

import
os

import mmap

def memory_map( file_name, access = mmap. ACCESS_WRITE):
size_val = os. path. getsize( file_name)
fd = os. open( file_name, os. O_RDWR)
return mmap. mmap( fd, size_val, access = access)


size = 1000000
with open( 'test_data', 'wb') as f:
f. seek( size - 1)
f. write( b'\x00')


m = memory_map( 'test_data')
print( f'the len of m is: { len( m)} ') #1000000
print( f'm split: { m[ 0: 10]} ') #b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
print( f'm[0] is: { m[ 0]} ') #0
m[ 0: 11] = b'Hello World'
print( f'close result: { m. close()} ') #None

with open( 'test_data', 'rb') as f:
print( f'read content: { f. read( 11)} ') #b'Hello World'

m = memory_map( 'test_data')
v = memoryview( m). cast( 'I')
v[ 0] = 7
print( f'point content from m is: { m[ 0: 4]} ') #b'\x07\x00\x00\x00'
m[ 0: 4] = b'\x07\x01\x00\x00'
print( f'v[0] = { v[ 0]} ') #263
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.

3.2、從緩沖區讀取二進制文件

import
os.
path


def read_into_buffer( file_name):
buf = bytearray( os. path. getsize( file_name))
with open( file_name, 'rb') as f:
"""#用來填充已有的緩存沖,而read是新建一個緩沖區,可避免大量的內存操作,
有個問題就是需要判斷讀取與字節數與緩存區大小是否一致,就是簡單判斷下讀取的大小與返回的大小是否一致"""
f. readinto( buf)
return buf


with open( 'test_file.bin', 'wb') as f:
f. write( b'Hello World')
buf_read = read_into_buffer( 'test_file.bin')
print( f'buf read is: { buf_read} ')
buf_read[ 0: 5] = b'Hello'
print( f'buf read is: { buf_read} ')
with open( 'new_test_file.bin', 'wb') as f:
f. write( buf_read)


# Size of each record (adjust value)
record_size = 32
"""判斷大小"""
buf_read = bytearray( record_size)
with open( 'test_file', 'rb') as f:
while True:
n = f. readinto( buf_read)
if n < record_size:
break


print( f'buf read is: { buf_read} ')
memory_val = memoryview( buf_read) #這是一種0-copy技術
memory_val = memory_val[ - 3:]
print( f'memory value is: { memory_val} ')
memory_val[:] = b'WORLD'
print( f'buf read is: { buf_read} ')
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.

3.3、序列化

import
serial

ser = serial. Serial( '/dev/tty.usbmodem641', # Device name varies
baudrate = 9600,
bytesize = 8,
parity = 'N',
stopbits = 1)


ser. write( b'G1 X50 Y50\r\n')
resp = ser. readline()
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
import
pickle

# file load is [1, 6, 3, 9]
# file load is hello,world!
# file load is {'go', 'java', 'python'}
# pickle funciton: b'\x80\x04\x95\x10\x00\x00\x00\x00\x00\x00\x00\x8c\x04math\x94\x8c\x03cos\x94\x93\x94.'
# T-minus is: 30
# T-minus is: 29load result: <__main__.Countdown object at 0x1037da850>
data_obj = ... # Some Python object
test_file = open( 'test_file', 'wb')
pickle. dump( data_obj, test_file) #它和load是相逆的兩個操作
p_con = pickle. dumps( data_obj)

# Restore from a file
test_file = open( 'test_file', 'rb')
data_obj = pickle. load( test_file)

# Restore from a string
data_obj = pickle. loads( p_con)


import pickle
test_file = open( 'some_data', 'wb')
pickle. dump([ 1, 6, 3, 9], test_file)
pickle. dump( 'hello,world!', test_file)
pickle. dump({ 'python', 'java', 'go'}, test_file)
test_file. close()
test_file = open( 'some_data', 'rb')
print( f'file load is { pickle. load( test_file)} ')
print( f'file load is { pickle. load( test_file)} ')
print( f'file load is { pickle. load( test_file)} ')


import math
import pickle
print( f'pickle funciton: { pickle. dumps( math. cos)} ')


import time
import threading
"""有些對象依賴系統底層是不能被序列化的,其實可以使用getstate和setstate來實現序列化和反序列化"""
class Countdown:
def __init__( self, n):
self. n = n
self. thr = threading. Thread( target = self. run)
self. thr. daemon = True
self. thr. start()

def run( self):
while self. n > 0:
print( f'T-minus is: { self. n} ')
self. n -= 1
time. sleep( 5)

def __getstate__( self):
return self. n

def __setstate__( self, n):
self. __init__( n)


count_down = Countdown( 30)

test_file = open( 'test.p', 'wb')
import pickle
pickle. dump( count_down, test_file)
test_file. close()


test_file = open( 'test.p', 'rb')
print( f'load result: { pickle. load( test_file)} ')
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.

四、編碼與解碼

base64

s_obj
=
b'hello'

import base64

code_obj = base64. b64encode( s_obj)
print( f'b64 encode { s_obj} = { code_obj} ') #b'aGVsbG8='

print( f'decode { code_obj} = { base64. b64decode( code_obj)} ') #b'hello'


code_obj = base64. b64encode( s_obj). decode( 'ascii')
print( f'encode decode { s_obj} = { code_obj} ') #aGVsbG8=
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.

0x

s
=
b'hello'

import binascii
h = binascii. b2a_hex( s)
print( f'base: { h} ') #b'68656c6c6f'
print( f'b2a hex: { binascii. a2b_hex( h)} ') #b'hello'


import base64
h = base64. b16encode( s)
print( f'base: { h} ') #b'68656C6C6F'
print( f'b16 decode: { base64. b16decode( h)} ') #b'hello'


h = base64. b16encode( s)
print( f'base: { h} ') #b'68656C6C6F'
print( f"decode: { h. decode( 'ascii')} ") #68656C6C6F
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.

五、高級操作

5.1、文件目錄的復制和移動

import
shutil


#shutil庫最大的問題是對文件的元數據保留的不全;

# Copy src to dst. (cp src dst)
shutil. copy( src, dst)

# Copy files, but preserve metadata (cp -p src dst)
shutil. copy2( src, dst)

# Copy directory tree (cp -R src dst)
shutil. copytree( src, dst)

# Move src to dst (mv src dst)
shutil. move( src, dst)

shutil. copytree( src, dst, symlinks = True)

#忽略文件
def ignore_pyc_files( dirname, filenames):
return [ name in filenames if name. endswith( '.pyc')]
shutil. copytree( src, dst, ignore = ignore_pyc_files)


shutil. copytree( src, dst, ignore = shutil. ignore_patterns( '*~', '*.pyc'))

try:
shutil. copytree( src, dst)
except shutil. Error as e:
for src, dst, msg in e. args[ 0]:
# src is source name
# dst is destination name
# msg is error message from exception
print( dst, src, msg)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
import
os.
path

#它的比shutil的優勢是元數據保留的全
file_name = '/davanced_programming/chapter13/spam.py'
print( f'base name is: { os. path. basename( file_name)} ')
print( f'dir name is: { os. path. dirname( file_name)} ')
print( f'file split: { os. path. split( file_name)} ')
print( os. path. join( '/new/dir', os. path. basename( file_name)))
print( os. path. expanduser( '~/chapter13/spam.py'))
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

5.2、壓縮文件

如果想處理更高級的細節,可使用tarfile,zipfile, gzip, bz2模塊,shutil只是一層代理

import
shutil


shutil. unpack_archive( 'py38.zip')
shutil. make_archive( 'py38', 'zip', 'test_zip')

print( shutil. get_archive_formats()) #輸出支持的文件歸檔格式
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.

5.3、查找文件

import
os


def find_file( start, name):
for rel_path, dirs, files in os. walk( start): #os.walk(start):
if name in files:
full_path = os. path. join( start, rel_path, name)
print( f'full path is: { os. path. normpath( os. path. abspath( full_path))} ') #abspath修正路徑名

if __name__ == '__main__':
find_file( '/advanced_programming/chapter13', 'file_input.py')
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
import
os

import time
#查找最近修改過的文件
def modified_within( top, seconds):
now = time. time()
for path, dirs, files in os. walk( top):
for name in files:
full_path = os. path. join( path, name)
if not os. path. exists( full_path):
continue

m_time = os. path. getmtime( full_path)
if m_time > ( now - seconds):
print( f'full path is: { full_path} ')

if __name__ == '__main__':
modified_within( '/advanced_programming/chapter13', float( 1000))
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved