文件讀取需要注意三個問題:1、with上下文環境;2、換行符;3、編碼(編碼可用sys.gefdefaultencoding()取得系統默認編碼)。如果想繞過文件編碼層可直接訪問buffer屬性如 sys.stduout.buffer.write();
open中有幾種模式【文件格式+打開方式】,文件格式:文本-t,二進制-b,壓縮文件-t。打開方式:r-讀,w-寫。
file_name = 'test.txt' #默認的是讀當前目錄下的文件
"""讀取文本"""
with open( file_name, 'rt') as f:
f. read()
#文件有可能存在,所以需要用這種方式判斷一下
import os
if not os. path. exists( file_name):
with open( file_name, 'wt') as f:
f. write( 'Hello,I am a test.\n')
else:
print( f'File { file_name} already exists!')
#二進制寫法
b = b'Hello World'
print( f'binary object b[0] = { b[ 0]} ')
#二進制的讀寫必須要進行解碼和編碼
with open( 'test.bin', 'rb') as f:
data = f. read( 16)
text = data. decode( 'utf-8')
with open( 'test.bin', 'wb') as f:
text = 'Hello World'
f. write( text. encode( 'utf-8'))
import array
a_obj = array. array( 'i', [ 0, 0, 0, 0, 0, 0, 0, 0])
with open( 'test.bin', 'rb') as f:
# readinto會直接操作到內存中,但這個會和平台相關,注意使用
f. readinto( a_obj)
# gzip compression
import gzip
gz_file, bz_file = "giztext.gz", "bz.gz"
with gzip. open( gz_file, 'rt') as f:
text = f. read()
# bz2 compression
import bz2
with bz2. open( bz_file, 'rt') as f:
text = f. read()
# gzip compression
import gzip
with gzip. open( gz_file, 'wt') as f:
f. write( text)
# bz2 compression
import bz2
with bz2. open( bz_file, 'wt') as f:
f. write( text)
#設置壓縮級別
with gzip. open( gz_file, 'wt', compresslevel = 3) as f:
f. write( text)
import urllib. request
import io
#二進制文件編碼修改
url_res = urllib. request. urlopen( 'http://www.python.org')
f_test = io. TextIOWrapper( url_res, encoding = 'utf-8')
text_val = f_test. read()
#修改一個已經打開的文本模式的編碼,先用detach()清除現在的編碼層
import sys
print( f'sys stdout encoding is: { sys. stdout. encoding} ') #utf-8
sys. stdout = io. TextIOWrapper( sys. stdout. detach(), encoding = 'latin-1')
print( f'sys stdout new encoding is: { sys. stdout. encoding} ') #latin-1
#I/O系統示例,下面是一次I/O的完整過程
file_read = open( 'sample.txt', 'w')
print( f'file read: { file_read} ') #<_io.TextIOWrapper name='sample.txt' mode='w' encoding='UTF-8'>
print( f'file buffer: { file_read. buffer} ') #<_io.BufferedWriter name='sample.txt'>
print( f'file buffer raw: { file_read. buffer. raw} ') #<_io.FileIO name='sample.txt' mode='wb' closefd=True>
from functools import partial
RECORD_SIZE = 32
with open( 'somefile.data', 'rb') as f:
records = iter( partial( f. read, RECORD_SIZE), b'')
for r in records:
pass
from tempfile import TemporaryFile
#TemporaryFile:創建一個匿名的臨時文件,不可以使用底層的一些方法
#NamedTemporaryFile:創建一個匿名的臨時文件,同時可以使用底層的一些方法
with TemporaryFile( 'w+t') as f:
# Read/write to the file
f. write( 'Hello World\n')
f. write( 'Testing\n')
# Seek back to beginning and read the data
f. seek( 0)
data = f. read()
f = TemporaryFile( 'w+t')
# Use the temporary file
f. close()
# ---------------------------------------------------
from tempfile import NamedTemporaryFile
with NamedTemporaryFile( 'w+t') as f:
print( 'filename is:', f. name)
pass
with NamedTemporaryFile( 'w+t', delete = False) as f:
print( 'filename is:', f. name)
pass
# ---------------------------------------------------
from tempfile import TemporaryDirectory
with TemporaryDirectory() as dirname:
print( 'dirname is:', dirname)
# Use the directory #/var/folders/h1/jwyy02nd1hg5p0_pgxg_9w3c0000gn/T/tmp_3lwonjh
import tempfile
print( tempfile. mkstemp()) #(4, '/var/folders/h1/jwyy02nd1hg5p0_pgxg_9w3c0000gn/T/tmpi_hjdkd0')
print( tempfile. gettempdir()) #/var/folders/h1/jwyy02nd1hg5p0_pgxg_9w3c0000gn/T
f = NamedTemporaryFile( prefix = 'mytemp', suffix = '.txt', dir = '/tmp')
print( f. name) #/tmp/mytempng2rx_bg.txt
文件描述符就是一個變量,用來指定某個系統的I/O通道,可以通過open()和makefile()函數來包裝,後者性能不如前者但可以跨平台。在unix系統中,可以通過這種主式來創建管道。
import os
file_data = os. open( 'test.txt', os. O_WRONLY | os. O_CREAT)
# Turn into a proper file
test_file = open( file_data, 'wt')
test_file. write( 'hello world\n')
test_file. close()
from socket import socket, AF_INET, SOCK_STREAM
def echo_client( client_sock, addr):
print( f'Got connection from { addr} ')
# Make text-mode file wrappers for socket reading/writing
client_in = open( client_sock. fileno(), 'rt', encoding = 'latin-1',
closefd = False)
client_out = open( client_sock. fileno(), 'wt', encoding = 'latin-1',
closefd = False)
# Echo lines back to the client using file I/O
for line in client_in:
client_out. write( line)
client_out. flush()
client_sock. close()
def echo_server( address):
sock = socket( AF_INET, SOCK_STREAM)
sock. bind( address)
sock. listen( 1)
while True:
client, addr = sock. accept()
echo_client( client, addr)
import sys
bstd_out = open( sys. stdout. fileno(), 'wb', closefd = False)
bstd_out. write( b'Hello World\n')
bstd_out. flush()
import os
csv_path = '/usr/test/Data/test.csv'
print( f'{ csv_path} base name is: { os. path. basename( csv_path)} ') #test.csv
print( f'{ csv_path} dir name is: { os. path. dirname( csv_path)} ') #/usr/test/Data
print( f"new path: { os. path. join( 'tmp', 'data', os. path. basename( csv_path))} ") #tmp/data/test.csv
csv_path = '~/Data/test.csv'
print( f'path expand user is: { os. path. expanduser( csv_path)} ') #/Users/liudong/Data/test.csv
print( f'{ csv_path} splitext is: { os. path. splitext( csv_path)} ') #('~/Data/test', '.csv')
#需要注意權限問題
import os
file_path = '/etc/passwd'
test_path = '/etc/test'
print( f"is { file_path} exists: { os. path. exists( file_path)} ")
import os
file_path = '/etc'
#列出文件夾中的所有文件
name_list = os. listdir( file_path)
print( f'file list of etc is:\n{ name_list} ')
#以下就是文件過濾
import os. path
dir_name_list = [ name for name in os. listdir( file_path)
if os. path. isdir( os. path. join( file_path, name))]
py_file_list = [ name for name in os. listdir( file_path)
if name. endswith( '.py')]
import os. path
import glob
py_file_list = glob. glob( '*.py')
# Get file sizes and modification dates,獲取更多的文件信息
name_sz_date = [( name, os. path. getsize( name), os. path. getmtime( name))
for name in py_file_list]
for name, size, mtime in name_sz_date:
print( f'name={ name} , size={ size} , mtime={ mtime} ')
# Alternative: Get file metadata
file_metadata = [( name, os. stat( name)) for name in py_file_list]
for name, meta in file_metadata:
print( name, meta. st_size, meta. st_mtime)
print( f'name={ name} , size={ meta. st_size} , mtime={ meta. st_mtime} ')
import os
import mmap
def memory_map( file_name, access = mmap. ACCESS_WRITE):
size_val = os. path. getsize( file_name)
fd = os. open( file_name, os. O_RDWR)
return mmap. mmap( fd, size_val, access = access)
size = 1000000
with open( 'test_data', 'wb') as f:
f. seek( size - 1)
f. write( b'\x00')
m = memory_map( 'test_data')
print( f'the len of m is: { len( m)} ') #1000000
print( f'm split: { m[ 0: 10]} ') #b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
print( f'm[0] is: { m[ 0]} ') #0
m[ 0: 11] = b'Hello World'
print( f'close result: { m. close()} ') #None
with open( 'test_data', 'rb') as f:
print( f'read content: { f. read( 11)} ') #b'Hello World'
m = memory_map( 'test_data')
v = memoryview( m). cast( 'I')
v[ 0] = 7
print( f'point content from m is: { m[ 0: 4]} ') #b'\x07\x00\x00\x00'
m[ 0: 4] = b'\x07\x01\x00\x00'
print( f'v[0] = { v[ 0]} ') #263
import os. path
def read_into_buffer( file_name):
buf = bytearray( os. path. getsize( file_name))
with open( file_name, 'rb') as f:
"""#用來填充已有的緩存沖,而read是新建一個緩沖區,可避免大量的內存操作,
有個問題就是需要判斷讀取與字節數與緩存區大小是否一致,就是簡單判斷下讀取的大小與返回的大小是否一致"""
f. readinto( buf)
return buf
with open( 'test_file.bin', 'wb') as f:
f. write( b'Hello World')
buf_read = read_into_buffer( 'test_file.bin')
print( f'buf read is: { buf_read} ')
buf_read[ 0: 5] = b'Hello'
print( f'buf read is: { buf_read} ')
with open( 'new_test_file.bin', 'wb') as f:
f. write( buf_read)
# Size of each record (adjust value)
record_size = 32
"""判斷大小"""
buf_read = bytearray( record_size)
with open( 'test_file', 'rb') as f:
while True:
n = f. readinto( buf_read)
if n < record_size:
break
print( f'buf read is: { buf_read} ')
memory_val = memoryview( buf_read) #這是一種0-copy技術
memory_val = memory_val[ - 3:]
print( f'memory value is: { memory_val} ')
memory_val[:] = b'WORLD'
print( f'buf read is: { buf_read} ')
import serial
ser = serial. Serial( '/dev/tty.usbmodem641', # Device name varies
baudrate = 9600,
bytesize = 8,
parity = 'N',
stopbits = 1)
ser. write( b'G1 X50 Y50\r\n')
resp = ser. readline()
import pickle
# file load is [1, 6, 3, 9]
# file load is hello,world!
# file load is {'go', 'java', 'python'}
# pickle funciton: b'\x80\x04\x95\x10\x00\x00\x00\x00\x00\x00\x00\x8c\x04math\x94\x8c\x03cos\x94\x93\x94.'
# T-minus is: 30
# T-minus is: 29load result: <__main__.Countdown object at 0x1037da850>
data_obj = ... # Some Python object
test_file = open( 'test_file', 'wb')
pickle. dump( data_obj, test_file) #它和load是相逆的兩個操作
p_con = pickle. dumps( data_obj)
# Restore from a file
test_file = open( 'test_file', 'rb')
data_obj = pickle. load( test_file)
# Restore from a string
data_obj = pickle. loads( p_con)
import pickle
test_file = open( 'some_data', 'wb')
pickle. dump([ 1, 6, 3, 9], test_file)
pickle. dump( 'hello,world!', test_file)
pickle. dump({ 'python', 'java', 'go'}, test_file)
test_file. close()
test_file = open( 'some_data', 'rb')
print( f'file load is { pickle. load( test_file)} ')
print( f'file load is { pickle. load( test_file)} ')
print( f'file load is { pickle. load( test_file)} ')
import math
import pickle
print( f'pickle funciton: { pickle. dumps( math. cos)} ')
import time
import threading
"""有些對象依賴系統底層是不能被序列化的,其實可以使用getstate和setstate來實現序列化和反序列化"""
class Countdown:
def __init__( self, n):
self. n = n
self. thr = threading. Thread( target = self. run)
self. thr. daemon = True
self. thr. start()
def run( self):
while self. n > 0:
print( f'T-minus is: { self. n} ')
self. n -= 1
time. sleep( 5)
def __getstate__( self):
return self. n
def __setstate__( self, n):
self. __init__( n)
count_down = Countdown( 30)
test_file = open( 'test.p', 'wb')
import pickle
pickle. dump( count_down, test_file)
test_file. close()
test_file = open( 'test.p', 'rb')
print( f'load result: { pickle. load( test_file)} ')
base64
s_obj = b'hello'
import base64
code_obj = base64. b64encode( s_obj)
print( f'b64 encode { s_obj} = { code_obj} ') #b'aGVsbG8='
print( f'decode { code_obj} = { base64. b64decode( code_obj)} ') #b'hello'
code_obj = base64. b64encode( s_obj). decode( 'ascii')
print( f'encode decode { s_obj} = { code_obj} ') #aGVsbG8=
0x
s = b'hello'
import binascii
h = binascii. b2a_hex( s)
print( f'base: { h} ') #b'68656c6c6f'
print( f'b2a hex: { binascii. a2b_hex( h)} ') #b'hello'
import base64
h = base64. b16encode( s)
print( f'base: { h} ') #b'68656C6C6F'
print( f'b16 decode: { base64. b16decode( h)} ') #b'hello'
h = base64. b16encode( s)
print( f'base: { h} ') #b'68656C6C6F'
print( f"decode: { h. decode( 'ascii')} ") #68656C6C6F
import shutil
#shutil庫最大的問題是對文件的元數據保留的不全;
# Copy src to dst. (cp src dst)
shutil. copy( src, dst)
# Copy files, but preserve metadata (cp -p src dst)
shutil. copy2( src, dst)
# Copy directory tree (cp -R src dst)
shutil. copytree( src, dst)
# Move src to dst (mv src dst)
shutil. move( src, dst)
shutil. copytree( src, dst, symlinks = True)
#忽略文件
def ignore_pyc_files( dirname, filenames):
return [ name in filenames if name. endswith( '.pyc')]
shutil. copytree( src, dst, ignore = ignore_pyc_files)
shutil. copytree( src, dst, ignore = shutil. ignore_patterns( '*~', '*.pyc'))
try:
shutil. copytree( src, dst)
except shutil. Error as e:
for src, dst, msg in e. args[ 0]:
# src is source name
# dst is destination name
# msg is error message from exception
print( dst, src, msg)
import os. path
#它的比shutil的優勢是元數據保留的全
file_name = '/davanced_programming/chapter13/spam.py'
print( f'base name is: { os. path. basename( file_name)} ')
print( f'dir name is: { os. path. dirname( file_name)} ')
print( f'file split: { os. path. split( file_name)} ')
print( os. path. join( '/new/dir', os. path. basename( file_name)))
print( os. path. expanduser( '~/chapter13/spam.py'))
如果想處理更高級的細節,可使用tarfile,zipfile, gzip, bz2模塊,shutil只是一層代理
import shutil
shutil. unpack_archive( 'py38.zip')
shutil. make_archive( 'py38', 'zip', 'test_zip')
print( shutil. get_archive_formats()) #輸出支持的文件歸檔格式
import os
def find_file( start, name):
for rel_path, dirs, files in os. walk( start): #os.walk(start):
if name in files:
full_path = os. path. join( start, rel_path, name)
print( f'full path is: { os. path. normpath( os. path. abspath( full_path))} ') #abspath修正路徑名
if __name__ == '__main__':
find_file( '/advanced_programming/chapter13', 'file_input.py')
import os
import time
#查找最近修改過的文件
def modified_within( top, seconds):
now = time. time()
for path, dirs, files in os. walk( top):
for name in files:
full_path = os. path. join( path, name)
if not os. path. exists( full_path):
continue
m_time = os. path. getmtime( full_path)
if m_time > ( now - seconds):
print( f'full path is: { full_path} ')
if __name__ == '__main__':
modified_within( '/advanced_programming/chapter13', float( 1000))