Files
mvn_flight/bin/asotr_kdi.py

208 lines
6.9 KiB
Python

import sys
from importlib import reload
sys.path.append('./')
import asotr
reload(asotr)
import matplotlib.pyplot as plt
from matplotlib import dates
import pandas as pd
from datetime import datetime, timedelta
import os
import json
def cmd_parse(asotr_data):
decode_list = []
for elem in asotr_data.itertuples():
elem_msg = asotr.cmd_decode(elem.cmd.strip())
if elem_msg != '':
str_ = f'{elem.timestamp};{elem_msg}'
decode_list.append(str_)
return decode_list
def pow_temp_parse(elem, tstamp_format='%d.%m.%Y %H:%M:%S.%f'):
timestamp = pd.to_datetime(elem.timestamp, format=tstamp_format)
if "*1" not in elem.data:
return ('none', [])
l = []
data = elem.data[2:]
l = []
l.append(asotr.get_utc_seconds(elem.timestamp, tstamp_format))
l.append(timestamp)
if "status 3" in elem.cmd:
l.extend(list(map(float, data.split())))
return ('t', l)
elif "status 4" in elem.cmd:
l.extend(list(map(int, data.split())))
return ('p', l)
else:
return ('none', [])
def get_cmd_data(fname, tstamp_format='%d.%m.%Y %H:%M:%S.%f'):
t = []
p = []
asotr_data = pd.read_csv(fname, delimiter=';', names=['timestamp', 'cmd', 'data'])
cmd_list = cmd_parse(asotr_data)
for elem in asotr_data.itertuples():
data = pow_temp_parse(elem, tstamp_format)
if data[0] == 't':
t.append(data[1])
elif data[0] == 'p':
p.append(data[1])
temp = pd.DataFrame(t,
columns=['timestamp_sec', 'timestamp', 'ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6'])
power = pd.DataFrame(p,
columns=['timestamp_sec', 'timestamp', 'ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6'])
return (cmd_list, temp, power)
def decode_collect_data(uart_server_log_path, asotr_kit):
files = os.listdir(uart_server_log_path)
files1 = [file for file in files if 'cmd_data' in file]
files1 = sorted(files1)
cmd_list_itog = []
temp_itog = []
pow_itog = []
cmd_list, temp, power = get_cmd_data(uart_server_log_path + files1[0])
cmd_list_itog.extend(cmd_list)
temp_itog = temp
pow_itog = power
print(files1[0])
for i, file in enumerate(files1):
if i > 0:
print(file)
cmd_list, temp, power = get_cmd_data(uart_server_log_path + file)
cmd_list_itog.extend(cmd_list)
temp_itog = pd.concat([temp_itog, temp], ignore_index=True)
pow_itog = pd.concat([pow_itog, power], ignore_index=True)
temp_itog = temp_itog.sort_values(by='timestamp_sec')
pow_itog = pow_itog.sort_values(by='timestamp_sec')
temp_itog['timestamp'] = temp_itog['timestamp'].dt.strftime('%d.%m.%Y %H:%M:%S.%f')
pow_itog['timestamp'] = pow_itog['timestamp'].dt.strftime('%d.%m.%Y %H:%M:%S.%f')
temp_itog.to_csv(f'./data/asotr0{asotr_kit}_data_T.csv', index=False, sep=';',
encoding='utf-8-sig', decimal='.')
pow_itog.to_csv(f'./data/asotr0{asotr_kit}_data_P.csv', index=False, sep=';',
encoding='utf-8-sig', decimal='.')
with open('./data/cmd_human.csv', 'w') as file:
for elem in cmd_list_itog:
file.write(f'{elem}\n')
def load_ps_cmd_decode(fname):
with open(fname, 'r') as file:
ps_cmd_decode = json.load(file)
return ps_cmd_decode
def get_ps_cmd_data(fname, ps_model):
decode = load_ps_cmd_decode('./decode_ps_cmd.json')
ps_data = pd.read_csv(fname, sep=';', names=['timestamp', 'cmd', 'cmd_answer'])
out = {'cmd_write': [], 'cmd_read': [], 'cmd_read_desc': []}
if ps_model == 'PSH-3630A':
for elem in decode:
if elem['ps_model'] == ps_model:
decode_cmd_write = elem['cmd_write']
decode_cmd_read = elem['cmd_read']
decode_cmd_read_desc = elem['cmd_read_desc']
break
cmd_write = []
cmd_read_desc = []
cmd_read = []
for row in ps_data.itertuples():
# check if current row is cmd_write:
cmd_str = row.cmd.split(' ')
timestamp_sec = asotr.get_utc_seconds(row.timestamp, '%d.%m.%Y %H:%M:%S.%f')
if (len(cmd_str) > 1):
elem = [timestamp_sec, row.timestamp,
decode_cmd_write[cmd_str[0]], cmd_str[1]]
cmd_write.append(elem)
elif (len(cmd_str) == 1):
elem = [timestamp_sec, row.timestamp,
decode_cmd_read[cmd_str[0]], row.cmd_answer.strip()]
cmd_read.append(elem)
elem = [timestamp_sec, row.timestamp,
decode_cmd_read_desc[cmd_str[0]], row.cmd_answer.strip()]
cmd_read_desc.append(elem)
out['cmd_write'] = cmd_write
out['cmd_read'] = cmd_read
out['cmd_read_desc'] = cmd_read_desc
return out
def convert_data_date_format(directory, fname_pattern, column_names,
old_date_fmt = "%Y.%m.%d %H:%M:%S.%f",
new_date_fmt = '%d.%m.%Y %H:%M:%S.%f'):
files = os.listdir(directory)
dateparse = lambda x: datetime.strptime(x, old_date_fmt)
for file in files:
fname = directory + file
if 'cmd_data' in fname:
print(fname)
df = pd.read_csv(fname, sep=';', decimal='.',
names=column_names,
parse_dates=['timestamp'], date_parser=dateparse)
df['timestamp'] = df['timestamp'].dt.strftime(new_date_fmt)
df.to_csv(fname, header=False, index=False, sep=';',
decimal='.', encoding='utf-8-sig')
def plot_data(fname, title, ox_dtime_format, events={}, hlines={}, ch='ch1'):
cmd_list, temp, power = get_cmd_data(fname)
fig = plt.figure(figsize=(10, 8), dpi=150)
ax1 = fig.add_subplot(2,1,1)
ax2 = fig.add_subplot(2,1,2, sharex=ax1)
ax1.plot(temp.timestamp, temp[ch])
if len(hlines) > 0:
for label, value in hlines.items():
ax1.axhline(y = value, color='b', linestyle='--')
ax1.text(temp.timestamp[len(temp.timestamp) - 1], value, label, va='bottom')
if len(events) > 0:
for time, event in events.items():
idx = asotr.find_best_time_idx(temp.timestamp, time, accuracy='seconds')
ax1.axvline(x = temp.timestamp[idx], color='r', linestyle='-.')
ax1.text(temp.timestamp[idx], max(temp[ch]) + 1, event, rotation=45, va='bottom')
ax2.plot(power.timestamp, power[ch])
date_formatter = dates.DateFormatter(ox_dtime_format)
ax1.xaxis.set_major_formatter(date_formatter)
ax1.set_xlabel('время')
ax1.set_ylabel('Температура, $^\circ$C')
ax1.grid(True)
ax2.xaxis.set_major_formatter(date_formatter)
ax2.set_ylabel('Мощность, %')
ax2.grid(True)
fig.suptitle(title)
plt.legend()
plt.tight_layout()
fig.savefig('plot_experim_data.png')
plt.show()