Three issues need to be paid attention to when reading files :1、with Context ;2、 A newline ;3、 code ( Coding available sys.gefdefaultencoding() Get the system default code ). If you want to bypass the file encoding layer, you can directly access buffer Properties such as sys.stduout.buffer.write();
open There are several modes in 【 File format + Open mode 】, File format : Text -t, Binary system -b, Compressed files -t. Open mode :r- read ,w- Write .
file_name = 'test.txt' # The default is to read the files in the current directory
""" Read text """
with open( file_name, 'rt') as f:
f. read()
# The file may exist , So we need to judge in this way
import os
if not os. path. exists( file_name):
with open( file_name, 'wt') as f:
f. write( 'Hello,I am a test.\n')
else:
print( f'File { file_name} already exists!')
# Binary writing
b = b'Hello World'
print( f'binary object b[0] = { b[ 0]} ')
# Binary reading and writing must be decoded and encoded
with open( 'test.bin', 'rb') as f:
data = f. read( 16)
text = data. decode( 'utf-8')
with open( 'test.bin', 'wb') as f:
text = 'Hello World'
f. write( text. encode( 'utf-8'))
import array
a_obj = array. array( 'i', [ 0, 0, 0, 0, 0, 0, 0, 0])
with open( 'test.bin', 'rb') as f:
# readinto Will operate directly into memory , But this will be platform related , Pay attention to
f. readinto( a_obj)
# gzip compression
import gzip
gz_file, bz_file = "giztext.gz", "bz.gz"
with gzip. open( gz_file, 'rt') as f:
text = f. read()
# bz2 compression
import bz2
with bz2. open( bz_file, 'rt') as f:
text = f. read()
# gzip compression
import gzip
with gzip. open( gz_file, 'wt') as f:
f. write( text)
# bz2 compression
import bz2
with bz2. open( bz_file, 'wt') as f:
f. write( text)
# Set compression level
with gzip. open( gz_file, 'wt', compresslevel = 3) as f:
f. write( text)
import urllib. request
import io
# Binary file encoding modification
url_res = urllib. request. urlopen( 'http://www.python.org')
f_test = io. TextIOWrapper( url_res, encoding = 'utf-8')
text_val = f_test. read()
# Modify the encoding of an already opened text mode , First use detach() Clear the current coding layer
import sys
print( f'sys stdout encoding is: { sys. stdout. encoding} ') #utf-8
sys. stdout = io. TextIOWrapper( sys. stdout. detach(), encoding = 'latin-1')
print( f'sys stdout new encoding is: { sys. stdout. encoding} ') #latin-1
#I/O System example , Here is a I/O Complete process
file_read = open( 'sample.txt', 'w')
print( f'file read: { file_read} ') #<_io.TextIOWrapper name='sample.txt' mode='w' encoding='UTF-8'>
print( f'file buffer: { file_read. buffer} ') #<_io.BufferedWriter name='sample.txt'>
print( f'file buffer raw: { file_read. buffer. raw} ') #<_io.FileIO name='sample.txt' mode='wb' closefd=True>
from functools import partial
RECORD_SIZE = 32
with open( 'somefile.data', 'rb') as f:
records = iter( partial( f. read, RECORD_SIZE), b'')
for r in records:
pass
from tempfile import TemporaryFile
#TemporaryFile: Create an anonymous temporary file , You can't use some of the underlying methods
#NamedTemporaryFile: Create an anonymous temporary file , You can also use some of the underlying methods
with TemporaryFile( 'w+t') as f:
# Read/write to the file
f. write( 'Hello World\n')
f. write( 'Testing\n')
# Seek back to beginning and read the data
f. seek( 0)
data = f. read()
f = TemporaryFile( 'w+t')
# Use the temporary file
f. close()
# ---------------------------------------------------
from tempfile import NamedTemporaryFile
with NamedTemporaryFile( 'w+t') as f:
print( 'filename is:', f. name)
pass
with NamedTemporaryFile( 'w+t', delete = False) as f:
print( 'filename is:', f. name)
pass
# ---------------------------------------------------
from tempfile import TemporaryDirectory
with TemporaryDirectory() as dirname:
print( 'dirname is:', dirname)
# Use the directory #/var/folders/h1/jwyy02nd1hg5p0_pgxg_9w3c0000gn/T/tmp_3lwonjh
import tempfile
print( tempfile. mkstemp()) #(4, '/var/folders/h1/jwyy02nd1hg5p0_pgxg_9w3c0000gn/T/tmpi_hjdkd0')
print( tempfile. gettempdir()) #/var/folders/h1/jwyy02nd1hg5p0_pgxg_9w3c0000gn/T
f = NamedTemporaryFile( prefix = 'mytemp', suffix = '.txt', dir = '/tmp')
print( f. name) #/tmp/mytempng2rx_bg.txt
A file descriptor is a variable , Used to specify a system I/O passageway , Can pass open() and makefile() Function to wrap , The latter is not as good as the former, but can cross platform . stay unix In the system , You can use this master to create a pipe .
import os
file_data = os. open( 'test.txt', os. O_WRONLY | os. O_CREAT)
# Turn into a proper file
test_file = open( file_data, 'wt')
test_file. write( 'hello world\n')
test_file. close()
from socket import socket, AF_INET, SOCK_STREAM
def echo_client( client_sock, addr):
print( f'Got connection from { addr} ')
# Make text-mode file wrappers for socket reading/writing
client_in = open( client_sock. fileno(), 'rt', encoding = 'latin-1',
closefd = False)
client_out = open( client_sock. fileno(), 'wt', encoding = 'latin-1',
closefd = False)
# Echo lines back to the client using file I/O
for line in client_in:
client_out. write( line)
client_out. flush()
client_sock. close()
def echo_server( address):
sock = socket( AF_INET, SOCK_STREAM)
sock. bind( address)
sock. listen( 1)
while True:
client, addr = sock. accept()
echo_client( client, addr)
import sys
bstd_out = open( sys. stdout. fileno(), 'wb', closefd = False)
bstd_out. write( b'Hello World\n')
bstd_out. flush()
import os
csv_path = '/usr/test/Data/test.csv'
print( f'{ csv_path} base name is: { os. path. basename( csv_path)} ') #test.csv
print( f'{ csv_path} dir name is: { os. path. dirname( csv_path)} ') #/usr/test/Data
print( f"new path: { os. path. join( 'tmp', 'data', os. path. basename( csv_path))} ") #tmp/data/test.csv
csv_path = '~/Data/test.csv'
print( f'path expand user is: { os. path. expanduser( csv_path)} ') #/Users/liudong/Data/test.csv
print( f'{ csv_path} splitext is: { os. path. splitext( csv_path)} ') #('~/Data/test', '.csv')
# We need to pay attention to the problem of authority
import os
file_path = '/etc/passwd'
test_path = '/etc/test'
print( f"is { file_path} exists: { os. path. exists( file_path)} ")
import os
file_path = '/etc'
# List all the files in the folder
name_list = os. listdir( file_path)
print( f'file list of etc is:\n{ name_list} ')
# The following is file filtering
import os. path
dir_name_list = [ name for name in os. listdir( file_path)
if os. path. isdir( os. path. join( file_path, name))]
py_file_list = [ name for name in os. listdir( file_path)
if name. endswith( '.py')]
import os. path
import glob
py_file_list = glob. glob( '*.py')
# Get file sizes and modification dates, Get more file information
name_sz_date = [( name, os. path. getsize( name), os. path. getmtime( name))
for name in py_file_list]
for name, size, mtime in name_sz_date:
print( f'name={ name} , size={ size} , mtime={ mtime} ')
# Alternative: Get file metadata
file_metadata = [( name, os. stat( name)) for name in py_file_list]
for name, meta in file_metadata:
print( name, meta. st_size, meta. st_mtime)
print( f'name={ name} , size={ meta. st_size} , mtime={ meta. st_mtime} ')
import os
import mmap
def memory_map( file_name, access = mmap. ACCESS_WRITE):
size_val = os. path. getsize( file_name)
fd = os. open( file_name, os. O_RDWR)
return mmap. mmap( fd, size_val, access = access)
size = 1000000
with open( 'test_data', 'wb') as f:
f. seek( size - 1)
f. write( b'\x00')
m = memory_map( 'test_data')
print( f'the len of m is: { len( m)} ') #1000000
print( f'm split: { m[ 0: 10]} ') #b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
print( f'm[0] is: { m[ 0]} ') #0
m[ 0: 11] = b'Hello World'
print( f'close result: { m. close()} ') #None
with open( 'test_data', 'rb') as f:
print( f'read content: { f. read( 11)} ') #b'Hello World'
m = memory_map( 'test_data')
v = memoryview( m). cast( 'I')
v[ 0] = 7
print( f'point content from m is: { m[ 0: 4]} ') #b'\x07\x00\x00\x00'
m[ 0: 4] = b'\x07\x01\x00\x00'
print( f'v[0] = { v[ 0]} ') #263
import os. path
def read_into_buffer( file_name):
buf = bytearray( os. path. getsize( file_name))
with open( file_name, 'rb') as f:
"""# Used to fill existing cache flushes , and read Is to create a new buffer , Avoid a large number of memory operations ,
One problem is to determine whether the number of bytes read is consistent with the size of the cache , Simply judge whether the read size is consistent with the returned size """
f. readinto( buf)
return buf
with open( 'test_file.bin', 'wb') as f:
f. write( b'Hello World')
buf_read = read_into_buffer( 'test_file.bin')
print( f'buf read is: { buf_read} ')
buf_read[ 0: 5] = b'Hello'
print( f'buf read is: { buf_read} ')
with open( 'new_test_file.bin', 'wb') as f:
f. write( buf_read)
# Size of each record (adjust value)
record_size = 32
""" Judge the size """
buf_read = bytearray( record_size)
with open( 'test_file', 'rb') as f:
while True:
n = f. readinto( buf_read)
if n < record_size:
break
print( f'buf read is: { buf_read} ')
memory_val = memoryview( buf_read) # This is a kind of 0-copy technology
memory_val = memory_val[ - 3:]
print( f'memory value is: { memory_val} ')
memory_val[:] = b'WORLD'
print( f'buf read is: { buf_read} ')
import serial
ser = serial. Serial( '/dev/tty.usbmodem641', # Device name varies
baudrate = 9600,
bytesize = 8,
parity = 'N',
stopbits = 1)
ser. write( b'G1 X50 Y50\r\n')
resp = ser. readline()
import pickle
# file load is [1, 6, 3, 9]
# file load is hello,world!
# file load is {'go', 'java', 'python'}
# pickle funciton: b'\x80\x04\x95\x10\x00\x00\x00\x00\x00\x00\x00\x8c\x04math\x94\x8c\x03cos\x94\x93\x94.'
# T-minus is: 30
# T-minus is: 29load result: <__main__.Countdown object at 0x1037da850>
data_obj = ... # Some Python object
test_file = open( 'test_file', 'wb')
pickle. dump( data_obj, test_file) # It and load Is the opposite of the two operations
p_con = pickle. dumps( data_obj)
# Restore from a file
test_file = open( 'test_file', 'rb')
data_obj = pickle. load( test_file)
# Restore from a string
data_obj = pickle. loads( p_con)
import pickle
test_file = open( 'some_data', 'wb')
pickle. dump([ 1, 6, 3, 9], test_file)
pickle. dump( 'hello,world!', test_file)
pickle. dump({ 'python', 'java', 'go'}, test_file)
test_file. close()
test_file = open( 'some_data', 'rb')
print( f'file load is { pickle. load( test_file)} ')
print( f'file load is { pickle. load( test_file)} ')
print( f'file load is { pickle. load( test_file)} ')
import math
import pickle
print( f'pickle funciton: { pickle. dumps( math. cos)} ')
import time
import threading
""" Some objects that depend on the underlying system cannot be serialized , You can actually use getstate and setstate To achieve serialization and deserialization """
class Countdown:
def __init__( self, n):
self. n = n
self. thr = threading. Thread( target = self. run)
self. thr. daemon = True
self. thr. start()
def run( self):
while self. n > 0:
print( f'T-minus is: { self. n} ')
self. n -= 1
time. sleep( 5)
def __getstate__( self):
return self. n
def __setstate__( self, n):
self. __init__( n)
count_down = Countdown( 30)
test_file = open( 'test.p', 'wb')
import pickle
pickle. dump( count_down, test_file)
test_file. close()
test_file = open( 'test.p', 'rb')
print( f'load result: { pickle. load( test_file)} ')
base64
s_obj = b'hello'
import base64
code_obj = base64. b64encode( s_obj)
print( f'b64 encode { s_obj} = { code_obj} ') #b'aGVsbG8='
print( f'decode { code_obj} = { base64. b64decode( code_obj)} ') #b'hello'
code_obj = base64. b64encode( s_obj). decode( 'ascii')
print( f'encode decode { s_obj} = { code_obj} ') #aGVsbG8=
0x
s = b'hello'
import binascii
h = binascii. b2a_hex( s)
print( f'base: { h} ') #b'68656c6c6f'
print( f'b2a hex: { binascii. a2b_hex( h)} ') #b'hello'
import base64
h = base64. b16encode( s)
print( f'base: { h} ') #b'68656C6C6F'
print( f'b16 decode: { base64. b16decode( h)} ') #b'hello'
h = base64. b16encode( s)
print( f'base: { h} ') #b'68656C6C6F'
print( f"decode: { h. decode( 'ascii')} ") #68656C6C6F
import shutil
#shutil The biggest problem with the library is that the metadata of the file is not fully preserved ;
# Copy src to dst. (cp src dst)
shutil. copy( src, dst)
# Copy files, but preserve metadata (cp -p src dst)
shutil. copy2( src, dst)
# Copy directory tree (cp -R src dst)
shutil. copytree( src, dst)
# Move src to dst (mv src dst)
shutil. move( src, dst)
shutil. copytree( src, dst, symlinks = True)
# Ignore files
def ignore_pyc_files( dirname, filenames):
return [ name in filenames if name. endswith( '.pyc')]
shutil. copytree( src, dst, ignore = ignore_pyc_files)
shutil. copytree( src, dst, ignore = shutil. ignore_patterns( '*~', '*.pyc'))
try:
shutil. copytree( src, dst)
except shutil. Error as e:
for src, dst, msg in e. args[ 0]:
# src is source name
# dst is destination name
# msg is error message from exception
print( dst, src, msg)
import os. path
# Its ratio shutil The advantage of is that the metadata is preserved completely
file_name = '/davanced_programming/chapter13/spam.py'
print( f'base name is: { os. path. basename( file_name)} ')
print( f'dir name is: { os. path. dirname( file_name)} ')
print( f'file split: { os. path. split( file_name)} ')
print( os. path. join( '/new/dir', os. path. basename( file_name)))
print( os. path. expanduser( '~/chapter13/spam.py'))
If you want to deal with more advanced details , You can use tarfile,zipfile, gzip, bz2 modular ,shutil Just a layer of agent
import shutil
shutil. unpack_archive( 'py38.zip')
shutil. make_archive( 'py38', 'zip', 'test_zip')
print( shutil. get_archive_formats()) # Output supported file archiving formats
import os
def find_file( start, name):
for rel_path, dirs, files in os. walk( start): #os.walk(start):
if name in files:
full_path = os. path. join( start, rel_path, name)
print( f'full path is: { os. path. normpath( os. path. abspath( full_path))} ') #abspath Fix pathname
if __name__ == '__main__':
find_file( '/advanced_programming/chapter13', 'file_input.py')
import os
import time
# Find recently modified files
def modified_within( top, seconds):
now = time. time()
for path, dirs, files in os. walk( top):
for name in files:
full_path = os. path. join( path, name)
if not os. path. exists( full_path):
continue
m_time = os. path. getmtime( full_path)
if m_time > ( now - seconds):
print( f'full path is: { full_path} ')
if __name__ == '__main__':
modified_within( '/advanced_programming/chapter13', float( 1000))