北屋教程网

专注编程知识分享,从入门到精通的编程学习平台

Python 的网络与互联网访问模块及应用实例(二)

Python 的网络与互联网访问能力非常强大,无论是进行底层的套接字编程,还是利用高级库进行高效的网络通信和数据传输,都能找到合适的工具。下面介绍一些关键的模块、应用实例,并探讨如何实现高并发处理。


一、核心网络模块与应用


Python 的网络编程能力构建在其丰富的标准库和第三方库之上。


底层网络接口:socket 模块


socket 模块是 Python 进行网络编程的基石,它提供了标准的 BSD Socket API,允许你进行 TCP 或 UDP 协议的通信。


TCP 服务器与客户端示例:


TCP 提供可靠的、面向连接的通信。


TCP 服务器:监听特定端口,等待客户端连接并处理请求。


import socket


def start_server(host='127.0.0.1', port=65432):


with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:


s.bind((host, port))


s.listen()


print("Server is listening...")


conn, addr = s.accept()


with conn:


print('Connected by', addr)


while True:


data = conn.recv(1024)


if not data:


break


conn.sendall(data)


if name == "main":


start_server()


TCP 客户端:连接到服务器并发送/接收数据。


import socket


def connect_to_server(host='127.0.0.1', port=65432):


with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:


s.connect((host, port))


s.sendall(b'Hello, world')


data = s.recv(1024)


print('Received', repr(data))


if name == "main":


connect_to_server()


UDP 服务器与客户端示例:


UDP 是无连接的协议,传输速度快,但不保证可靠性,适用于视频流、在线游戏等场景。


UDP 服务器:


import socket


def start_udp_server(host='127.0.0.1', port=65432):


with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:


s.bind((host, port))


while True:


data, addr = s.recvfrom(1024)


print("Received from:", addr)


s.sendto(data, addr)


if name == "main":


start_udp_server()


UDP 客户端:


import socket


def connect_to_udp_server(host='127.0.0.1', port=65432):


with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:


s.sendto(b'Hello, UDP server!', (host, port))


data, addr = s.recvfrom(1024)


print('Received', repr(data), 'from', addr)


if name == "main":


connect_to_udp_server()


高级 HTTP 客户端:requests 库


虽然 http.client 是标准库,但第三方 requests 库因其人性化的 API 而更受欢迎。它简化了 HTTP 请求的处理,支持 GET、POST 等各种方法,并能轻松处理响应、Cookies、会话等。


import requests


def fetch_webpage(url="http://example.com "):


response = requests.get(url)


return response.text


if name == "main":


print(fetch_webpage())


网络运维自动化


Python 在网络运维自动化中扮演着重要角色,例如配置管理、流量分析和监控。


使用 paramiko 进行网络设备配置(如 Cisco 路由器):


import paramiko


host = '192.168.1.100'


port = 22


username = 'admin'


password = 'cisco123'


ssh = paramiko.SSHClient()


ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())


ssh.connect(host, port, username, password)


stdin, stdout, stderr = ssh.exec_command('enable')


stdin, stdout, stderr = ssh.exec_command('configure terminal')


stdin, stdout, stderr = ssh.exec_command('interface fa0/0')


stdin, stdout, stderr = ssh.exec_command('ip address 192.168.1.100 255.255.255.0')


stdin, stdout, stderr = ssh.exec_command('exit')


ssh.close()


使用 scapy 进行网络流量分析:


from scapy.all import sniff


interface = 'eth0'


duration = 30


captured_packets = sniff(iface=interface, timeout=duration)


for packet in captured_packets:


if packet.haslayer('TCP'):


print('TCP packet:', packet.summary())


elif packet.haslayer('UDP'):


print('UDP packet:', packet.summary())


其他实用网络工具脚本


网络服务状态检查:监控服务是否在线。


import socket


def check_service_status(host, port, timeout=5):


with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:


s.settimeout(timeout)


try:


s.connect((host, port))


return "Service is up."


except socket.error as e:


return f"Service is down: {e}"


print(check_service_status('example.com', 80))


端口扫描器:发现主机上开放的端口。


import socket


def port_scanner(host, start_port, end_port):


open_ports = []


for port in range(start_port, end_port + 1):


with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:


s.settimeout(1)


if s.connect_ex((host, port)) == 0:


open_ports.append(port)


return open_ports


print(port_scanner('example.com', 1, 1024))


简单的 HTTP 服务器:用于快速测试或本地文件共享。


from http.server import SimpleHTTPRequestHandler, HTTPServer


def run_server(port=8000):


server_address = ('', port)


httpd = HTTPServer(server_address, SimpleHTTPRequestHandler)


print(f"Server started on port {port}")


httpd.serve_forever()


run_server()


网络聊天室(多线程):允许多个客户端连接并广播消息。


import socket


import threading


clients = []


def client_thread(conn, addr):


print(f"Connected by {addr}")


while True:


try:


message = conn.recv(1024).decode()


if not message or message == '!quit':


break


print(f"{addr}: {message}")


broadcast(message, conn)


except:


break


conn.close()


remove_client(conn)


def broadcast(message, connection):


for client in clients:


if client != connection:


try:


client.send(message.encode())


except:


client.close()


remove_client(client)


def remove_client(connection):


if connection in clients:


clients.remove(connection)


def main():


server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)


server_socket.bind(('0.0.0.0', 12345))


server_socket.listen()


print("Chat server started...")


while True:


conn, addr = server_socket.accept()


clients.append(conn)


threading.Thread(target=client_thread, args=(conn, addr)).start()


if name == "main":


main()


二、实现高并发网络应用


当需要处理大量并发连接时,传统的同步阻塞模型(如一个连接一个线程)会成为性能瓶颈。Python 提供了多种高并发处理方案。


异步编程与 asyncio 库


异步编程是一种编程范式,它允许程序在等待 I/O 操作(如网络请求、文件读写)完成时,去执行其他任务,而不是干等着,从而极大提高程序的并发能力和资源利用率。asyncio 是 Python 内置的异步编程框架,核心是事件循环(Event Loop)、协程(Coroutine)和任务(Task)。


异步函数与 async/await:


使用 async def 定义异步函数,其调用返回一个协程对象。await 用于等待一个异步操作完成。


import asyncio


async def fetch_data(url):


print(f"Fetching {url}...")


await asyncio.sleep(2)


return f"Data from {url}"


async def main():


results = await asyncio.gather(


fetch_data("https://api.example.com/1 "),


fetch_data("https://api.example.com/2 ")


)


print(results)


asyncio.run(main())


异步 HTTP 请求(aiohttp):


对于网络请求,可以使用 aiohttp 这样的异步 HTTP 客户端库来代替同步的 requests 库。


import aiohttp


import asyncio


async def fetch(session, url):


async with session.get(url) as response:


return await response.text()


async def main():


urls = ["https://api.example.com/1 ", "https://api.example.com/2 "]


async with aiohttp.ClientSession() as session:


tasks = [fetch(session, url) for url in urls]


results = await asyncio.gather(*tasks)


print(results)


asyncio.run(main())


高性能异步 Web 框架(FastAPI):


FastAPI 是一个现代的高性能 Web 框架,内置了对异步的支持,非常适合构建高并发的 API 服务。


from fastapi import FastAPI


import aiohttp


app = FastAPI()


@app.get("/proxy")


async def proxy(url: str):


async with aiohttp.ClientSession() as session:


async with session.get(url) as resp:


return await resp.json()


多线程与多进程


多线程 (threading 模块):适用于 I/O 密集型任务。当某个线程在等待 I/O 时,其他线程可以继续执行。但由于 Python 的全局解释器锁(GIL),多线程不适合 CPU 密集型任务。


之前的网络聊天室示例就使用了多线程来处理多个客户端连接。


多进程 (multiprocessing 模块):适用于 CPU 密集型任务。每个进程有自己独立的 Python 解释器和内存空间,避免了 GIL 的限制,真正实现了并行计算。但进程创建和切换的开销比线程大。


高并发 Socket 技巧


对于直接的 Socket 编程,也可以采用一些模式来实现高并发:


异步非阻塞模式:使用 selectors 模块(基于 select 或更高效的 epoll/kqueue)来监听多个 Socket 的事件(如可读、可写),在单个线程中处理多个连接。


import socket


import selectors


sel = selectors.DefaultSelector()


sock = socket.socket()


sock.bind(('0.0.0.0', 8888))


sock.listen(100)


sock.setblocking(False)


def accept(sock, mask):


conn, addr = sock.accept()


conn.setblocking(False)


sel.register(conn, selectors.EVENT_READ, read)


def read(conn, mask):


try:


data = conn.recv(1000)


if data:


conn.send(data)


else:


sel.unregister(conn)


conn.close()


except ConnectionResetError:


sel.unregister(conn)


conn.close()


sel.register(sock, selectors.EVENT_READ, accept)


while True:


events = sel.select()


for key, mask in events:


callback = key.data


callback(key.fileobj, mask)


连接池:对于需要频繁创建和销毁网络连接的情况,使用连接池可以避免重复建立连接的开销。


from queue import Queue


class SocketPool:


def init(self, host, port, pool_size=10):


self.pool = Queue()


for _ in range(pool_size):


sock = socket.socket()


sock.connect((host, port))


self.pool.put(sock)




def get(self):

return self.pool.get()


def put(self, sock):

self.pool.put(sock)


心跳机制:在长连接中,定期发送心跳包(如 ping/pong)来检测连接是否存活,及时清理失效连接。


import time


def handle_client(conn):


last_heartbeat = time.time()


while True:


try:


data = conn.recv(1024)


if data == b'ping':


conn.send(b'pong')


last_heartbeat = time.time()


if time.time() - last_heartbeat > 30:


raise ConnectionError("心跳超时")


except Exception:


conn.close()


break


三、最佳实践与注意事项


异常处理与重试:网络操作极不稳定,必须使用 try-except 块妥善处理超时、连接错误、解析错误等异常,并可根据需要实现重试逻辑。


资源管理:使用 with 语句或确保手动调用 close() 方法,及时释放 Socket 连接、文件句柄等资源,避免泄漏。


安全性:


验证和清理所有输入,防止注入攻击。


使用 HTTPS、SSL/TLS 加密敏感数据传输。


妥善保管密钥、令牌等敏感信息,不要硬编码在代码中。


性能考量:


I/O 密集型 vs CPU 密集型:根据任务类型选择合适的并发模型(异步、多线程、多进程)。


使用高效库:例如,在高并发异步应用中使用 aiohttp 代替 requests,使用 asyncpg 访问 PostgreSQL 数据库等。


数据库优化:为常用查询字段建立索引,使用连接池减少连接开销。


缓存:使用 Redis 或 Memcached 等缓存频繁访问的数据,减轻后端压力。


遵守规则:编写网络爬虫时,尊重 robots.txt,设置合理的请求间隔,避免对目标网站造成过大压力。





控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言