|
- #!/usr/bin/env python3
- import datetime
- from io import StringIO
- from math import ceil
-
- import pandas as pd
-
- import multiprocessing
- import os
- import re
- import subprocess
-
- from argparse import ArgumentParser
- from time import sleep, time
-
- # tshark -r ./tcp-cap-test/test__bandwidth_reverse_tcp_bbr_1.pcap -Y "tcp.stream eq 1" -T fields -e frame.time_relative -e ip.len -e ip.hdr_len -e tcp.hdr_len -e tcp.analysis.ack_rtt -e tcp.analysis.bytes_in_flight -e tcp.analysis.retransmission -e tcp.analysis.duplicate_ack -e ip.dst -e ip.src -e tcp.options.mss_val -E header=y -E separator=, -E quote=d
- from util import chunk_list
-
-
- def format_tcp_trace_to_csv(pcap_number, packets_to_keep, is_reverse=False):
- txt_name = "{}{}{}.txt".format(args.folder, args.tcp_trace, pcap_number)
- try:
- txt_file = open(txt_name, "r")
- except IOError as e:
- print("\rCan not open file {}\n {} {}".format(txt_name, e.errno, e.strerror))
- return
- content = txt_file.read()
- txt_file.close()
-
- csv_string = ""
- csv_string += "time_tcp_probe,snd_cwnd,snd_wnd,srtt\n"
- uptime = None
- counter = 0
- lines = content.split("\n")
- start_time = None
- for line in lines:
- counter += 1
- if uptime is None:
- uptime = float(line.split(" ")[0])
- else:
- if is_reverse:
- line_filter = "src=[::ffff:{}]:{}".format(args.server, args.port)
- else:
- line_filter = "dest={}:{}".format(args.server, args.port)
-
- # ignore tcp packets from iperf syn (packets to keep = len of tcp.stream.eq 1)
- if line_filter in line and counter >= (len(lines) - packets_to_keep):
- match = re.match(
- r".* (\d+\.\d+): tcp_probe:.*snd_cwnd=(\d+).*snd_wnd=(\d+).*srtt=(\d+)",
- line,
- )
- if match:
- if start_time is None:
- start_time = float(match.group(1)) - uptime
- time = float(match.group(1)) - (uptime + start_time)
- snd_cwnd = match.group(2)
- snd_wnd = match.group(3)
- srtt = match.group(4)
- csv_string += "{},{},{},{}\n".format(time, snd_cwnd, snd_wnd, srtt)
-
- csv_string_io = StringIO(csv_string)
-
- trace_df = pd.read_csv(csv_string_io)
- if len(trace_df) <= 1:
- print("\rFaulty tcp trace file for pcap no: {}".format(pcap_number))
- return None
- return trace_df
-
-
- def format_pcaps_to_csv(pcaps, dummy):
- global n
- for pcap in pcaps:
- if pcap.endswith(".pcap") and pcap.startswith(args.prefix):
- match = re.match(regex, pcap)
- if match:
- # metadata from pcap filename
- direction = "upload"
- if "_reverse_" in pcap:
- direction = "download"
- congestion_control = match.group(2)
- pcap_number = match.group(3)
-
- # analyse traffic from pcap (receiver side)
- tshark_command = [
- "tshark",
- "-r",
- "{}{}".format(args.folder, pcap),
- # remove this for mobile measurements
- # "-Y",
- # "tcp.stream eq 1",
- "-T",
- "fields",
- "-e",
- "frame.time_relative",
- "-e",
- "ip.len",
- "-e",
- "ip.hdr_len",
- "-e",
- "tcp.hdr_len",
- "-e",
- "tcp.analysis.ack_rtt",
- "-e",
- "tcp.analysis.bytes_in_flight",
- "-e",
- "tcp.analysis.retransmission",
- "-e",
- "tcp.analysis.duplicate_ack",
- "-e",
- "ip.src",
- "-e",
- "ip.dst",
- "-e",
- "tcp.options.mss_val",
- "-e",
- "tcp.window_size",
- "-e",
- "frame.time_epoch",
- "-e",
- "tcp.stream", # have to be the last value in line!
- "-E",
- "header=y",
- "-E",
- "separator=,",
- "-E",
- "quote=d",
- ]
-
- tshark_out = None
- try:
- tshark_out = subprocess.check_output(tshark_command).decode("utf-8")
- except subprocess.CalledProcessError as tsharkexec:
- if tsharkexec.returncode == 2:
- print("\rtshark could not open pcap: {}".format(pcap))
- else:
- print("\rtshark exited with code: {}".format(tsharkexec.returncode))
- print(tsharkexec.output)
- continue
-
- # Convert String into StringIO
- csv_string_io = StringIO(tshark_out)
-
- conv_bool = lambda x: (True if x != "" else False)
-
- pcap_df = pd.read_csv(
- csv_string_io,
- converters={
- "tcp.analysis.retransmission": conv_bool,
- "tcp.analysis.duplicate_ack": conv_bool,
- },
- )
-
- last_tcp_stream_in_pcap = pcap_df["tcp.stream"].max()
- pcap_df = pcap_df.loc[pcap_df["tcp.stream"] == last_tcp_stream_in_pcap]
-
- pcap_df["payload_size"] = pcap_df["ip.len"] - (
- pcap_df["ip.hdr_len"] + pcap_df["tcp.hdr_len"]
- )
- pcap_df["direction"] = direction
- pcap_df["congestion_control"] = congestion_control
- pcap_df["pcap_number"] = pcap_number
- pcap_df["datetime"] = pd.to_datetime(
- pcap_df["frame.time_epoch"].apply(
- lambda x: datetime.datetime.fromtimestamp(x)
- )
- )
-
- pcap_df = pcap_df.drop(
- columns=["tcp.stream", "ip.len", "ip.hdr_len", "tcp.hdr_len"]
- )
-
- pcap_df.rename(
- columns={
- "frame.time_relative": "arrival_time",
- "ip.src": "src_ip",
- "ip.dst": "dst_ip",
- "tcp.options.mss_val": "mss",
- "tcp.analysis.ack_rtt": "ack_rtt",
- "tcp.analysis.bytes_in_flight": "bytes_in_flight",
- "tcp.window_size": "receive_window_size",
- "tcp.analysis.retransmission": "is_retranmission",
- "tcp.analysis.duplicate_ack": "is_dup_ack",
- "frame.time_epoch": "time_epoch",
- },
- inplace=True,
- )
-
- pcap_df = pcap_df.sort_values("arrival_time")
- try:
- # join tcp_trace data with pcap data
- merge_srtt = True
- if merge_srtt:
- tcp_trace_df = format_tcp_trace_to_csv(
- pcap_number,
- len(pcap_df),
- is_reverse=True if "_reverse_" in pcap else False,
- )
-
- if tcp_trace_df is None:
- print(
- "\rNo tcp trace file for pcap no {} found".format(
- pcap_number
- )
- )
- continue ## break before but stoped the thread
- merged_df = pd.merge_asof(
- pcap_df.loc[pcap_df["src_ip"] != args.server],
- tcp_trace_df,
- left_on="arrival_time",
- right_on="time_tcp_probe",
- tolerance=0.01,
- )
- merged_df = pd.concat(
- [merged_df, pcap_df.loc[pcap_df["src_ip"] == args.server]]
- )
- merged_df = merged_df.sort_values("arrival_time")
- merged_df.to_csv(
- "{}{}".format(args.folder, pcap).replace(".pcap", ".csv")
- )
- else:
- pcap_df.to_csv(
- "{}{}".format(args.folder, pcap).replace(".pcap", ".csv")
- )
- except:
- print("\rCould not merge data for pcap no: {}".format(pcap))
- pcap_df.to_csv(
- "{}{}".format(args.folder, pcap).replace(".pcap", ".csv")
- )
-
- n.value += 1
-
- else:
- print("File does not match regex: {}".format(pcap))
- else:
- print("File is not from type PCAP: {}".format(pcap))
-
-
- from itertools import islice
-
-
- def chunk(it, size):
- it = iter(it)
- return iter(lambda: tuple(islice(it, size)), ())
-
-
- if __name__ == "__main__":
-
- parser = ArgumentParser()
- parser.add_argument("-f", "--folder", required=True, help="Folder with pcaps.")
- parser.add_argument(
- "-p",
- "--prefix",
- required=True,
- help="Filename prefix e.g. 2021-03-17_bandwidth_tcp_bbr_",
- )
- parser.add_argument(
- "-t",
- "--tcp_trace",
- required=True,
- help="Format of tcp trace txt files e.g: 2021_03_30_bandwidth_reverse_tcp_tcp_trace_ for "
- "2021_03_30_bandwidth_reverse_tcp_tcp_trace_1.txt",
- )
- parser.add_argument(
- "-c",
- "--cores",
- default=1,
- type=int,
- help="Number of cores for multiprocessing.",
- )
- parser.add_argument(
- "--port",
- default=5201,
- type=int,
- help="iPerf3 port used for measurements",
- )
- parser.add_argument(
- "--server",
- default="130.75.73.69",
- type=str,
- help="iPerf3 server ip used for measurements",
- )
-
- args = parser.parse_args()
-
- manager = multiprocessing.Manager()
-
- # regex for protokoll, algo and bitrate
- regex = r".*_bandwidth_[reverse_]*(.+)_(.+)_(\d+)\.pcap"
-
- csv_header = "n,start_time,end_time,payload_size,protocol,algorithm,direction,packages_received,syns_in_pcap\n"
- n = manager.Value("i", 0)
- filenames = os.listdir(args.folder)
- number_of_files = len(filenames)
-
- pcap_list = []
- jobs = []
-
- st = time()
-
- for filename in filenames:
- if filename.endswith(".pcap") and filename.startswith(args.prefix):
- if re.match(regex, filename):
- pcap_list.append(filename)
- pcap_list.sort()
-
- print("Found {} pcap files in {} files.".format(len(pcap_list), len(filenames)))
- if len(pcap_list) == 0:
- print("Abort no pcaps found with prefix: {}".format(args.prefix))
- print("{}{}".format(args.folder, args.prefix))
- exit(1)
-
- parts = chunk(pcap_list, ceil(len(pcap_list) / args.cores))
- print("Start processing with {} jobs.".format(args.cores))
-
-
- for p in parts:
- process = multiprocessing.Process(target=format_pcaps_to_csv, args=(p, "dummy"))
- jobs.append(process)
-
- for j in jobs:
- j.start()
-
- print("Started all jobs.")
- # Ensure all of the processes have finished
- finished_job_counter = 0
- working = ["|", "/", "-", "\\", "|", "/", "-", "\\"]
- w = 0
- while len(jobs) != finished_job_counter:
- sleep(1)
- print(
- "\r\t{}{}{}\t Running {} jobs ({} finished). Processed {} out of {} pcaps. ({}%) ".format(
- working[w],
- working[w],
- working[w],
- len(jobs),
- finished_job_counter,
- n.value,
- len(pcap_list),
- round((n.value / len(pcap_list)) * 100, 2),
- ),
- end="",
- )
- finished_job_counter = 0
- for j in jobs:
- if not j.is_alive():
- finished_job_counter += 1
- if (w + 1) % len(working) == 0:
- w = 0
- else:
- w += 1
- print("")
-
- et = time()
- # get the execution time
- elapsed_time = et - st
- print("Execution time:", elapsed_time, "seconds")
|