asotr_flight/asotr.py
2025-04-23 19:05:48 +03:00

520 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
from datetime import datetime, timedelta
from scipy import signal
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
import numpy as np
import json
import pytz
from matplotlib import dates
fname_json_decode = '/home/danila/Danila/work/MVN/Soft/PID/python/decode_asotr_cmd.json'
def get_utc_seconds(timestamp_str, timestamp_format):
dt_obj = datetime.strptime(timestamp_str, timestamp_format)
utc_timezone = pytz.utc
dt_utc = dt_obj.replace(tzinfo=utc_timezone)
timestamp = int(dt_utc.timestamp())
return timestamp
def load_cmd_decode(fname):
with open(fname, 'r') as file:
data = json.load(file)
return data
def bitmask_to_num(data):
num = int(data)
res = []
d = bin(num)
d1 = d[::-1]
for i in range(num.bit_length()):
if d1[i] == '1':
res.append(i + 1)
return res
def flight_temperature_decode(cmd_string):
decode = load_cmd_decode(fname_json_decode)
asotr_kit = ''
temp = []
cmd = cmd_string.split(' ')
if len(cmd) == 8:
temp = [cmd[2], cmd[3], cmd[4], cmd[5], cmd[6], cmd[7]]
asotr_kit = cmd[0][1]
return (asotr_kit, temp)
def cmd_decode(cmd_string):
decode = load_cmd_decode(fname_json_decode)
asotr_kit = 0;
msg_decode = ''
out = ''
if 'OK' in cmd_string:
return out
cmd = cmd_string.split(' ')
if len(cmd) > 5:
return out
if cmd[1] == '':
return out
if '1' in cmd[0]:
asotr_kit = 1
elif '2' in cmd[0]:
asotr_kit = 2
msg_ = f'{cmd[1]} {cmd[2]}'
msg_decode = decode[msg_]
if (len(cmd) == 4):
value = ''
if cmd[2] == '32':
value1 = bitmask_to_num(cmd[3])
if (len(value1) == 0):
value = 'запрет всех'
else:
value = ', '.join(map(str, value1))
elif (cmd[2] == '20' or cmd[2] == '21' or cmd[2] == '22'
or cmd[2] == '23' or cmd[2] == '24' or cmd[3] == '25'):
if cmd[3] == '0':
value = 'ПИД-регулирование'
elif cmd[3] == '1':
value = 'релейное регулирование'
elif cmd[3] == '2':
value = 'постоянная мощность'
else:
value = cmd[3]
out = f'АСОТР{asotr_kit}: {msg_decode} ({value})'
else:
if msg_decode != '':
out = f'АСОТР{asotr_kit}: {msg_decode}'
return out
def cmd_flight_parse(asotr_data):
decode_list = []
temperature_list = []
for elem in asotr_data.itertuples():
elem_msg = cmd_decode(elem.cmd_answer.strip())
if elem_msg != '':
str_ = f'{elem.timestamp};{elem_msg}'
decode_list.append(str_)
asotr_kit, temp = flight_temperature_decode(elem.cmd_answer.strip())
if len(temp) > 0:
timestamp = get_utc_seconds(elem.timestamp, '%d.%m.%Y %H:%M:%S.%f')
str_ = f'{timestamp};{elem.timestamp};{asotr_kit};{temp[0]};{temp[1]};{temp[2]};{temp[3]};{temp[4]};{temp[5]}'
temperature_list.append(str_)
return (decode_list, temperature_list)
# accuracy: minutes, hours
def find_best_time_idx(time_arr, user_time, accuracy='minutes'):
tstamp = datetime.strptime(user_time, "%d.%m.%Y %H:%M:%S")
if accuracy == 'minutes':
delta = timedelta(minutes=30)
elif accuracy == 'hours':
delta = timedelta(hours=24)
elif accuracy == 'seconds':
delta = timedelta(seconds=30)
low = 0
high = len(time_arr) - 1
mid = len(time_arr) // 2
if mid not in time_arr.index:
# print(f'mid not in time_arr: {mid}')
return -1
a = time_arr[mid]
while ((a < (tstamp - delta)) or (a > (tstamp + delta))) and low < high:
if tstamp > a:
low = mid + 1
else:
high = mid - 1
mid = (low + high) // 2
# print(f'mid: (low + high)/2: {mid}')
if mid not in time_arr.index:
# print(f'mid not in time_arr: {mid}')
return -1
a = time_arr[mid]
if low > high:
# print(f'low > high: {mid}')
mid = high
if mid > 30:
for j in range(mid-30, len(time_arr)):
# print(f'{time_arr[j]} < {tstamp}: {j}')
if time_arr[j] >= tstamp:
# print(f'{time_arr[j]} > {tstamp}: {j}')
return j
else:
for j in range(0, len(time_arr)):
# print(f'{time_arr[j]} < {tstamp}: {j}')
if time_arr[j] >= tstamp:
# print(f'{time_arr[j]} > {tstamp}: {j}')
return j
return mid
# raise ValueError("data not found!")
def find_time_idx(data_list, keys_list, timestamp, accuracy):
out_dict = dict.fromkeys(keys_list, -1)
for i, elem in enumerate(data_list):
out_dict[keys_list[i]] = find_best_time_idx(elem['timestamp'], timestamp, accuracy)
return out_dict
def get_cmd_data(fname):
asotr_data = pd.read_csv(fname, delimiter=';')
cmd_list, temperature_list = cmd_flight_parse(asotr_data)
return (cmd_list, temperature_list)
def get_data(path, asotr_kit, start_date, end_date, time_accuracy):
ch_signs = ["temp", "temp_set", "pow"]
fname_temp = "asotr" + asotr_kit + "_data_T.csv"
fname_tempSet = "asotr" + asotr_kit + "_data_TSET.csv"
fname_pow = "asotr" + asotr_kit + "_data_P.csv"
fname = [path + fname_temp, path + fname_tempSet, path + fname_pow]
dateparse = lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S.%f")
data = [ pd.read_csv(fname[0], sep=";", parse_dates=["timestamp"], date_parser=dateparse),
pd.read_csv(fname[1], sep=";", parse_dates=["timestamp"], date_parser=dateparse),
pd.read_csv(fname[2], sep=";", parse_dates=["timestamp"], date_parser=dateparse),]
ch = [[], [], [], [], [], []]
data_dict = {
"temp": ch,
"temp_set": ch,
"pow": ch,
"time_temp": [],
"time_temp_set": [],
"time_pow": [],
}
idxb = dict.fromkeys(ch_signs, -1)
idxe = dict.fromkeys(ch_signs, -1)
idxb = find_time_idx(data, ch_signs, start_date, time_accuracy)
idxe = find_time_idx(data, ch_signs, end_date, time_accuracy)
data_dict["time_temp"] = data[0]["timestamp"][idxb["temp"] : idxe["temp"]]
data_dict["time_temp_set"] = data[1]["timestamp"][idxb["temp_set"] : idxe["temp_set"]]
data_dict["time_pow"] = data[2]["timestamp"][idxb["pow"] : idxe["pow"]]
col = ["ch1", "ch2", "ch3", "ch4", "ch5", "ch6"]
for j in range(len(ch_signs)):
data_dict[ch_signs[j]] = data[j][['ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6']][idxb[ch_signs[j]]:idxe[ch_signs[j]]]
raw_data = data
return (raw_data, data_dict)
# shift_flag - normalization of the offset of all samples of each period to the first period
# peaks: min, max
def find_periods(time, data, shift_flag, peaks='min'):
if peaks == 'min':
idx, _ = find_peaks(-data, distance=80)
else:
idx, _ = find_peaks(data, distance=80)
periods = []
periods_t = []
for i in range(1, len(idx)):
period_t = time.iloc[idx[i-1]:idx[i]]
period = data.iloc[idx[i-1]:idx[i]]
periods.append(period)
periods_t.append(period_t)
if shift_flag == True:
res = shift_data_(periods)
else:
res = periods
return (periods_t, res, idx)
# shift_flag - normalization of the offset of all samples of each period to the first period
def get_signal_profile_corr(time, data, pattern, shift_flag, peak_height):
period_cnts = len(pattern)
periods = []
periods_t = []
# find correlation between signal and pattern
correlation = signal.correlate(data, pattern, mode='same', method='fft')
normalized_correlation = correlation / max(abs(correlation))
# find correlation peaks
# peak_height = 0.7
peaks_indices = signal.find_peaks(normalized_correlation, height=peak_height)[0]
# separate and collect each finded period
for peak_idx in peaks_indices:
start_index = peak_idx - period_cnts // 2 # peak center
end_index = start_index + period_cnts
if 0 <= start_index < len(data) and 0 <= end_index < len(data):
period = data.iloc[start_index:end_index]
period_t = time.iloc[start_index:end_index]
periods.append(period)
periods_t.append(period_t)
if shift_flag == True:
res = shift_data_(periods)
else:
res = periods
return (periods_t, res)
def shift_data_(data):
first = [list_.iloc[0] for list_ in data]
delta = []
for i in range(1, len(first)):
delta.append(first[i] - first[0])
res = []
res.append(data[0])
for idx, elem in enumerate(data):
if idx > 0:
corr = elem - delta[idx-1]
res.append(corr)
return res
def get_peak_temp_forecast(cur_time, num_periods):
peaks_forecast = []
period = timedelta(hours=1, minutes=33, seconds=0, milliseconds=150)
time = cur_time
for i in range(num_periods):
time = time + period
peaks_forecast.append(time)
return peaks_forecast
def plot_signal_profile(time, data, pattern_t, pattern, method, shift_flag, peak_height=0.8):
if method == 'corr':
periods_t, periods = get_signal_profile_corr(time, data, pattern, shift_flag, peak_height)
print(f'Найдено {len(periods)} периодов.')
elif method == 'peaks':
periods_t, periods, peaks = find_periods(time, data, shift_flag, peaks='min')
print(f'Найдено {len(periods)} периодов.')
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(8, 6))
for idx, period in enumerate(periods):
ax1.plot(np.arange(len(period)), period)
ax1.grid(True)
ax2.plot(time, data)
ax2.grid(True)
plt.grid(True)
plt.show()
def insert_temp_data_from_flight_cmd(fname_cmd_temp):
dir_asotr = '/home/danila/Danila/work/MVN/Soft/asotr_csv/data/'
fname_asotr = [f'{dir_asotr}asotr01_data_T.csv', f'{dir_asotr}asotr02_data_T.csv']
df_cmd = pd.read_csv(fname_cmd_temp, sep=';')
df_asotr = []
df_cmd_temp = []
for i, fname in enumerate(fname_asotr):
df = pd.read_csv(fname, sep=';')
df_asotr.append(df)
df = df_cmd[df_cmd['asotr_kit'] == i + 1]
df = df.drop(['asotr_kit'], axis=1)
df_cmd_temp.append(df)
df_asotr_ = [
pd.concat(
[df_asotr[0], df_cmd_temp[0]], ignore_index=True).sort_values(by='timestamp_sec'),
pd.concat(
[df_asotr[1], df_cmd_temp[1]], ignore_index=True).sort_values(by='timestamp_sec')
]
return df_asotr_
def subtract_data(data1, data2):
init_shift = data2[0] - data1[0]
out = data2 - data1 - init_shift
return pd.Series(out)
# for transmit data: cmd_list, temp, power = get_cmd_data(fname)
def cut_data(data, time_begin, duration_sec, accuracy='seconds'):
time_format = "%d.%m.%Y %H:%M:%S";
delta = timedelta(seconds=duration_sec)
tstamp_begin = datetime.strptime(time_begin, time_format)
tstamp_end = tstamp_begin + delta
time_end = tstamp_end.strftime(time_format)
idx_begin = find_best_time_idx(data['timestamp'], time_begin, accuracy)
idx_end = find_best_time_idx(data['timestamp'], time_end, accuracy)
out = data.loc[idx_begin : idx_end]
return out
def cut_norm_data(data, time_begin, duration_sec, channel='ch1',
interp={'method': 'cubic', 'order': 2}, accuracy='seconds'):
data_period = cut_data(data, time_begin, duration_sec, accuracy)
temp_norm = data_period[channel].values - data_period[channel].iloc[0]
time_l = list(data_period['timestamp'])
temp_l = list(temp_norm)
orig_data = pd.DataFrame({ 'timestamp': time_l, 'temp': temp_l })
interp_data = orig_data.set_index('timestamp')
interp_data = interp_data.resample('S').mean().interpolate(method=interp["method"],
order=interp["order"])
interp_data = interp_data.reset_index(names=['timestamp'])
return orig_data, interp_data
def get_step_response_diff(data, thermocycle_info, channel='ch1',
interp={'method': 'cubic', 'order': 2}, accuracy='seconds', cut_step_resp={}):
date = thermocycle_info['date']
time_begin_orig = date + ' ' + thermocycle_info['time_begin'][0]
time_begin_step = date + ' ' + thermocycle_info['time_begin'][1]
duration_sec = thermocycle_info['duration_sec']
_, orig_interp_cycle = cut_norm_data(data, time_begin_orig, duration_sec, channel,
interp, accuracy)
_, step_interp_cycle = cut_norm_data(data, time_begin_step, duration_sec, channel,
interp, accuracy)
max_ = min(len(orig_interp_cycle), len(step_interp_cycle))
subtract_step = subtract_data(
orig_interp_cycle['temp'].iloc[0:max_].values,
step_interp_cycle['temp'].iloc[0:max_].values)
step_time = list(step_interp_cycle['timestamp'].iloc[0:max_])
step_temp = list(subtract_step)
step_response = pd.DataFrame({'timestamp': step_time, 'temp': step_temp})
if len(cut_step_resp) > 0:
time_begin = date + ' ' + cut_step_resp['time_step_begin']
step_response = cut_data(step_response, time_begin,
cut_step_resp['step_duration'], accuracy='seconds')
first = step_response['temp'].iloc[0]
step_response['temp'] = step_response['temp'] - first
return (step_response, orig_interp_cycle, step_interp_cycle)
def plot_step_response_in_thermocycle(data_info, thermocycle_info, interp,
cut_step_resp, plot_info):
title = f'{plot_info["title"]}, канал {data_info["channel"][2]} АСОТР КДИ СПИН-X, период опроса {data_info["period"]} ({thermocycle_info["date"]})'
step_resp, orig_interp_cycle, step_interp_cycle = get_step_response_diff(
data_info['data'], thermocycle_info, channel=data_info['channel'],
interp=interp, accuracy=data_info['find_accuracy'])
fig = plt.figure(figsize=(8, 6), dpi=200)
fig.suptitle(title, fontsize=plot_info['font'])
ax1 = fig.add_subplot(2,1,1)
ax2 = fig.add_subplot(2,1,2)
ax1.plot(step_resp['timestamp'], step_resp['temp'],
label='реакция на ступенчатое воздействие')
step_begin = thermocycle_info['date'] + ' ' + cut_step_resp['time_step_begin']
idx = find_best_time_idx(step_interp_cycle.timestamp, step_begin,
accuracy=data_info['find_accuracy'])
ax1.axvline(x = step_interp_cycle.timestamp[idx], color='r', linestyle='-.',
label='момент подачи ступенчатого воздействия')
date_formatter = dates.DateFormatter(plot_info['ox_dtime_format'])
ax1.xaxis.set_major_formatter(date_formatter)
ax1.legend(loc=plot_info["legend_pos"][0], fontsize=plot_info['font'])
ax1.grid(True)
ax1.tick_params(axis='both', width=1, labelsize=plot_info['font'])
ax1.set_ylabel(r'$\Delta$T, $^\circ$C', fontsize=plot_info['font'])
ax2.axvline(x = step_interp_cycle.timestamp[idx], color='r', linestyle='-.',
label='момент подачи ступенчатого воздействия')
ax2.plot(orig_interp_cycle['timestamp'], orig_interp_cycle['temp'], '--',
label='термоцикл')
ax2.plot(step_interp_cycle['timestamp'], step_interp_cycle['temp'],
label='термоцикл с реакцией на ступенчатое воздействие')
ax2.xaxis.set_major_formatter(date_formatter)
ax2.legend(loc=plot_info["legend_pos"][1], fontsize=plot_info['font'],
fancybox=True, framealpha=0.4)
ax2.grid(True)
ax2.tick_params(axis='both', width=1, labelsize=plot_info['font'])
ax2.set_xlabel('время', fontsize=plot_info['font'])
ax2.set_ylabel(r'$T_{norm}$, $^\circ$C', fontsize=plot_info['font'])
fig.suptitle(title, fontsize=plot_info['font'])
plt.tight_layout()
fig.savefig(plot_info["name_fig"])
plt.show()
def plot_imp_response(data, data_info, plot_info, thermocycle_info):
title = f'{plot_info["title"]}, канал {data_info["channel"][2]} АСОТР КДИ СПИН-X, период опроса {data_info["period"]} ({thermocycle_info["date"]})'
fig = plt.figure(figsize=(11, 6), dpi=200)
fig.suptitle(title, fontsize=plot_info['font'])
ax1 = fig.add_subplot(1,1,1)
date_formatter = dates.DateFormatter(plot_info['ox_dtime_format'])
ax1.xaxis.set_major_formatter(date_formatter)
ax1.plot(data['timestamp'], data['temp'], '.', label='реакц. на импульсное воздействие')
ax1.xaxis.set_major_formatter(date_formatter)
ax1.legend(loc=plot_info["legend_pos"][0], fontsize=plot_info['font'],
fancybox=True, framealpha=0.4)
ax1.grid(True)
ax1.tick_params(axis='both', width=1, labelsize=plot_info['font'])
ax1.set_xlabel('время', fontsize=plot_info['font'])
ax1.set_ylabel(r'$t_{norm}$, $^\circ$C', fontsize=plot_info['font'])
fig.suptitle(title, fontsize=plot_info['font'])
plt.tight_layout()
fig.savefig(plot_info["name_fig"])
plt.show()
#timestamp as string format: dd:mm:YYYY HH:MM:SS
def insert_data_cyclo(base_time_str, fname):
path = '/home/danila/Danila/work/MVN/flight/experiments/'
time_format = "%d.%m.%Y %H:%M:%S"
cyclogram_file = path + fname
df = pd.read_excel(cyclogram_file)
base_time = pd.to_datetime(base_time_str)
df['timestamp'] = df.iloc[:, 0].apply(lambda x: base_time + timedelta(seconds=x))
df.iloc[:, 0] = df['timestamp'].dt.strftime(time_format)
df = df.drop(['timestamp'], axis=1)
fname = cyclogram_file.replace('.xls', '.csv')
df.to_csv(fname, index=False, sep=';', encoding='utf-8-sig')