project reorganization: 1. executable files in bin directory now. 2. add recursive_unpack_targz.py for recursive unpacking specified in this script archives tar.gz with MVN data. 3. add asotr_unzip_plot.sh bash file for unpacking MVN data, collect asotr data into csv files and plot asotr MVN data. 4. add brd_wheel_1Hz_parser.py for demonstrate how to work with brd telemetry data

This commit is contained in:
Danila Gamkov
2025-06-06 10:54:25 +03:00
parent 2f37a7329b
commit b04009ad27
34 changed files with 2151 additions and 138 deletions

1
bin/.rustc_info.json Normal file
View File

@@ -0,0 +1 @@
{"rustc_fingerprint":2742313010855374649,"outputs":{"4614504638168534921":{"success":true,"status":"","code":0,"stdout":"rustc 1.83.0 (90b35a623 2024-11-26)\nbinary: rustc\ncommit-hash: 90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf\ncommit-date: 2024-11-26\nhost: x86_64-unknown-linux-gnu\nrelease: 1.83.0\nLLVM version: 19.1.1\n","stderr":""},"15729799797837862367":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.so\nlib___.so\nlib___.a\nlib___.so\n/home/danila/.rustup/toolchains/stable-x86_64-unknown-linux-gnu\noff\npacked\nunpacked\n___\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_abi=\"\"\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"gnu\"\ntarget_family=\"unix\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_os=\"linux\"\ntarget_pointer_width=\"64\"\ntarget_vendor=\"unknown\"\nunix\n","stderr":""}},"successes":{}}

132
bin/.vimrc Normal file
View File

@@ -0,0 +1,132 @@
set tabstop=4
set softtabstop=4
set shiftwidth=4
set noexpandtab
set colorcolumn=90
highlight ColorColumnt ctermbg=darkgray
augroup project
autocmd!
autocmd BufRead,BufNewFile *.h,*.c set filetype=c.doxygen
augroup END
let &path.="src/include, src/source,"
" Включаем использование системного буфера
set clipboard=unnamedplus
" Работа с текстом
" Python использует 4 пробела для отступов
autocmd FileType python setlocal tabstop=4 shiftwidth=4
" Кодировка текста
set encoding=utf-8
set fileencoding=utf-8
set fileencodings=utf-8,cp1251,koi8-r,cp866
" Поиск по тексту
set hlsearch " подсвечивать результаты поиска
" Перемещение по тексту
" Когда достигаем границ строки, то перемещаемся на предыдующую/следующую
set whichwrap+=h,l,<,>,[,]
set number
" Настройки автодополнения
set completeopt=menu,menuone,noselect
" Разделение экрана
set splitbelow " разбивать вниз
set splitright " разбивать вправо
" сочетание клавиш
" Использование h, j, k, l для перемещения с зажатым Ctrl в режиме
" редактирования
inoremap <C-h> <Left>
inoremap <C-j> <Down>
inoremap <C-k> <Up>
inoremap <C-l> <Right>
let g:mapleader = "\<Space>"
" Переключение между вкладками
nnoremap <leader>t :tabnext<CR>
nnoremap <leader>T :tabprevious<CR>
" Список вкладок
nnoremap <leader>tl :tabs<CR>
" nnoremap <leader>tn :tabnew<CR>
nnoremap <leader>tc :tabclose<CR>
nnoremap <leader>to :tabonly<CR>
nnoremap <leader>tm :tabmove<CR>
" Редактировать файл в новой вкладке
nnoremap <leader>te :tabedit |
" Выбор вкладки
nnoremap <leader>1 1gt
nnoremap <leader>2 2gt
nnoremap <leader>3 3gt
nnoremap <leader>4 4gt
nnoremap <leader>5 5gt
nnoremap <leader>6 6gt
nnoremap <leader>7 7gt
nnoremap <leader>8 8gt
nnoremap <leader>9 9gt
nnoremap <leader>0 :tablast<CR>
" Разбиение окон
nnoremap <leader>s :split<CR>
nnoremap <leader>v :vsplit<CR>
" Выбор окна
nnoremap <C-h> <C-w>h
nnoremap <C-j> <C-w>j
nnoremap <C-k> <C-w>k
nnoremap <C-l> <C-w>l
" Размер окна
nnoremap <C-u> <C-w>+
nnoremap <C-d> <C-w>-
nnoremap <C-p> <C-w><
nnoremap <C-n> <C-w>>
" Vimspector
" nnoremap <leader><F2> <F10>
" nnoremap <leader>q <F11>
nmap <Leader><Right> <Plug>VimspectorStepOver
nmap <Leader><Down> <Plug>VimspectorStepInto
nmap <Leader><Up> <Plug>VimspectorStepOut
nmap <Leader><Tab> <Plug>VimspectorDisassemble
" Сделать окна одного размера
nnoremap <leader>= <C-w>=
" Переключения между буферами
" nnoremap <leader>b :bnext<CR>
" nnoremap <leader>B :bprevious<CR>
" nnoremap <leader>l :ls<CR>
" nnoremap <leader>d :bd<CR>
" " Скрыть/раскрыть блок кода
" nnoremap <leader>z za
" настройка плагинов
" настройки для отступов
" let g:indent_guides_enable_on_vim_startup = 1
" Настройки для разноцветной подсветки скобок
let g:rainbow_active = 1
" Настройки для vim-airline
let g:airline#extensions#tabline#enabled = 1
let g:airline#extensions#tabline#buffer_nr_show = 1
let g:airline#extensions#tabline#formatter = 'unique_tail'
let g:airline_powerline_fonts = 1
let g:airline_solarized_bg = 'luna'
let g:vimspector_enable_mappings = 'HUMAN'

585
bin/asotr.py Normal file
View File

@@ -0,0 +1,585 @@
import pandas as pd
from datetime import datetime, timedelta
from scipy import signal
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
import numpy as np
import json
import pytz
from matplotlib import dates
"""
module asotr parses csv data from flight device ASOTR MVN
Danila Gamkov []
danila_gamkov@cosmos.ru
# License: IKI RAN
"""
__author__ = 'Danila Gamkov'
class IdxNotFound(Exception):
pass
class TimeIndexNotFound(Exception):
pass
fname_json_decode = './decode_asotr_cmd.json'
def convert_to_str(lst):
index = [i for i, x in enumerate(lst) if x == 1]
res = f"ch{index[0] + 1}"
for idx in index[1:]:
res += f"_{idx + 1}"
return res
def get_utc_seconds(timestamp_str, timestamp_format):
dt_obj = datetime.strptime(timestamp_str, timestamp_format)
utc_timezone = pytz.utc
dt_utc = dt_obj.replace(tzinfo=utc_timezone)
timestamp = int(dt_utc.timestamp())
return timestamp
def load_cmd_decode(fname):
with open(fname, 'r') as file:
data = json.load(file)
return data
def bitmask_to_num(data):
num = int(data)
res = []
d = bin(num)
d1 = d[::-1]
for i in range(num.bit_length()):
if d1[i] == '1':
res.append(i + 1)
return res
def flight_temperature_decode(cmd_string):
decode = load_cmd_decode(fname_json_decode)
asotr_kit = ''
temp = []
cmd = cmd_string.split(' ')
if len(cmd) == 8:
temp = [cmd[2], cmd[3], cmd[4], cmd[5], cmd[6], cmd[7]]
asotr_kit = cmd[0][1]
return (asotr_kit, temp)
def cmd_decode(cmd_string):
decode = load_cmd_decode(fname_json_decode)
asotr_kit = 0;
msg_decode = ''
out = ''
if 'OK' in cmd_string:
return out
cmd = cmd_string.split()
if len(cmd) > 5:
return out
if '1' in cmd[0]:
asotr_kit = 1
elif '2' in cmd[0]:
asotr_kit = 2
msg_ = f'{cmd[1]} {cmd[2]}'
msg_decode = decode[msg_]
if (len(cmd) == 4):
value = ''
if cmd[2] == '32':
value1 = bitmask_to_num(cmd[3])
if (len(value1) == 0):
value = 'запрет всех'
else:
value = ', '.join(map(str, value1))
elif (cmd[2] == '20' or cmd[2] == '21' or cmd[2] == '22'
or cmd[2] == '23' or cmd[2] == '24' or cmd[3] == '25'):
if cmd[3] == '0':
value = 'ПИД-регулирование'
elif cmd[3] == '1':
value = 'релейное регулирование'
elif cmd[3] == '2':
value = 'постоянная мощность'
else:
value = cmd[3]
out = f'АСОТР{asotr_kit}: {msg_decode} ({value})'
else:
if msg_decode != '':
out = f'АСОТР{asotr_kit}: {msg_decode}'
return out
def cmd_flight_parse(asotr_data):
decode_list = []
temperature_list = []
for elem in asotr_data.itertuples():
elem_msg = cmd_decode(elem.cmd_answer.strip())
if elem_msg != '':
str_ = f'{elem.timestamp};{elem_msg}'
decode_list.append(str_)
asotr_kit, temp = flight_temperature_decode(elem.cmd_answer.strip())
if len(temp) > 0:
timestamp = get_utc_seconds(elem.timestamp, '%d.%m.%Y %H:%M:%S.%f')
str_ = f'{timestamp};{elem.timestamp};{asotr_kit};{temp[0]};{temp[1]};{temp[2]};{temp[3]};{temp[4]};{temp[5]}'
temperature_list.append(str_)
return (decode_list, temperature_list)
# accuracy: 'seconds', 'minutes', 'hours'
def find_best_time_idx(time_arr, user_time, accuracy='minutes') -> int:
"""
finds best time index according specified user_time in time_arr and with specified accuracy
Args:
time_arr(pandas.core.series.Series[datetime]): timestamp array
user_time(string): time value, which index required find in time_arr (string in the following format: d.m.Y HH:MM:SS)
accuracy(string): specify accuracy which time index required find in time_arr(type 'seconds', 'minutes' or 'hours')
Returns:
int: index that has been found in time_arr (or -1 if index has not been found)
"""
tstamp = datetime.strptime(user_time, "%d.%m.%Y %H:%M:%S")
if accuracy == 'minutes':
delta = timedelta(minutes=30)
elif accuracy == 'hours':
delta = timedelta(hours=24)
elif accuracy == 'seconds':
delta = timedelta(seconds=30)
low = time_arr.idxmin()
high = time_arr.idxmax()
mid = time_arr.idxmin() + (time_arr.idxmax() - time_arr.idxmin()) // 2
if mid not in time_arr.index:
# print(f'mid not in time_arr: {mid}, {time_arr.index}, {time_arr.idxmin()}')
return -1
a = time_arr[mid]
while ((a < (tstamp - delta)) or (a > (tstamp + delta))) and low < high:
if tstamp > a:
low = mid + 1
else:
high = mid - 1
mid = low + (high - low) // 2
# print(f'mid: (low + high)/2: {mid}')
if mid not in time_arr.index:
# print(f'mid not in time_arr: {mid}')
return -1
a = time_arr[mid]
if low > high:
# print(f'low > high: {mid}')
mid = high
if mid > 30:
for j in range(mid-30, len(time_arr)):
# print(f'{time_arr[j]} < {tstamp}: {j}')
if time_arr[j] >= tstamp:
# print(f'{time_arr[j]} > {tstamp}: {j}')
return j
else:
for j in range(0, len(time_arr)):
# print(f'{time_arr[j]} < {tstamp}: {j}')
if time_arr[j] >= tstamp:
# print(f'{time_arr[j]} > {tstamp}: {j}')
return j
if mid <= low + 1 or mid >= high - 1:
return -1
return mid
def find_time_idx(data_list, keys_list, timestamp, accuracy):
out_dict = dict.fromkeys(keys_list, -1)
for i, elem in enumerate(data_list):
idx = find_best_time_idx(elem['timestamp'], timestamp, accuracy)
if idx != -1:
out_dict[keys_list[i]] = idx
else:
raise TimeIndexNotFound(f'index corresponding to time {timestamp} in times array not found!')
return out_dict
def get_cmd_data(fname):
asotr_data = pd.read_csv(fname, delimiter=';')
cmd_list, temperature_list = cmd_flight_parse(asotr_data)
return (cmd_list, temperature_list)
def get_data(path, asotr_kit, start_date, end_date, time_accuracy):
ch_signs = ["temp", "temp_set", "pow"]
fname_temp = "asotr" + asotr_kit + "_data_T.csv"
fname_tempSet = "asotr" + asotr_kit + "_data_TSET.csv"
fname_pow = "asotr" + asotr_kit + "_data_P.csv"
fname = [path + fname_temp, path + fname_tempSet, path + fname_pow]
dateparse = lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S.%f")
try:
data = [ pd.read_csv(fname[0], sep=";", parse_dates=["timestamp"], date_parser=dateparse),
pd.read_csv(fname[1], sep=";", parse_dates=["timestamp"], date_parser=dateparse),
pd.read_csv(fname[2], sep=";", parse_dates=["timestamp"], date_parser=dateparse),]
except FileNotFoundError:
print(f'Error opening file: one (or all) file not found in directory: \n{fname}')
return
except pd.errors.EmptyDataError:
print(f'Error opening file: one (or all) file is empty or have incorrect format. \nLook at the files: {fname}')
return
except pd.errors.ParserError:
print(f'Error parsing file: file have incorrect structure. \nLook at the files: {fname}')
return
except Exception as e:
print(f'Error parsing file: {e}. \nLook at the files: {fname}')
return
ch = [[], [], [], [], [], []]
data_dict = {
"temp": ch,
"temp_set": ch,
"pow": ch,
"time_temp": [],
"time_temp_set": [],
"time_pow": [],
}
idxb = dict.fromkeys(ch_signs, -1)
idxe = dict.fromkeys(ch_signs, -1)
try:
idxb = find_time_idx(data, ch_signs, start_date, time_accuracy)
idxe = find_time_idx(data, ch_signs, end_date, time_accuracy)
except Exception as e:
print(e)
return
data_dict["time_temp"] = data[0]["timestamp"][idxb["temp"] : idxe["temp"]]
data_dict["time_temp_set"] = data[1]["timestamp"][idxb["temp_set"] : idxe["temp_set"]]
data_dict["time_pow"] = data[2]["timestamp"][idxb["pow"] : idxe["pow"]]
col = ["ch1", "ch2", "ch3", "ch4", "ch5", "ch6"]
for j in range(len(ch_signs)):
data_dict[ch_signs[j]] = data[j][['ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6']][idxb[ch_signs[j]]:idxe[ch_signs[j]]]
raw_data = data
return (raw_data, data_dict)
# shift_flag - normalization of the offset of all samples of each period to the first period
# peaks: min, max
def find_periods(time, data, shift_flag, peaks='min'):
if peaks == 'min':
idx, _ = find_peaks(-data, distance=80)
else:
idx, _ = find_peaks(data, distance=80)
periods = []
periods_t = []
for i in range(1, len(idx)):
period_t = time.iloc[idx[i-1]:idx[i]]
period = data.iloc[idx[i-1]:idx[i]]
periods.append(period)
periods_t.append(period_t)
if shift_flag == True:
res = shift_data_(periods)
else:
res = periods
return (periods_t, res, idx)
# shift_flag - normalization of the offset of all samples of each period to the first period
def get_signal_profile_corr(time, data, pattern, shift_flag, peak_height):
period_cnts = len(pattern)
periods = []
periods_t = []
# find correlation between signal and pattern
correlation = signal.correlate(data, pattern, mode='same', method='fft')
normalized_correlation = correlation / max(abs(correlation))
# find correlation peaks
# peak_height = 0.7
peaks_indices = signal.find_peaks(normalized_correlation, height=peak_height)[0]
# separate and collect each finded period
for peak_idx in peaks_indices:
start_index = peak_idx - period_cnts // 2 # peak center
end_index = start_index + period_cnts
if 0 <= start_index < len(data) and 0 <= end_index < len(data):
period = data.iloc[start_index:end_index]
period_t = time.iloc[start_index:end_index]
periods.append(period)
periods_t.append(period_t)
if shift_flag == True:
res = shift_data_(periods)
else:
res = periods
return (periods_t, res)
def shift_data_(data):
first = [list_.iloc[0] for list_ in data]
delta = []
for i in range(1, len(first)):
delta.append(first[i] - first[0])
res = []
res.append(data[0])
for idx, elem in enumerate(data):
if idx > 0:
corr = elem - delta[idx-1]
res.append(corr)
return res
def get_peak_temp_forecast(cur_time, num_periods):
peaks_forecast = []
period = timedelta(hours=1, minutes=33, seconds=0, milliseconds=150)
time = cur_time
for i in range(num_periods):
time = time + period
peaks_forecast.append(time)
return peaks_forecast
def plot_signal_profile(time, data, pattern_t, pattern, method, shift_flag, peak_height=0.8):
if method == 'corr':
periods_t, periods = get_signal_profile_corr(time, data, pattern, shift_flag, peak_height)
print(f'Найдено {len(periods)} периодов.')
elif method == 'peaks':
periods_t, periods, peaks = find_periods(time, data, shift_flag, peaks='min')
print(f'Найдено {len(periods)} периодов.')
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(8, 6))
for idx, period in enumerate(periods):
ax1.plot(np.arange(len(period)), period)
ax1.grid(True)
ax2.plot(time, data)
ax2.grid(True)
plt.grid(True)
plt.show()
def insert_temp_data_from_flight_cmd(fname_cmd_temp, dir_asotr):
fname_asotr = [f'{dir_asotr}asotr01_data_T.csv', f'{dir_asotr}asotr02_data_T.csv']
df_cmd = pd.read_csv(fname_cmd_temp, sep=';')
df_asotr = []
df_cmd_temp = []
for i, fname in enumerate(fname_asotr):
df = pd.read_csv(fname, sep=';')
df_asotr.append(df)
df = df_cmd[df_cmd['asotr_kit'] == i + 1]
df = df.drop(['asotr_kit'], axis=1)
df_cmd_temp.append(df)
df_asotr_ = [
pd.concat(
[df_asotr[0], df_cmd_temp[0]], ignore_index=True).sort_values(by='timestamp_sec'),
pd.concat(
[df_asotr[1], df_cmd_temp[1]], ignore_index=True).sort_values(by='timestamp_sec')
]
return df_asotr_
def subtract_data(data1, data2):
init_shift = data2[0] - data1[0]
out = data2 - data1 - init_shift
return pd.Series(out)
# for transmit data: cmd_list, temp, power = get_cmd_data(fname)
def cut_data(data, time_begin, duration_sec, accuracy='seconds'):
time_format = "%d.%m.%Y %H:%M:%S";
delta = timedelta(seconds=duration_sec)
tstamp_begin = datetime.strptime(time_begin, time_format)
tstamp_end = tstamp_begin + delta
time_end = tstamp_end.strftime(time_format)
idx_begin = find_best_time_idx(data['timestamp'], time_begin, accuracy)
idx_end = find_best_time_idx(data['timestamp'], time_end, accuracy)
if idx_begin == -1 or idx_end == -1:
raise IdxNotFound(f"error finding array index corresponding to timestamp: check time_begin ({time_begin}) or time_end ({time_end})")
idx_end = idx_end - 1
out = data.loc[idx_begin : idx_end]
return out
def cut_norm_data(data, time_begin, duration_sec, channel='ch1',
interp={'method': 'cubic', 'order': 2}, accuracy='seconds'):
try:
data_period = cut_data(data, time_begin, duration_sec, accuracy)
except IdxNotFound as e:
print(f'{e}')
temp_norm = data_period[channel].values - data_period[channel].iloc[0]
time_l = list(data_period['timestamp'])
temp_l = list(temp_norm)
orig_data = pd.DataFrame({ 'timestamp': time_l, 'temp': temp_l })
interp_data = orig_data.set_index('timestamp')
interp_data = interp_data.resample('S').mean().interpolate(method=interp["method"],
order=interp["order"])
interp_data = interp_data.reset_index(names=['timestamp'])
return orig_data, interp_data
def get_step_response_diff(data, thermocycle_info, channel='ch1',
interp={'method': 'cubic', 'order': 2}, accuracy='seconds', cut_step_resp={}):
time_begin_orig = thermocycle_info['time_begin'][0]
time_begin_step = thermocycle_info['time_begin'][1]
duration_sec = thermocycle_info['duration_sec']
_, orig_interp_cycle = cut_norm_data(data, time_begin_orig, duration_sec, channel,
interp, accuracy)
_, step_interp_cycle = cut_norm_data(data, time_begin_step, duration_sec, channel,
interp, accuracy)
max_ = min(len(orig_interp_cycle), len(step_interp_cycle))
subtract_step = subtract_data(
orig_interp_cycle['temp'].iloc[0:max_].values,
step_interp_cycle['temp'].iloc[0:max_].values)
step_time = list(step_interp_cycle['timestamp'].iloc[0:max_])
step_temp = list(subtract_step)
step_response = pd.DataFrame({'timestamp': step_time, 'temp': step_temp})
if len(cut_step_resp) > 0:
time_begin = cut_step_resp['time_step_begin']
try:
step_response = cut_data(step_response, time_begin,
cut_step_resp['step_duration'], accuracy='seconds')
except IdxNotFound as e:
print(f'{e}')
first = step_response['temp'].iloc[0]
step_response['temp'] = step_response['temp'] - first
return (step_response, orig_interp_cycle, step_interp_cycle)
def plot_step_response_in_thermocycle(data_info, thermocycle_info, interp,
cut_step_resp, plot_info):
title = f'{plot_info["title"]}, канал {data_info["channel"][2]} АСОТР, {data_info["device"]} СПИН-X1-МВН, период опроса {data_info["period"]} ({thermocycle_info["date"]})'
step_resp, orig_interp_cycle, step_interp_cycle = get_step_response_diff(
data_info['data'], thermocycle_info, channel=data_info['channel'],
interp=interp, accuracy=data_info['find_accuracy'])
fig = plt.figure(figsize=(9, 6), dpi=200)
fig.suptitle(title, fontsize=plot_info['font'])
ax1 = fig.add_subplot(2,1,1)
ax2 = fig.add_subplot(2,1,2)
ax1.plot(step_resp['timestamp'], step_resp['temp'],
label='реакция на ' + thermocycle_info['type_ru'] + ' воздействие')
step_begin = cut_step_resp['time_step_begin']
idx = find_best_time_idx(step_interp_cycle.timestamp, step_begin,
accuracy=data_info['find_accuracy'])
ax1.axvline(x = step_interp_cycle.timestamp[idx], color='r', linestyle='-.',
label= thermocycle_info['type_ru'] + ' воздействие, начало')
date_formatter = dates.DateFormatter(plot_info['ox_dtime_format'])
ax1.xaxis.set_major_formatter(date_formatter)
ax1.legend(loc=plot_info["legend_pos"][0], fontsize=plot_info['font'])
ax1.grid(True)
ax1.tick_params(axis='both', width=1, labelsize=plot_info['font'])
ax1.set_ylabel(r'$\Delta$T, $^\circ$C', fontsize=plot_info['font'])
ax2.axvline(x = step_interp_cycle.timestamp[idx], color='r', linestyle='-.',
label= thermocycle_info['type_ru'] + ' воздействие, начало')
ax2.plot(orig_interp_cycle['timestamp'], orig_interp_cycle['temp'], '--',
label='термоцикл')
ax2.plot(step_interp_cycle['timestamp'], step_interp_cycle['temp'],
label='термоцикл с реакцией на ' + thermocycle_info['type_ru'] + ' воздействие')
ax2.xaxis.set_major_formatter(date_formatter)
ax2.legend(loc=plot_info["legend_pos"][1], fontsize=plot_info['font'],
fancybox=True, framealpha=0.4)
ax2.grid(True)
ax2.tick_params(axis='both', width=1, labelsize=plot_info['font'])
ax2.set_xlabel('Время, ЧЧ:MM:CC', fontsize=plot_info['font'])
ax2.set_ylabel(r'$T_{norm}$, $^\circ$C', fontsize=plot_info['font'])
fig.suptitle(title, fontsize=plot_info['font'])
plt.tight_layout()
fig.savefig(plot_info["name_fig"])
plt.show()
def plot_imp_response(data, data_info, plot_info, thermocycle_info):
title = f'{plot_info["title"]}, канал {data_info["channel"][2]} АСОТР КДИ СПИН-X, период опроса {data_info["period"]} ({thermocycle_info["date"]})'
fig = plt.figure(figsize=(11, 6), dpi=200)
fig.suptitle(title, fontsize=plot_info['font'])
ax1 = fig.add_subplot(1,1,1)
date_formatter = dates.DateFormatter(plot_info['ox_dtime_format'])
ax1.xaxis.set_major_formatter(date_formatter)
ax1.plot(data['timestamp'], data['temp'], '.', label='реакц. на импульсное воздействие')
ax1.xaxis.set_major_formatter(date_formatter)
ax1.legend(loc=plot_info["legend_pos"][0], fontsize=plot_info['font'],
fancybox=True, framealpha=0.4)
ax1.grid(True)
ax1.tick_params(axis='both', width=1, labelsize=plot_info['font'])
ax1.set_xlabel('время', fontsize=plot_info['font'])
ax1.set_ylabel(r'$t_{norm}$, $^\circ$C', fontsize=plot_info['font'])
fig.suptitle(title, fontsize=plot_info['font'])
plt.tight_layout()
fig.savefig(plot_info["name_fig"])
plt.show()
#timestamp as string format: dd:mm:YYYY HH:MM:SS
def insert_data_cyclo(base_time_str, fname, path):
time_format = "%d.%m.%Y %H:%M:%S"
cyclogram_file = path + fname
df = pd.read_excel(cyclogram_file)
base_time = pd.to_datetime(base_time_str, format='%d.%m.%Y %H:%M:%S')
df['timestamp'] = df.iloc[:, 0].apply(lambda x: base_time + timedelta(seconds=x))
df.iloc[:, 0] = df['timestamp'].dt.strftime(time_format)
df = df.drop(['timestamp'], axis=1)
fname = cyclogram_file.replace('.xls', '.csv')
df.to_csv(fname, index=False, sep=';', encoding='utf-8-sig')

BIN
bin/asotr_csv Executable file

Binary file not shown.

14
bin/asotr_unzip_plot.sh Executable file
View File

@@ -0,0 +1,14 @@
#! /bin/bash
if [ $# != 1 ]
then
echo "erorr use $0. Right use this script: "
echo "$0 path"
else
cp ../asotr_csv/target/release/asotr_csv ./
path_=$1
python3 recursive_unpack_targz.py ${path_}
./asotr_csv -d ${path_}
python3 plot_asotr_flight_all.py
fi

150
bin/brd_wheel_1Hz_parser.py Normal file
View File

@@ -0,0 +1,150 @@
import pandas as pd
import os
import re
from pathlib import Path
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
tstamp_s = '%d.%m.%Y %H:%M:%S.%f'
ox_dtime_format = '%d.%m.%Y %H:%M'
path_itog_brd_data = '../data/brd_data/'
class PathFileNotFound(Exception):
pass
def find_required_files(root_dir, pattern):
result = []
for dirpath, _, filenames in os.walk(root_dir):
for filename in filenames:
match = re.match(pattern, filename)
if match:
result.append(dirpath + '/' + filename)
if len(result) == 0:
raise PathFileNotFound(f'error: check that the path is correct ({root_dir}) or files pattern is correct ({pattern})')
return sorted(result)
def read_files_into_df(fname_list, column_list, dtype_columns={}):
data_itog = pd.DataFrame()
epoch_start = pd.Timestamp('2000-01-01')
for fname in fname_list:
data = pd.read_csv(fname, sep=r'\s+', dtype=str)
data = data.dropna()
data = data[column_list]
if 'TIME' in column_list:
# convert TIME value to human-readable timestamp (sinse epoch 01.01.2000)
time = data['TIME'].astype(float)
tstamp = epoch_start + pd.to_timedelta(time, unit='s')
timestamp = tstamp.dt.strftime(tstamp_s)
data['timestamp'] = timestamp
# clear dataframe rows where time value == 0
data['time'] = time
data_clear = data.query('time != 0.0')
data_itog = pd.concat([data_itog, data_clear], ignore_index=True)
return data_itog
def collect_tm_brd_files(root_dir_tm_data, column_list, column_list_itog):
patterns_tm = [r'mvn_tm_brd01_(.*)', r'mvn_tm_brd02_(.*)', r'mvn_tm_brd03_(.*)',
r'mvn_tm_brd04_(.*)']
for pattern in patterns_tm:
fname = path_itog_brd_data + pattern[:12] + '.csv'
try:
found_files = find_required_files(root_dir_tm_data, pattern)
data = read_files_into_df(found_files, column_list, dtype_columns={11: float})
except KeyError as e:
print(f'error in collect_tm_brd_files: the specified column name was not found in the data file (path: {root_dir_tm_data}) ({e})')
break
except Exception as e:
print(f'error in collect_tm_brd_files: {e}')
break
data.to_csv(fname, index=False, sep=';', columns=column_list_itog, encoding='utf-8-sig')
print('data saved: ' + fname)
def collect_tm_brd_wheel_data(root_dir_wheel_data, column_list, column_list_itog):
patterns_wheel = [r'mvn_wheel_brd01_(.*)', r'mvn_wheel_brd02_(.*)', r'mvn_wheel_brd03_(.*)',
r'mvn_wheel_brd04_(.*)']
for pattern in patterns_wheel:
fname = path_itog_brd_data + pattern[:15] + '.csv'
try:
found_files = find_required_files(root_dir_wheel_data, pattern)
data = read_files_into_df(found_files, column_list, dtype_columns={0: float, 1: int})
except KeyError as e:
print(f'error in collect_tm_brd_wheel_data: the specified column name was not found in the data file (path: {root_dir_tm_data}) ({e})')
break
except Exception as e:
print(f'error in collect_tm_brd_wheel_data: {e}')
break
mask = data['STATE'] == '0'
data = data[mask]
data.to_csv(fname, index=False, sep=';', columns=column_list_itog, encoding='utf-8-sig')
print('data saved: ' + fname)
## collect raw tm brd data into one file for each brd
root_dir_tm_data = '/home/danila/Danila/work/MVN/flight/brd_data/arch_for_MB/archive_tm_data_txt/'
column_list = ['TIME', 'PER_1Hz', 'ST_HV']
column_list_itog = ['TIME', 'timestamp', 'PER_1Hz', 'ST_HV']
collect_tm_brd_files(root_dir_tm_data, column_list, column_list_itog)
root_dir_wheel_data = '/home/danila/Danila/work/MVN/flight/brd_data/arch_for_MB/archive_wheel_data_txt/'
column_list = ['TIME', 'STATE']
column_list_itog = ['TIME', 'timestamp', 'STATE']
collect_tm_brd_wheel_data(root_dir_wheel_data, column_list, column_list_itog)
## plot 'evolution' 1 Hz from tm brd data
fname = path_itog_brd_data + 'mvn_tm_brd01.csv'
dateparse = lambda x: datetime.strptime(x, tstamp_s)
df = pd.read_csv(fname, sep=';', parse_dates=['timestamp'], date_parser=dateparse)
plt.plot(df['timestamp'], df['PER_1Hz'], '.')
plt.show()
border_clr_wheel = 2
fname = path_itog_brd_data + 'mvn_wheel_brd01.csv'
wheel_df = pd.read_csv(fname, sep=';')
wheel_df['TIME_diff'] = wheel_df['TIME'].diff()
median_tdiff = wheel_df['TIME_diff'].median()
wheel_df_clear = wheel_df[(wheel_df['TIME_diff'] > median_tdiff - border_clr_wheel) &
(wheel_df['TIME_diff'] < median_tdiff + border_clr_wheel)]
wheel_df_peaks = wheel_df[(wheel_df['TIME_diff'] <= median_tdiff - border_clr_wheel) |
(wheel_df['TIME_diff'] >= median_tdiff + border_clr_wheel)]
plt.plot(wheel_df_clear['TIME'], wheel_df_clear['TIME_diff'])
plt.show()
# df1 = df[df['TIME_diff'] < 30.6]
# print(df[df['TIME_diff'] > 30.6 or df['TIME_diff'] < 29.4] )
# for idx, row in df.iterrows():
# print(row['TIME'])

88
bin/decode_asotr_cmd.json Normal file
View File

@@ -0,0 +1,88 @@
{
"write 0": "маркер начала сектора",
"write 10": "Разрешение/блокировка работы мотора",
"write 11": "режим управления мотором",
"write 12": "направление вращения",
"write 14": "уставка защиты по току мотора кратковременная (мА)",
"write 15": "уставка защиты по току мотора среднему за 20с (мА)",
"write 16": "Время разгона мотора, секунд",
"write 17": "Время останова мотора, секунд",
"write 18": "Номинальный ток мотора( мА)",
"write 20": "СОТР1 режим управления каналом",
"write 21": "СОТР2 режим управления каналом",
"write 22": "СОТР3 режим управления каналом",
"write 23": "СОТР4 режим управления каналом",
"write 24": "СОТР5 режим управления каналом",
"write 25": "СОТР6 режим управления каналом",
"write 26": "СОТР1 уставка мощности в канале в %",
"write 27": "СОТР2 уставка мощности в канале в %",
"write 28": "СОТР3 уставка мощности в канале в %",
"write 29": "СОТР4 уставка мощности в канале в %",
"write 30": "СОТР5 уставка мощности в канале в %",
"write 31": "СОТР6 уставка мощности в канале в %",
"write 32": "Маска-разрешения работы каналов СОТР",
"write 50": "Уставка Kp ПИД-регулятора мотора",
"write 51": "Уставка Kd ПИД-регулятора мотора",
"write 52": "Уставка Ki ПИД-регулятора мотора",
"write 53": "заданная скорость вращения, об/мин",
"write 55": "СОТР1 - уставка температуры канала",
"write 56": "СОТР2 - уставка температуры канала",
"write 57": "СОТР3 - уставка температуры канала",
"write 58": "СОТР4 - уставка температуры канала",
"write 59": "СОТР5 - уставка температуры канала",
"write 60": "СОТР6 - уставка температуры канала",
"write 61": "СОТР1 - уставка Kp ПИД-регулятора канала",
"write 62": "СОТР2 - уставка Kp ПИД-регулятора канала",
"write 63": "СОТР3 - уставка Kp ПИД-регулятора канала",
"write 64": "СОТР4 - уставка Kp ПИД-регулятора канала",
"write 65": "СОТР5 - уставка Kp ПИД-регулятора канала",
"write 66": "СОТР6 - уставка Kp ПИД-регулятора канала",
"write 67": "СОТР1 - уставка Kd ПИД-регулятора канала",
"write 68": "СОТР2 - уставка Kd ПИД-регулятора канала",
"write 69": "СОТР3 - уставка Kd ПИД-регулятора канала",
"write 70": "СОТР4 - уставка Kd ПИД-регулятора канала",
"write 71": "СОТР5 - уставка Kd ПИД-регулятора канала",
"write 72": "СОТР6 - уставка Kd ПИД-регулятора канала",
"write 73": "СОТР1 - уставка Ki ПИД-регулятора канала",
"write 74": "СОТР2 - уставка Ki ПИД-регулятора канала",
"write 75": "СОТР3 - уставка Ki ПИД-регулятора канала",
"write 76": "СОТР4 - уставка Ki ПИД-регулятора канала",
"write 77": "СОТР5 - уставка Ki ПИД-регулятора канала",
"write 78": "СОТР6 - уставка Ki ПИД-регулятора канала",
"write 79": "СОТР1 - уставка гистерезиса релейн. регулятора",
"write 80": "СОТР2 - уставка гистерезиса релейн. регулятора",
"write 81": "СОТР3 - уставка гистерезиса релейн. регулятора",
"write 82": "СОТР4 - уставка гистерезиса релейн. регулятора",
"write 83": "СОТР5 - уставка гистерезиса релейн. регулятора",
"write 84": "СОТР6 - уставка гистерезиса релейн. регулятора",
"write 85": "СОТР1 - уставка Ro термодатчика канала",
"write 86": "СОТР2 - уставка Ro термодатчика канала",
"write 87": "СОТР3 - уставка Ro термодатчика канала",
"write 88": "СОТР4 - уставка Ro термодатчика канала",
"write 89": "СОТР5 - уставка Ro термодатчика канала",
"write 90": "СОТР6 - уставка Ro термодатчика канала",
"write 91": "СОТР1 - уставка Alpha термодатчика канала",
"write 92": "СОТР2 - уставка Alpha термодатчика канала",
"write 93": "СОТР3 - уставка Alpha термодатчика канала",
"write 94": "СОТР4 - уставка Alpha термодатчика канала",
"write 95": "СОТР5 - уставка Alpha термодатчика канала",
"write 96": "СОТР6 - уставка Alpha термодатчика канала",
"func 1": "Перезапуск процессора МУП",
"func 2": "Перезапуск МУП через питание",
"func 3": "Запись текущих уставок в сектор I памяти FLASH",
"func 4": "Запись текущих уставок в сектор J памяти FLASH",
"func 5": "Чтение уставок из сектора I памяти FLASH",
"func 6": "Чтение уставок из сектора J памяти FLASH",
"func 7": "Запуск мотора (разгон и поддержание скорости вращения)",
"func 8": "Останов мотора (торможение и остановка)",
"func 9": "Применить уставки СОТР из ОЗУ в алгоритме ПИД-регуляторов",
"status 1": "",
"status 2": "",
"status 3": "",
"status 4": "",
"status 5": "",
"status 6": "",
"status 7": "",
"status 8": "",
"status 9": ""
}

80
bin/decode_cmd_data.py Normal file
View File

@@ -0,0 +1,80 @@
import sys
from importlib import reload
sys.path.append('./')
import asotr
reload(asotr)
import pandas as pd
from datetime import datetime, timedelta
path_data = '../data/asotr/'
fname_cmd_flight = '../data/cmd_asotr/all_flight_cmd_asotr.csv'
fname_cmd_temp = '../data/cmd_asotr/flight_cmd_temp.csv'
fname_cmd_human = '../data/cmd_asotr/cmd_human.csv'
timeformat = '%d.%m.%Y %H:%M:%S'
prev_days = 25
## get flight commands file (generated by mvn_log_viewer)
## Translate to human-readeble format and take temperatures from flight commands file
## save in cmd_human
cmd_list, temperature_list = asotr.get_cmd_data(fname_cmd_flight)
with open(fname_cmd_human, 'w') as file:
for elem in cmd_list:
file.write(f'{elem}\n')
## temperatures from flight commands file save to file flight_cmd_temp
with open(fname_cmd_temp, 'w') as file:
file.write(f'timestamp_sec;timestamp;asotr_kit;ch1;ch2;ch3;ch4;ch5;ch6\r\n')
for elem in temperature_list:
file.write(f'{elem}\n')
## insert temperatures from flight commands file to main asotr temperatures data files
df_asotr_ = asotr.insert_temp_data_from_flight_cmd(fname_cmd_temp, path_data)
end_date = ''
for i, data in enumerate(df_asotr_):
end_date = data['timestamp'].iloc[len(data) - 1][0:18]
data.to_csv(f'{path_data}asotr0{i+1}_data_T.csv', index=False, sep=';',
encoding='utf-8-sig', decimal='.')
delta_date = datetime.strptime(end_date, timeformat) - timedelta(days=prev_days)
start_date = delta_date.strftime(timeformat)
## form timestamp file where minimum of temperatures registered
for kit in range(1,3):
asotr_kit = f'0{kit}'
_, data_dict = asotr.get_data(path_data, asotr_kit, start_date, end_date, 'minutes')
min_temp_ch = []
for channel in range(1,7):
ch = f'ch{channel}'
data1 = data_dict['temp'][ch]
time1 = data_dict['time_temp']
periods_t, periods, _ = asotr.find_periods(time1, data1, shift_flag=False, peaks='min')
min_temp_period = []
for elem in periods_t:
min_temp_period.append(elem.iloc[0].strftime('%d.%m.%Y %H:%M:%S.%f')[:-3])
min_temp_ch.append(min_temp_period)
fname = f'{path_data}asotr{asotr_kit}_min_T.csv'
df = pd.DataFrame(min_temp_ch).transpose()
df.to_csv(fname, header=False, index=False, sep=';',
encoding='utf-8-sig', decimal='.')
df1 = pd.read_csv(fname, sep=';',
names=['ch1','ch2','ch3','ch4','ch5','ch6'])
df1.to_csv(fname, index=False, sep=';',
encoding='utf-8-sig', decimal='.')

164
bin/flight_temp_forecast.py Normal file
View File

@@ -0,0 +1,164 @@
import matplotlib.pyplot as plt
from matplotlib.widgets import Slider
import pandas as pd
import numpy as np
import sys
from importlib import reload
sys.path.append('./')
import asotr
reload(asotr)
from datetime import datetime, timedelta
from matplotlib import dates
def get_raw_data(year, path_with_data, asotr_kit, data_borders):
if data_borders['flag'] == True:
start_date = data_borders['begin'] + " 00:00:00"
end_date = data_borders['end'] + " 23:59:59"
accuracy = 'minutes'
else:
start_date = '01.01.' + year + " 00:00:00"
end_date = '01.01.' + year + " 23:59:59"
accuracy = 'hours'
try:
data, data_dict_borders = asotr.get_data(path_with_data, asotr_kit,
start_date, end_date, accuracy)
ch_signs = ["temp", "temp_set", "pow"]
ch = [[], [], [], [], [], []]
data_dict = {
"temp": ch,
"temp_set": ch,
"pow": ch,
"time_temp": [],
"time_temp_set": [],
"time_pow": [],
}
data_dict["time_temp"] = data[0]["timestamp"]
data_dict["time_temp_set"] = data[1]["timestamp"]
data_dict["time_pow"] = data[2]["timestamp"]
col = ["ch1", "ch2", "ch3", "ch4", "ch5", "ch6"]
for j in range(len(ch_signs)):
data_dict[ch_signs[j]] = data[j][col]
except Exception as e:
print(f'exception: {e}')
raise
try:
fname_beta = path_with_data + 'beta_' + year + '.xlsx'
dateparse_beta = lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S')
data_beta = pd.read_excel(fname_beta, sheet_name=0, usecols=[0,1,2], header=4,
names=['turn_num', 'beta_angle', 'timestamp'], parse_dates=['timestamp'],
date_parser=dateparse_beta)
except Exception as e:
print(f'exception: {e}')
raise
return (data_dict, data_dict_borders, data_beta)
def plot_asotr_borders(year, path_with_data, ch, asotr_kit, data_borders,
font=14, save_flag=True):
# get from files and prepare data
print_width = 20
print_height = 12
width = 1
plot_windows = 1
channels = list(map(int, ch))
plot_task = {"temp": 1, "temp_set": 1, "pow": 1}
ox_dtime_format = "%d.%m.%Y"
legend = [
"канал 1 (БРД1)",
"канал 2 (БРД2)",
"канал 3 (БРД3)",
"канал 4 (БРД4)",
"канал 5 (плита МУП МВН)",
"канал 6 (плита МУП МВН)",
]
legend_set = list(map(lambda x: x + " уставка", legend))
width = [1, 1, 1, 1, 1, 1]
width_set = [3, 3, 3, 3, 3, 3]
marker = ["-", "--", "-.", "-", "-", "--"]
width_arr = [1, 0.5, 0.2, 0.1, 1, 1]
try:
data_dict, data_dict_borders, data_beta = get_raw_data(year, path_with_data,
asotr_kit, data_borders)
except Exception as e:
print(f'{e}')
return
if plot_windows == 1:
fig, ax = plt.subplots(figsize=(print_width, print_height), dpi=200)
if plot_task["temp"] == 1:
for i in range(len(channels)):
if channels[i] == 1:
line, = ax.plot(data_dict_borders["time_temp"],
data_dict_borders['temp'].iloc[:,i],
'--',
linewidth=1,
label=legend[i],)
ax.plot(data_dict["time_temp"],
data_dict['temp'].iloc[:,i],
marker[i],
linewidth=width[i],
label=legend[i],)
ch = i
ax.tick_params(axis="both", width=1, labelsize=font)
ax.grid(visible=True, linestyle="dotted")
ax.set_ylabel("Температура, $^\circ$C", fontsize=font)
ax.set_xlabel("Время", fontsize=font)
ax.legend(fontsize=font)
date_formatter = dates.DateFormatter(ox_dtime_format)
ax.xaxis.set_major_formatter(date_formatter)
ax2 = ax.twinx()
ax2.plot(data_beta['timestamp'], data_beta['beta_angle'], marker[4],
color='r', linewidth=width[5], label='угол Бета')
ax2.set_ylabel('Угол Бета', fontsize=font)
ax2.tick_params(axis='y', width=1, labelsize=font)
ax2.legend(fontsize=font, loc='lower right')
plt.tight_layout()
def update(val):
shift_amount = val * pd.Timedelta(days=1)
shifted_timestamps = data_dict_borders['time_temp'] + shift_amount
scaled_values = data_dict_borders['temp'].iloc[:,ch] + 5
line.set_data(shifted_timestamps, scaled_values)
fig.canvas.draw_idle()
slider_ax = plt.axes([0.25, 0.05, 0.65, 0.03])
slider = Slider(slider_ax, 'Shift days', -100, 100, valinit=0)
slider.on_changed(update)
plt.show()
if save_flag == True:
pict_name = (f'../plots/reports/ASOTR{asotr_kit}_flight_T_P_{asotr.convert_to_str(channels)}_{data_borders["begin"][0:5].replace(".", "")}_{data_borders["end"][0:5].replace(".", "")}_{data_borders["end"][6:]}.png')
fig.savefig(pict_name)
ch = '100000'
year = '2025'
path_with_data = '../data/asotr/'
asotr_kit = '01'
data_borders = {'flag': True, 'begin': '15.03.2025', 'end': '01.05.2025'}
plot_asotr_borders(year, path_with_data, ch, asotr_kit, data_borders, font=6, save_flag=True)

41
bin/impulse_response.py Normal file
View File

@@ -0,0 +1,41 @@
import pandas as pd
import matplotlib.pyplot as plt
import sys
from importlib import reload
sys.path.append('./')
import asotr
reload(asotr)
import matplotlib.pyplot as plt
from matplotlib import dates
import pandas as pd
from datetime import datetime
asotr_kit = 1
fname = f'../../python_cyclo/data/asotr0{asotr_kit}_data_T.csv'
dateparse = lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S.%f")
data = pd.read_csv(fname, sep=';', parse_dates=['timestamp'], date_parser=dateparse)
interp = {'method': 'polynomial', 'order': 2}
thermocycle_info = {'date': '14.03.2025', 'type': 'impuse'}
cut_step_resp = {'orig_time_step_begin': '14.03.2025 13:49:32', 'orig_step_duration': 20*60}
data_info = {'data': data, 'channel': 'ch1', 'period': '1 сек', 'find_accuracy': 'seconds'}
name = f'{thermocycle_info["type"]}_response_{thermocycle_info["date"].replace(".","")}'
plot_info = {'title': 'Реакция на импульсное воздействие',
'ox_dtime_format': "%H:%M:%S", 'legend_pos': ['upper right', 'lower left'],
'name_fig': f'../plots/response/{name}.png',
'font': 10}
tstamp_orig_begin = cut_step_resp['orig_time_step_begin']
_, interp_imp_resp = asotr.cut_norm_data(data_info['data'], tstamp_orig_begin,
cut_step_resp['orig_step_duration'], channel='ch1', interp=interp,
accuracy=data_info['find_accuracy'])
interp_imp_resp.to_csv(f'../data/asotr/response/asotr0{asotr_kit}_{name}.csv', index=False, sep=';',
encoding='utf-8-sig', decimal='.')
asotr.plot_imp_response(interp_imp_resp, data_info, plot_info, thermocycle_info)

View File

@@ -0,0 +1,184 @@
import matplotlib.pyplot as plt
from matplotlib import dates
import pandas as pd
from datetime import datetime
import sys
font = 6
print_width = 10
print_height = 6
width = 1
plot_windows = 2
channels = [1, 1, 1, 1, 1, 1]
asotr_kit = '01'
xborders=False
begin=0;
end=0;
path = '../data/asotr/'
fname_B = f'{path}beta_2025.xlsx'
fname = 'asotr' + asotr_kit + '_data_T.csv'
fname_pow = 'asotr' + asotr_kit + '_data_P.csv'
pict_name = '../plots/' + 'ASOTR' + asotr_kit + '_flight_T_P_all'
ox_dtime_format = '%Y.%m.%d %H:%M'
legend=['БРД1', 'БРД2', 'БРД3', 'БРД4', 'плита МУП МВН, датчик1', 'плита МУП МВН, датчик 2']
width=[1, 1, 1, 1, 1, 1]
marker = ['-', '-', '-', '-', '--', '-'];
width_arr = [1, 0.5, 0.2, 0.1, 1, 1]
dateparse = lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S.%f")
dparse_b = lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S')
data_b = pd.read_excel(fname_B,
sheet_name=0,
usecols=[0,1,2],
header=4,
names=['turn_num', 'beta_angle', 'timestamp'],
parse_dates=['timestamp'],
date_parser=dparse_b)
fname = [path + fname, path + fname_pow]
data = [pd.read_csv(fname[0], sep=';', parse_dates=['timestamp'], date_parser=dateparse),
pd.read_csv(fname[1], sep=';', parse_dates=['timestamp'], date_parser=dateparse)]
ch= [[], [], [], [], [], []]
ch_signs = ["temp", "pow"]
data_dict = {"temp": ch, "pow": ch, "time": []}
data_dict["time"] = data[0]['timestamp']
col=['ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6', 'ch7']
for j in range(2):
for index, row, in data[j].iterrows():
for i in range(6):
ch[i].append(float(row[col[i]]))
data_dict[ch_signs[j]] = ch
ch= [[], [], [], [], [], []]
len_data = [len(data_dict['temp'][0]), len(data_dict['pow'][0])]
len_ = min(len_data)
if xborders == False:
begin = 0
end = len_ - 1
if plot_windows == 1:
fig, ax = plt.subplots(figsize=(print_width, print_height), dpi=200)
i = 0
for elem in data_dict['temp']:
if channels[i] == 1:
ax.plot(data_dict['time'][begin:end], elem[begin:end], marker[i], linewidth=width[i], label=legend[i])
i += 1
ax.tick_params(axis="both", width=1, labelsize=font)
ax.grid(visible=True, linestyle = 'dotted')
ax.set_ylabel('Температура, $^\circ$C', fontsize=font)
ax.set_xlabel('Время', fontsize=font)
ax.legend(fontsize=font)
date_formatter = dates.DateFormatter(ox_dtime_format)
ax.xaxis.set_major_formatter(date_formatter)
plt.tight_layout()
fig.savefig(pict_name)
plt.show()
elif plot_windows == 2:
fig = plt.figure(figsize=(print_width, print_height), dpi=200)
ax1 = fig.add_subplot(2, 1, 1)
ax2 = fig.add_subplot(2, 1, 2, sharex=ax1)
i = 0
for elem in data_dict['temp']:
if channels[i] == 1:
ax1.plot(data_dict['time'][begin:end], elem[begin:end], marker[i], linewidth=width[i], label=legend[i])
i += 1
ax3 = ax1.twinx()
ax3.plot(data_b['timestamp'], data_b['beta_angle'], marker[4], color='r', linewidth=width[5], label='угол Бета')
ax3.set_ylabel('Угол Бета', fontsize=font)
ax3.tick_params(axis="y", width=1, labelsize=font)
ax3.legend(fontsize=font, loc='upper right')
i = 0
for elem in data_dict['pow']:
if channels[i] == 1:
ax2.plot(data_dict['time'][begin:end], elem[begin:end], marker[i], linewidth=width[i], label=legend[i])
i += 1
ax1.tick_params(axis="both", width=1, labelsize=font)
ax1.grid(visible=True, linestyle = 'dotted')
ax1.set_ylabel('Температура, $^\circ$C', fontsize=font)
ax1.set_xlabel('Время', fontsize=font)
ax1.legend(fontsize=font, loc='lower right')
date_formatter = dates.DateFormatter(ox_dtime_format)
ax1.xaxis.set_major_formatter(date_formatter)
ax2.tick_params(axis="both", width=1, labelsize=font)
ax2.grid(visible=True, linestyle = 'dotted')
ax2.set_ylabel('Мощность, %', fontsize=font)
ax2.set_xlabel('Время', fontsize=font)
ax2.legend(fontsize=font, loc='lower right')
date_formatter = dates.DateFormatter(ox_dtime_format)
ax2.xaxis.set_major_formatter(date_formatter)
plt.title('АСОТР ' + asotr_kit, fontsize=font)
plt.tight_layout()
fig.savefig(pict_name)
plt.show()
# asotr_kit2 = '02'
# fname2 = 'asotr' + asotr_kit2 + '_data_T.csv'
# fname_pow2 = 'asotr' + asotr_kit2 + '_data_P.csv'
# legend2=['2 БРД1', '2 БРД2', '2 БРД3', '2 БРД4', '2 плита МУП МВН, датчик1', '2 плита МУП МВН, датчик 2']
# fname2 = [path + fname2, path + fname_pow2]
# data2 = [pd.read_csv(fname2[0], sep=';', parse_dates=['timestamp'], date_parser=dateparse),
# pd.read_csv(fname2[1], sep=';', parse_dates=['timestamp'], date_parser=dateparse)]
# ch= [[], [], [], [], [], []]
# ch_signs = ["temp", "pow"]
# data_dict2 = {"temp": ch, "pow": ch, "time": []}
# data_dict2["time"] = data2[0]['timestamp']
# col=['ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6', 'ch7']
# for j in range(2):
# for index, row, in data2[j].iterrows():
# for i in range(6):
# ch[i].append(float(row[col[i]]))
# data_dict2[ch_signs[j]] = ch
# ch= [[], [], [], [], [], []]
# len_data2 = [len(data_dict2['temp'][0]), len(data_dict2['pow'][0])]
# len_2 = min(len_data2)
# if xborders == False:
# begin2 = 0
# end2 = len_2 - 1
# i = 0
# for elem in data_dict2['temp']:
# if channels[i] == 1:
# print('legend2: ' + legend2[i])
# ax1.plot(data_dict2['time'][begin2:end2], elem[begin2:end2], marker[i], linewidth=width[i], label=legend2[i])
# i += 1
# ax2.plot(pd.Series(data_dict2['temp'][0]) - pd.Series(data_dict['temp'][0]))

182
bin/plot_flight_borders.py Normal file
View File

@@ -0,0 +1,182 @@
import matplotlib.pyplot as plt
from matplotlib import dates
import argparse
import sys
from importlib import reload
sys.path.append('./')
import asotr
reload(asotr)
import pandas as pd
def plot_asotr_borders(path_with_data, ch, asotr_kit, begin, end, font=14, cmd=0, show_flag=True):
print_width = 20
print_height = 12
width = 1
plot_windows = 2
channels = list(map(int, ch))
pict_name = (f'../plots/reports/ASOTR{asotr_kit}_flight_T_P_{asotr.convert_to_str(channels)}_{begin[0:5].replace(".", "")}_{end[0:5].replace(".", "")}_{end[6:]}.png')
plot_task = {"temp": 1, "temp_set": 1, "pow": 1}
ox_dtime_format = "%d.%m.%Y"
legend = [
"канал 1 (БРД1)",
"канал 2 (БРД2)",
"канал 3 (БРД3)",
"канал 4 (БРД4)",
"канал 5 (плита МУП МВН)",
"канал 6 (плита МУП МВН)",
]
legend_set = list(map(lambda x: x + " уставка", legend))
width = [1, 1, 1, 1, 1, 1]
width_set = [3, 3, 3, 3, 3, 3]
marker = ["-", "--", "-.", "-", "-", "--"]
width_arr = [1, 0.5, 0.2, 0.1, 1, 1]
# get from files and prepare data
start_date = begin.replace('_', ' ')
end_date = end.replace('_', ' ')
try:
data, data_dict = asotr.get_data(path_with_data, asotr_kit, start_date, end_date, 'minutes')
except Exception as e:
return
if plot_windows == 1:
fig, ax = plt.subplots(figsize=(print_width, print_height), dpi=200)
if plot_task["temp"] == 1:
for i in range(len(channels)):
if channels[i] == 1:
ax.plot(data_dict["time_temp"],
data_dict['temp'].iloc[:,i],
marker[i],
linewidth=width[i],
label=legend[i],)
ax.tick_params(axis="both", width=1, labelsize=font)
ax.grid(visible=True, linestyle="dotted")
ax.set_ylabel("Температура, $^\circ$C", fontsize=font)
ax.set_xlabel("Время", fontsize=font)
ax.legend(fontsize=font)
date_formatter = dates.DateFormatter(ox_dtime_format)
ax.xaxis.set_major_formatter(date_formatter)
plt.tight_layout()
fig.savefig(pict_name)
print(f'figure saved: {pict_name}')
if show_flag == True:
plt.show()
elif plot_windows == 2:
fig = plt.figure(figsize=(print_width, print_height), dpi=200)
ax1 = fig.add_subplot(2, 1, 1)
ax2 = fig.add_subplot(2, 1, 2, sharex=ax1)
if cmd == '1':
try:
cmd_human = pd.read_csv('../data/cmd_asotr/cmd_human.csv',
delimiter=';', names=['timestamp', 'cmd'])
except Exception as e:
print(f'Error parsing file: {e}')
return
max_temp = max(data_dict['temp'].iloc[:,1])
min_temp = min(data_dict['temp'].iloc[:,1])
# print(cmd_human)
step = 0
for i, row in cmd_human.iterrows():
row_time = row['timestamp'][0:len(row['timestamp']) - 4]
# print(row_time)
idx = asotr.find_best_time_idx(data_dict['time_temp'],
row_time, accuracy='minutes')
# print(idx)
if idx != -1:
ax1.axvline(x = data_dict['time_temp'][idx], color='r',
linestyle='-.')
ax1.text(data_dict['time_temp'][idx], max_temp - step, row['cmd'],
rotation=45, va='bottom', fontsize=font)
step += (max_temp - min_temp)/20
if plot_task["temp"] == 1:
for i in range(len(channels)):
if channels[i] == 1:
ax1.plot(data_dict["time_temp"],
data_dict['temp'].iloc[:,i],
marker[i],
linewidth=width[i],
label=legend[i],)
if plot_task["temp_set"] == 1:
for i in range(len(channels)):
if channels[i] == 1:
ax1.plot(data_dict["time_temp_set"],
data_dict['temp_set'].iloc[:,i],
marker[i],
linewidth=width_set[i],
label=legend_set[i],)
if plot_task["pow"] == 1:
for i in range(len(channels)):
if channels[i] == 1:
ax2.plot(data_dict["time_pow"],
data_dict['pow'].iloc[:,i],
marker[i],
linewidth=width[i],
label=legend[i],)
ax1.tick_params(axis="both", width=1, labelsize=font)
ax1.grid(visible=True, linestyle="dotted")
ax1.set_ylabel("Температура, $^\circ$C", fontsize=font)
ax1.set_xlabel("Время", fontsize=font)
ax1.legend(fontsize=font)
date_formatter = dates.DateFormatter(ox_dtime_format)
ax1.xaxis.set_major_formatter(date_formatter)
ax2.tick_params(axis="both", width=1, labelsize=font)
ax2.grid(visible=True, linestyle="dotted")
ax2.set_ylabel("Мощность, %", fontsize=font)
ax2.set_xlabel("Время", fontsize=font)
ax2.set_ylim(-5,105)
ax2.legend(fontsize=font)
date_formatter = dates.DateFormatter(ox_dtime_format)
ax2.xaxis.set_major_formatter(date_formatter)
title = (f'работа АСОТР{asotr_kit} в период с {start_date[0:10]} по {end_date[0:10]} г.')
fig.suptitle(title, fontsize=font)
plt.tight_layout()
fig.savefig(pict_name)
print(f'figure saved: {pict_name}')
if show_flag == True:
plt.show()
if __name__ == '__main__':
argparser = argparse.ArgumentParser("plot_flight_borders.py")
argparser.add_argument('-s', '--source', required=True,
help='type path with asotr csv data')
argparser.add_argument('-c', '--channel', required=True,
help='type channel (example: 000011)')
argparser.add_argument('-a', '--asotr', required=True,
help='type asotr kit (01 or 02)')
argparser.add_argument('-b', '--begin', required=True,
help='type begin date if dd.mm.YYYY format')
argparser.add_argument('-e', '--end', required=True,
help='type end date if dd.mm.YYYY format')
argparser.add_argument('-f', '--font', required=False,
help='type font size (from 1 to 30)')
argparser.add_argument('-d', '--cmd', required=False,
help='type display commands flag (0/1)')
argparser.add_argument('-p', '--plot', required=False,
help='display data in plot flag (0/1)')
args = argparser.parse_args()
plot_asotr_borders(args.source, args.channel, args.asotr, args.begin, args.end,
args.font, args.cmd, show_flag=args.plot)

21
bin/plot_flight_borders.sh Executable file
View File

@@ -0,0 +1,21 @@
#! /bin/bash
if [ $# != 2 ]
then
echo "erorr use $0. Right use this script: "
echo "$0 25.02.2025_00:00:00 10.03.2025_23:59:59"
else
path_csv_data=../data/asotr/
begin=$1
end=$2
python3 plot_flight_borders.py -s ${path_csv_data} -c 111100 -a 01 -b ${begin} -e ${end}
python3 plot_flight_borders.py -s ${path_csv_data} -c 001000 -a 01 -b ${begin} -e ${end}
python3 plot_flight_borders.py -s ${path_csv_data} -c 000011 -a 01 -b ${begin} -e ${end}
python3 plot_flight_borders.py -s ${path_csv_data} -c 111100 -a 02 -b ${begin} -e ${end}
python3 plot_flight_borders.py -s ${path_csv_data} -c 010100 -a 02 -b ${begin} -e ${end}
python3 plot_flight_borders.py -s ${path_csv_data} -c 010000 -a 02 -b ${begin} -e ${end}
python3 plot_flight_borders.py -s ${path_csv_data} -c 000100 -a 02 -b ${begin} -e ${end}
python3 plot_flight_borders.py -s ${path_csv_data} -c 000011 -a 02 -b ${begin} -e ${end}
fi

View File

@@ -0,0 +1,86 @@
import sys
from importlib import reload
sys.path.append('./')
import asotr
reload(asotr)
import matplotlib.pyplot as plt
from matplotlib import dates
import numpy as np
from datetime import timedelta
path = '../data/asotr/'
pict_name = '../plots/periods_profile/periods_profile_10042025.png'
channel = 'ch1'
asotr_kit = '01'
start_date = '24.04.2025 22:30:00'
end_date = '25.04.2025 02:00:00'
# start_date = '06.01.2025 22:40:00'
# end_date = '21.01.2025 01:20:00'
shift = False
raw_data, data_dict = asotr.get_data(path, asotr_kit, start_date, end_date, 'minutes')
data1 = data_dict['temp'][channel]
time1 = data_dict['time_temp']
periods_t, periods, _ = asotr.find_periods(time1, data1, shift_flag=False, peaks='min')
_, _, peaks = asotr.find_periods(time1, data1, shift_flag=False, peaks='max')
peaks_forecast = asotr.get_peak_temp_forecast(time1.iloc[peaks[0]], 1000)
delta_sec = []
for idx, elem in enumerate(peaks):
if idx > 0:
print(f'peak____: {time1.iloc[elem]}')
print(f'forecast: {peaks_forecast[idx-1]}')
delta = time1.iloc[elem] - peaks_forecast[idx-1]
delta_sec.append(delta.total_seconds())
time_, periods_ = asotr.get_signal_profile_corr(time1, data1, periods[0], shift, peak_height=0.7)
print(f'Найдено {len(periods_)} периодов.')
ox_dtime_format = "%H:%M:%S"
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(14, 10))
date_formatter = dates.DateFormatter(ox_dtime_format)
ax1.xaxis.set_major_formatter(date_formatter)
ax1.plot(time1, data1)
for elem in peaks:
ax1.axvline(x = time1.iloc[elem], color='r', linewidth=0.5)
ax1.set_title(f'Температура на орбите: АСОТР{asotr_kit}, канал {channel[2]}')
for idx, period in enumerate(periods_):
ax2.plot(np.arange(len(period)), period, label=f'период {idx}')
ax2.set_title('Профиль изменения температуры АСОТР по периоду')
delta = []
for elem in periods_:
delta1 = elem.values - periods[0].values
delta.append(delta1)
for idx, elem in enumerate(delta):
if idx == len(delta) - 1:
ax3.plot(elem, label=f'период {idx}', marker='|', linewidth=2)
elif idx == len(delta)//2:
ax3.plot(elem, label=f'период {idx}', marker='^', linewidth=2)
elif idx == 1:
ax3.plot(elem, label=f'период {idx}', marker='o', linewidth=2)
elif idx > 0:
ax3.plot(elem, label=f'период {idx}')
ax3.set_title(r'$\Delta$$T_i$ = $T_i$ - $T_1$')
ax1.set_ylabel('Температура, $^\circ$C')
ax2.set_ylabel('Температура, $^\circ$C')
ax3.set_ylabel(r'$\Delta$$T_i$, $^\circ$C')
ax3.set_xlabel("Время, мин.")
ax1.grid(True)
ax2.grid(True)
ax3.grid(True)
ax2.legend()
ax3.legend()
fig.savefig(pict_name)
plt.show()

View File

@@ -0,0 +1,19 @@
import sys
from importlib import reload
sys.path.append('./')
import asotr
reload(asotr)
import pandas as pd
from datetime import datetime, timedelta
path = '../data/experiments/'
timestamp = '04.05.2025 00:42:00'
cyclogram_file = 'cyclogram_step_ident_ch3.xls'
asotr.insert_data_cyclo(timestamp, cyclogram_file, path)
timestamp = '04.05.2025 03:48:00'
cyclogram_file = 'cyclogram_imp_ident_ch3.xls'
asotr.insert_data_cyclo(timestamp, cyclogram_file, path)

View File

@@ -0,0 +1,37 @@
import os
import tarfile
def extract_tar_gz(filepath, extract_dir):
""" Unpack archive in specified directory """
try:
with tarfile.open(filepath, "r:gz") as tar:
tar.extractall(path=extract_dir)
print(f"[+] Extracted: {filepath}")
except Exception as e:
print(f"[!] Error extracting {filepath}: {e}")
def should_extract(archive_path):
""" check exist directory's name without .tag.gs """
dirname = os.path.splitext(os.path.splitext(archive_path)[0])[0]
list_ignore = ['brd.tar.gz', 'aux_data.tar.gz', 'uvi.tar.gz', 'aux.tar.gz']
if all(elem not in archive_path for elem in list_ignore):
return not os.path.isdir(dirname)
def walk_and_extract(start_dir):
""" recursive directory traversal with unpacking """
for root, _, files in os.walk(start_dir):
for file in files:
if file.endswith(".tar.gz"):
archive_path = os.path.join(root, file)
target_dir = os.path.splitext(os.path.splitext(archive_path)[0])[0]
if should_extract(archive_path):
extract_tar_gz(archive_path, target_dir)
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print("Usage: python recursive_unpack_targz.py /path/to/start/dir")
else:
walk_and_extract(sys.argv[1])

294
bin/step_response.py Normal file
View File

@@ -0,0 +1,294 @@
import pandas as pd
import matplotlib.pyplot as plt
import sys
from importlib import reload
sys.path.append('./')
import asotr
reload(asotr)
import matplotlib.pyplot as plt
from matplotlib import dates
import pandas as pd
from datetime import datetime
asotr_kit = 1
# fname = f'../python_cyclo/data/asotr0{asotr_kit}_data_T.csv'
fname = f'../data/asotr/asotr0{asotr_kit}_data_T.csv'
dateparse = lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S.%f")
data = pd.read_csv(fname, sep=';', parse_dates=['timestamp'], date_parser=dateparse)
# date = '20.03.2025'
# period = '1 мин'
# time_begin_orig = date + ' 17:10:11'
# time_begin1 = date + ' 18:10:17'
# time_begin2 = date + ' 19:10:23'
# step_begin = time_begin2
# duration = 3600
# accuracy = 'seconds'
# name_fig = 'step_response_KDI_20242003.png'
# date = '21.03.2025'
# period = '1 мин'
# time_begin_orig = date + ' 14:00:11'
# time_begin1 = date + ' 15:00:16'
# time_begin2 = date + ' 16:00:16'
# step_begin = time_begin2
# duration = 3600
# accuracy = 'seconds'
# name_fig = 'step_response_KDI_20242103.png'
# date = '24.03.2025'
# period = '1 сек'
# time_begin_orig = date + ' 19:45:11'
# time_begin1 = date + ' 20:45:13'
# time_begin2 = date + ' 21:45:17'
# step_begin = time_begin2
# duration = 3600
# accuracy = 'seconds'
# name_fig = 'step_response_KDI_20242403.png'
# interp = {'method': 'polynomial', 'order': 1}
# thermocycle_info = {'date': '01.04.2025',
# 'time_begin': ['01.04.2025 16:27:00', '01.04.2025 18:00:00'],
# 'duration_sec': 92*60, 'type': 'step'}
# cut_step_resp = {'time_step_begin': '01.04.2025 18:53:21', 'step_duration': 25*60}
# data_info = {'data': data, 'device': 'KDI', 'channel': 'ch1', 'period': '1 мин',
# 'find_accuracy': 'seconds'}
# name = f'{thermocycle_info["type"]}_response_{data_info["device"]}_{thermocycle_info["date"].replace(".","")}'
# plot_info = {'title': 'Реакция на ступенчатое воздействие',
# 'ox_dtime_format': "%H:%M:%S", 'legend_pos': ['upper left', 'lower left'],
# 'name_fig': f'{name}.png', 'font': 10}
interp = {'method': 'polynomial', 'order': 1}
data_info_list = []
thermocycle_info_list = []
cut_step_resp_list = []
data_info = {'data': data, 'device': 'летный', 'channel': 'ch1', 'period': '1 мин',
'find_accuracy': 'seconds'}
thermocycle_info = {'date': '25.04.2025',
'time_begin': ['24.04.2025 22:46:32', '25.04.2025 00:19:33'],
'duration_sec': 92*60, 'type': 'step', 'type_ru': 'ступенчатое'}
cut_step_resp = {'time_step_begin': '25.04.2025 01:18:01', 'step_duration': 30*60}
data_info_list.append(data_info)
thermocycle_info_list.append(thermocycle_info)
cut_step_resp_list.append(cut_step_resp)
data_info = {'data': data, 'device': 'летный', 'channel': 'ch2', 'period': '1 мин',
'find_accuracy': 'seconds'}
thermocycle_info = {'date': '25.04.2025',
'time_begin': ['24.04.2025 22:46:32', '25.04.2025 00:19:33'],
'duration_sec': 92*60, 'type': 'step1_to2', 'type_ru': 'ступенчатое'}
cut_step_resp = {'time_step_begin': '25.04.2025 01:18:01', 'step_duration': 30*60}
data_info_list.append(data_info)
thermocycle_info_list.append(thermocycle_info)
cut_step_resp_list.append(cut_step_resp)
data_info = {'data': data, 'device': 'летный', 'channel': 'ch1', 'period': '1 мин',
'find_accuracy': 'seconds'}
thermocycle_info = {'date': '25.04.2025',
'time_begin': ['25.04.2025 01:52:34', '25.04.2025 03:25:34'],
'duration_sec': 92*60, 'type': 'impulse', 'type_ru': 'импульсное'}
cut_step_resp = {'time_step_begin': '25.04.2025 04:24:00', 'step_duration': 15*60}
data_info_list.append(data_info)
thermocycle_info_list.append(thermocycle_info)
cut_step_resp_list.append(cut_step_resp)
data_info = {'data': data, 'device': 'летный', 'channel': 'ch2', 'period': '1 мин',
'find_accuracy': 'seconds'}
thermocycle_info = {'date': '25.04.2025',
'time_begin': ['25.04.2025 01:52:34', '25.04.2025 03:25:34'],
'duration_sec': 92*60, 'type': 'impulse1_to2', 'type_ru': 'импульсное'}
cut_step_resp = {'time_step_begin': '25.04.2025 04:24:00', 'step_duration': 20*60}
data_info_list.append(data_info)
thermocycle_info_list.append(thermocycle_info)
cut_step_resp_list.append(cut_step_resp)
data_info = {'data': data, 'device': 'летный', 'channel': 'ch2', 'period': '1 мин',
'find_accuracy': 'seconds'}
thermocycle_info = {'date': '30.04.2025',
'time_begin': ['29.04.2025 22:02:54', '29.04.2025 23:35:54'],
'duration_sec': 93*60, 'type': 'step', 'type_ru': 'ступенчатое'}
cut_step_resp = {'time_step_begin': '30.04.2025 00:36:01', 'step_duration': 30*60}
data_info_list.append(data_info)
thermocycle_info_list.append(thermocycle_info)
cut_step_resp_list.append(cut_step_resp)
data_info = {'data': data, 'device': 'летный', 'channel': 'ch1', 'period': '1 мин',
'find_accuracy': 'seconds'}
thermocycle_info = {'date': '30.04.2025',
'time_begin': ['29.04.2025 22:02:54', '29.04.2025 23:35:54'],
'duration_sec': 93*60, 'type': 'step2_to1', 'type_ru': 'ступенчатое'}
cut_step_resp = {'time_step_begin': '30.04.2025 00:36:01', 'step_duration': 30*60}
data_info_list.append(data_info)
thermocycle_info_list.append(thermocycle_info)
cut_step_resp_list.append(cut_step_resp)
data_info = {'data': data, 'device': 'летный', 'channel': 'ch2', 'period': '1 мин',
'find_accuracy': 'seconds'}
thermocycle_info = {'date': '30.04.2025',
'time_begin': ['30.04.2025 01:09:55', '30.04.2025 02:41:54'],
'duration_sec': 93*60, 'type': 'impulse', 'type_ru': 'импульсное'}
cut_step_resp = {'time_step_begin': '30.04.2025 03:42:00', 'step_duration': 15*60}
data_info_list.append(data_info)
thermocycle_info_list.append(thermocycle_info)
cut_step_resp_list.append(cut_step_resp)
data_info = {'data': data, 'device': 'летный', 'channel': 'ch1', 'period': '1 мин',
'find_accuracy': 'seconds'}
thermocycle_info = {'date': '30.04.2025',
'time_begin': ['30.04.2025 01:09:55', '30.04.2025 02:41:54'],
'duration_sec': 93*60, 'type': 'impulse2_to1', 'type_ru': 'импульсное'}
cut_step_resp = {'time_step_begin': '30.04.2025 03:42:00', 'step_duration': 20*60}
data_info_list.append(data_info)
thermocycle_info_list.append(thermocycle_info)
cut_step_resp_list.append(cut_step_resp)
data_info = {'data': data, 'device': 'летный', 'channel': 'ch4', 'period': '1 мин',
'find_accuracy': 'seconds'}
thermocycle_info = {'date': '02.05.2025',
'time_begin': ['01.05.2025 22:05:30', '01.05.2025 23:38:40'],
'duration_sec': 93*60, 'type': 'step', 'type_ru': 'ступенчатое'}
cut_step_resp = {'time_step_begin': '02.05.2025 00:39:00', 'step_duration': 30*60}
data_info_list.append(data_info)
thermocycle_info_list.append(thermocycle_info)
cut_step_resp_list.append(cut_step_resp)
data_info = {'data': data, 'device': 'летный', 'channel': 'ch3', 'period': '1 мин',
'find_accuracy': 'seconds'}
thermocycle_info = {'date': '02.05.2025',
'time_begin': ['01.05.2025 22:05:30', '01.05.2025 23:38:40'],
'duration_sec': 93*60, 'type': 'step4_to3', 'type_ru': 'ступенчатое'}
cut_step_resp = {'time_step_begin': '02.05.2025 00:39:00', 'step_duration': 30*60}
data_info_list.append(data_info)
thermocycle_info_list.append(thermocycle_info)
cut_step_resp_list.append(cut_step_resp)
data_info = {'data': data, 'device': 'летный', 'channel': 'ch4', 'period': '1 мин',
'find_accuracy': 'seconds'}
thermocycle_info = {'date': '02.05.2025',
'time_begin': ['02.05.2025 01:12:30', '02.05.2025 02:46:02'],
'duration_sec': 93*60, 'type': 'impulse', 'type_ru': 'импульсное'}
cut_step_resp = {'time_step_begin': '02.05.2025 03:45:02', 'step_duration': 15*60}
data_info_list.append(data_info)
thermocycle_info_list.append(thermocycle_info)
cut_step_resp_list.append(cut_step_resp)
data_info = {'data': data, 'device': 'летный', 'channel': 'ch3', 'period': '1 мин',
'find_accuracy': 'seconds'}
thermocycle_info = {'date': '02.05.2025',
'time_begin': ['02.05.2025 01:12:30', '02.05.2025 02:46:02'],
'duration_sec': 93*60, 'type': 'impulse4_to3', 'type_ru': 'импульсное'}
cut_step_resp = {'time_step_begin': '02.05.2025 03:45:02', 'step_duration': 20*60}
data_info_list.append(data_info)
thermocycle_info_list.append(thermocycle_info)
cut_step_resp_list.append(cut_step_resp)
data_info = {'data': data, 'device': 'летный', 'channel': 'ch3', 'period': '1 мин',
'find_accuracy': 'seconds'}
thermocycle_info = {'date': '04.05.2025',
'time_begin': ['03.05.2025 22:12:11', '03.05.2025 23:45:10'],
'duration_sec': 93*60, 'type': 'step', 'type_ru': 'ступенчатое'}
cut_step_resp = {'time_step_begin': '04.05.2025 00:42:01', 'step_duration': 26*60}
data_info_list.append(data_info)
thermocycle_info_list.append(thermocycle_info)
cut_step_resp_list.append(cut_step_resp)
data_info = {'data': data, 'device': 'летный', 'channel': 'ch4', 'period': '1 мин',
'find_accuracy': 'seconds'}
thermocycle_info = {'date': '04.05.2025',
'time_begin': ['03.05.2025 22:12:11', '03.05.2025 23:45:10'],
'duration_sec': 93*60, 'type': 'step3_to4', 'type_ru': 'ступенчатое'}
cut_step_resp = {'time_step_begin': '04.05.2025 00:42:01', 'step_duration': 30*60}
data_info_list.append(data_info)
thermocycle_info_list.append(thermocycle_info)
cut_step_resp_list.append(cut_step_resp)
data_info = {'data': data, 'device': 'летный', 'channel': 'ch3', 'period': '1 мин',
'find_accuracy': 'seconds'}
thermocycle_info = {'date': '04.05.2025',
'time_begin': ['04.05.2025 01:19:10', '04.05.2025 02:52:11'],
'duration_sec': 93*60, 'type': 'impulse', 'type_ru': 'импульсное'}
cut_step_resp = {'time_step_begin': '04.05.2025 03:48:01', 'step_duration': 15*60}
data_info_list.append(data_info)
thermocycle_info_list.append(thermocycle_info)
cut_step_resp_list.append(cut_step_resp)
data_info = {'data': data, 'device': 'летный', 'channel': 'ch4', 'period': '1 мин',
'find_accuracy': 'seconds'}
thermocycle_info = {'date': '04.05.2025',
'time_begin': ['04.05.2025 01:19:10', '04.05.2025 02:52:11'],
'duration_sec': 93*60, 'type': 'impulse3_to4', 'type_ru': 'импульсное'}
cut_step_resp = {'time_step_begin': '04.05.2025 03:48:01', 'step_duration': 20*60}
data_info_list.append(data_info)
thermocycle_info_list.append(thermocycle_info)
cut_step_resp_list.append(cut_step_resp)
def get_step_response(data_info, thermocycle_info, cut_step_resp):
name = f'{data_info["channel"]}_{thermocycle_info["type"]}_response_{data_info["device"]}_{thermocycle_info["date"].replace(".","")}'
plot_info = {'title': 'Реакция на ' + thermocycle_info['type_ru'] + ' воздействие',
'ox_dtime_format': "%H:%M:%S", 'legend_pos': ['upper left', 'lower left'],
'name_fig': f'../plots/response/{name}.png', 'font': 10}
asotr.plot_step_response_in_thermocycle(data_info, thermocycle_info, interp,
cut_step_resp, plot_info)
step_resp_cut, _, _ = asotr.get_step_response_diff(data_info['data'], thermocycle_info,
channel=data_info['channel'], interp=interp, accuracy=data_info['find_accuracy'],
cut_step_resp=cut_step_resp)
max_ = len(step_resp_cut)
step_resp_cut.to_csv(f'../data/asotr/response/asotr0{asotr_kit}_{data_info["channel"]}_{thermocycle_info["type"]}_{thermocycle_info["date"].replace(".","")}.csv', index=False, sep=';', encoding='utf-8-sig', decimal='.')
title = f'{plot_info["title"]}, канал {data_info["channel"][2]} АСОТР, {data_info["device"]} СПИН-X1-МВН, период опроса {data_info["period"]} ({thermocycle_info["date"]})'
fig = plt.figure(figsize=(10, 6), dpi=200)
fig.suptitle(title, fontsize=plot_info['font'])
ax1 = fig.add_subplot(1,1,1)
ax1.plot(step_resp_cut['timestamp'].iloc[0:max_], step_resp_cut['temp'].iloc[0:max_], '-',
label='реакция на ' + thermocycle_info['type_ru'] + ' воздействие с термоциклом')
date_formatter = dates.DateFormatter(plot_info['ox_dtime_format'])
ax1.xaxis.set_major_formatter(date_formatter)
ax1.legend(loc=plot_info["legend_pos"][0], fontsize=plot_info['font'])
ax1.grid(True)
ax1.tick_params(axis='both', width=1, labelsize=plot_info['font'])
ax1.set_ylabel(r'$T_{norm}$, $^\circ$C', fontsize=plot_info['font'])
ax1.set_xlabel('Время, ЧЧ:MM:CC', fontsize=plot_info['font'])
plt.tight_layout()
fig.savefig(plot_info["name_fig"])
plt.show()
for i, elem in enumerate(data_info_list):
get_step_response(data_info_list[i], thermocycle_info_list[i], cut_step_resp_list[i])

110
bin/step_response_diff.py Normal file
View File

@@ -0,0 +1,110 @@
import pandas as pd
import matplotlib.pyplot as plt
import sys
from importlib import reload
sys.path.append('./')
import asotr
reload(asotr)
import matplotlib.pyplot as plt
from matplotlib import dates
import pandas as pd
from datetime import datetime
asotr_kit = 1
fname = f'../../python_cyclo/data/asotr0{asotr_kit}_data_T.csv'
dateparse = lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S.%f")
data = pd.read_csv(fname, sep=';', parse_dates=['timestamp'], date_parser=dateparse)
# date = '20.03.2025'
# period = '1 мин'
# time_begin_orig = date + ' 17:10:11'
# time_begin1 = date + ' 18:10:17'
# time_begin2 = date + ' 19:10:23'
# step_begin = time_begin2
# duration = 3600
# accuracy = 'seconds'
# name_fig = 'step_response_KDI_20242003.png'
# date = '21.03.2025'
# period = '1 мин'
# time_begin_orig = date + ' 14:00:11'
# time_begin1 = date + ' 15:00:16'
# time_begin2 = date + ' 16:00:16'
# step_begin = time_begin2
# duration = 3600
# accuracy = 'seconds'
# name_fig = 'step_response_KDI_20242103.png'
# date = '24.03.2025'
# period = '1 сек'
# time_begin_orig = date + ' 19:45:11'
# time_begin1 = date + ' 20:45:13'
# time_begin2 = date + ' 21:45:17'
# step_begin = time_begin2
# duration = 3600
# accuracy = 'seconds'
# name_fig = 'step_response_KDI_20242403.png'
# interp = {'method': 'polynomial', 'order': 2}
# thermocycle_info = {'date': '24.03.2025', 'time_begin': ['20:45:00', '21:45:11'],
# 'duration_sec': 60*60, 'type': 'step'}
# cut_step_resp = {'time_step_begin': '21:45:11', 'step_duration': 60*60,
# 'orig_time_step_begin': '19:45:11', 'orig_step_duration': 25*60}
# data_info = {'data': data, 'device': 'KDI', 'channel': 'ch1', 'period': '1 мин',
# 'find_accuracy': 'seconds'}
# name = f'{thermocycle_info["type"]}_response_diff_{data_info["device"]}_{thermocycle_info["date"].replace(".","")}'
# plot_info = {'title': 'Реакция на ступенчатое воздействие',
# 'ox_dtime_format': "%H:%M:%S", 'legend_pos': ['upper left', 'lower left'],
# 'name_fig': f'{name}.png', 'font': 10}
interp = {'method': 'polynomial', 'order': 2}
thermocycle_info = {'date': '01.04.2025',
'time_begin': ['01.04.2025 16:27:00', '01.04.2025 18:00:00'],
'duration_sec': 92*60, 'type': 'step'}
cut_step_resp = {'time_step_begin': '01.04.2025 18:53:21', 'step_duration': 24*60,
'orig_time_step_begin': '01.04.2025 15:22:10', 'orig_step_duration': 24*60}
data_info = {'data': data, 'channel': 'ch1', 'period': '1 мин',
'find_accuracy': 'seconds'}
plot_info = {'title': 'Реакция на ступенч. воздейств.',
'ox_dtime_format': "%H:%M:%S", 'legend_pos': ['lower right', 'lower left'],
'name_fig': '../plots/response/step_response_diff_KDI_20240401.png', 'font': 10}
step_resp_cut, _, _ = asotr.get_step_response_diff(data_info['data'], thermocycle_info,
channel=data_info['channel'], interp=interp, accuracy=data_info['find_accuracy'],
cut_step_resp=cut_step_resp)
tstamp_orig_begin = cut_step_resp['orig_time_step_begin']
_, interp_step_resp = asotr.cut_norm_data(data_info['data'], tstamp_orig_begin,
cut_step_resp['orig_step_duration'], channel='ch1', interp=interp,
accuracy=data_info['find_accuracy'])
max_ = min(len(interp_step_resp), len(step_resp_cut))
step_resp_cut.to_csv(f'../data/asotr/response/asotr0{asotr_kit}_{thermocycle_info["type"]}_{thermocycle_info["date"].replace(".","")}.csv', index=False, sep=';', encoding='utf-8-sig', decimal='.')
title = f'{plot_info["title"]}, канал {data_info["channel"][2]} АСОТР КДИ СПИН-X1-МВН, период опроса {data_info["period"]} ({thermocycle_info["date"]})'
fig = plt.figure(figsize=(6, 6), dpi=200)
fig.suptitle(title, fontsize=plot_info['font'])
ax1 = fig.add_subplot(1,1,1)
ax1.plot(step_resp_cut['timestamp'].iloc[0:max_], step_resp_cut['temp'].iloc[0:max_], '--',
label='реакция на ступенчатое воздействие с термоциклом')
ax1.plot(step_resp_cut['timestamp'].iloc[0:max_], interp_step_resp['temp'].iloc[0:max_],
label='реакция на ступенчатое воздействие')
date_formatter = dates.DateFormatter(plot_info['ox_dtime_format'])
ax1.xaxis.set_major_formatter(date_formatter)
ax1.legend(loc=plot_info["legend_pos"][0], fontsize=plot_info['font'])
ax1.grid(True)
ax1.tick_params(axis='both', width=1, labelsize=plot_info['font'])
ax1.set_ylabel(r'$T_{norm}$, $^\circ$C', fontsize=plot_info['font'])
ax1.set_xlabel('Время', fontsize=plot_info['font'])
plt.tight_layout()
fig.savefig(plot_info["name_fig"])
plt.show()

View File

@@ -0,0 +1,86 @@
import sys
import statistics
from importlib import reload
sys.path.append('./')
import asotr
reload(asotr)
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
from matplotlib import dates
from datetime import timedelta
path = '../data/asotr/'
channel = 'ch1'
asotr_kit = '01'
start_date = '25.04.2025 00:00:00'
end_date = '25.04.2025 08:00:00'
forecast_days = 20
timeformat = '%d.%m.%Y %H:%M:%S'
num_peaks_forecast = forecast_days * 20
shift = True
try:
raw_data, data_dict = asotr.get_data(path, asotr_kit, start_date, end_date, 'minutes')
except Exception as e:
sys.exit()
data1 = data_dict['temp'][channel]
time1 = data_dict['time_temp']
periods_t, periods, _ = asotr.find_periods(time1, data1, shift_flag=False, peaks='min')
_, _, peaks = asotr.find_periods(time1, data1, shift_flag=False, peaks='max')
peaks_forecast = asotr.get_peak_temp_forecast(time1.iloc[peaks[0]], num_peaks_forecast)
with open('../data/asotr/peaks_forecast.txt', 'w') as file:
for elem in peaks_forecast:
file.write(f'{str(elem)}\n')
delta_sec = []
for idx, elem in enumerate(peaks):
if idx > 0:
delta = time1.iloc[elem] - peaks_forecast[idx-1]
# print(delta)
delta_sec.append(delta.total_seconds())
delta_self_sec = []
delta_self_sec1 = []
for idx, elem in enumerate(periods_t):
delta1 = elem.iloc[len(elem)-1] - elem.iloc[0]
delta_self_sec.append(delta1.total_seconds())
for idx, elem in enumerate(delta_self_sec):
if idx > 0:
delta_self_sec1.append(delta_self_sec[idx] - delta_self_sec[idx - 1])
# print(delta_self_sec)
print(statistics.median(delta_self_sec))
ox_dtime_format = "%d.%m.%Y %H:%M"
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(8, 6))
date_formatter = dates.DateFormatter(ox_dtime_format)
ax1.xaxis.set_major_formatter(date_formatter)
ax1.plot(time1, data1)
for elem in peaks:
ax1.axvline(x = time1.iloc[elem], color='r', linewidth=0.5)
ax1.set_title(f'температура на орбите: АСОТР{asotr_kit}, канал {channel[2]}')
ax2.set_title('Разница по времени между временем i-го пика и i-м предсказанием пика')
ax2.set_ylabel(r'$\Delta$$t_{peak}$ = $timePeak_i$ - $timeForecast_i$, сек')
ax2.plot(delta_sec)
ax3.set_title('Разница по времени между первым и последующим периодами')
ax3.set_ylabel(r'$\Delta$$t_{period}$ = $period_i$ - $period_0$, сек')
ax3.plot(delta_self_sec1)
ax1.set_ylabel('Температура, град.')
ax1.grid(True)
ax2.grid(True)
ax3.grid(True)
plt.show()