208 lines
6.9 KiB
Python
208 lines
6.9 KiB
Python
import sys
|
|
from importlib import reload
|
|
sys.path.append('./')
|
|
import asotr
|
|
reload(asotr)
|
|
import matplotlib.pyplot as plt
|
|
from matplotlib import dates
|
|
import pandas as pd
|
|
from datetime import datetime, timedelta
|
|
import os
|
|
import json
|
|
|
|
def cmd_parse(asotr_data):
|
|
decode_list = []
|
|
|
|
for elem in asotr_data.itertuples():
|
|
elem_msg = asotr.cmd_decode(elem.cmd.strip())
|
|
if elem_msg != '':
|
|
str_ = f'{elem.timestamp};{elem_msg}'
|
|
decode_list.append(str_)
|
|
|
|
return decode_list
|
|
|
|
def pow_temp_parse(elem, tstamp_format='%d.%m.%Y %H:%M:%S.%f'):
|
|
timestamp = pd.to_datetime(elem.timestamp, format=tstamp_format)
|
|
if "*1" not in elem.data:
|
|
return ('none', [])
|
|
l = []
|
|
|
|
data = elem.data[2:]
|
|
l = []
|
|
l.append(asotr.get_utc_seconds(elem.timestamp, tstamp_format))
|
|
l.append(timestamp)
|
|
|
|
if "status 3" in elem.cmd:
|
|
l.extend(list(map(float, data.split())))
|
|
return ('t', l)
|
|
elif "status 4" in elem.cmd:
|
|
l.extend(list(map(int, data.split())))
|
|
return ('p', l)
|
|
else:
|
|
return ('none', [])
|
|
|
|
def get_cmd_data(fname, tstamp_format='%d.%m.%Y %H:%M:%S.%f'):
|
|
t = []
|
|
p = []
|
|
asotr_data = pd.read_csv(fname, delimiter=';', names=['timestamp', 'cmd', 'data'])
|
|
|
|
cmd_list = cmd_parse(asotr_data)
|
|
|
|
for elem in asotr_data.itertuples():
|
|
data = pow_temp_parse(elem, tstamp_format)
|
|
if data[0] == 't':
|
|
t.append(data[1])
|
|
elif data[0] == 'p':
|
|
p.append(data[1])
|
|
temp = pd.DataFrame(t,
|
|
columns=['timestamp_sec', 'timestamp', 'ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6'])
|
|
power = pd.DataFrame(p,
|
|
columns=['timestamp_sec', 'timestamp', 'ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6'])
|
|
return (cmd_list, temp, power)
|
|
|
|
|
|
def decode_collect_data(uart_server_log_path, asotr_kit):
|
|
files = os.listdir(uart_server_log_path)
|
|
files1 = [file for file in files if 'cmd_data' in file]
|
|
files1 = sorted(files1)
|
|
|
|
cmd_list_itog = []
|
|
temp_itog = []
|
|
pow_itog = []
|
|
|
|
cmd_list, temp, power = get_cmd_data(uart_server_log_path + files1[0])
|
|
cmd_list_itog.extend(cmd_list)
|
|
temp_itog = temp
|
|
pow_itog = power
|
|
print(files1[0])
|
|
|
|
for i, file in enumerate(files1):
|
|
if i > 0:
|
|
print(file)
|
|
cmd_list, temp, power = get_cmd_data(uart_server_log_path + file)
|
|
cmd_list_itog.extend(cmd_list)
|
|
temp_itog = pd.concat([temp_itog, temp], ignore_index=True)
|
|
pow_itog = pd.concat([pow_itog, power], ignore_index=True)
|
|
|
|
temp_itog = temp_itog.sort_values(by='timestamp_sec')
|
|
pow_itog = pow_itog.sort_values(by='timestamp_sec')
|
|
|
|
temp_itog['timestamp'] = temp_itog['timestamp'].dt.strftime('%d.%m.%Y %H:%M:%S.%f')
|
|
pow_itog['timestamp'] = pow_itog['timestamp'].dt.strftime('%d.%m.%Y %H:%M:%S.%f')
|
|
|
|
temp_itog.to_csv(f'./data/asotr0{asotr_kit}_data_T.csv', index=False, sep=';',
|
|
encoding='utf-8-sig', decimal='.')
|
|
pow_itog.to_csv(f'./data/asotr0{asotr_kit}_data_P.csv', index=False, sep=';',
|
|
encoding='utf-8-sig', decimal='.')
|
|
|
|
with open('./data/cmd_human.csv', 'w') as file:
|
|
for elem in cmd_list_itog:
|
|
file.write(f'{elem}\n')
|
|
|
|
|
|
def load_ps_cmd_decode(fname):
|
|
with open(fname, 'r') as file:
|
|
ps_cmd_decode = json.load(file)
|
|
return ps_cmd_decode
|
|
|
|
def get_ps_cmd_data(fname, ps_model):
|
|
decode = load_ps_cmd_decode('./decode_ps_cmd.json')
|
|
ps_data = pd.read_csv(fname, sep=';', names=['timestamp', 'cmd', 'cmd_answer'])
|
|
out = {'cmd_write': [], 'cmd_read': [], 'cmd_read_desc': []}
|
|
|
|
if ps_model == 'PSH-3630A':
|
|
for elem in decode:
|
|
if elem['ps_model'] == ps_model:
|
|
decode_cmd_write = elem['cmd_write']
|
|
decode_cmd_read = elem['cmd_read']
|
|
decode_cmd_read_desc = elem['cmd_read_desc']
|
|
break
|
|
|
|
cmd_write = []
|
|
cmd_read_desc = []
|
|
cmd_read = []
|
|
for row in ps_data.itertuples():
|
|
# check if current row is cmd_write:
|
|
cmd_str = row.cmd.split(' ')
|
|
|
|
timestamp_sec = asotr.get_utc_seconds(row.timestamp, '%d.%m.%Y %H:%M:%S.%f')
|
|
if (len(cmd_str) > 1):
|
|
elem = [timestamp_sec, row.timestamp,
|
|
decode_cmd_write[cmd_str[0]], cmd_str[1]]
|
|
cmd_write.append(elem)
|
|
elif (len(cmd_str) == 1):
|
|
elem = [timestamp_sec, row.timestamp,
|
|
decode_cmd_read[cmd_str[0]], row.cmd_answer.strip()]
|
|
cmd_read.append(elem)
|
|
elem = [timestamp_sec, row.timestamp,
|
|
decode_cmd_read_desc[cmd_str[0]], row.cmd_answer.strip()]
|
|
cmd_read_desc.append(elem)
|
|
|
|
out['cmd_write'] = cmd_write
|
|
out['cmd_read'] = cmd_read
|
|
out['cmd_read_desc'] = cmd_read_desc
|
|
|
|
return out
|
|
|
|
def convert_data_date_format(directory, fname_pattern, column_names,
|
|
old_date_fmt = "%Y.%m.%d %H:%M:%S.%f",
|
|
new_date_fmt = '%d.%m.%Y %H:%M:%S.%f'):
|
|
|
|
files = os.listdir(directory)
|
|
dateparse = lambda x: datetime.strptime(x, old_date_fmt)
|
|
|
|
for file in files:
|
|
fname = directory + file
|
|
if 'cmd_data' in fname:
|
|
print(fname)
|
|
df = pd.read_csv(fname, sep=';', decimal='.',
|
|
names=column_names,
|
|
parse_dates=['timestamp'], date_parser=dateparse)
|
|
|
|
df['timestamp'] = df['timestamp'].dt.strftime(new_date_fmt)
|
|
|
|
df.to_csv(fname, header=False, index=False, sep=';',
|
|
decimal='.', encoding='utf-8-sig')
|
|
|
|
|
|
def plot_data(fname, title, ox_dtime_format, events={}, hlines={}, ch='ch1'):
|
|
cmd_list, temp, power = get_cmd_data(fname)
|
|
|
|
fig = plt.figure(figsize=(10, 8), dpi=150)
|
|
ax1 = fig.add_subplot(2,1,1)
|
|
ax2 = fig.add_subplot(2,1,2, sharex=ax1)
|
|
|
|
ax1.plot(temp.timestamp, temp[ch])
|
|
|
|
if len(hlines) > 0:
|
|
for label, value in hlines.items():
|
|
ax1.axhline(y = value, color='b', linestyle='--')
|
|
ax1.text(temp.timestamp[len(temp.timestamp) - 1], value, label, va='bottom')
|
|
|
|
if len(events) > 0:
|
|
for time, event in events.items():
|
|
idx = asotr.find_best_time_idx(temp.timestamp, time, accuracy='seconds')
|
|
ax1.axvline(x = temp.timestamp[idx], color='r', linestyle='-.')
|
|
ax1.text(temp.timestamp[idx], max(temp[ch]) + 1, event, rotation=45, va='bottom')
|
|
|
|
ax2.plot(power.timestamp, power[ch])
|
|
|
|
date_formatter = dates.DateFormatter(ox_dtime_format)
|
|
|
|
ax1.xaxis.set_major_formatter(date_formatter)
|
|
ax1.set_xlabel('время')
|
|
ax1.set_ylabel('Температура, $^\circ$C')
|
|
ax1.grid(True)
|
|
|
|
ax2.xaxis.set_major_formatter(date_formatter)
|
|
ax2.set_ylabel('Мощность, %')
|
|
ax2.grid(True)
|
|
|
|
fig.suptitle(title)
|
|
plt.legend()
|
|
plt.tight_layout()
|
|
fig.savefig('plot_experim_data.png')
|
|
plt.show()
|
|
|
|
|