Recently, I received a network security book presented by the electronic industry press 《python Black hat 》, There are a total of 24 An experiment , Today, I will repeat the 10 An experiment (http Traffic picture restore ), My test environment is mbp The computer +conda development environment . Visit one http Website , You can capture all the pictures of the website , The actual test results are average , Not only can we not catch it completely , And there is a lot of repetition in the captured pictures , remarks : Maybe the website I chose is not good ~
1、 Visit a with pictures http Website , Note that 80 Port of http, No https, Then open wireshark Grab the bag , Save as pcap.pcap
2、 stay mbp Start the script on , Please ignore the alarm
3、 View the pictures restored through traffic
Reference code :
# -*- coding: utf-8 -*-
# @Time : 2022/6/13 6:56 PM
# @Author : ailx10
# @File : recapper.py
from scapy.all import rdpcap
from scapy.layers.inet import TCP
import collections
import os
import re
import sys
import zlib
OUTDIR = "/Users/ailx10/py3hack/chapter4/picture"
PCAPS = "/Users/ailx10/py3hack/chapter4/download"
Response = collections.namedtuple("Response",["header","payload"])
def get_header(payload):
try:
header_raw = payload[:payload.index(b"\r\n\r\n")+2]
except ValueError:
sys.stdout.write("-")
sys.stdout.flush()
return None
header = dict(re.findall(r"(?P<name>.*?):(?P<value>.*?)\r\n",header_raw.decode()))
if "Content-Type" not in header:
return None
return header
def extract_content(Response,content_name="image"):
content,content_type = None,None
if content_name in Response.header["Content-Type"]:
content_type = Response.header["Content-Type"].split("/")[1]
content = Response.payload[Response.payload.index(b"\r\n\r\n")+4:]
if "Content-Encoding" in Response.header:
if Response.header["Content-Encoding"] == "gzip":
content = zlib.decompress(Response.header,zlib.MAX_WBITS | 32)
elif Response.header["Content-Encoding"] == "deflate":
content = zlib.decompress(Response.payload)
return content,content_type
class Recapper:
def __init__(self,fname):
pcap = rdpcap(fname)
self.sessions = pcap.sessions()
self.response = list()
def get_responses(self):
for session in self.sessions:
payload = b""
for packet in self.sessions[session]:
try:
if packet[TCP].dport == 80 or packet[TCP].sport == 80:
payload += bytes(packet[TCP].payload)
except IndexError:
sys.stdout.write("x")
sys.stdout.flush()
if payload:
header = get_header(payload)
if header is None:
continue
self.response.append(Response(header=header,payload=payload))
def write(self,content_name):
for i,response in enumerate(self.response):
content,content_type = extract_content(response,content_name)
if content and content_type:
fname = os.path.join(OUTDIR,f"ex_{i}.{content_type}")
print(f"Writing {fname}")
with open(fname,"wb") as f:
f.write(content)
if __name__ == "__main__":
pfile = os.path.join(PCAPS,"pcap.pcap")
recapper = Recapper(pfile)
recapper.get_responses()
recapper.write("image")