|
- #!/usr/bin/env python3
- import csv
- import datetime
- from argparse import ArgumentParser
- import pandas as pd
-
-
- def convert_bandwidth(value):
- try:
- value = int(value)
- except:
- value = -1
- if value == 0:
- return 1.4
- elif value == 1:
- return 3
- elif value == 2:
- return 5
- elif value == 3:
- return 10
- elif value == 4:
- return 15
- elif value == 5:
- return 20
- else:
- return 0
-
-
- if __name__ == "__main__":
-
- parser = ArgumentParser()
- parser.add_argument("-f", "--file", required=True, help="Input txt file.")
- args = parser.parse_args()
-
- file = open(args.file, "r")
- content = file.read()
- file.close()
-
- all_csv_lines = list()
- csv_header = ["time"]
-
- h1 = False
- h2 = False
- h3 = False
-
- for line in content.split("\n"):
- if line == "" or line == "\n":
- break
- raw_columns = line.split(";")
- csv_line = list()
- csv_line.append(raw_columns[0])
- for i in range(1, len(raw_columns)):
- col = raw_columns[i]
-
- if 'AT+QNWCFG="nr5g_csi"' in col:
- if not h1:
- csv_header += ["mcs_PDSCH", "ri_PDSCH", "downlink_cqi", "pmi"]
- h1 = True
- tmp = raw_columns[i + 1].replace('+QNWCFG: "nr5g_csi",', "")
- csv_line += tmp.split(",")
- elif "AT+QENDC" in col:
- if not h2:
- csv_header += [
- "endc_avl",
- "plmn_info_list_r15_avl",
- "endc_rstr",
- "5G_basic",
- ]
- h2 = True
- tmp = raw_columns[i + 1].replace("+QENDC: ", "")
- csv_line += tmp.split(",")
- elif 'AT+QENG="servingcell"' in col:
- if not h3:
- csv_header += [
- "connection_state",
- "is_tdd",
- "mcc",
- "mnc",
- "cellID",
- "PCID",
- "earfcn",
- "freq_band_ind",
- "UL_bandwidth",
- "DL_bandwidth",
- "TAC",
- "RSRP",
- "RSRQ",
- "RSSI",
- "SINR",
- "CQI_1-30",
- "tx_power",
- "srxlev",
- "MCC",
- "MNC",
- "PCID",
- "RSRP",
- "SINR",
- "RSRQ",
- "ARFCN",
- "band",
- ]
- h3 = True
- if "NOCONN" in raw_columns[i + 1]:
- csv_line.append("NOCONN")
- csv_line += raw_columns[i + 2].replace('+QENG: "LTE",', "").split(",")
- csv_line += (
- raw_columns[i + 3].replace('+QENG:"NR5G-NSA",', "").split(",")
- )
- elif "SEARCH" in raw_columns[i + 1]:
- csv_line.append("SEARCH")
- csv_line += [""] * 25
- elif "OK" == raw_columns[i + 1]:
- csv_line.append("OK")
- csv_line += [""] * 25
- else:
- csv_line.append("undefined")
- csv_line += [""] * 25
-
-
- all_csv_lines.append(csv_line)
-
-
-
- outputfile = open(args.file.replace("txt", "csv"), "w")
- writer = csv.writer(outputfile, delimiter=",", lineterminator="\n", escapechar='\\')
- writer.writerow(csv_header)
- #print(all_csv_lines)
- for l in all_csv_lines:
- #print(l)
- writer.writerow(l)
-
- outputfile.close()
-
- outputfile = open(args.file.replace("txt", "csv"), "r")
- serial_df = pd.read_csv(outputfile,
- converters={"UL_bandwidth": convert_bandwidth, "DL_bandwidth": convert_bandwidth},
- )
- serial_df = serial_df.drop(columns=["MCC", "MNC"])
- serial_df["datetime"] = pd.to_datetime(
- serial_df["time"].apply(lambda x: datetime.datetime.fromtimestamp(x))
- )
- serial_df.to_csv(args.file.replace("txt", "csv"))
- outputfile.close()
-
-
-
- exit()
-
- delete_string = [
- 'AT+QNWCFG="nr5g_csi";',
- '+QNWCFG: "nr5g_csi"',
- 'AT+QENG="servingcell";+QENG: "servingcell",',
- "+QENG:",
- "AT+QENDC;+QENDC:",
- ]
-
- for d in delete_string:
- content = content.replace(d, ",")
- content = (
- content.replace(";", "")
- .replace(" ", "")
- .replace(",,,", ",")
- .replace('"', "")
- .replace("LTE,", "")
- .replace("NR5G-NSA,", "")
- )
-
- header = "time,mcs,ri,cqi,pmi,conn_state,is_tdd,MCC,MNC,cellID,PCID,earfcn,freq_band_ind,UL_bandwidth,DL_bandwidth,TAC,RSRP,RSRQ,RSSI,SINR,CQI,tx_power,srxlev,MCC,MNC,PCID,RSRP,SINR,RSRQ,ARFCN,band,endc_avl,plmn_info_list_r15_avl,endc_rstr,5G_basic\n"
- csv_path = args.file.replace("txt", "csv")
- print("Write to: {}".format(csv_path))
- csv_string = header
- for csv_line in content.split("\n"):
- if len(header.split(",")) == len(csv_line.split(",")):
- csv_string += csv_line + "\n"
- else:
- # print("{} found {}".format(len(header.split(",")), len(csv_line.split(","))))
- print("Could not interpret string: {}".format(csv_line))
- print(
- "Expect {} columns got {}".format(
- len(header.split(",")), len(csv_line.split(","))
- )
- )
-
- csv_string_io = StringIO(csv_string)
- serial_df = pd.read_csv(csv_string_io)
- serial_df = serial_df
- serial_df["datetime"] = pd.to_datetime(
- serial_df["time"].apply(lambda x: datetime.datetime.fromtimestamp(x))
- )
- serial_df.to_csv(csv_path)
- print(serial_df)
|