北屋教程网

专注编程知识分享,从入门到精通的编程学习平台

Python文件操作常用库高级应用教程

本文是在前面《Python文件操作常用库使用教程》的基础上,进一步学习Python文件操作库的高级应用。

一、高级文件系统监控

1.1 watchdog库 - 实时文件系统监控

安装与基本使用

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class FileChangeHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if not event.is_directory:
            print(f"文件被修改: {event.src_path}")

def monitor_directory(path):
    event_handler = FileChangeHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

# 监控当前目录
monitor_directory('.')

功能扩展

class CustomHandler(FileSystemEventHandler):
    def __init__(self, callback):
        self.callback = callback
    
    def on_any_event(self, event):
        event_type = event.event_type  # 'modified', 'created', 'deleted', 'moved'
        self.callback(event_type, event.src_path, 
                     getattr(event, 'dest_path', None))

# 使用示例
def log_change(event_type, src, dest):
    print(f"{event_type}: {src} {f'-> {dest}' if dest else ''}")

monitor_with_callback('.', log_change)

文件监控系统架构




二、高性能文件处理

2.1 内存映射文件(mmap)

大文件处理示例

import mmap
import re

def search_large_file(filename, pattern):
    """在大文件中搜索模式"""
    with open(filename, 'r+b') as f:
        # 内存映射文件
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
            # 使用正则搜索
            for match in re.finditer(pattern.encode(), mm):
                yield match.start(), match.group().decode()

# 使用示例
for pos, text in search_large_file('large.log', r'ERROR\s+\d+'):
    print(f"在位置 {pos} 发现错误: {text}")

性能对比测试

import timeit

def test_performance():
    # 传统方式
    def traditional():
        with open('large.file') as f:
            return sum(1 for _ in f)
    
    # mmap方式
    def mmap_way():
        with open('large.file', 'r+b') as f:
            with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
                return sum(1 for _ in mm)
    
    print("传统方式:", timeit.timeit(traditional, number=10))
    print("mmap方式:", timeit.timeit(mmap_way, number=10))

2.2 并行文件处理

多进程文件处理

from multiprocessing import Pool
import os

def process_file_chunk(args):
    """处理文件块"""
    filename, start, end = args
    with open(filename, 'rb') as f:
        f.seek(start)
        chunk = f.read(end - start)
        return len(chunk.splitlines())

def parallel_file_processing(filename, workers=4):
    """并行处理大文件"""
    file_size = os.path.getsize(filename)
    chunk_size = file_size // workers
    ranges = [(filename, i*chunk_size, (i+1)*chunk_size) 
              for i in range(workers)]
    
    with Pool(workers) as pool:
        results = pool.map(process_file_chunk, ranges)
    
    return sum(results)

# 使用示例
total_lines = parallel_file_processing('huge_file.csv')
print(f"文件总行数: {total_lines}")

三、高级压缩与归档处理

3.1 增量压缩与加密

ZIP加密与增量添加

import zipfile
import os
from tqdm import tqdm  # 进度条库

def create_secure_zip(output_zip, files_to_zip, password):
    """创建加密ZIP文件"""
    with zipfile.ZipFile(
        output_zip, 
        'w', 
        compression=zipfile.ZIP_DEFLATED
    ) as zipf:
        zipf.setpassword(password.encode())
        
        for file in tqdm(files_to_zip, desc="压缩进度"):
            if os.path.isfile(file):
                zipf.write(file)
                
def add_to_existing_zip(zip_file, new_files, password=None):
    """向现有ZIP添加文件"""
    temp_zip = f"temp_{zip_file}"
    os.rename(zip_file, temp_zip)
    
    with zipfile.ZipFile(temp_zip, 'r') as zin:
        with zipfile.ZipFile(zip_file, 'w') as zout:
            if password:
                zout.setpassword(password.encode())
            
            # 复制现有条目
            for item in tqdm(zin.infolist(), desc="复制原有文件"):
                zout.writestr(item, zin.read(item))
            
            # 添加新文件
            for file in tqdm(new_files, desc="添加新文件"):
                if os.path.isfile(file):
                    zout.write(file)
    
    os.remove(temp_zip)

3.2 多卷压缩文件

分卷压缩实现

import zipfile
import math

def split_zip(input_files, output_prefix, max_size_mb):
    """创建分卷ZIP文件"""
    max_size = max_size_mb * 1024 * 1024
    part_num = 1
    current_size = 0
    current_zip = None
    
    for file in input_files:
        file_size = os.path.getsize(file)
        
        # 如果当前ZIP不存在或需要新分卷
        if current_zip is None or current_size + file_size > max_size:
            if current_zip is not None:
                current_zip.close()
            
            zip_name = f"{output_prefix}.zip.{part_num:03d}"
            current_zip = zipfile.ZipFile(
                zip_name, 'w', zipfile.ZIP_DEFLATED)
            part_num += 1
            current_size = 0
        
        current_zip.write(file)
        current_size += file_size
    
    if current_zip is not None:
        current_zip.close()
    
    return part_num - 1  # 返回分卷数量

四、云存储集成

4.1 boto3 - AWS S3集成

S3文件操作示例

import boto3
from botocore.exceptions import ClientError

class S3Manager:
    def __init__(self, bucket_name):
        self.s3 = boto3.client('s3')
        self.bucket = bucket_name
    
    def upload_file(self, local_path, s3_key):
        """上传文件到S3"""
        try:
            self.s3.upload_file(
                local_path, 
                self.bucket, 
                s3_key,
                ExtraArgs={
                    'ACL': 'bucket-owner-full-control',
                    'StorageClass': 'INTELLIGENT_TIERING'
                }
            )
            return True
        except ClientError as e:
            print(f"S3上传错误: {e}")
            return False
    
    def download_file(self, s3_key, local_path):
        """从S3下载文件"""
        try:
            self.s3.download_file(self.bucket, s3_key, local_path)
            return True
        except ClientError as e:
            print(f"S3下载错误: {e}")
            return False
    
    def generate_presigned_url(self, s3_key, expires=3600):
        """生成预签名URL"""
        try:
            return self.s3.generate_presigned_url(
                'get_object',
                Params={'Bucket': self.bucket, 'Key': s3_key},
                ExpiresIn=expires
            )
        except ClientError as e:
            print(f"生成URL错误: {e}")
            return None

4.2 阿里云OSS集成

OSS文件操作示例

import oss2
from oss2.exceptions import OssError

class OSSManager:
    def __init__(self, access_key, secret_key, endpoint, bucket_name):
        auth = oss2.Auth(access_key, secret_key)
        self.bucket = oss2.Bucket(auth, endpoint, bucket_name)
    
    def upload_with_progress(self, local_path, oss_key):
        """带进度显示的上传"""
        def progress_callback(consumed_bytes, total_bytes):
            if total_bytes:
                percent = 100 * (consumed_bytes / total_bytes)
                print(f"\r上传进度: {percent:.2f}%", end='')
        
        try:
            result = self.bucket.put_object_from_file(
                oss_key, 
                local_path,
                progress_callback=progress_callback
            )
            print("\n上传完成")
            return result.status == 200
        except OssError as e:
            print(f"\nOSS上传错误: {e}")
            return False
    
    def download_large_file(self, oss_key, local_path, part_size=10*1024*1024):
        """分片下载大文件"""
        try:
            oss2.resumable_download(
                self.bucket, 
                oss_key, 
                local_path,
                part_size=part_size,
                num_threads=4
            )
            return True
        except OssError as e:
            print(f"OSS下载错误: {e}")
            return False

五、文件内容高级处理

5.1 二进制文件解析

结构化二进制文件解析

import struct
from collections import namedtuple

def parse_binary_file(filename):
    """解析自定义二进制格式文件"""
    # 定义文件头结构
    Header = namedtuple('Header', 'magic version num_records')
    header_format = '<4sII'  # 4字节魔数,2个无符号整数
    
    # 定义记录结构
    Record = namedtuple('Record', 'id timestamp value')
    record_format = '<Qdf'  # 无符号长整型,双精度浮点,单精度浮点
    
    with open(filename, 'rb') as f:
        # 读取文件头
        header_data = f.read(struct.calcsize(header_format))
        header = Header._make(struct.unpack(header_format, header_data))
        
        # 验证魔数
        if header.magic != b'BIN1':
            raise ValueError("无效的文件格式")
        
        # 读取记录
        records = []
        for _ in range(header.num_records):
            record_data = f.read(struct.calcsize(record_format))
            records.append(Record._make(struct.unpack(record_format, record_data)))
    
    return header, records

5.2 实时日志文件分析

实时日志分析器

import re
from collections import defaultdict
import time

class LogAnalyzer:
    def __init__(self, log_file, patterns):
        self.log_file = log_file
        self.patterns = patterns
        self.position = 0
        self.stats = defaultdict(int)
    
    def analyze(self):
        """分析日志文件"""
        with open(self.log_file, 'r', encoding='utf-8', errors='ignore') as f:
            # 跳到上次读取位置
            f.seek(self.position)
            
            for line in f:
                for name, pattern in self.patterns.items():
                    if re.search(pattern, line):
                        self.stats[name] += 1
            
            # 更新读取位置
            self.position = f.tell()
        
        return dict(self.stats)
    
    def continuous_monitor(self, interval=5):
        """持续监控日志文件"""
        try:
            while True:
                stats = self.analyze()
                if stats:
                    print(f"\n[{time.ctime()}] 统计结果:")
                    for name, count in stats.items():
                        print(f"{name}: {count}")
                time.sleep(interval)
        except KeyboardInterrupt:
            print("\n监控停止")

# 使用示例
patterns = {
    'ERROR': r'ERROR',
    'WARNING': r'WARNING',
    'HTTP_500': r'HTTP/1.\d"\s+500'
}
analyzer = LogAnalyzer('app.log', patterns)
analyzer.continuous_monitor()

六、安全文件操作

6.1 安全文件写入模式

原子写入模式

import os
import tempfile

def atomic_write(filename, data, mode='w', encoding='utf-8'):
    """原子性写入文件"""
    # 创建临时文件
    temp_dir = os.path.dirname(os.path.abspath(filename))
    with tempfile.NamedTemporaryFile(
        mode=mode,
        dir=temp_dir,
        prefix='.tmp_',
        delete=False,
        encoding=encoding
    ) as tmp:
        tmp.write(data)
        tmp.flush()
        os.fsync(tmp.fileno())
        tempname = tmp.name
    
    # 原子替换
    try:
        os.replace(tempname, filename)
    except:
        os.unlink(tempname)
        raise

6.2 文件权限管理

安全权限设置

import os
import stat

def set_secure_permissions(filename):
    """设置安全文件权限"""
    # 获取当前权限
    current_mode = os.stat(filename).st_mode
    
    # 移除组和其他用户的写权限
    new_mode = current_mode & ~stat.S_IWGRP & ~stat.S_IWOTH
    
    # 如果是敏感文件,移除所有组和其他用户的权限
    if filename.endswith(('.key', '.pem', '.env')):
        new_mode = new_mode & ~stat.S_IRGRP & ~stat.S_IROTH
    
    os.chmod(filename, new_mode)

def create_secure_file(filename, content):
    """创建安全文件"""
    # 使用原子写入
    atomic_write(filename, content)
    
    # 设置安全权限
    set_secure_permissions(filename)
    
    # 验证权限
    mode = os.stat(filename).st_mode
    if mode & (stat.S_IWGRP | stat.S_IWOTH):
        raise RuntimeError("文件权限设置失败")

七、总结

表2:文件操作性能优化技术

场景

优化技术

适用条件

大文件读取

内存映射(mmap)

需要随机访问的大文件

批量小文件

并行处理(multiprocessing)

大量独立小文件

顺序处理

缓冲读写(大缓冲区)

顺序读写场景

网络存储

断点续传/分片上传

不稳定网络环境

实时处理

增量读取(记录位置)

日志监控等场景

压缩文件

流式处理(不解压)

大型压缩文件

建议

  1. 避免频繁的小IO操作,尽量批量处理
  2. 使用with语句确保资源释放
  3. 对大文件考虑内存映射技术
  4. 大量文件处理使用并行技术
  5. 网络存储操作添加重试机制
  6. 关键操作添加原子性保证

通过本教程的高级技巧,我们可以在实际项目中实现更高效、更安全的文件操作,满足企业级应用的需求。


持续更新Python编程学习日志与技巧,敬请关注!



#编程# #python# #在头条记录我的2025#

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言