|
- #!/usr/bin/env python3
- import json
- import requests
- import os
- import re
- import socket
- import subprocess
- import asyncio
- import serial
- from argparse import ArgumentParser
- from datetime import datetime
-
- from threading import Thread
- from time import sleep
-
- # {} for interface
- GET_IPV4_SHELL_COMMAND = "ip a | grep {} | grep inet | cut -d' ' -f6 | cut -d'/' -f1"
-
- NR_CQI_COMMAND = b'AT+QNWCFG="nr5g_csi"\r\n'
- NR_SERVINGCELL_COMMAND = b'AT+QENG="servingcell"\r\n'
- NR_EN_DC_STATUS_COMMAND = b"AT+QENDC\r\n"
- NR_SERIAL_RESPOND_TOME = 0.3 # s
- CMD_TIME_EPOCH = "date +%s"
-
-
- class ProcessHandler:
- def __init__(self):
- self.processList = []
-
- def add_process(self, process, logfile=None):
- if logfile is not None:
- self.processList.append(dict(process=process, logfile=logfile))
- else:
- self.processList.append(process)
-
- def kill_all(self):
- for p in self.processList:
- if isinstance(p, dict):
- p["logfile"].close()
- p["process"].kill()
- else:
- p.kill()
-
-
- def parse_var(s):
- """
- Parse a key, value pair, separated by '='
- That's the reverse of ShellArgs.
-
- On the command line (argparse) a declaration will typically look like:
- foo=hello
- or
- foo="hello world"
- """
- items = s.split("=")
- key = items[0].strip() # we remove blanks around keys, as is logical
- value = ""
- if len(items) > 1:
- # rejoin the rest:
- value = "=".join(items[1:])
- return (key, value)
-
-
- def parse_vars(items):
- """
- Parse a series of key-value pairs and return a dictionary
- """
- d = {}
-
- if items:
- for item in items:
- key, value = parse_var(item)
- d[key] = value
- return d
-
-
- def write_to_file(filepath, content, overwrite=False):
- mode = None
- if overwrite:
- mode = "w"
- else:
- mode = "a"
- try:
- f = open(filepath, mode)
- f.write(content)
- f.close()
- except IOError:
- print_message("ERROR: Could not write to file: {}".format(filepath))
-
-
- # For none blocking filewrite
- def background_write_to_file(filepath, content, overwrite=False):
- thread = Thread(
- target=write_to_file,
- args=(
- filepath,
- content,
- overwrite,
- ),
- )
- thread.start()
-
-
- def execute_tcp_dump(processHandler, interface, outputfile, ws_filter, flags=[]):
- cmd = ["tcpdump", "-i" + interface, "-U", "-w", outputfile, ws_filter]
- if len(flags) > 0:
- # insert after -U
- n = 3
- for flag in flags:
- cmd.insert(n, flag)
- n += 1
- process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
- processHandler.add_process(process)
-
-
- def save_tcp_trace(processHandler, filename, sender_ip, port):
- cmd_uptime = ["cat", "/proc/uptime"]
- logfile = open(filename, "a")
- subprocess.call(cmd_uptime, stdout=logfile)
- # /sys/kernel/debug/tracing/events/tcp/tcp_probe/enable
- cmd = ["cat", "/sys/kernel/debug/tracing/trace_pipe"]
- process = subprocess.Popen(cmd, stdout=logfile)
- processHandler.add_process(process, logfile=logfile)
-
-
- def get_ip_from_interface(interface):
- c = GET_IPV4_SHELL_COMMAND.format(interface)
- ip = (
- subprocess.check_output(c, shell=True)
- .decode("utf-8")
- .replace(" ", "")
- .replace("\n", "")
- )
- return ip
-
-
- def config2json(args):
- config = vars(args).copy()
- # parse the key-value pairs from set arg
- set_args = parse_vars(args.set)
- config["set"] = set_args
- # delete not used keys
- del config["interface"]
- del config["server"]
- del config["client"]
- del config["folder"]
- return bytes(json.dumps(config), "utf-8")
-
-
- def json2config(json_string):
- return json.loads(json_string)
-
-
- def print_message(m):
- print("[{}]\t{}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), m))
-
-
- def deactivate_hystart():
- print_message("Deactivate HyStart")
- os.system('echo "0" > /sys/module/tcp_cubic/parameters/hystart')
-
-
- def activate_hystart():
- print_message("Activate HyStart")
- os.system('echo "1" > /sys/module/tcp_cubic/parameters/hystart')
-
-
- def is_hystart_activated():
- return (
- int(
- subprocess.check_output(
- "cat /sys/module/tcp_cubic/parameters/hystart", shell=True
- )
- )
- == "1"
- )
-
-
- def is_tcp_probe_enabled():
- return (
- int(
- subprocess.check_output(
- "cat /sys/kernel/debug/tracing/events/tcp/tcp_probe/enable", shell=True
- )
- )
- == "1"
- )
-
-
- def enable_tcp_probe():
- os.system("echo '1' > /sys/kernel/debug/tracing/events/tcp/tcp_probe/enable")
-
-
- def set_default_receive_window():
- print_message("Set receive window to default values")
- os.system('echo "212992" > /proc/sys/net/core/rmem_max')
- os.system('echo "212992" > /proc/sys/net/core/rmem_default')
- os.system('echo "4096 131072 6291456" > /proc/sys/net/ipv4/tcp_rmem')
-
-
- def raise_receive_window():
- print_message("Multiply receive windows by factor 10")
- os.system('echo "2129920" > /proc/sys/net/core/rmem_max')
- os.system('echo "2129920" > /proc/sys/net/core/rmem_default')
- os.system('echo "40960 1310720 62914560" > /proc/sys/net/ipv4/tcp_rmem')
-
-
- def monitor_serial(ser, output_file):
- run_cmds = [NR_CQI_COMMAND, NR_SERVINGCELL_COMMAND, NR_EN_DC_STATUS_COMMAND]
- try:
- while ser.is_open:
- response = subprocess.check_output(CMD_TIME_EPOCH, shell=True).decode(
- "utf-8"
- )
- for cmd in run_cmds:
- ser.write(cmd)
- sleep(0.3)
- response += ser.read(ser.inWaiting()).decode("utf-8")
- response = (
- response.replace("\n", ";")
- .replace("\r", "")
- .replace(";;OK", ";")
- .replace(";;", ";")
- )
- write_to_file(output_file, response + "\n")
- except:
- if not ser.is_open:
- print_message("Serial port is closed. Exit monitoring thread.")
- else:
- print_message(
- "Something went wrong while monitoring serial interface. Exit monitoring thread."
- )
- return
-
-
- class Server:
- def __init__(self, config):
- self.config = config
-
- def measure(self):
- print("Received config:")
- print(self.config)
- print_message("Start measurement")
-
- if self.config["bandwidth"]:
- self.bandwidth()
- elif self.config["drx"]:
- self.drx()
- elif self.config["harq"]:
- self.harq()
- elif self.config["cbr"]:
- self.cbr()
- elif self.config["tcp_parallel"]:
- self.tcp_parallel_flows()
- else:
- print_message("Nothing to do on server side.")
-
- def drx(self):
- print_message("DRX nothing to do on server side.")
-
- def harq(self):
- print_message("HARQ nothing to do on server side.")
-
- def bandwidth(self):
- server_is_sender = False
- if "server_is_sender" in self.config["set"]:
- server_is_sender = self.config["set"]["server_is_sender"]
- print_message("Server is sender: {}".format(server_is_sender))
-
- tcp_algo = list()
- if "algo" in self.config["set"]:
- for s in self.config["set"]["algo"].split(","):
- tcp_algo.append(s)
- print_message("Using {} for TCP transmissions.".format(s))
- else:
- tcp_algo.append("cubic")
- print_message("Using {} for TCP transmissions.".format(tcp_algo[0]))
-
- alternate_hystart = False
- if "alternate_hystart" in self.config["set"]:
- if self.config["set"]["alternate_hystart"] == "true":
- alternate_hystart = True
-
- # prevent address already in use
- sleep(2)
- ws_filter = ""
- if server_is_sender:
- # server sends
- if not is_tcp_probe_enabled():
- print_message("tcp probe is not enabled!")
- enable_tcp_probe()
- print_message("tcp probe is now enabled")
-
- for n in range(1, self.config["number_of_measurements"] + 1):
- print_message(
- "{} of {}".format(n, self.config["number_of_measurements"])
- )
- print_message(
- "Using {} for congestion control".format(
- tcp_algo[congestion_control_index]
- )
- )
- iperf_command = [
- "iperf3",
- "-s",
- "--port",
- str(self.config["port"]),
- "--one-off",
- ]
- filepath_tcp_trace = "{}{}_bandwidth_reverse_{}_{}_{}.txt".format(
- self.config["folder"], self.config["prefix"], "tcp", "tcp_trace", n
- )
- save_tcp_trace(
- processHandler,
- filepath_tcp_trace,
- self.config["server"],
- self.config["port"],
- )
- subprocess.call(iperf_command)
- processHandler.kill_all()
- congestion_control_index = (congestion_control_index + 1) % len(
- tcp_algo
- )
- else:
- # client sends
- ws_filter = "{} and port {}".format("tcp", self.config["port"])
- print_message("Use ws filter: {}".format(ws_filter))
- name_option = ""
- state_counter = 0
- congestion_control_index = 0
- for n in range(1, self.config["number_of_measurements"] + 1):
- print_message(
- "Measurement {} of {}".format(
- n, self.config["number_of_measurements"]
- )
- )
- tcpdump_flags = []
- if alternate_hystart:
- if state_counter == 0:
- tcp_algo = "cubic"
- raise_receive_window()
- name_option = "_cubic_slowstart_raise_"
- state_counter = 1
- elif state_counter == 1:
- tcp_algo = "cubic"
- set_default_receive_window()
- name_option = "_cubic_hystart_default_"
- state_counter = 2
- elif state_counter == 2:
- tcp_algo = "bbr"
- raise_receive_window()
- name_option = "_bbr_raise_"
- state_counter = 3
- elif state_counter == 3:
- tcp_algo = "bbr"
- set_default_receive_window()
- name_option = "_bbr_default_"
- state_counter = 0
- else:
- name_option = ""
- filepath = "{}{}{}_bandwidth_{}_{}_{}.pcap".format(
- self.config["folder"],
- self.config["prefix"],
- name_option,
- "tcp",
- tcp_algo[congestion_control_index],
- n,
- )
- tcpdump_flags.append("-s96")
- thread = Thread(
- target=execute_tcp_dump,
- args=(
- processHandler,
- args.interface,
- filepath,
- ws_filter,
- tcpdump_flags,
- ),
- )
- thread.start()
- sleep(2)
- iperf_command = [
- "iperf3",
- "-s",
- "--port",
- str(self.config["port"]),
- "--one-off",
- ]
- subprocess.call(iperf_command)
- sleep(2)
- processHandler.kill_all()
- congestion_control_index = (congestion_control_index + 1) % len(
- tcp_algo
- )
-
- def cbr(self):
- use_reverse_mode = False
- if "reverse" in self.config["set"]:
- use_reverse_mode = self.config["set"]["reverse"] == "true"
- print_message("Use reverse mode: {}".format(use_reverse_mode))
- # bitrate is only used for filenames on server side
- bitrate = "1M"
- if "bitrate" in self.config["set"]:
- bitrate = self.config["set"]["bitrate"]
- sleep_time = 60.0
- if "sleep" in self.config["set"]:
- sleep_time = float(self.config["set"]["sleep"])
- print_message("Sleep time for each measurement: {}s".format(sleep_time))
-
- # build wireshark filter
- if use_reverse_mode:
- ws_filter = "{} and src {}".format("udp", self.config["server"])
- else:
- ws_filter = "{} and dst {} and port {}".format(
- "udp", self.config["server"], self.config["port"]
- )
- print_message("Use ws filter: {}".format(ws_filter))
-
- # build iperf3 command
- iperf_command = [
- "iperf3",
- "-s",
- "--port",
- str(self.config["port"]),
- "--one-off",
- ]
-
- for n in range(1, self.config["number_of_measurements"] + 1):
- print_message("{} of {}".format(n, self.config["number_of_measurements"]))
- sleep(sleep_time)
- pcap_filepath = "{}{}_cbr_server_{}_bitrate{}_{}.pcap".format(
- self.config["folder"],
- self.config["prefix"],
- "sender" if use_reverse_mode else "receiver",
- bitrate,
- n,
- )
- thread = Thread(
- target=execute_tcp_dump,
- args=(
- processHandler,
- self.config["interface"],
- pcap_filepath,
- ws_filter,
- ),
- )
- thread.start()
- sleep(1)
- subprocess.call(iperf_command)
- sleep(1)
- processHandler.kill_all()
-
- def tcp_parallel_flows(self):
- # for n in range(1, (self.config["number_of_measurements"] * 2) + 1):
- # print_message(
- # "Measurement {} of {}".format(
- # n, self.config["number_of_measurements"]
- # )
- # )
- sleep(4)
- iperf_command = [
- "iperf3",
- "-s",
- "--port",
- str(self.config["port"]),
- # "--one-off",
- ]
- subprocess.call(iperf_command)
-
-
- class Client:
- def __init__(self, config):
- self.config = config
-
- def measure(self):
- print("Config:")
- print(self.config)
- sleep(1)
- print_message("Start measurement")
-
- ser = None
-
- if self.config["serial"] is not None:
- print_message("Opening serial port for {}".format(self.config["serial"]))
- ser = serial.Serial(
- port=self.config["serial"],
- baudrate=self.config["baudrate"],
- )
- ser.isOpen()
- ser_filepath = "{}{}_serial_monitor_output.txt".format(
- self.config["folder"], self.config["prefix"]
- )
- ser_thread = Thread(
- target=monitor_serial,
- args=(
- ser,
- ser_filepath,
- ),
- )
- ser_thread.start()
-
- if self.config["bandwidth"]:
- self.bandwidth()
- elif self.config["drx"]:
- self.drx()
- elif self.config["harq"]:
- self.harq()
- elif self.config["cbr"]:
- self.cbr()
- elif self.config["tcp_parallel"]:
- self.tcp_parallel_flows()
- elif self.config["ping"]:
- self.ping()
-
- if ser is not None:
- print_message("Closing serial port...")
- ser.close()
- sleep(2)
- print_message("done...")
-
- def ping(self):
- c = "ping {} -I {} -i {} -c {}".format(
- self.config["server"],
- self.config["interface"],
- 0,
- self.config["number_of_measurements"],
- )
- command = [c]
- print_message(
- "Start sending {} pings with nog gap.".format(
- self.config["number_of_measurements"]
- )
- )
- ping_out = subprocess.check_output(command, shell=True).decode("utf-8")
- filepath = "{}{}_ping_no_gap.txt".format(
- self.config["folder"], self.config["prefix"]
- )
- print_message("Write measured pings to: {}".format(filepath))
- write_to_file(filepath, ping_out)
-
- def drx(self):
- # send ICMP pings to server and increase the gab
- # check for set args
- if "pre_ping" in self.config["set"]:
- is_pre_ping_enabled = (
- True if self.config["set"]["pre_ping"] == "true" else False
- )
- else:
- is_pre_ping_enabled = False
- if "intervals" in self.config["set"]:
- intervals = []
- interval_ranges = self.config["set"]["intervals"].split(",")
- for interval_range in interval_ranges:
- start, end, increase = [float(x) for x in interval_range.split(":")]
- curr = start
- while curr <= end:
- intervals.append(curr)
- curr += increase
- else:
- intervals = []
-
- filepath = "{}{}_ping_drx_raw.txt".format(
- self.config["folder"], self.config["prefix"]
- )
- if is_pre_ping_enabled:
- print_message("Send pre pings.")
- c = "ping {} -I {} -i {} -c {}".format(
- self.config["server"],
- self.config["interface"],
- intervals[0],
- self.config["number_of_measurements"],
- )
- command = [c]
- subprocess.check_output(command, shell=True)
- else:
- print_message("Preping is disabled. Wait 60s for drx-long-sleep...")
- sleep(60)
- for i in intervals:
- current = i
- if not is_pre_ping_enabled:
- sleep(current)
-
- c = "echo 'gap={}s' && ping {} -I {} -i {} -c {} && echo ';;;'".format(
- current,
- self.config["server"],
- self.config["interface"],
- current,
- self.config["number_of_measurements"],
- )
- command = [c]
- print_message("ping with {}s gap".format(current))
- background_write_to_file(
- filepath,
- subprocess.check_output(command, shell=True).decode("utf-8"),
- )
-
- # Read raw out and format it to csv
- regex = r"gap=(.*)s|64 bytes from (.*\..*\..*\..*) icmp_seq=(\d+) ttl=(\d+) time=(.*) ms"
- filepath = "{}{}_ping_drx_raw.txt".format(
- self.config["folder"], self.config["prefix"]
- )
-
- print_message("Format script output")
- f = open(filepath, "r")
- raw = f.read()
- f.close()
-
- lines = raw.split("\n")
- csv_out = "n,gap,rtt\n"
- n = 1
- gap = ""
- for l in lines:
- match = re.match(regex, l)
- if match:
- if match.group(5):
- time = match.group(5)
- csv_out = csv_out + "{},{},{}\n".format(n, gap, time)
- n += 1
- else:
- gap = match.group(1)
- filepath = "{}{}_ping_drx.csv".format(
- self.config["folder"], self.config["prefix"]
- )
- print_message("Write file {}".format(filepath))
- write_to_file(filepath, csv_out)
-
- def harq(self):
- ws_filter = "tcp[tcpflags] & (tcp-syn) != 0"
- print_message("Use ws filter: {}".format(ws_filter))
- filepath = "{}{}_{}_tcp_handshakes_http_client.pcap".format(
- self.config["folder"],
- self.config["prefix"],
- self.config["number_of_measurements"],
- )
- thread = Thread(
- target=execute_tcp_dump,
- args=(
- processHandler,
- self.config["interface"],
- filepath,
- ws_filter,
- ),
- )
- thread.start()
- sleep(5)
- for n in range(1, self.config["number_of_measurements"] + 1):
- print_message("{} of {}".format(n, self.config["number_of_measurements"]))
- requests.get("http://{}".format(self.config["server"]))
- sleep(5)
- processHandler.kill_all()
-
- def bandwidth(self):
- server_is_sender = False
- if "server_is_sender" in self.config["set"]:
- server_is_sender = self.config["set"]["server_is_sender"] == "true"
- print_message("Server is sender: {}".format(server_is_sender))
-
- tcp_algo = list()
- if "algo" in self.config["set"]:
- for s in self.config["set"]["algo"].split(","):
- tcp_algo.append(s)
- print_message("Using {} for TCP transmissions.".format(s))
- else:
- tcp_algo.append("cubic")
- print_message("Using {} for TCP transmissions.".format(tcp_algo[0]))
-
- alternate_hystart = False
- if "alternate_hystart" in self.config["set"]:
- if self.config["set"]["alternate_hystart"] == "true":
- alternate_hystart = True
- print_message("Alternate between HyStart and Slowstart for Cubic.")
-
- time = "10"
- if "time" in self.config["set"]:
- time = self.config["set"]["time"]
-
- sleep(2)
-
- congestion_control_index = 0
- if server_is_sender:
- # server is sending
- ws_filter = "{} and port {}".format("tcp", self.config["port"])
- print_message("Use ws filter: {}".format(ws_filter))
- for n in range(1, self.config["number_of_measurements"] + 1):
- print_message(
- "{} of {}".format(n, self.config["number_of_measurements"])
- )
- print_message(
- "Using {} for congestion control".format(
- tcp_algo[congestion_control_index]
- )
- )
- tcpdump_flags = []
- filepath = "{}{}_bandwidth_reverse_{}_{}_{}.pcap".format(
- self.config["folder"],
- self.config["prefix"],
- "tcp",
- tcp_algo[congestion_control_index],
- n,
- )
- tcpdump_flags.append("-s96")
- thread = Thread(
- target=execute_tcp_dump,
- args=(
- processHandler,
- self.config["interface"],
- filepath,
- ws_filter,
- tcpdump_flags,
- ),
- )
- thread.start()
- sleep(5)
- iperf_command = [
- "iperf3",
- "-c",
- self.config["server"],
- "-R",
- "-t",
- time,
- "-C",
- tcp_algo[congestion_control_index],
- ]
- subprocess.call(iperf_command)
- sleep(4)
- processHandler.kill_all()
- congestion_control_index = (congestion_control_index + 1) % len(
- tcp_algo
- )
- else:
- # client is sending
- state_counter = 0
- name_option = ""
- if not is_tcp_probe_enabled():
- print_message("tcp probe is not enabled!")
- enable_tcp_probe()
- print_message("tcp probe is now enabled")
- for n in range(1, self.config["number_of_measurements"] + 1):
- print_message(
- "{} of {}".format(n, self.config["number_of_measurements"])
- )
- iperf_command = ["iperf3", "-c", self.config["server"], "-t", time]
- if alternate_hystart:
- if state_counter == 0:
- tcp_algo = "cubic"
- deactivate_hystart()
- name_option = "_cubic_slowstart_raise_"
- state_counter = 1
- elif state_counter == 1:
- tcp_algo = "cubic"
- activate_hystart()
- name_option = "_cubic_hystart_default_"
- state_counter = 2
- elif state_counter == 2:
- tcp_algo = "bbr"
- deactivate_hystart()
- name_option = "_bbr_raise_"
- state_counter = 3
- elif state_counter == 3:
- tcp_algo = "bbr"
- activate_hystart()
- name_option = "_bbr_default_"
- state_counter = 0
-
- iperf_command.append("-C")
- iperf_command.append(tcp_algo[congestion_control_index])
- filepath_tcp_trace = "{}{}_bandwidth_{}_{}_{}.txt".format(
- self.config["folder"], self.config["prefix"], "tcp", "tcp_trace", n
- )
- save_tcp_trace(
- processHandler,
- filepath_tcp_trace,
- self.config["client"],
- self.config["port"],
- )
- sleep(2)
- subprocess.call(iperf_command)
- processHandler.kill_all()
- congestion_control_index = (congestion_control_index + 1) % len(
- tcp_algo
- )
- sleep(4)
-
- def cbr(self):
- bitrate = "1M"
- if "bitrate" in self.config["set"]:
- bitrate = self.config["set"]["bitrate"]
- print("Set bitrate to {}.".format(bitrate))
- time = "2"
- if "time" in self.config["set"]:
- time = self.config["set"]["time"]
- use_reverse_mode = False
- if "reverse" in self.config["set"]:
- use_reverse_mode = self.config["set"]["reverse"] == "true"
- print_message("Use reverse mode: {}".format(use_reverse_mode))
- sleep_time = 60.0
- if "sleep" in self.config["set"]:
- sleep_time = float(self.config["set"]["sleep"])
- print_message("Sleep time for each measurement: {}s".format(sleep_time))
-
- # build wireshark filter
- if use_reverse_mode:
- ws_filter = "{} and dst {}".format("udp", self.config["client"])
- else:
- ws_filter = "{} and src {} and port {}".format(
- "udp", self.config["client"], self.config["port"]
- )
- print_message("Use ws filter: {}".format(ws_filter))
-
- # build iperf3 command
- iperf_command = [
- "iperf3",
- "-c",
- self.config["server"],
- "-t",
- time,
- "--udp",
- "-b",
- bitrate,
- ]
- if use_reverse_mode:
- iperf_command.append("-R")
-
- for n in range(1, self.config["number_of_measurements"] + 1):
- print_message("{} of {}".format(n, self.config["number_of_measurements"]))
- sleep(sleep_time)
- pcap_filepath = "{}{}_cbr_client_{}_bitrate{}_{}.pcap".format(
- self.config["folder"],
- self.config["prefix"],
- "receiver" if use_reverse_mode else "sender",
- bitrate,
- n,
- )
- thread = Thread(
- target=execute_tcp_dump,
- args=(
- processHandler,
- self.config["interface"],
- pcap_filepath,
- ws_filter,
- ),
- )
- thread.start()
- sleep(2)
- subprocess.call(iperf_command)
- sleep(2)
- processHandler.kill_all()
-
- def tcp_parallel_flows(self):
- tcp_algo = "cubic"
- if "algo" in self.config["set"]:
- tcp_algo = self.config["set"]["algo"]
- print_message("Using {} for tcp.".format(tcp_algo))
- time = "12"
- if "time" in self.config["set"]:
- time = self.config["set"]["time"]
- start = "1"
- if "start" in self.config["set"]:
- start = self.config["set"]["start"]
- end = "2"
- if "end" in self.config["set"]:
- end = self.config["set"]["end"]
- # build flow seq
- flows = []
- runs = []
- for i in range(int(start), int(end) + 1):
- flows.append(i)
- flows.append(i)
- for i in range(1, self.config["number_of_measurements"] + 1):
- runs.append(i)
- runs.append(i)
- print_message(
- "{} to {} flows, with {} runs.".format(
- start, end, self.config["number_of_measurements"]
- )
- )
- sleep(5)
- for flow in flows:
- for run in runs:
- if tcp_algo == "cubic":
- tcp_algo = "bbr"
- else:
- tcp_algo = "cubic"
- filepath = "{}{}_{}parallel_tcp_{}_flows_{}.txt".format(
- self.config["folder"], self.config["prefix"], flow, tcp_algo, run
- )
- c = "iperf3 -c {} -t {} -C {} -P {}".format(
- self.config["server"], time, tcp_algo, flow
- )
- iperf_command = [c]
- print_message("{} parallel {} flows".format(flow, tcp_algo))
- retry = True
- while retry:
- try:
- out = subprocess.check_output(iperf_command, shell=True).decode(
- "utf-8"
- )
- write_to_file(filepath, out)
- retry = False
- except:
- print("iPerf Error.\t{}".format(iperf_command))
- sleep(1)
- if retry:
- print_message("Retry in 5s")
- sleep(5)
-
-
- async def start_server(args):
- # get configuration from client
- print_message("Start Server")
- config = None
- ip = get_ip_from_interface(args.interface)
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.setblocking(True)
- s.bind((ip, args.port))
- s.listen(1)
- conn, addr = s.accept()
- print_message("Got connection from {}".format(addr))
- while True:
- data = conn.recv(1024)
- if not data:
- break
- config = json2config(data.decode("utf-8"))
- conn.close()
- s.close()
- # add interface and ip to config
- config["interface"] = args.interface
- config["client"] = addr[0]
- config["server"] = ip
- config["folder"] = args.folder
- server = Server(config)
- server.measure()
-
-
- async def start_client(args):
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect((args.client, args.port))
- s.send(config2json(args))
- s.close()
- print("Config send.")
- # overwrite ip
- config = vars(args).copy()
- config["server"] = args.client
- config["client"] = get_ip_from_interface(args.interface)
- new_set = dict()
- if config["set"] is not None:
- for pair in config["set"]:
- tmp = pair.split("=")
- new_set[tmp[0]] = tmp[1]
- config["set"] = new_set
- client = Client(config)
- client.measure()
-
-
- if __name__ == "__main__":
- now = datetime.now()
- processHandler = ProcessHandler()
-
- parser = ArgumentParser()
- # common arguments
- # required
- parser.add_argument("-i", "--interface", required=True, help="Interface.")
- # optional
- parser.add_argument("-p", "--port", default=5201, type=int, help="Port.")
- parser.add_argument(
- "-f",
- "--folder",
- default=os.path.dirname(os.path.realpath(__file__)) + "/",
- help="Folder for the pcap files.",
- )
-
- # server exclusive arguments
- parser.add_argument(
- "-s",
- "--server",
- action="store_true",
- default=False,
- help="Starts the script in server mode.",
- )
-
- # client exclusive arguments
- parser.add_argument(
- "-c",
- "--client",
- default=None,
- help="Start in client mode and set the server IPv4 address.",
- )
- parser.add_argument(
- "--prefix", default=now.strftime("%Y-%m-%d"), help="Prefix on filename."
- )
- parser.add_argument(
- "--set",
- metavar="KEY=VALUE",
- nargs="+",
- help="Set a number of key-value pairs "
- "(do not put spaces before or after the = sign). "
- "If a value contains spaces, you should define "
- "it with double quotes: "
- 'foo="this is a sentence". Note that '
- "values are always treated as strings.",
- )
- parser.add_argument(
- "-n",
- "--number_of_measurements",
- type=int,
- default=10,
- help="Number of measurements",
- )
- parser.add_argument(
- "--ping",
- action="store_true",
- default=False,
- help="Sending ICMP pings.",
- )
- parser.add_argument(
- "--bandwidth",
- action="store_true",
- default=False,
- help="Measure greedy tcp throughput with iperf3."
- "Use the --set flag for: "
- "server_is_sender=false if enable server is sending."
- "algo=cubic set tcp algorithm. Can be a comma separated string for multiple congestion control algorithms."
- "alternate_hystart=false if enabled alternate reproduce every Cubic measurement wicht and without HyStart (Also raises the receive window.). "
- "time=10 length of transmission in seconds.",
- )
- parser.add_argument(
- "--tcp_parallel",
- action="store_true",
- default=False,
- help="Measure greedy tcp throughput with parallel tcp flows. Alternate between bbr and cubic."
- "Use the --set flag for: "
- "start=1 Start value for flows "
- "end=2 End value for flows "
- "time=12 transmission time. ",
- )
- parser.add_argument(
- "--cbr",
- action="store_true",
- default=False,
- help="Measure Udp CBR traffic."
- "Use the --set flag for: "
- "reverse=false enable reverse mode. Server is sending. "
- "bitrate=1M target bitrate in bits/sec (0 for unlimited) "
- "[KMG] indicates options that support a K/M/G suffix for kilo-, mega-, or giga-. "
- "time=2 time in seconds to transmit for. "
- "sleep=60 sleep before each cbr traffic in seconds. ",
- )
- parser.add_argument(
- "--drx",
- action="store_true",
- default=False,
- help="Measure RTTs with ping tool. To detect DRX cycles. "
- "Use the --set flag for: "
- "intervals=start:end:increase,start2:end2:increase2... set intervals for gaps, end is inclusive, "
- "values in [s]. At least one interval is required. E.g.: 0:3:1 creates values [0,1,2,3]"
- "pre_ping=false if enabled pings are send before measurements to set device in continuous reception mode. ",
- )
- parser.add_argument(
- "--harq",
- action="store_true",
- default=False,
- help="Captures http transmissions on client side.",
- )
- parser.add_argument(
- "--serial",
- default=None,
- help="Serial device e.g. /dev/ttyUSB2",
- )
- parser.add_argument(
- "--baudrate",
- default=115200,
- type=int,
- help="Serial device baudrate",
- )
-
- args = parser.parse_args()
-
- if args.server:
- asyncio.run(start_server(args))
- elif args.client is not None:
- asyncio.run(start_client(args))
|