|
|
|
@@ -0,0 +1,191 @@ |
|
|
|
#!/usr/bin/env python3 |
|
|
|
import csv |
|
|
|
import datetime |
|
|
|
from argparse import ArgumentParser |
|
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
|
|
|
def convert_bandwidth(value): |
|
|
|
try: |
|
|
|
value = int(value) |
|
|
|
except: |
|
|
|
value = -1 |
|
|
|
if value == 0: |
|
|
|
return 1.4 |
|
|
|
elif value == 1: |
|
|
|
return 3 |
|
|
|
elif value == 2: |
|
|
|
return 5 |
|
|
|
elif value == 3: |
|
|
|
return 10 |
|
|
|
elif value == 4: |
|
|
|
return 15 |
|
|
|
elif value == 5: |
|
|
|
return 20 |
|
|
|
else: |
|
|
|
return 0 |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
parser = ArgumentParser() |
|
|
|
parser.add_argument("-f", "--file", required=True, help="Input txt file.") |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
file = open(args.file, "r") |
|
|
|
content = file.read() |
|
|
|
file.close() |
|
|
|
|
|
|
|
all_csv_lines = list() |
|
|
|
csv_header = ["time"] |
|
|
|
|
|
|
|
h1 = False |
|
|
|
h2 = False |
|
|
|
h3 = False |
|
|
|
|
|
|
|
for line in content.split("\n"): |
|
|
|
if line == "" or line == "\n": |
|
|
|
break |
|
|
|
raw_columns = line.split(";") |
|
|
|
csv_line = list() |
|
|
|
csv_line.append(raw_columns[0]) |
|
|
|
for i in range(1, len(raw_columns)): |
|
|
|
col = raw_columns[i] |
|
|
|
|
|
|
|
if 'AT+QNWCFG="nr5g_csi"' in col: |
|
|
|
if not h1: |
|
|
|
csv_header += ["mcs_PDSCH", "ri_PDSCH", "downlink_cqi", "pmi"] |
|
|
|
h1 = True |
|
|
|
tmp = raw_columns[i + 1].replace('+QNWCFG: "nr5g_csi",', "") |
|
|
|
csv_line += tmp.split(",") |
|
|
|
elif "AT+QENDC" in col: |
|
|
|
if not h2: |
|
|
|
csv_header += [ |
|
|
|
"endc_avl", |
|
|
|
"plmn_info_list_r15_avl", |
|
|
|
"endc_rstr", |
|
|
|
"5G_basic", |
|
|
|
] |
|
|
|
h2 = True |
|
|
|
tmp = raw_columns[i + 1].replace("+QENDC: ", "") |
|
|
|
csv_line += tmp.split(",") |
|
|
|
elif 'AT+QENG="servingcell"' in col: |
|
|
|
if not h3: |
|
|
|
csv_header += [ |
|
|
|
"connection_state", |
|
|
|
"is_tdd", |
|
|
|
"mcc", |
|
|
|
"mnc", |
|
|
|
"cellID", |
|
|
|
"PCID", |
|
|
|
"earfcn", |
|
|
|
"freq_band_ind", |
|
|
|
"UL_bandwidth", |
|
|
|
"DL_bandwidth", |
|
|
|
"TAC", |
|
|
|
"RSRP", |
|
|
|
"RSRQ", |
|
|
|
"RSSI", |
|
|
|
"SINR", |
|
|
|
"CQI_1-30", |
|
|
|
"tx_power", |
|
|
|
"srxlev", |
|
|
|
"MCC", |
|
|
|
"MNC", |
|
|
|
"PCID", |
|
|
|
"RSRP", |
|
|
|
"SINR", |
|
|
|
"RSRQ", |
|
|
|
"ARFCN", |
|
|
|
"band", |
|
|
|
] |
|
|
|
h3 = True |
|
|
|
if "NOCONN" in raw_columns[i + 1]: |
|
|
|
csv_line.append("NOCONN") |
|
|
|
csv_line += raw_columns[i + 2].replace('+QENG: "LTE",', "").split(",") |
|
|
|
csv_line += ( |
|
|
|
raw_columns[i + 3].replace('+QENG:"NR5G-NSA",', "").split(",") |
|
|
|
) |
|
|
|
elif "SEARCH" in raw_columns[i + 1]: |
|
|
|
csv_line.append("SEARCH") |
|
|
|
csv_line += [""] * 25 |
|
|
|
elif "OK" == raw_columns[i + 1]: |
|
|
|
csv_line.append("OK") |
|
|
|
csv_line += [""] * 25 |
|
|
|
else: |
|
|
|
csv_line.append("undefined") |
|
|
|
csv_line += [""] * 25 |
|
|
|
|
|
|
|
|
|
|
|
all_csv_lines.append(csv_line) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
outputfile = open(args.file.replace("txt", "csv"), "w") |
|
|
|
writer = csv.writer(outputfile, delimiter=",", lineterminator="\n", escapechar='\\') |
|
|
|
writer.writerow(csv_header) |
|
|
|
#print(all_csv_lines) |
|
|
|
for l in all_csv_lines: |
|
|
|
#print(l) |
|
|
|
writer.writerow(l) |
|
|
|
|
|
|
|
outputfile.close() |
|
|
|
|
|
|
|
outputfile = open(args.file.replace("txt", "csv"), "r") |
|
|
|
serial_df = pd.read_csv(outputfile, |
|
|
|
converters={"UL_bandwidth": convert_bandwidth, "DL_bandwidth": convert_bandwidth}, |
|
|
|
) |
|
|
|
serial_df = serial_df.drop(columns=["MCC", "MNC"]) |
|
|
|
serial_df["datetime"] = pd.to_datetime( |
|
|
|
serial_df["time"].apply(lambda x: datetime.datetime.fromtimestamp(x)) |
|
|
|
) |
|
|
|
serial_df.to_csv(args.file.replace("txt", "csv")) |
|
|
|
outputfile.close() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
exit() |
|
|
|
|
|
|
|
delete_string = [ |
|
|
|
'AT+QNWCFG="nr5g_csi";', |
|
|
|
'+QNWCFG: "nr5g_csi"', |
|
|
|
'AT+QENG="servingcell";+QENG: "servingcell",', |
|
|
|
"+QENG:", |
|
|
|
"AT+QENDC;+QENDC:", |
|
|
|
] |
|
|
|
|
|
|
|
for d in delete_string: |
|
|
|
content = content.replace(d, ",") |
|
|
|
content = ( |
|
|
|
content.replace(";", "") |
|
|
|
.replace(" ", "") |
|
|
|
.replace(",,,", ",") |
|
|
|
.replace('"', "") |
|
|
|
.replace("LTE,", "") |
|
|
|
.replace("NR5G-NSA,", "") |
|
|
|
) |
|
|
|
|
|
|
|
header = "time,mcs,ri,cqi,pmi,conn_state,is_tdd,MCC,MNC,cellID,PCID,earfcn,freq_band_ind,UL_bandwidth,DL_bandwidth,TAC,RSRP,RSRQ,RSSI,SINR,CQI,tx_power,srxlev,MCC,MNC,PCID,RSRP,SINR,RSRQ,ARFCN,band,endc_avl,plmn_info_list_r15_avl,endc_rstr,5G_basic\n" |
|
|
|
csv_path = args.file.replace("txt", "csv") |
|
|
|
print("Write to: {}".format(csv_path)) |
|
|
|
csv_string = header |
|
|
|
for csv_line in content.split("\n"): |
|
|
|
if len(header.split(",")) == len(csv_line.split(",")): |
|
|
|
csv_string += csv_line + "\n" |
|
|
|
else: |
|
|
|
# print("{} found {}".format(len(header.split(",")), len(csv_line.split(",")))) |
|
|
|
print("Could not interpret string: {}".format(csv_line)) |
|
|
|
print( |
|
|
|
"Expect {} columns got {}".format( |
|
|
|
len(header.split(",")), len(csv_line.split(",")) |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
csv_string_io = StringIO(csv_string) |
|
|
|
serial_df = pd.read_csv(csv_string_io) |
|
|
|
serial_df = serial_df |
|
|
|
serial_df["datetime"] = pd.to_datetime( |
|
|
|
serial_df["time"].apply(lambda x: datetime.datetime.fromtimestamp(x)) |
|
|
|
) |
|
|
|
serial_df.to_csv(csv_path) |
|
|
|
print(serial_df) |