import sys from importlib import reload sys.path.append('./') import asotr reload(asotr) import matplotlib.pyplot as plt from matplotlib import dates import pandas as pd from datetime import datetime, timedelta import os import json def cmd_parse(asotr_data): decode_list = [] for elem in asotr_data.itertuples(): elem_msg = asotr.cmd_decode(elem.cmd.strip()) if elem_msg != '': str_ = f'{elem.timestamp};{elem_msg}' decode_list.append(str_) return decode_list def pow_temp_parse(elem, tstamp_format='%d.%m.%Y %H:%M:%S.%f'): timestamp = pd.to_datetime(elem.timestamp, format=tstamp_format) if "*1" not in elem.data: return ('none', []) l = [] data = elem.data[2:] l = [] l.append(asotr.get_utc_seconds(elem.timestamp, tstamp_format)) l.append(timestamp) if "status 3" in elem.cmd: l.extend(list(map(float, data.split()))) return ('t', l) elif "status 4" in elem.cmd: l.extend(list(map(int, data.split()))) return ('p', l) else: return ('none', []) def get_cmd_data(fname, tstamp_format='%d.%m.%Y %H:%M:%S.%f'): t = [] p = [] asotr_data = pd.read_csv(fname, delimiter=';', names=['timestamp', 'cmd', 'data']) cmd_list = cmd_parse(asotr_data) for elem in asotr_data.itertuples(): data = pow_temp_parse(elem, tstamp_format) if data[0] == 't': t.append(data[1]) elif data[0] == 'p': p.append(data[1]) temp = pd.DataFrame(t, columns=['timestamp_sec', 'timestamp', 'ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6']) power = pd.DataFrame(p, columns=['timestamp_sec', 'timestamp', 'ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6']) return (cmd_list, temp, power) def decode_collect_data(uart_server_log_path, asotr_kit): files = os.listdir(uart_server_log_path) files1 = [file for file in files if 'cmd_data' in file] files1 = sorted(files1) cmd_list_itog = [] temp_itog = [] pow_itog = [] cmd_list, temp, power = get_cmd_data(uart_server_log_path + files1[0]) cmd_list_itog.extend(cmd_list) temp_itog = temp pow_itog = power print(files1[0]) for i, file in enumerate(files1): if i > 0: print(file) cmd_list, temp, power = get_cmd_data(uart_server_log_path + file) cmd_list_itog.extend(cmd_list) temp_itog = pd.concat([temp_itog, temp], ignore_index=True) pow_itog = pd.concat([pow_itog, power], ignore_index=True) temp_itog = temp_itog.sort_values(by='timestamp_sec') pow_itog = pow_itog.sort_values(by='timestamp_sec') temp_itog['timestamp'] = temp_itog['timestamp'].dt.strftime('%d.%m.%Y %H:%M:%S.%f') pow_itog['timestamp'] = pow_itog['timestamp'].dt.strftime('%d.%m.%Y %H:%M:%S.%f') temp_itog.to_csv(f'./data/asotr0{asotr_kit}_data_T.csv', index=False, sep=';', encoding='utf-8-sig', decimal='.') pow_itog.to_csv(f'./data/asotr0{asotr_kit}_data_P.csv', index=False, sep=';', encoding='utf-8-sig', decimal='.') with open('./data/cmd_human.csv', 'w') as file: for elem in cmd_list_itog: file.write(f'{elem}\n') def load_ps_cmd_decode(fname): with open(fname, 'r') as file: ps_cmd_decode = json.load(file) return ps_cmd_decode def get_ps_cmd_data(fname, ps_model): decode = load_ps_cmd_decode('./decode_ps_cmd.json') ps_data = pd.read_csv(fname, sep=';', names=['timestamp', 'cmd', 'cmd_answer']) out = {'cmd_write': [], 'cmd_read': [], 'cmd_read_desc': []} if ps_model == 'PSH-3630A': for elem in decode: if elem['ps_model'] == ps_model: decode_cmd_write = elem['cmd_write'] decode_cmd_read = elem['cmd_read'] decode_cmd_read_desc = elem['cmd_read_desc'] break cmd_write = [] cmd_read_desc = [] cmd_read = [] for row in ps_data.itertuples(): # check if current row is cmd_write: cmd_str = row.cmd.split(' ') timestamp_sec = asotr.get_utc_seconds(row.timestamp, '%d.%m.%Y %H:%M:%S.%f') if (len(cmd_str) > 1): elem = [timestamp_sec, row.timestamp, decode_cmd_write[cmd_str[0]], cmd_str[1]] cmd_write.append(elem) elif (len(cmd_str) == 1): elem = [timestamp_sec, row.timestamp, decode_cmd_read[cmd_str[0]], row.cmd_answer.strip()] cmd_read.append(elem) elem = [timestamp_sec, row.timestamp, decode_cmd_read_desc[cmd_str[0]], row.cmd_answer.strip()] cmd_read_desc.append(elem) out['cmd_write'] = cmd_write out['cmd_read'] = cmd_read out['cmd_read_desc'] = cmd_read_desc return out def convert_data_date_format(directory, fname_pattern, column_names, old_date_fmt = "%Y.%m.%d %H:%M:%S.%f", new_date_fmt = '%d.%m.%Y %H:%M:%S.%f'): files = os.listdir(directory) dateparse = lambda x: datetime.strptime(x, old_date_fmt) for file in files: fname = directory + file if 'cmd_data' in fname: print(fname) df = pd.read_csv(fname, sep=';', decimal='.', names=column_names, parse_dates=['timestamp'], date_parser=dateparse) df['timestamp'] = df['timestamp'].dt.strftime(new_date_fmt) df.to_csv(fname, header=False, index=False, sep=';', decimal='.', encoding='utf-8-sig') def plot_data(fname, title, ox_dtime_format, events={}, hlines={}, ch='ch1'): cmd_list, temp, power = get_cmd_data(fname) fig = plt.figure(figsize=(10, 8), dpi=150) ax1 = fig.add_subplot(2,1,1) ax2 = fig.add_subplot(2,1,2, sharex=ax1) ax1.plot(temp.timestamp, temp[ch]) if len(hlines) > 0: for label, value in hlines.items(): ax1.axhline(y = value, color='b', linestyle='--') ax1.text(temp.timestamp[len(temp.timestamp) - 1], value, label, va='bottom') if len(events) > 0: for time, event in events.items(): idx = asotr.find_best_time_idx(temp.timestamp, time, accuracy='seconds') ax1.axvline(x = temp.timestamp[idx], color='r', linestyle='-.') ax1.text(temp.timestamp[idx], max(temp[ch]) + 1, event, rotation=45, va='bottom') ax2.plot(power.timestamp, power[ch]) date_formatter = dates.DateFormatter(ox_dtime_format) ax1.xaxis.set_major_formatter(date_formatter) ax1.set_xlabel('время') ax1.set_ylabel('Температура, $^\circ$C') ax1.grid(True) ax2.xaxis.set_major_formatter(date_formatter) ax2.set_ylabel('Мощность, %') ax2.grid(True) fig.suptitle(title) plt.legend() plt.tight_layout() fig.savefig('plot_experim_data.png') plt.show()