| #!/usr/bin/env python3 | |||||
| import datetime | |||||
| from io import StringIO | |||||
| from math import ceil | |||||
| import pandas as pd | |||||
| import multiprocessing | |||||
| import os | |||||
| import re | |||||
| import subprocess | |||||
| from argparse import ArgumentParser | |||||
| from time import sleep, time | |||||
| # tshark -r ./tcp-cap-test/test__bandwidth_reverse_tcp_bbr_1.pcap -Y "tcp.stream eq 1" -T fields -e frame.time_relative -e ip.len -e ip.hdr_len -e tcp.hdr_len -e tcp.analysis.ack_rtt -e tcp.analysis.bytes_in_flight -e tcp.analysis.retransmission -e tcp.analysis.duplicate_ack -e ip.dst -e ip.src -e tcp.options.mss_val -E header=y -E separator=, -E quote=d | |||||
| from util import chunk_list | |||||
| def format_tcp_trace_to_csv(pcap_number, packets_to_keep, is_reverse=False): | |||||
| txt_name = "{}{}{}.txt".format(args.folder, args.tcp_trace, pcap_number) | |||||
| try: | |||||
| txt_file = open(txt_name, "r") | |||||
| except IOError as e: | |||||
| print("\rCan not open file {}\n {} {}".format(txt_name, e.errno, e.strerror)) | |||||
| return | |||||
| content = txt_file.read() | |||||
| txt_file.close() | |||||
| csv_string = "" | |||||
| csv_string += "time_tcp_probe,snd_cwnd,snd_wnd,srtt\n" | |||||
| uptime = None | |||||
| counter = 0 | |||||
| lines = content.split("\n") | |||||
| start_time = None | |||||
| for line in lines: | |||||
| counter += 1 | |||||
| if uptime is None: | |||||
| uptime = float(line.split(" ")[0]) | |||||
| else: | |||||
| if is_reverse: | |||||
| line_filter = "src=[::ffff:{}]:{}".format(args.server, args.port) | |||||
| else: | |||||
| line_filter = "dest={}:{}".format(args.server, args.port) | |||||
| # ignore tcp packets from iperf syn (packets to keep = len of tcp.stream.eq 1) | |||||
| if line_filter in line and counter >= (len(lines) - packets_to_keep): | |||||
| match = re.match( | |||||
| r".* (\d+\.\d+): tcp_probe:.*snd_cwnd=(\d+).*snd_wnd=(\d+).*srtt=(\d+)", | |||||
| line, | |||||
| ) | |||||
| if match: | |||||
| if start_time is None: | |||||
| start_time = float(match.group(1)) - uptime | |||||
| time = float(match.group(1)) - (uptime + start_time) | |||||
| snd_cwnd = match.group(2) | |||||
| snd_wnd = match.group(3) | |||||
| srtt = match.group(4) | |||||
| csv_string += "{},{},{},{}\n".format(time, snd_cwnd, snd_wnd, srtt) | |||||
| csv_string_io = StringIO(csv_string) | |||||
| trace_df = pd.read_csv(csv_string_io) | |||||
| if len(trace_df) <= 1: | |||||
| print("\rFaulty tcp trace file for pcap no: {}".format(pcap_number)) | |||||
| return None | |||||
| return trace_df | |||||
| def format_pcaps_to_csv(pcaps, dummy): | |||||
| global n | |||||
| for pcap in pcaps: | |||||
| if pcap.endswith(".pcap") and pcap.startswith(args.prefix): | |||||
| match = re.match(regex, pcap) | |||||
| if match: | |||||
| # metadata from pcap filename | |||||
| direction = "upload" | |||||
| if "_reverse_" in pcap: | |||||
| direction = "download" | |||||
| congestion_control = match.group(2) | |||||
| pcap_number = match.group(3) | |||||
| # analyse traffic from pcap (receiver side) | |||||
| tshark_command = [ | |||||
| "tshark", | |||||
| "-r", | |||||
| "{}{}".format(args.folder, pcap), | |||||
| # remove this for mobile measurements | |||||
| # "-Y", | |||||
| # "tcp.stream eq 1", | |||||
| "-T", | |||||
| "fields", | |||||
| "-e", | |||||
| "frame.time_relative", | |||||
| "-e", | |||||
| "ip.len", | |||||
| "-e", | |||||
| "ip.hdr_len", | |||||
| "-e", | |||||
| "tcp.hdr_len", | |||||
| "-e", | |||||
| "tcp.analysis.ack_rtt", | |||||
| "-e", | |||||
| "tcp.analysis.bytes_in_flight", | |||||
| "-e", | |||||
| "tcp.analysis.retransmission", | |||||
| "-e", | |||||
| "tcp.analysis.duplicate_ack", | |||||
| "-e", | |||||
| "ip.src", | |||||
| "-e", | |||||
| "ip.dst", | |||||
| "-e", | |||||
| "tcp.options.mss_val", | |||||
| "-e", | |||||
| "tcp.window_size", | |||||
| "-e", | |||||
| "frame.time_epoch", | |||||
| "-e", | |||||
| "tcp.stream", # have to be the last value in line! | |||||
| "-E", | |||||
| "header=y", | |||||
| "-E", | |||||
| "separator=,", | |||||
| "-E", | |||||
| "quote=d", | |||||
| ] | |||||
| tshark_out = None | |||||
| try: | |||||
| tshark_out = subprocess.check_output(tshark_command).decode("utf-8") | |||||
| except subprocess.CalledProcessError as tsharkexec: | |||||
| if tsharkexec.returncode == 2: | |||||
| print("\rtshark could not open pcap: {}".format(pcap)) | |||||
| else: | |||||
| print("\rtshark exited with code: {}".format(tsharkexec.returncode)) | |||||
| print(tsharkexec.output) | |||||
| continue | |||||
| # Convert String into StringIO | |||||
| csv_string_io = StringIO(tshark_out) | |||||
| conv_bool = lambda x: (True if x != "" else False) | |||||
| pcap_df = pd.read_csv( | |||||
| csv_string_io, | |||||
| converters={ | |||||
| "tcp.analysis.retransmission": conv_bool, | |||||
| "tcp.analysis.duplicate_ack": conv_bool, | |||||
| }, | |||||
| ) | |||||
| last_tcp_stream_in_pcap = pcap_df["tcp.stream"].max() | |||||
| pcap_df = pcap_df.loc[pcap_df["tcp.stream"] == last_tcp_stream_in_pcap] | |||||
| pcap_df["payload_size"] = pcap_df["ip.len"] - ( | |||||
| pcap_df["ip.hdr_len"] + pcap_df["tcp.hdr_len"] | |||||
| ) | |||||
| pcap_df["direction"] = direction | |||||
| pcap_df["congestion_control"] = congestion_control | |||||
| pcap_df["pcap_number"] = pcap_number | |||||
| pcap_df["datetime"] = pd.to_datetime( | |||||
| pcap_df["frame.time_epoch"].apply( | |||||
| lambda x: datetime.datetime.fromtimestamp(x) | |||||
| ) | |||||
| ) | |||||
| pcap_df = pcap_df.drop( | |||||
| columns=["tcp.stream", "ip.len", "ip.hdr_len", "tcp.hdr_len"] | |||||
| ) | |||||
| pcap_df.rename( | |||||
| columns={ | |||||
| "frame.time_relative": "arrival_time", | |||||
| "ip.src": "src_ip", | |||||
| "ip.dst": "dst_ip", | |||||
| "tcp.options.mss_val": "mss", | |||||
| "tcp.analysis.ack_rtt": "ack_rtt", | |||||
| "tcp.analysis.bytes_in_flight": "bytes_in_flight", | |||||
| "tcp.window_size": "receive_window_size", | |||||
| "tcp.analysis.retransmission": "is_retranmission", | |||||
| "tcp.analysis.duplicate_ack": "is_dup_ack", | |||||
| "frame.time_epoch": "time_epoch", | |||||
| }, | |||||
| inplace=True, | |||||
| ) | |||||
| pcap_df = pcap_df.sort_values("arrival_time") | |||||
| try: | |||||
| # join tcp_trace data with pcap data | |||||
| merge_srtt = False | |||||
| if merge_srtt: | |||||
| tcp_trace_df = format_tcp_trace_to_csv( | |||||
| pcap_number, | |||||
| len(pcap_df), | |||||
| is_reverse=True if "_reverse_" in pcap else False, | |||||
| ) | |||||
| if tcp_trace_df is None: | |||||
| print( | |||||
| "\rNo tcp trace file for pcap no {} found".format( | |||||
| pcap_number | |||||
| ) | |||||
| ) | |||||
| break | |||||
| merged_df = pd.merge_asof( | |||||
| pcap_df.loc[pcap_df["src_ip"] != args.server], | |||||
| tcp_trace_df, | |||||
| left_on="arrival_time", | |||||
| right_on="time_tcp_probe", | |||||
| tolerance=0.01, | |||||
| ) | |||||
| merged_df = pd.concat( | |||||
| [merged_df, pcap_df.loc[pcap_df["src_ip"] == args.server]] | |||||
| ) | |||||
| merged_df = merged_df.sort_values("arrival_time") | |||||
| merged_df.to_csv( | |||||
| "{}{}".format(args.folder, pcap).replace(".pcap", ".csv") | |||||
| ) | |||||
| else: | |||||
| pcap_df.to_csv( | |||||
| "{}{}".format(args.folder, pcap).replace(".pcap", ".csv") | |||||
| ) | |||||
| except: | |||||
| print("\rCould not merge data for pcap no: {}".format(pcap)) | |||||
| pcap_df.to_csv( | |||||
| "{}{}".format(args.folder, pcap).replace(".pcap", ".csv") | |||||
| ) | |||||
| n.value += 1 | |||||
| else: | |||||
| print("File does not match regex: {}".format(pcap)) | |||||
| else: | |||||
| print("File is not from type PCAP: {}".format(pcap)) | |||||
| from itertools import islice | |||||
| def chunk(it, size): | |||||
| it = iter(it) | |||||
| return iter(lambda: tuple(islice(it, size)), ()) | |||||
| if __name__ == "__main__": | |||||
| parser = ArgumentParser() | |||||
| parser.add_argument("-f", "--folder", required=True, help="Folder with pcaps.") | |||||
| parser.add_argument( | |||||
| "-p", | |||||
| "--prefix", | |||||
| required=True, | |||||
| help="Filename prefix e.g. 2021-03-17_bandwidth_tcp_bbr_", | |||||
| ) | |||||
| parser.add_argument( | |||||
| "-t", | |||||
| "--tcp_trace", | |||||
| required=True, | |||||
| help="Format of tcp trace txt files e.g: 2021_03_30_bandwidth_reverse_tcp_tcp_trace_ for " | |||||
| "2021_03_30_bandwidth_reverse_tcp_tcp_trace_1.txt", | |||||
| ) | |||||
| parser.add_argument( | |||||
| "-c", | |||||
| "--cores", | |||||
| default=1, | |||||
| type=int, | |||||
| help="Number of cores for multiprocessing.", | |||||
| ) | |||||
| parser.add_argument( | |||||
| "--port", | |||||
| default=5201, | |||||
| type=int, | |||||
| help="iPerf3 port used for measurements", | |||||
| ) | |||||
| parser.add_argument( | |||||
| "--server", | |||||
| default="130.75.73.69", | |||||
| type=str, | |||||
| help="iPerf3 server ip used for measurements", | |||||
| ) | |||||
| args = parser.parse_args() | |||||
| manager = multiprocessing.Manager() | |||||
| # regex for protokoll, algo and bitrate | |||||
| regex = r".*_bandwidth_[reverse_]*(.+)_(.+)_(\d+)\.pcap" | |||||
| csv_header = "n,start_time,end_time,payload_size,protocol,algorithm,direction,packages_received,syns_in_pcap\n" | |||||
| n = manager.Value("i", 0) | |||||
| filenames = os.listdir(args.folder) | |||||
| number_of_files = len(filenames) | |||||
| pcap_list = [] | |||||
| jobs = [] | |||||
| st = time() | |||||
| for filename in filenames: | |||||
| if filename.endswith(".pcap") and filename.startswith(args.prefix): | |||||
| if re.match(regex, filename): | |||||
| pcap_list.append(filename) | |||||
| pcap_list.sort() | |||||
| print("Found {} pcap files in {} files.".format(len(pcap_list), len(filenames))) | |||||
| if len(pcap_list) == 0: | |||||
| print("Abort no pcaps found with prefix: {}".format(args.prefix)) | |||||
| print("{}{}".format(args.folder, args.prefix)) | |||||
| exit(1) | |||||
| parts = chunk(pcap_list, ceil(len(pcap_list) / args.cores)) | |||||
| print("Start processing with {} jobs.".format(args.cores)) | |||||
| for p in parts: | |||||
| process = multiprocessing.Process(target=format_pcaps_to_csv, args=(p, "dummy")) | |||||
| jobs.append(process) | |||||
| for j in jobs: | |||||
| j.start() | |||||
| print("Started all jobs.") | |||||
| # Ensure all of the processes have finished | |||||
| finished_job_counter = 0 | |||||
| working = ["|", "/", "-", "\\", "|", "/", "-", "\\"] | |||||
| w = 0 | |||||
| while len(jobs) != finished_job_counter: | |||||
| sleep(1) | |||||
| print( | |||||
| "\r\t{}{}{}\t Running {} jobs ({} finished). Processed {} out of {} pcaps. ({}%) ".format( | |||||
| working[w], | |||||
| working[w], | |||||
| working[w], | |||||
| len(jobs), | |||||
| finished_job_counter, | |||||
| n.value, | |||||
| len(pcap_list), | |||||
| round((n.value / len(pcap_list)) * 100, 2), | |||||
| ), | |||||
| end="", | |||||
| ) | |||||
| finished_job_counter = 0 | |||||
| for j in jobs: | |||||
| if not j.is_alive(): | |||||
| finished_job_counter += 1 | |||||
| if (w + 1) % len(working) == 0: | |||||
| w = 0 | |||||
| else: | |||||
| w += 1 | |||||
| print("") | |||||
| et = time() | |||||
| # get the execution time | |||||
| elapsed_time = et - st | |||||
| print("Execution time:", elapsed_time, "seconds") |