#!/usr/bin/env python3 import csv import datetime from argparse import ArgumentParser import pandas as pd def convert_bandwidth(value): try: value = int(value) except: value = -1 if value == 0: return 1.4 elif value == 1: return 3 elif value == 2: return 5 elif value == 3: return 10 elif value == 4: return 15 elif value == 5: return 20 else: return 0 if __name__ == "__main__": parser = ArgumentParser() parser.add_argument("-f", "--file", required=True, help="Input txt file.") args = parser.parse_args() file = open(args.file, "r") content = file.read() file.close() all_csv_lines = list() csv_header = ["time"] h1 = False h2 = False h3 = False for line in content.split("\n"): if line == "" or line == "\n": break raw_columns = line.split(";") csv_line = list() csv_line.append(raw_columns[0]) for i in range(1, len(raw_columns)): col = raw_columns[i] if 'AT+QNWCFG="nr5g_csi"' in col: if not h1: csv_header += ["mcs_PDSCH", "ri_PDSCH", "downlink_cqi", "pmi"] h1 = True tmp = raw_columns[i + 1].replace('+QNWCFG: "nr5g_csi",', "") csv_line += tmp.split(",") elif "AT+QENDC" in col: if not h2: csv_header += [ "endc_avl", "plmn_info_list_r15_avl", "endc_rstr", "5G_basic", ] h2 = True tmp = raw_columns[i + 1].replace("+QENDC: ", "") csv_line += tmp.split(",") elif 'AT+QENG="servingcell"' in col: if not h3: csv_header += [ "connection_state", "is_tdd", "mcc", "mnc", "cellID", "PCID", "earfcn", "freq_band_ind", "UL_bandwidth", "DL_bandwidth", "TAC", "RSRP", "RSRQ", "RSSI", "SINR", "CQI_1-30", "tx_power", "srxlev", "MCC", "MNC", "PCID", "RSRP", "SINR", "RSRQ", "ARFCN", "band", ] h3 = True if "NOCONN" in raw_columns[i + 1]: csv_line.append("NOCONN") csv_line += raw_columns[i + 2].replace('+QENG: "LTE",', "").split(",") csv_line += ( raw_columns[i + 3].replace('+QENG:"NR5G-NSA",', "").split(",") ) elif "SEARCH" in raw_columns[i + 1]: csv_line.append("SEARCH") csv_line += [""] * 25 elif "OK" == raw_columns[i + 1]: csv_line.append("OK") csv_line += [""] * 25 else: csv_line.append("undefined") csv_line += [""] * 25 all_csv_lines.append(csv_line) outputfile = open(args.file.replace("txt", "csv"), "w") writer = csv.writer(outputfile, delimiter=",", lineterminator="\n", escapechar='\\') writer.writerow(csv_header) #print(all_csv_lines) for l in all_csv_lines: #print(l) writer.writerow(l) outputfile.close() outputfile = open(args.file.replace("txt", "csv"), "r") serial_df = pd.read_csv(outputfile, converters={"UL_bandwidth": convert_bandwidth, "DL_bandwidth": convert_bandwidth}, ) serial_df = serial_df.drop(columns=["MCC", "MNC"]) serial_df["datetime"] = pd.to_datetime( serial_df["time"].apply(lambda x: datetime.datetime.fromtimestamp(x)) ) serial_df.to_csv(args.file.replace("txt", "csv")) outputfile.close() exit() delete_string = [ 'AT+QNWCFG="nr5g_csi";', '+QNWCFG: "nr5g_csi"', 'AT+QENG="servingcell";+QENG: "servingcell",', "+QENG:", "AT+QENDC;+QENDC:", ] for d in delete_string: content = content.replace(d, ",") content = ( content.replace(";", "") .replace(" ", "") .replace(",,,", ",") .replace('"', "") .replace("LTE,", "") .replace("NR5G-NSA,", "") ) header = "time,mcs,ri,cqi,pmi,conn_state,is_tdd,MCC,MNC,cellID,PCID,earfcn,freq_band_ind,UL_bandwidth,DL_bandwidth,TAC,RSRP,RSRQ,RSSI,SINR,CQI,tx_power,srxlev,MCC,MNC,PCID,RSRP,SINR,RSRQ,ARFCN,band,endc_avl,plmn_info_list_r15_avl,endc_rstr,5G_basic\n" csv_path = args.file.replace("txt", "csv") print("Write to: {}".format(csv_path)) csv_string = header for csv_line in content.split("\n"): if len(header.split(",")) == len(csv_line.split(",")): csv_string += csv_line + "\n" else: # print("{} found {}".format(len(header.split(",")), len(csv_line.split(",")))) print("Could not interpret string: {}".format(csv_line)) print( "Expect {} columns got {}".format( len(header.split(",")), len(csv_line.split(",")) ) ) csv_string_io = StringIO(csv_string) serial_df = pd.read_csv(csv_string_io) serial_df = serial_df serial_df["datetime"] = pd.to_datetime( serial_df["time"].apply(lambda x: datetime.datetime.fromtimestamp(x)) ) serial_df.to_csv(csv_path) print(serial_df)