Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

357 Zeilen
12KB

  1. #!/usr/bin/env python3
  2. import datetime
  3. from io import StringIO
  4. from math import ceil
  5. import pandas as pd
  6. import multiprocessing
  7. import os
  8. import re
  9. import subprocess
  10. from argparse import ArgumentParser
  11. from time import sleep, time
  12. # tshark -r ./tcp-cap-test/test__bandwidth_reverse_tcp_bbr_1.pcap -Y "tcp.stream eq 1" -T fields -e frame.time_relative -e ip.len -e ip.hdr_len -e tcp.hdr_len -e tcp.analysis.ack_rtt -e tcp.analysis.bytes_in_flight -e tcp.analysis.retransmission -e tcp.analysis.duplicate_ack -e ip.dst -e ip.src -e tcp.options.mss_val -E header=y -E separator=, -E quote=d
  13. from util import chunk_list
  14. def format_tcp_trace_to_csv(pcap_number, packets_to_keep, is_reverse=False):
  15. txt_name = "{}{}{}.txt".format(args.folder, args.tcp_trace, pcap_number)
  16. try:
  17. txt_file = open(txt_name, "r")
  18. except IOError as e:
  19. print("\rCan not open file {}\n {} {}".format(txt_name, e.errno, e.strerror))
  20. return
  21. content = txt_file.read()
  22. txt_file.close()
  23. csv_string = ""
  24. csv_string += "time_tcp_probe,snd_cwnd,snd_wnd,srtt\n"
  25. uptime = None
  26. counter = 0
  27. lines = content.split("\n")
  28. start_time = None
  29. for line in lines:
  30. counter += 1
  31. if uptime is None:
  32. uptime = float(line.split(" ")[0])
  33. else:
  34. if is_reverse:
  35. line_filter = "src=[::ffff:{}]:{}".format(args.server, args.port)
  36. else:
  37. line_filter = "dest={}:{}".format(args.server, args.port)
  38. # ignore tcp packets from iperf syn (packets to keep = len of tcp.stream.eq 1)
  39. if line_filter in line and counter >= (len(lines) - packets_to_keep):
  40. match = re.match(
  41. r".* (\d+\.\d+): tcp_probe:.*snd_cwnd=(\d+).*snd_wnd=(\d+).*srtt=(\d+)",
  42. line,
  43. )
  44. if match:
  45. if start_time is None:
  46. start_time = float(match.group(1)) - uptime
  47. time = float(match.group(1)) - (uptime + start_time)
  48. snd_cwnd = match.group(2)
  49. snd_wnd = match.group(3)
  50. srtt = match.group(4)
  51. csv_string += "{},{},{},{}\n".format(time, snd_cwnd, snd_wnd, srtt)
  52. csv_string_io = StringIO(csv_string)
  53. trace_df = pd.read_csv(csv_string_io)
  54. if len(trace_df) <= 1:
  55. print("\rFaulty tcp trace file for pcap no: {}".format(pcap_number))
  56. return None
  57. return trace_df
  58. def format_pcaps_to_csv(pcaps, dummy):
  59. global n
  60. for pcap in pcaps:
  61. if pcap.endswith(".pcap") and pcap.startswith(args.prefix):
  62. match = re.match(regex, pcap)
  63. if match:
  64. # metadata from pcap filename
  65. direction = "upload"
  66. if "_reverse_" in pcap:
  67. direction = "download"
  68. congestion_control = match.group(2)
  69. pcap_number = match.group(3)
  70. # analyse traffic from pcap (receiver side)
  71. tshark_command = [
  72. "tshark",
  73. "-r",
  74. "{}{}".format(args.folder, pcap),
  75. # remove this for mobile measurements
  76. # "-Y",
  77. # "tcp.stream eq 1",
  78. "-T",
  79. "fields",
  80. "-e",
  81. "frame.time_relative",
  82. "-e",
  83. "ip.len",
  84. "-e",
  85. "ip.hdr_len",
  86. "-e",
  87. "tcp.hdr_len",
  88. "-e",
  89. "tcp.analysis.ack_rtt",
  90. "-e",
  91. "tcp.analysis.bytes_in_flight",
  92. "-e",
  93. "tcp.analysis.retransmission",
  94. "-e",
  95. "tcp.analysis.duplicate_ack",
  96. "-e",
  97. "ip.src",
  98. "-e",
  99. "ip.dst",
  100. "-e",
  101. "tcp.options.mss_val",
  102. "-e",
  103. "tcp.window_size",
  104. "-e",
  105. "frame.time_epoch",
  106. "-e",
  107. "tcp.stream", # have to be the last value in line!
  108. "-E",
  109. "header=y",
  110. "-E",
  111. "separator=,",
  112. "-E",
  113. "quote=d",
  114. ]
  115. tshark_out = None
  116. try:
  117. tshark_out = subprocess.check_output(tshark_command).decode("utf-8")
  118. except subprocess.CalledProcessError as tsharkexec:
  119. if tsharkexec.returncode == 2:
  120. print("\rtshark could not open pcap: {}".format(pcap))
  121. else:
  122. print("\rtshark exited with code: {}".format(tsharkexec.returncode))
  123. print(tsharkexec.output)
  124. continue
  125. # Convert String into StringIO
  126. csv_string_io = StringIO(tshark_out)
  127. conv_bool = lambda x: (True if x != "" else False)
  128. pcap_df = pd.read_csv(
  129. csv_string_io,
  130. converters={
  131. "tcp.analysis.retransmission": conv_bool,
  132. "tcp.analysis.duplicate_ack": conv_bool,
  133. },
  134. )
  135. last_tcp_stream_in_pcap = pcap_df["tcp.stream"].max()
  136. pcap_df = pcap_df.loc[pcap_df["tcp.stream"] == last_tcp_stream_in_pcap]
  137. pcap_df["payload_size"] = pcap_df["ip.len"] - (
  138. pcap_df["ip.hdr_len"] + pcap_df["tcp.hdr_len"]
  139. )
  140. pcap_df["direction"] = direction
  141. pcap_df["congestion_control"] = congestion_control
  142. pcap_df["pcap_number"] = pcap_number
  143. pcap_df["datetime"] = pd.to_datetime(
  144. pcap_df["frame.time_epoch"].apply(
  145. lambda x: datetime.datetime.fromtimestamp(x)
  146. )
  147. )
  148. pcap_df = pcap_df.drop(
  149. columns=["tcp.stream", "ip.len", "ip.hdr_len", "tcp.hdr_len"]
  150. )
  151. pcap_df.rename(
  152. columns={
  153. "frame.time_relative": "arrival_time",
  154. "ip.src": "src_ip",
  155. "ip.dst": "dst_ip",
  156. "tcp.options.mss_val": "mss",
  157. "tcp.analysis.ack_rtt": "ack_rtt",
  158. "tcp.analysis.bytes_in_flight": "bytes_in_flight",
  159. "tcp.window_size": "receive_window_size",
  160. "tcp.analysis.retransmission": "is_retranmission",
  161. "tcp.analysis.duplicate_ack": "is_dup_ack",
  162. "frame.time_epoch": "time_epoch",
  163. },
  164. inplace=True,
  165. )
  166. pcap_df = pcap_df.sort_values("arrival_time")
  167. try:
  168. # join tcp_trace data with pcap data
  169. merge_srtt = False
  170. if merge_srtt:
  171. tcp_trace_df = format_tcp_trace_to_csv(
  172. pcap_number,
  173. len(pcap_df),
  174. is_reverse=True if "_reverse_" in pcap else False,
  175. )
  176. if tcp_trace_df is None:
  177. print(
  178. "\rNo tcp trace file for pcap no {} found".format(
  179. pcap_number
  180. )
  181. )
  182. break
  183. merged_df = pd.merge_asof(
  184. pcap_df.loc[pcap_df["src_ip"] != args.server],
  185. tcp_trace_df,
  186. left_on="arrival_time",
  187. right_on="time_tcp_probe",
  188. tolerance=0.01,
  189. )
  190. merged_df = pd.concat(
  191. [merged_df, pcap_df.loc[pcap_df["src_ip"] == args.server]]
  192. )
  193. merged_df = merged_df.sort_values("arrival_time")
  194. merged_df.to_csv(
  195. "{}{}".format(args.folder, pcap).replace(".pcap", ".csv")
  196. )
  197. else:
  198. pcap_df.to_csv(
  199. "{}{}".format(args.folder, pcap).replace(".pcap", ".csv")
  200. )
  201. except:
  202. print("\rCould not merge data for pcap no: {}".format(pcap))
  203. pcap_df.to_csv(
  204. "{}{}".format(args.folder, pcap).replace(".pcap", ".csv")
  205. )
  206. n.value += 1
  207. else:
  208. print("File does not match regex: {}".format(pcap))
  209. else:
  210. print("File is not from type PCAP: {}".format(pcap))
  211. from itertools import islice
  212. def chunk(it, size):
  213. it = iter(it)
  214. return iter(lambda: tuple(islice(it, size)), ())
  215. if __name__ == "__main__":
  216. parser = ArgumentParser()
  217. parser.add_argument("-f", "--folder", required=True, help="Folder with pcaps.")
  218. parser.add_argument(
  219. "-p",
  220. "--prefix",
  221. required=True,
  222. help="Filename prefix e.g. 2021-03-17_bandwidth_tcp_bbr_",
  223. )
  224. parser.add_argument(
  225. "-t",
  226. "--tcp_trace",
  227. required=True,
  228. help="Format of tcp trace txt files e.g: 2021_03_30_bandwidth_reverse_tcp_tcp_trace_ for "
  229. "2021_03_30_bandwidth_reverse_tcp_tcp_trace_1.txt",
  230. )
  231. parser.add_argument(
  232. "-c",
  233. "--cores",
  234. default=1,
  235. type=int,
  236. help="Number of cores for multiprocessing.",
  237. )
  238. parser.add_argument(
  239. "--port",
  240. default=5201,
  241. type=int,
  242. help="iPerf3 port used for measurements",
  243. )
  244. parser.add_argument(
  245. "--server",
  246. default="130.75.73.69",
  247. type=str,
  248. help="iPerf3 server ip used for measurements",
  249. )
  250. args = parser.parse_args()
  251. manager = multiprocessing.Manager()
  252. # regex for protokoll, algo and bitrate
  253. regex = r".*_bandwidth_[reverse_]*(.+)_(.+)_(\d+)\.pcap"
  254. csv_header = "n,start_time,end_time,payload_size,protocol,algorithm,direction,packages_received,syns_in_pcap\n"
  255. n = manager.Value("i", 0)
  256. filenames = os.listdir(args.folder)
  257. number_of_files = len(filenames)
  258. pcap_list = []
  259. jobs = []
  260. st = time()
  261. for filename in filenames:
  262. if filename.endswith(".pcap") and filename.startswith(args.prefix):
  263. if re.match(regex, filename):
  264. pcap_list.append(filename)
  265. pcap_list.sort()
  266. print("Found {} pcap files in {} files.".format(len(pcap_list), len(filenames)))
  267. if len(pcap_list) == 0:
  268. print("Abort no pcaps found with prefix: {}".format(args.prefix))
  269. print("{}{}".format(args.folder, args.prefix))
  270. exit(1)
  271. parts = chunk(pcap_list, ceil(len(pcap_list) / args.cores))
  272. print("Start processing with {} jobs.".format(args.cores))
  273. for p in parts:
  274. process = multiprocessing.Process(target=format_pcaps_to_csv, args=(p, "dummy"))
  275. jobs.append(process)
  276. for j in jobs:
  277. j.start()
  278. print("Started all jobs.")
  279. # Ensure all of the processes have finished
  280. finished_job_counter = 0
  281. working = ["|", "/", "-", "\\", "|", "/", "-", "\\"]
  282. w = 0
  283. while len(jobs) != finished_job_counter:
  284. sleep(1)
  285. print(
  286. "\r\t{}{}{}\t Running {} jobs ({} finished). Processed {} out of {} pcaps. ({}%) ".format(
  287. working[w],
  288. working[w],
  289. working[w],
  290. len(jobs),
  291. finished_job_counter,
  292. n.value,
  293. len(pcap_list),
  294. round((n.value / len(pcap_list)) * 100, 2),
  295. ),
  296. end="",
  297. )
  298. finished_job_counter = 0
  299. for j in jobs:
  300. if not j.is_alive():
  301. finished_job_counter += 1
  302. if (w + 1) % len(working) == 0:
  303. w = 0
  304. else:
  305. w += 1
  306. print("")
  307. et = time()
  308. # get the execution time
  309. elapsed_time = et - st
  310. print("Execution time:", elapsed_time, "seconds")