用法:python extract_tls_flow.py -vr white_pcap/11/2018-01-10_13-05-09_2.pcap -o pcap_ssl_flow.txt >log.txt
python dpkt解析ssl流,记录含有client hello到app data的完整ssl 流,同时记录ssl证书:
- #!/usr/bin/env python
-
- from __future__ import absolute_import
- from __future__ import print_function
- import argparse
- from binascii import hexlify
- import socket
- import struct
- import json
- import sys
- import textwrap
- import dpkt
- from constants import PRETTY_NAMES
- from asn1crypto import x509
-
-
- global streambuffer
- streambuffer = {}
- global encrypted_streams
- encrypted_streams = [] # change_cipher
- global ssl_servers_certs
- ssl_servers_certs = {}
- global ssl_servers_with_client_hello
- ssl_servers_with_client_hello = set()
- global client_hello_set
- client_hello_set = set()
- global ssl_flows
- ssl_flows = []
- global buffer
- buffer = {}
- need_more_parse = False
-
-
- class FlowDirection(object):
- OUT = 1
- IN = 2
- UNKNOWN = 3
-
-
- class Extension(object):
- """
- Encapsulates TLS extensions.
- """
- def __init__(self, payload):
- self._type_id, payload = unpacker('H', payload)
- self._type_name = pretty_name('extension_type', self._type_id)
- self._length, payload = unpacker('H', payload)
- # Data contains an array with the 'raw' contents
- self._data = None
- # pretty_data contains an array with the 'beautified' contents
- self._pretty_data = None
- if self._length > 0:
- self._data, self._pretty_data = parse_extension(payload[:self._length],
- self._type_name)
-
- def __str__(self):
- # Prints out data array in textual format
- return '{0}: {1}'.format(self._type_name, self._pretty_data)
-
-
- def analyze_packet(_timestamp, packet, nth):
- """
- Main analysis loop for pcap.
- """
- eth = dpkt.ethernet.Ethernet(packet)
- if isinstance(eth.data, dpkt.ip.IP):
- #print("timestamp:", _timestamp, "debug")
- parse_ip_packet(eth.data, nth, _timestamp)
-
-
- def parse_arguments():
- """
- Parses command line arguments.
- """
- global filename
- global verboseprint
- global output_file
- parser = argparse.ArgumentParser(
- formatter_class=argparse.RawDescriptionHelpFormatter,
- description=textwrap.dedent('''\
- Captures, parses and shows TLS Handshake packets
- Copyright (C) 2015 Peter Mosmans [Go Forward]
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.'''))
- parser.add_argument('-r', '--read', metavar='FILE', action='store',
- help='read from file (don\'t capture live packets)')
- parser.add_argument('-v', '--verbose', action='store_true',
- help='increase output verbosity')
- parser.add_argument('-o', '--output', action='store',
- help='output file')
- args = parser.parse_args()
-
- if args.verbose:
- def verboseprint(*args):
- print('# ', end="")
- for arg in args:
- print(arg, end="")
- print()
- else:
- verboseprint = lambda *a: None
- filename = None
- if args.read:
- filename = args.read
- output_file = "demo_output.txt"
- if args.output:
- output_file = args.output
-
-
-
- def parse_ip_packet(ip, nth, timestamp):
- """
- Parses IP packet.
- """
- sys.stdout.flush()
- if isinstance(ip.data, dpkt.tcp.TCP) and len(ip.data.data):
- # print("****TCP packet found****", "tcp payload:", list(ip.data.data))
- parse_tcp_packet(ip, nth, timestamp)
-
-
- def parse_tcp_packet(ip, nth, timestamp):
- """
- Parses TCP packet.
- """
- stream = ip.data.data
- """ refer: The Transport Layer Security (TLS) Protocol URL:https://tools.ietf.org/html/rfc5246
- enum {
- change_cipher_spec(20), alert(21), handshake(22),
- application_data(23), (255)
- } ContentType;
- """
- # ssl flow
- if (stream[0]) in {20, 21, 22, 23}:
- if (stream[0]) in {20, 21, 22}:
- parse_tls_records(ip, stream, nth)
- else:
- connection = '{0}:{1}-{2}:{3}'.format(socket.inet_ntoa(ip.src),
- ip.data.sport,
- socket.inet_ntoa(ip.dst),
- ip.data.dport)
- print("*"*99)
- print("23 SSL application data:{} 10 sample:{} nth:{}".format(connection, list(stream[:10]), nth))
- # buffer record recent ssl flow from handshake to app data TODO precise description
- record_recent_data_flow(ip, stream, nth, timestamp)
-
-
- def has_application_data(flow_list):
- for flow in flow_list:
- if flow[0] == 23:
- return True
- return Fals