import pandas as pd from datetime import datetime, timedelta from scipy import signal from scipy.signal import find_peaks import matplotlib.pyplot as plt import numpy as np import json import pytz from matplotlib import dates """ module asotr parses csv data from flight device ASOTR MVN Danila Gamkov [] danila_gamkov@cosmos.ru # License: IKI RAN """ __author__ = 'Danila Gamkov' fname_json_decode = './decode_asotr_cmd.json' def get_utc_seconds(timestamp_str, timestamp_format): dt_obj = datetime.strptime(timestamp_str, timestamp_format) utc_timezone = pytz.utc dt_utc = dt_obj.replace(tzinfo=utc_timezone) timestamp = int(dt_utc.timestamp()) return timestamp def load_cmd_decode(fname): with open(fname, 'r') as file: data = json.load(file) return data def bitmask_to_num(data): num = int(data) res = [] d = bin(num) d1 = d[::-1] for i in range(num.bit_length()): if d1[i] == '1': res.append(i + 1) return res def flight_temperature_decode(cmd_string): decode = load_cmd_decode(fname_json_decode) asotr_kit = '' temp = [] cmd = cmd_string.split(' ') if len(cmd) == 8: temp = [cmd[2], cmd[3], cmd[4], cmd[5], cmd[6], cmd[7]] asotr_kit = cmd[0][1] return (asotr_kit, temp) def cmd_decode(cmd_string): decode = load_cmd_decode(fname_json_decode) asotr_kit = 0; msg_decode = '' out = '' if 'OK' in cmd_string: return out cmd = cmd_string.split(' ') if len(cmd) > 5: return out if cmd[1] == '': return out if '1' in cmd[0]: asotr_kit = 1 elif '2' in cmd[0]: asotr_kit = 2 msg_ = f'{cmd[1]} {cmd[2]}' msg_decode = decode[msg_] if (len(cmd) == 4): value = '' if cmd[2] == '32': value1 = bitmask_to_num(cmd[3]) if (len(value1) == 0): value = 'запрет всех' else: value = ', '.join(map(str, value1)) elif (cmd[2] == '20' or cmd[2] == '21' or cmd[2] == '22' or cmd[2] == '23' or cmd[2] == '24' or cmd[3] == '25'): if cmd[3] == '0': value = 'ПИД-регулирование' elif cmd[3] == '1': value = 'релейное регулирование' elif cmd[3] == '2': value = 'постоянная мощность' else: value = cmd[3] out = f'АСОТР{asotr_kit}: {msg_decode} ({value})' else: if msg_decode != '': out = f'АСОТР{asotr_kit}: {msg_decode}' return out def cmd_flight_parse(asotr_data): decode_list = [] temperature_list = [] for elem in asotr_data.itertuples(): elem_msg = cmd_decode(elem.cmd_answer.strip()) if elem_msg != '': str_ = f'{elem.timestamp};{elem_msg}' decode_list.append(str_) asotr_kit, temp = flight_temperature_decode(elem.cmd_answer.strip()) if len(temp) > 0: timestamp = get_utc_seconds(elem.timestamp, '%d.%m.%Y %H:%M:%S.%f') str_ = f'{timestamp};{elem.timestamp};{asotr_kit};{temp[0]};{temp[1]};{temp[2]};{temp[3]};{temp[4]};{temp[5]}' temperature_list.append(str_) return (decode_list, temperature_list) # accuracy: 'seconds', 'minutes', 'hours' def find_best_time_idx(time_arr, user_time, accuracy='minutes') -> int: """ finds best time index according specified user_time in time_arr and with specified accuracy Args: time_arr(pandas.core.series.Series[datetime]): timestamp array user_time(string): time value, which index required find in time_arr (string in the following format: d.m.Y HH:MM:SS) accuracy(string): specify accuracy which time index required find in time_arr(type 'seconds', 'minutes' or 'hours') Returns: int: index that has been found in time_arr (or -1 if index has not been found) """ tstamp = datetime.strptime(user_time, "%d.%m.%Y %H:%M:%S") if accuracy == 'minutes': delta = timedelta(minutes=30) elif accuracy == 'hours': delta = timedelta(hours=24) elif accuracy == 'seconds': delta = timedelta(seconds=30) low = 0 high = len(time_arr) - 1 mid = len(time_arr) // 2 if mid not in time_arr.index: # print(f'mid not in time_arr: {mid}') return -1 a = time_arr[mid] while ((a < (tstamp - delta)) or (a > (tstamp + delta))) and low < high: if tstamp > a: low = mid + 1 else: high = mid - 1 mid = (low + high) // 2 # print(f'mid: (low + high)/2: {mid}') if mid not in time_arr.index: # print(f'mid not in time_arr: {mid}') return -1 a = time_arr[mid] if low > high: # print(f'low > high: {mid}') mid = high if mid > 30: for j in range(mid-30, len(time_arr)): # print(f'{time_arr[j]} < {tstamp}: {j}') if time_arr[j] >= tstamp: # print(f'{time_arr[j]} > {tstamp}: {j}') return j else: for j in range(0, len(time_arr)): # print(f'{time_arr[j]} < {tstamp}: {j}') if time_arr[j] >= tstamp: # print(f'{time_arr[j]} > {tstamp}: {j}') return j return mid def find_time_idx(data_list, keys_list, timestamp, accuracy): out_dict = dict.fromkeys(keys_list, -1) for i, elem in enumerate(data_list): idx = find_best_time_idx(elem['timestamp'], timestamp, accuracy) if idx != -1: out_dict[keys_list[i]] = idx else: raise TimeIndexNotFound(f'index corresponding to time {timestamp} in times array not found!') return out_dict def get_cmd_data(fname): asotr_data = pd.read_csv(fname, delimiter=';') cmd_list, temperature_list = cmd_flight_parse(asotr_data) return (cmd_list, temperature_list) def get_data(path, asotr_kit, start_date, end_date, time_accuracy): ch_signs = ["temp", "temp_set", "pow"] fname_temp = "asotr" + asotr_kit + "_data_T.csv" fname_tempSet = "asotr" + asotr_kit + "_data_TSET.csv" fname_pow = "asotr" + asotr_kit + "_data_P.csv" fname = [path + fname_temp, path + fname_tempSet, path + fname_pow] dateparse = lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S.%f") try: data = [ pd.read_csv(fname[0], sep=";", parse_dates=["timestamp"], date_parser=dateparse), pd.read_csv(fname[1], sep=";", parse_dates=["timestamp"], date_parser=dateparse), pd.read_csv(fname[2], sep=";", parse_dates=["timestamp"], date_parser=dateparse),] except FileNotFoundError: print(f'Error opening file: one (or all) file not found in directory: \n{fname}') return except pd.errors.EmptyDataError: print(f'Error opening file: one (or all) file is empty or have incorrect format. \nLook at the files: {fname}') return except pd.errors.ParserError: print(f'Error parsing file: file have incorrect structure. \nLook at the files: {fname}') return except Exception as e: print(f'Error parsing file: {e}. \nLook at the files: {fname}') return ch = [[], [], [], [], [], []] data_dict = { "temp": ch, "temp_set": ch, "pow": ch, "time_temp": [], "time_temp_set": [], "time_pow": [], } idxb = dict.fromkeys(ch_signs, -1) idxe = dict.fromkeys(ch_signs, -1) try: idxb = find_time_idx(data, ch_signs, start_date, time_accuracy) idxe = find_time_idx(data, ch_signs, end_date, time_accuracy) except Exception as e: print(e) return data_dict["time_temp"] = data[0]["timestamp"][idxb["temp"] : idxe["temp"]] data_dict["time_temp_set"] = data[1]["timestamp"][idxb["temp_set"] : idxe["temp_set"]] data_dict["time_pow"] = data[2]["timestamp"][idxb["pow"] : idxe["pow"]] col = ["ch1", "ch2", "ch3", "ch4", "ch5", "ch6"] for j in range(len(ch_signs)): data_dict[ch_signs[j]] = data[j][['ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6']][idxb[ch_signs[j]]:idxe[ch_signs[j]]] raw_data = data return (raw_data, data_dict) # shift_flag - normalization of the offset of all samples of each period to the first period # peaks: min, max def find_periods(time, data, shift_flag, peaks='min'): if peaks == 'min': idx, _ = find_peaks(-data, distance=80) else: idx, _ = find_peaks(data, distance=80) periods = [] periods_t = [] for i in range(1, len(idx)): period_t = time.iloc[idx[i-1]:idx[i]] period = data.iloc[idx[i-1]:idx[i]] periods.append(period) periods_t.append(period_t) if shift_flag == True: res = shift_data_(periods) else: res = periods return (periods_t, res, idx) # shift_flag - normalization of the offset of all samples of each period to the first period def get_signal_profile_corr(time, data, pattern, shift_flag, peak_height): period_cnts = len(pattern) periods = [] periods_t = [] # find correlation between signal and pattern correlation = signal.correlate(data, pattern, mode='same', method='fft') normalized_correlation = correlation / max(abs(correlation)) # find correlation peaks # peak_height = 0.7 peaks_indices = signal.find_peaks(normalized_correlation, height=peak_height)[0] # separate and collect each finded period for peak_idx in peaks_indices: start_index = peak_idx - period_cnts // 2 # peak center end_index = start_index + period_cnts if 0 <= start_index < len(data) and 0 <= end_index < len(data): period = data.iloc[start_index:end_index] period_t = time.iloc[start_index:end_index] periods.append(period) periods_t.append(period_t) if shift_flag == True: res = shift_data_(periods) else: res = periods return (periods_t, res) def shift_data_(data): first = [list_.iloc[0] for list_ in data] delta = [] for i in range(1, len(first)): delta.append(first[i] - first[0]) res = [] res.append(data[0]) for idx, elem in enumerate(data): if idx > 0: corr = elem - delta[idx-1] res.append(corr) return res def get_peak_temp_forecast(cur_time, num_periods): peaks_forecast = [] period = timedelta(hours=1, minutes=33, seconds=0, milliseconds=150) time = cur_time for i in range(num_periods): time = time + period peaks_forecast.append(time) return peaks_forecast def plot_signal_profile(time, data, pattern_t, pattern, method, shift_flag, peak_height=0.8): if method == 'corr': periods_t, periods = get_signal_profile_corr(time, data, pattern, shift_flag, peak_height) print(f'Найдено {len(periods)} периодов.') elif method == 'peaks': periods_t, periods, peaks = find_periods(time, data, shift_flag, peaks='min') print(f'Найдено {len(periods)} периодов.') fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(8, 6)) for idx, period in enumerate(periods): ax1.plot(np.arange(len(period)), period) ax1.grid(True) ax2.plot(time, data) ax2.grid(True) plt.grid(True) plt.show() def insert_temp_data_from_flight_cmd(fname_cmd_temp, dir_asotr): fname_asotr = [f'{dir_asotr}asotr01_data_T.csv', f'{dir_asotr}asotr02_data_T.csv'] df_cmd = pd.read_csv(fname_cmd_temp, sep=';') df_asotr = [] df_cmd_temp = [] for i, fname in enumerate(fname_asotr): df = pd.read_csv(fname, sep=';') df_asotr.append(df) df = df_cmd[df_cmd['asotr_kit'] == i + 1] df = df.drop(['asotr_kit'], axis=1) df_cmd_temp.append(df) df_asotr_ = [ pd.concat( [df_asotr[0], df_cmd_temp[0]], ignore_index=True).sort_values(by='timestamp_sec'), pd.concat( [df_asotr[1], df_cmd_temp[1]], ignore_index=True).sort_values(by='timestamp_sec') ] return df_asotr_ def subtract_data(data1, data2): init_shift = data2[0] - data1[0] out = data2 - data1 - init_shift return pd.Series(out) # for transmit data: cmd_list, temp, power = get_cmd_data(fname) def cut_data(data, time_begin, duration_sec, accuracy='seconds'): time_format = "%d.%m.%Y %H:%M:%S"; delta = timedelta(seconds=duration_sec) tstamp_begin = datetime.strptime(time_begin, time_format) tstamp_end = tstamp_begin + delta time_end = tstamp_end.strftime(time_format) idx_begin = find_best_time_idx(data['timestamp'], time_begin, accuracy) idx_end = find_best_time_idx(data['timestamp'], time_end, accuracy) out = data.loc[idx_begin : idx_end] return out def cut_norm_data(data, time_begin, duration_sec, channel='ch1', interp={'method': 'cubic', 'order': 2}, accuracy='seconds'): data_period = cut_data(data, time_begin, duration_sec, accuracy) temp_norm = data_period[channel].values - data_period[channel].iloc[0] time_l = list(data_period['timestamp']) temp_l = list(temp_norm) orig_data = pd.DataFrame({ 'timestamp': time_l, 'temp': temp_l }) interp_data = orig_data.set_index('timestamp') interp_data = interp_data.resample('S').mean().interpolate(method=interp["method"], order=interp["order"]) interp_data = interp_data.reset_index(names=['timestamp']) return orig_data, interp_data def get_step_response_diff(data, thermocycle_info, channel='ch1', interp={'method': 'cubic', 'order': 2}, accuracy='seconds', cut_step_resp={}): date = thermocycle_info['date'] time_begin_orig = date + ' ' + thermocycle_info['time_begin'][0] time_begin_step = date + ' ' + thermocycle_info['time_begin'][1] duration_sec = thermocycle_info['duration_sec'] _, orig_interp_cycle = cut_norm_data(data, time_begin_orig, duration_sec, channel, interp, accuracy) _, step_interp_cycle = cut_norm_data(data, time_begin_step, duration_sec, channel, interp, accuracy) max_ = min(len(orig_interp_cycle), len(step_interp_cycle)) subtract_step = subtract_data( orig_interp_cycle['temp'].iloc[0:max_].values, step_interp_cycle['temp'].iloc[0:max_].values) step_time = list(step_interp_cycle['timestamp'].iloc[0:max_]) step_temp = list(subtract_step) step_response = pd.DataFrame({'timestamp': step_time, 'temp': step_temp}) if len(cut_step_resp) > 0: time_begin = date + ' ' + cut_step_resp['time_step_begin'] step_response = cut_data(step_response, time_begin, cut_step_resp['step_duration'], accuracy='seconds') first = step_response['temp'].iloc[0] step_response['temp'] = step_response['temp'] - first return (step_response, orig_interp_cycle, step_interp_cycle) def plot_step_response_in_thermocycle(data_info, thermocycle_info, interp, cut_step_resp, plot_info): title = f'{plot_info["title"]}, канал {data_info["channel"][2]} АСОТР КДИ СПИН-X, период опроса {data_info["period"]} ({thermocycle_info["date"]})' step_resp, orig_interp_cycle, step_interp_cycle = get_step_response_diff( data_info['data'], thermocycle_info, channel=data_info['channel'], interp=interp, accuracy=data_info['find_accuracy']) fig = plt.figure(figsize=(8, 6), dpi=200) fig.suptitle(title, fontsize=plot_info['font']) ax1 = fig.add_subplot(2,1,1) ax2 = fig.add_subplot(2,1,2) ax1.plot(step_resp['timestamp'], step_resp['temp'], label='реакция на ступенчатое воздействие') step_begin = thermocycle_info['date'] + ' ' + cut_step_resp['time_step_begin'] idx = find_best_time_idx(step_interp_cycle.timestamp, step_begin, accuracy=data_info['find_accuracy']) ax1.axvline(x = step_interp_cycle.timestamp[idx], color='r', linestyle='-.', label='момент подачи ступенчатого воздействия') date_formatter = dates.DateFormatter(plot_info['ox_dtime_format']) ax1.xaxis.set_major_formatter(date_formatter) ax1.legend(loc=plot_info["legend_pos"][0], fontsize=plot_info['font']) ax1.grid(True) ax1.tick_params(axis='both', width=1, labelsize=plot_info['font']) ax1.set_ylabel(r'$\Delta$T, $^\circ$C', fontsize=plot_info['font']) ax2.axvline(x = step_interp_cycle.timestamp[idx], color='r', linestyle='-.', label='момент подачи ступенчатого воздействия') ax2.plot(orig_interp_cycle['timestamp'], orig_interp_cycle['temp'], '--', label='термоцикл') ax2.plot(step_interp_cycle['timestamp'], step_interp_cycle['temp'], label='термоцикл с реакцией на ступенчатое воздействие') ax2.xaxis.set_major_formatter(date_formatter) ax2.legend(loc=plot_info["legend_pos"][1], fontsize=plot_info['font'], fancybox=True, framealpha=0.4) ax2.grid(True) ax2.tick_params(axis='both', width=1, labelsize=plot_info['font']) ax2.set_xlabel('время', fontsize=plot_info['font']) ax2.set_ylabel(r'$T_{norm}$, $^\circ$C', fontsize=plot_info['font']) fig.suptitle(title, fontsize=plot_info['font']) plt.tight_layout() fig.savefig(plot_info["name_fig"]) plt.show() def plot_imp_response(data, data_info, plot_info, thermocycle_info): title = f'{plot_info["title"]}, канал {data_info["channel"][2]} АСОТР КДИ СПИН-X, период опроса {data_info["period"]} ({thermocycle_info["date"]})' fig = plt.figure(figsize=(11, 6), dpi=200) fig.suptitle(title, fontsize=plot_info['font']) ax1 = fig.add_subplot(1,1,1) date_formatter = dates.DateFormatter(plot_info['ox_dtime_format']) ax1.xaxis.set_major_formatter(date_formatter) ax1.plot(data['timestamp'], data['temp'], '.', label='реакц. на импульсное воздействие') ax1.xaxis.set_major_formatter(date_formatter) ax1.legend(loc=plot_info["legend_pos"][0], fontsize=plot_info['font'], fancybox=True, framealpha=0.4) ax1.grid(True) ax1.tick_params(axis='both', width=1, labelsize=plot_info['font']) ax1.set_xlabel('время', fontsize=plot_info['font']) ax1.set_ylabel(r'$t_{norm}$, $^\circ$C', fontsize=plot_info['font']) fig.suptitle(title, fontsize=plot_info['font']) plt.tight_layout() fig.savefig(plot_info["name_fig"]) plt.show() #timestamp as string format: dd:mm:YYYY HH:MM:SS def insert_data_cyclo(base_time_str, fname, path): time_format = "%d.%m.%Y %H:%M:%S" cyclogram_file = path + fname df = pd.read_excel(cyclogram_file) base_time = pd.to_datetime(base_time_str, format='%d.%m.%Y %H:%M:%S') df['timestamp'] = df.iloc[:, 0].apply(lambda x: base_time + timedelta(seconds=x)) df.iloc[:, 0] = df['timestamp'].dt.strftime(time_format) df = df.drop(['timestamp'], axis=1) fname = cyclogram_file.replace('.xls', '.csv') df.to_csv(fname, index=False, sep=';', encoding='utf-8-sig')