diff --git a/format_throughput_pcap_to_csv.py b/format_throughput_pcap_to_csv.py new file mode 100755 index 0000000..e900987 --- /dev/null +++ b/format_throughput_pcap_to_csv.py @@ -0,0 +1,356 @@ +#!/usr/bin/env python3 +import datetime +from io import StringIO +from math import ceil + +import pandas as pd + +import multiprocessing +import os +import re +import subprocess + +from argparse import ArgumentParser +from time import sleep, time + +# tshark -r ./tcp-cap-test/test__bandwidth_reverse_tcp_bbr_1.pcap -Y "tcp.stream eq 1" -T fields -e frame.time_relative -e ip.len -e ip.hdr_len -e tcp.hdr_len -e tcp.analysis.ack_rtt -e tcp.analysis.bytes_in_flight -e tcp.analysis.retransmission -e tcp.analysis.duplicate_ack -e ip.dst -e ip.src -e tcp.options.mss_val -E header=y -E separator=, -E quote=d +from util import chunk_list + + +def format_tcp_trace_to_csv(pcap_number, packets_to_keep, is_reverse=False): + txt_name = "{}{}{}.txt".format(args.folder, args.tcp_trace, pcap_number) + try: + txt_file = open(txt_name, "r") + except IOError as e: + print("\rCan not open file {}\n {} {}".format(txt_name, e.errno, e.strerror)) + return + content = txt_file.read() + txt_file.close() + + csv_string = "" + csv_string += "time_tcp_probe,snd_cwnd,snd_wnd,srtt\n" + uptime = None + counter = 0 + lines = content.split("\n") + start_time = None + for line in lines: + counter += 1 + if uptime is None: + uptime = float(line.split(" ")[0]) + else: + if is_reverse: + line_filter = "src=[::ffff:{}]:{}".format(args.server, args.port) + else: + line_filter = "dest={}:{}".format(args.server, args.port) + + # ignore tcp packets from iperf syn (packets to keep = len of tcp.stream.eq 1) + if line_filter in line and counter >= (len(lines) - packets_to_keep): + match = re.match( + r".* (\d+\.\d+): tcp_probe:.*snd_cwnd=(\d+).*snd_wnd=(\d+).*srtt=(\d+)", + line, + ) + if match: + if start_time is None: + start_time = float(match.group(1)) - uptime + time = float(match.group(1)) - (uptime + start_time) + snd_cwnd = match.group(2) + snd_wnd = match.group(3) + srtt = match.group(4) + csv_string += "{},{},{},{}\n".format(time, snd_cwnd, snd_wnd, srtt) + + csv_string_io = StringIO(csv_string) + + trace_df = pd.read_csv(csv_string_io) + if len(trace_df) <= 1: + print("\rFaulty tcp trace file for pcap no: {}".format(pcap_number)) + return None + return trace_df + + +def format_pcaps_to_csv(pcaps, dummy): + global n + for pcap in pcaps: + if pcap.endswith(".pcap") and pcap.startswith(args.prefix): + match = re.match(regex, pcap) + if match: + # metadata from pcap filename + direction = "upload" + if "_reverse_" in pcap: + direction = "download" + congestion_control = match.group(2) + pcap_number = match.group(3) + + # analyse traffic from pcap (receiver side) + tshark_command = [ + "tshark", + "-r", + "{}{}".format(args.folder, pcap), + # remove this for mobile measurements + # "-Y", + # "tcp.stream eq 1", + "-T", + "fields", + "-e", + "frame.time_relative", + "-e", + "ip.len", + "-e", + "ip.hdr_len", + "-e", + "tcp.hdr_len", + "-e", + "tcp.analysis.ack_rtt", + "-e", + "tcp.analysis.bytes_in_flight", + "-e", + "tcp.analysis.retransmission", + "-e", + "tcp.analysis.duplicate_ack", + "-e", + "ip.src", + "-e", + "ip.dst", + "-e", + "tcp.options.mss_val", + "-e", + "tcp.window_size", + "-e", + "frame.time_epoch", + "-e", + "tcp.stream", # have to be the last value in line! + "-E", + "header=y", + "-E", + "separator=,", + "-E", + "quote=d", + ] + + tshark_out = None + try: + tshark_out = subprocess.check_output(tshark_command).decode("utf-8") + except subprocess.CalledProcessError as tsharkexec: + if tsharkexec.returncode == 2: + print("\rtshark could not open pcap: {}".format(pcap)) + else: + print("\rtshark exited with code: {}".format(tsharkexec.returncode)) + print(tsharkexec.output) + continue + + # Convert String into StringIO + csv_string_io = StringIO(tshark_out) + + conv_bool = lambda x: (True if x != "" else False) + + pcap_df = pd.read_csv( + csv_string_io, + converters={ + "tcp.analysis.retransmission": conv_bool, + "tcp.analysis.duplicate_ack": conv_bool, + }, + ) + + last_tcp_stream_in_pcap = pcap_df["tcp.stream"].max() + pcap_df = pcap_df.loc[pcap_df["tcp.stream"] == last_tcp_stream_in_pcap] + + pcap_df["payload_size"] = pcap_df["ip.len"] - ( + pcap_df["ip.hdr_len"] + pcap_df["tcp.hdr_len"] + ) + pcap_df["direction"] = direction + pcap_df["congestion_control"] = congestion_control + pcap_df["pcap_number"] = pcap_number + pcap_df["datetime"] = pd.to_datetime( + pcap_df["frame.time_epoch"].apply( + lambda x: datetime.datetime.fromtimestamp(x) + ) + ) + + pcap_df = pcap_df.drop( + columns=["tcp.stream", "ip.len", "ip.hdr_len", "tcp.hdr_len"] + ) + + pcap_df.rename( + columns={ + "frame.time_relative": "arrival_time", + "ip.src": "src_ip", + "ip.dst": "dst_ip", + "tcp.options.mss_val": "mss", + "tcp.analysis.ack_rtt": "ack_rtt", + "tcp.analysis.bytes_in_flight": "bytes_in_flight", + "tcp.window_size": "receive_window_size", + "tcp.analysis.retransmission": "is_retranmission", + "tcp.analysis.duplicate_ack": "is_dup_ack", + "frame.time_epoch": "time_epoch", + }, + inplace=True, + ) + + pcap_df = pcap_df.sort_values("arrival_time") + try: + # join tcp_trace data with pcap data + merge_srtt = False + if merge_srtt: + tcp_trace_df = format_tcp_trace_to_csv( + pcap_number, + len(pcap_df), + is_reverse=True if "_reverse_" in pcap else False, + ) + + if tcp_trace_df is None: + print( + "\rNo tcp trace file for pcap no {} found".format( + pcap_number + ) + ) + break + merged_df = pd.merge_asof( + pcap_df.loc[pcap_df["src_ip"] != args.server], + tcp_trace_df, + left_on="arrival_time", + right_on="time_tcp_probe", + tolerance=0.01, + ) + merged_df = pd.concat( + [merged_df, pcap_df.loc[pcap_df["src_ip"] == args.server]] + ) + merged_df = merged_df.sort_values("arrival_time") + merged_df.to_csv( + "{}{}".format(args.folder, pcap).replace(".pcap", ".csv") + ) + else: + pcap_df.to_csv( + "{}{}".format(args.folder, pcap).replace(".pcap", ".csv") + ) + except: + print("\rCould not merge data for pcap no: {}".format(pcap)) + pcap_df.to_csv( + "{}{}".format(args.folder, pcap).replace(".pcap", ".csv") + ) + + n.value += 1 + + else: + print("File does not match regex: {}".format(pcap)) + else: + print("File is not from type PCAP: {}".format(pcap)) + + +from itertools import islice + + +def chunk(it, size): + it = iter(it) + return iter(lambda: tuple(islice(it, size)), ()) + + +if __name__ == "__main__": + + parser = ArgumentParser() + parser.add_argument("-f", "--folder", required=True, help="Folder with pcaps.") + parser.add_argument( + "-p", + "--prefix", + required=True, + help="Filename prefix e.g. 2021-03-17_bandwidth_tcp_bbr_", + ) + parser.add_argument( + "-t", + "--tcp_trace", + required=True, + help="Format of tcp trace txt files e.g: 2021_03_30_bandwidth_reverse_tcp_tcp_trace_ for " + "2021_03_30_bandwidth_reverse_tcp_tcp_trace_1.txt", + ) + parser.add_argument( + "-c", + "--cores", + default=1, + type=int, + help="Number of cores for multiprocessing.", + ) + parser.add_argument( + "--port", + default=5201, + type=int, + help="iPerf3 port used for measurements", + ) + parser.add_argument( + "--server", + default="130.75.73.69", + type=str, + help="iPerf3 server ip used for measurements", + ) + + args = parser.parse_args() + + manager = multiprocessing.Manager() + + # regex for protokoll, algo and bitrate + regex = r".*_bandwidth_[reverse_]*(.+)_(.+)_(\d+)\.pcap" + + csv_header = "n,start_time,end_time,payload_size,protocol,algorithm,direction,packages_received,syns_in_pcap\n" + n = manager.Value("i", 0) + filenames = os.listdir(args.folder) + number_of_files = len(filenames) + + pcap_list = [] + jobs = [] + + st = time() + + for filename in filenames: + if filename.endswith(".pcap") and filename.startswith(args.prefix): + if re.match(regex, filename): + pcap_list.append(filename) + pcap_list.sort() + + print("Found {} pcap files in {} files.".format(len(pcap_list), len(filenames))) + if len(pcap_list) == 0: + print("Abort no pcaps found with prefix: {}".format(args.prefix)) + print("{}{}".format(args.folder, args.prefix)) + exit(1) + + parts = chunk(pcap_list, ceil(len(pcap_list) / args.cores)) + print("Start processing with {} jobs.".format(args.cores)) + + + for p in parts: + process = multiprocessing.Process(target=format_pcaps_to_csv, args=(p, "dummy")) + jobs.append(process) + + for j in jobs: + j.start() + + print("Started all jobs.") + # Ensure all of the processes have finished + finished_job_counter = 0 + working = ["|", "/", "-", "\\", "|", "/", "-", "\\"] + w = 0 + while len(jobs) != finished_job_counter: + sleep(1) + print( + "\r\t{}{}{}\t Running {} jobs ({} finished). Processed {} out of {} pcaps. ({}%) ".format( + working[w], + working[w], + working[w], + len(jobs), + finished_job_counter, + n.value, + len(pcap_list), + round((n.value / len(pcap_list)) * 100, 2), + ), + end="", + ) + finished_job_counter = 0 + for j in jobs: + if not j.is_alive(): + finished_job_counter += 1 + if (w + 1) % len(working) == 0: + w = 0 + else: + w += 1 + print("") + + et = time() + # get the execution time + elapsed_time = et - st + print("Execution time:", elapsed_time, "seconds")