Source code for transphire.processworker

"""
    TranSPHIRE is supposed to help with the cryo-EM data collection
    Copyright (C) 2017 Markus Stabrin

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
import numpy as np
import json
import pexpect as pe
import time
import os
import shutil
import re
import glob
import copy as cp
from PyQt5.QtCore import QObject, pyqtSignal, pyqtSlot
import multiprocessing as mp
import multiprocessing.managers
import queue

from . import transphire_utils as tu


[docs]class MyManager(multiprocessing.managers.BaseManager): pass
MyManager.register('LifoQueue', queue.LifoQueue)
[docs]class ProcessWorker(QObject): """ Setup and start worker threads Inherits from: QObject Buttons: None Signals: sig_start - Connected to the run method to start the process (settings|object) sig_finished - Emitted, if run method finishes (No objects) sig_error - Emitted, if an error occured (text|str) sig_status - Emitted to change the status (text|str, device|str, color|str) sig_notification - Emitted to send a notification (text|str) sig_plot_ctf - Emitted to plot ctf information (ctf_name|str, ctf_settings|object, settings|object) sig_plot_motion - Emitted to plot motion information (motion_name|str, motion_settings|object, settings|object) sig_plot_picking - Emitted to plot picking information (picking_name|str, picking_settings|str, settings|object) """ sig_set_project_directory = pyqtSignal(str, str, str) sig_start = pyqtSignal(object, object) sig_finished = pyqtSignal() sig_error = pyqtSignal(str) sig_status = pyqtSignal(str, object, str, str) sig_notification = pyqtSignal(str) signal_plot = pyqtSignal(object) def __init__(self, password, content_process, mount_directory, parent=None): """ Initialize object variables. Arguments: password - Sudo password content_process - Pipeline content mount_directory - Folder containing the mount points parent - Parent widget (default None) Return: None """ super(ProcessWorker, self).__init__(parent) # Variables self.password = password self.content_process = content_process self.mount_directory = mount_directory self.stop = None self.abort = None self.settings = {} self.idx_number = 0 self.idx_values = 1 # Events self.sig_start.connect(self.run)
[docs] def emit_plot_signals(self, folder_list, monitor): settings_emit = [] for name_no_feedback, name, prog_type, folder in self.settings['plot_emit']: try: settings_emit.append([ name, name_no_feedback, folder, self.settings, self.settings['Copy'][prog_type], ]) except KeyError: pass self.signal_plot.emit(settings_emit)
[docs] @pyqtSlot(object, object) def run(self, settings, restart_dict): """ Start the process. Arguments: settings - Transphire settings Return: None """ # Set settings self.abort = False self.sig_error.emit('NEW SESSION: {}'.format('Monitor' if settings['Monitor'] else 'Start')) content_process = cp.deepcopy(self.content_process) settings['copy_software_meta'] = True if settings['Input']['Input frames extension'] in ('dm4'): settings['Output extension'] = 'mrc' else: settings['Output extension'] = settings['Input']['Input frames extension'] # Set paths settings['software_meta_tar'] = os.path.join( settings['tar_folder'], 'Software_meta.tar' ) picking_name = settings['Copy']['Picking'] if picking_name not in ('False', 'Later'): settings[picking_name]['--threshold_old'] = settings[picking_name]['--threshold'] value = settings[picking_name]['--weights'] try: if '|||' in value: external_log, local_key = value.split('|||') with open(settings[external_log], 'r') as read: log_data = json.load(read) try: set_value = log_data[settings['current_set']][picking_name][local_key]['new_file'] except KeyError: set_value = value else: set_value = value except TypeError: set_value = value settings[picking_name]['--weights_old'] = set_value self.settings = settings self.sig_set_project_directory.emit(self.settings['project_folder'], self.settings['log_folder'], self.settings['error_folder']) manager_lifo = MyManager() manager_lifo.start() manager = mp.Manager() typ_dict = {} share_dict = {} bad_dict = {} queue_dict = {} full_content = [] for entry in content_process: for process in entry: for key in process: if 'WIDGETS' in key: continue process[key][self.idx_values]['group'], process[key][self.idx_values]['aim'] = \ process[key][self.idx_values]['group'].split(';') process[key][self.idx_values]['aim'] = process[key][self.idx_values]['aim'].split(',') share_dict[key] = manager.list() bad_dict[key] = manager.list() queue_dict[key] = manager_lifo.LifoQueue() #queue_dict[key] = manager.Queue() typ_dict[key] = manager.dict({ 'file_number': 0, 'spot': False, 'lost_input_meta': False, 'lost_input_frames': False, 'lost_work': False, 'lost_backup': False, 'lost_hdd': False, 'full_transphire': False, 'full_work': False, 'full_backup': False, 'full_hdd': False, 'unknown_error': False, 'delay_error': False, 'is_error': False, 'queue_list_time': 0.0, 'tar_idx': 0, 'max_running': int(self.settings['Pipeline'][key]), 'running': 0, 'do_update_count': 0, 'queue_list': manager.list(), 'queue_list_lock': manager.Lock(), 'queue_lock': manager.Lock(), 'save_lock': manager.Lock(), 'count_lock': manager.Lock(), 'error_lock': manager.Lock(), 'bad_lock': manager.Lock(), 'share_lock': manager.Lock(), 'write_lock': manager.Lock(), 'spot_dict': manager.dict(self.fill_spot_dict()), 'settings_file': '{0}/updated_settings_{1}.txt'.format( self.settings['log_folder'], key ), 'number_file': '{0}/last_filenumber_{1}.txt'.format( self.settings['log_folder'], key ), 'save_file': '{0}/Queue_{1}'.format( self.settings['queue_folder'], key ), 'done_file': '{0}/Queue_{1}_done'.format( self.settings['queue_folder'], key ), 'feedback_lock_file': '{0}/Queue_{1}_feedback_lock'.format( self.settings['queue_folder'], key ), 'list_file': '{0}/Queue_{1}_list'.format( self.settings['queue_folder'], key ), 'error_file': '{0}/Queue_{1}_error'.format( self.settings['error_folder'], key ), }) full_content.append([key, process[key]]) # Queue communication dictionary queue_com = { 'log': manager.Queue(), 'status': manager.Queue(), 'notification': manager.Queue(), 'error': manager.Queue(), 'info': manager.Queue(), } # Set stop variable to the return value of the pre_check if settings['Monitor']: self.stop = False self.emit_plot_signals([], monitor=True) self.run_monitor( typ_dict=typ_dict, queue_com=queue_com, full_content=full_content, ) else: self.stop = bool(self.pre_check_programs()) self.run_process( typ_dict=typ_dict, queue_com=queue_com, share_dict=share_dict, bad_dict=bad_dict, queue_dict=queue_dict, content_process=content_process, full_content=full_content, manager=manager, restart_dict=restart_dict, ) self.sig_finished.emit()
[docs] def run_monitor( self, typ_dict, queue_com, full_content ): """ Run the TranSPHIRE monitor process. Arguments: typ_dict - Dictionary for the queue types queue_com - Dictionary for queue communication Returns: None """ def check_int(number): try: int(number) except: return False else: return True while True: if self.stop: text = 'Not monitoring' color = tu.get_color('white') else: text = 'Monitoring' color = tu.get_color('Running') for entry in full_content: name = entry[0] key = '_'.join([key for key in name.split('_') if not check_int(key)]) try: with open(typ_dict[key]['save_file']) as read: nr_do = len([line for line in read.readlines() if line.strip()]) except FileNotFoundError: nr_do = 0 try: with open(typ_dict[key]['done_file']) as read: nr_done = len([line for line in read.readlines() if line.strip()]) except FileNotFoundError: nr_done = 0 queue_com['status'].put([ text, [ nr_do, nr_done ], name, color ]) try: self.check_queue(queue_com=queue_com) except BrokenPipeError: pass if self.stop: break else: time.sleep(3)
[docs] def run_process(self, typ_dict, queue_com, share_dict, bad_dict, queue_dict, content_process, full_content, manager, restart_dict, ): """ Run the TranSPHIRE process. Arguments: typ_dict - Dictionary for the queue types queue_com - Dictionary for queue communication Returns: None """ # Set default values for folder list and thread list folder_list = ['stack_folder', 'meta_folder'] use_threads_list = ['Meta', 'Find', 'Import'] data_frame = tu.DataFrame(manager, self.settings['data_frame']) # Decide if one will use copy to hdd self.settings['Copy']['Copy_to_hdd'] = self.settings['Copy']['Copy to hdd'] if self.settings['Copy']['Copy to hdd'] != 'False': if self.settings['Copy']['Copy to hdd'] == 'Later': pass else: try: for folder in glob.glob('{0}/*'.format(self.settings['copy_to_hdd_folder_feedback_0'])): if not os.path.ismount(folder): try: os.listdir(folder) except PermissionError: pass else: self.sig_error.emit( 'HDD not mounted or not well unmounted!' + 'Please remount if you want to use HDD' ) return None else: pass except FileNotFoundError: self.sig_error.emit( 'HDD not mounted or not well unmounted!' + 'Please remount if you want to use HDD!' ) return None use_threads_list.append('Copy_to_hdd') else: pass names = [ entry.replace('_entries', '') for entry in self.settings['Copy'] if entry.endswith('_entries') and entry.replace('_entries', '').replace('_', ' ') in self.settings['Copy'] ] for entry in names: if 'copy_to_' in entry.lower(): # Fill folder list and threads list for name in ['work', 'backup']: short_name = 'Copy_to_{0}'.format(name) long_name = 'Copy to {0}'.format(name) folder_name = '{0}_folder_feedback_0'.format(short_name.lower()) user_name = '{0}_user'.format(short_name) self.settings['Copy'][short_name] = \ self.settings['Copy'][long_name] if self.settings['Copy'][long_name] != 'False': if self.settings['Copy'][long_name] == 'Later': pass elif not os.path.ismount(self.settings[folder_name]): try: os.listdir(self.settings[folder_name]) except PermissionError: pass except FileNotFoundError: self.sig_error.emit( '{0} folder {1} not mounted!'.format( name, self.settings['Copy'][long_name] ) ) return None except OSError as err: if 'Required key' in str(err): self.sig_error.emit( '\n'.join([ '{0} folder {1} no longer mounted! '.format( name, self.settings['Copy'][long_name] ), 'Use kinit to refresh the key' ]) ) return None else: print( '\n'.join([ 'Unknown Error occured!', 'Please contact the TranSPHIRE authors!' ]) ) raise else: self.sig_error.emit( '{0} folder {1} not mounted!'.format( name, self.settings['Copy'][long_name] ) ) return None else: pass try: self.settings[user_name] = self.settings[ 'user_{0}'.format( self.settings['Copy'][long_name].replace(' ', '_') ) ] except KeyError: self.sig_error.emit('{0} needs remount! '.format(name)) return None use_threads_list.append(short_name) else: pass elif 'ctf' == entry.lower(): # Set CTF settings if self.settings['Copy']['CTF'] != 'False': folder_list.append('ctf_folder_feedback_0') use_threads_list.append('CTF') ctf_name = self.settings['Copy']['CTF'] try: if self.settings[ctf_name]['Use movies'] == 'True': self.settings['Copy']['CTF_frames'] = 'True' self.settings['Copy']['CTF_sum'] = 'False' else: self.settings['Copy']['CTF_frames'] = 'False' self.settings['Copy']['CTF_sum'] = 'True' except KeyError: self.settings['Copy']['CTF_frames'] = 'False' self.settings['Copy']['CTF_sum'] = 'True' else: self.settings['Copy']['CTF_frames'] = 'False' self.settings['Copy']['CTF_sum'] = 'False' else: if self.settings['Copy'][entry] != 'False': folder_list.append('{0}_folder_feedback_0'.format(entry.lower())) use_threads_list.append(entry) else: pass # Fill different dictionarys with process information gpu_mutex_dict = dict([ ('{0}_{1}'.format(idx, idx_2), [manager.RLock(), mp.Value('i', 0)]) if idx_2 != -1 else (str(idx), [manager.RLock(), mp.Value('i', 0)]) for idx in range(10) for idx_2 in range(-1, 10) ]) # Shared dictionary shared_dict = { 'share': share_dict, 'bad': bad_dict, 'queue': queue_dict, 'global_update_lock': manager.Lock(), 'translate_lock': manager.Lock(), 'ctf_star_lock': manager.Lock(), 'ctf_partres_lock': manager.Lock(), 'motion_star_lock': manager.Lock(), 'motion_star_relion3_lock': manager.Lock(), 'motion_txt_lock': manager.Lock(), 'gpu_lock': gpu_mutex_dict, 'gpu_lock_lock': manager.Lock(), 'data_frame_lock': manager.Lock(), 'typ': typ_dict, } keep_list = [ self.settings['set_folder_raw'], self.settings['tar_folder'], self.settings['meta_folder'], self.settings['stack_folder'], ] try: restart_feedback = restart_dict['feedback'] except KeyError: restart_feedback = False if restart_feedback: try: shutil.move( os.path.join(self.settings['project_folder'], '000_Feedback_results'), self.settings['restart_backup_folder'], ) except FileNotFoundError: pass try: shutil.move( self.settings['feedback_file'], self.settings['restart_backup_folder'], ) except FileNotFoundError: pass else: keep_list.append('000_Feedback_results') try: with open(self.settings['feedback_file'], 'r') as read: content = read.read().strip() self.settings['do_feedback_loop'] = int(content) except FileNotFoundError: pass self.settings['is_superres'] = mp.Value('i', 2) self.settings['do_feedback_loop'] = mp.Value('i', self.settings['do_feedback_loop']) if self.settings['do_feedback_loop'].value == 0: status = 'Done' color = 'Finished' else: status = 'Running' color = 'Running' queue_com['status'].put([ '{0:02d}|{1:02d}'.format( int(self.settings['Output']['Number of feedbacks']) - self.settings['do_feedback_loop'].value, int(self.settings['Output']['Number of feedbacks']) ), [status], 'Feedbacks', tu.get_color(color) ]) with open(self.settings['feedback_file'], 'w') as write: write.write(str(self.settings['do_feedback_loop'].value)) # Fill process queues for entry in content_process: for process in entry: for key in process: if 'WIDGETS' in key: continue elif process[key][1]['name'].startswith('Copy_to'): continue self.prefill_queue( shared_dict=shared_dict, entry=process[key][1], restart_dict=restart_dict, keep_list=keep_list, ) for entry in content_process: for process in entry: for key in process: if 'WIDGETS' in key: continue elif not process[key][1]['name'].startswith('Copy_to'): continue self.prefill_queue( shared_dict=shared_dict, entry=process[key][1], restart_dict=restart_dict, keep_list=keep_list, ) queue_com['info'].put('Current settings saved to: {0}'.format(self.settings['set_folder'])) self.check_queue(queue_com=queue_com) # Unlock the Class2d queue in case of a TranSPHIRE crash during 2D classification try: with open(shared_dict['typ']['Class2d']['feedback_lock_file'], 'r') as read: in_feedback = '1' in read.read() if in_feedback and shared_dict['queue']['Select2d'].empty() and shared_dict['queue']['Train2d'].empty(): with open(shared_dict['typ']['Class2d']['feedback_lock_file'], 'w') as write: write.write('0') except FileNotFoundError: pass self.emit_plot_signals(folder_list=folder_list, monitor=False) # Define file number and check if file already exists if self.settings['Output']['Rename micrographs'] == 'True': try: with open(shared_dict['typ']['Find']['number_file'], 'r') as read: shared_dict['typ']['Find']['file_number'] = int(read.readline()) except FileNotFoundError: shared_dict['typ']['Find']['file_number'] = int(self.settings['Output']['Start number']) - 1 message = '{0}: New run detected - Set start number to {1}\n'.format( 'Find', self.settings['Output']['Start number'], ) else: if self.settings['Output']['Start number'] != '0' and \ int(self.settings['Output']['Start number']) > int(shared_dict['typ']['Find']['file_number']): message = '{0}: Filenumber {1} provided and larger than last file_number {2}!\nContinue with {1}\n'.format( 'Find', self.settings['Output']['Start number'], shared_dict['typ']['Find']['file_number'], ) shared_dict['typ']['Find']['file_number'] = int(self.settings['Output']['Start number']) - 1 elif self.settings['Output']['Start number'] != '0' and \ int(self.settings['Output']['Start number']) <= int(shared_dict['typ']['Find']['file_number']): self.stop = True message = '{0}: Filenumber {1} provided and smaller equals the last file_number {2}!\nPlease adjust the Start number to a valid, i.e. not used yet, value or 0\n'.format( 'Find', self.settings['Output']['Start number'], shared_dict['typ']['Find']['file_number'], ) elif self.settings['Output']['Start number'] == '0': message = '{0}: Continue run detected! Continue with file number: {1}'.format( 'Find', shared_dict['typ']['Find']['file_number'], ) else: self.stop = True message = 'Unreachable code! Please contact the TranSPHIRE authors.' queue_com['error'].put(message) queue_com['notification'].put(message) ##### # # shared_dict['typ']['Find']['file_number'] = int(self.settings['Output']['Start number']) - 1 # # if self.settings['Output']['Rename micrographs'] == 'True': # new_name = '{0}/{1}{2:0{3}d}{4}'.format( # self.settings['meta_folder'], # self.settings['Output']['Rename prefix'], # shared_dict['typ']['Find']['file_number']+1, # len(self.settings['Output']['Estimated mic number']), # self.settings['Output']['Rename suffix'] # ) # # if os.path.exists('{0}_krios_sum.mrc'.format(new_name)): # old_filenumber = shared_dict['typ']['Find']['file_number'] # try: # with open(shared_dict['typ']['Find']['number_file'], 'r') as read: # shared_dict['typ']['Find']['file_number'] = int(read.readline()) # except FileNotFoundError: # shared_dict['typ']['Find']['file_number'] = 0 # if self.settings['Output']['Increment number'] == 'True': # message = '{0}: Filenumber {1} already exists!\n'.format( # 'Find', # old_filenumber+1 # ) + \ # 'Last one used: {0} - Continue with {1}'.format( # shared_dict['typ']['Find']['file_number'], # shared_dict['typ']['Find']['file_number']+1 # ) # queue_com['error'].put(message) # queue_com['notification'].put(message) # else: # self.stop = True # message = '{0}: Filenumber {1} already exists!\n'.format( # 'Find', # old_filenumber+1 # ) + \ # 'Check Startnumber! Last one used: {0}'.format(shared_dict['typ']['Find']['file_number']) # queue_com['error'].put(message) # queue_com['notification'].put(message) # else: # pass # else: # pass # Start threads thread_list = [] use_threads_set = set(use_threads_list) for key, settings_content in full_content: content_settings = settings_content[self.idx_values] number = self.settings['Pipeline'][key] names = None if (key == 'Find' and number != '1') or (key == 'Meta' and number != '1'): self.stop = True message = 'Find and Meta are not allowed to have more than 1 process running!' queue_com['error'].put(message) queue_com['notification'].put(message) names = [key] elif number == '1': names = [key] else: names = ['{0}_{1}'.format(key, idx) for idx in range(int(number))] assert names is not None, (key, names, number) for name in names: thread_obj = ProcessThread( shared_dict=shared_dict, name=name, content_settings=content_settings, queue_com=queue_com, settings=self.settings, mount_directory=self.mount_directory, password=self.password, use_threads_set=use_threads_set, stop=mp.Value('i', self.stop), abort=mp.Value('i', self.abort), has_finished=mp.Value('i', 0), data_frame=data_frame, parent=self, ) thread = mp.Process(target=self.run_in_parallel, args=(thread_obj,)) thread.start() thread_list.append([thread, name, content_settings, thread_obj]) self.check_queue(queue_com=queue_com) time.sleep(1) # Run until the user stops the processes if not self.stop: while True: try: self.check_queue(queue_com=queue_com) except BrokenPipeError: pass if self.stop: break else: pass time.sleep(3) else: self.check_queue(queue_com=queue_com) # Indicate to stop all processes for key, settings_content in full_content: typ = settings_content[self.idx_values]['name'] size = shared_dict['queue'][typ].qsize() self.sig_status.emit( 'Stopping', [size, shared_dict['typ'][typ]['file_number']], key, tu.get_color('Stopped') ) self.check_queue(queue_com=queue_com) # Send the stop signals to all threads for _, _, _, thread_obj in thread_list: thread_obj.stop.value = True thread_obj.abort.value = self.abort for _, name, _, thread_obj in thread_list: queue_com['log'].put('Waiting for {0} to finish!'.format(name)) while not thread_obj.has_finished.value: time.sleep(1) self.check_queue(queue_com=queue_com) queue_com['status'].put([ '{0:02d}|{1:02d}'.format( int(self.settings['Output']['Number of feedbacks']) - self.settings['do_feedback_loop'].value, int(self.settings['Output']['Number of feedbacks']) ), ['Not running'], 'Feedbacks', tu.get_color('white') ]) queue_com['log'].put('All done!') self.check_queue(queue_com=queue_com) final_sizes = [] for key, settings_content in full_content: typ = settings_content[self.idx_values]['name'] size = shared_dict['queue'][typ].qsize() final_sizes.append([key, size, typ]) # Wait for all threads to finish for thread, name, _, thread_obj in thread_list: thread.join() del thread_obj time.sleep(0.1) for key, size, typ in final_sizes: self.sig_status.emit( '00|{0:02d}'.format(shared_dict['typ'][typ]['max_running']), [size, shared_dict['typ'][typ]['file_number']], key, tu.get_color('white') ) time.sleep(1)
[docs] @staticmethod def run_in_parallel(thread_obj): thread_obj.run()
[docs] def pre_check_programs(self): """ Check, if all programs the user wants to use are available. Arguments: None Return: True, if programs exist, else False """ default_unique_types = tu.get_unique_types() error = False check_files = [] for entry in default_unique_types: if entry == 'Compress': continue check_files.append(['Path', self.settings['Copy'][entry]]) check_files.append(['Path', 'IMOD header']) try: if self.settings['Copy']['Train2d'] not in ('False', 'Later'): check_files.append(['Path', 'cryolo_gui.py']) check_files.append(['Path', 'cryolo_evaluation.py']) check_files.append(['Path', 'sp_pipe.py']) else: pass except KeyError: pass try: if self.settings['Copy']['Select2d'] not in ('False', 'Later'): check_files.append(['Path', 'sp_header.py']) else: pass except KeyError: pass try: if self.settings['Copy']['Class2d'] not in ('False', 'Later'): check_files.append(['Path', 'e2bdb.py']) check_files.append(['Path', 'mpirun']) else: pass except KeyError: pass try: if self.settings['Copy']['Auto3d'] not in ('False', 'Later'): check_files.append(['Path', 'sp_pipe.py']) else: pass except KeyError: pass try: if [entry for entry in map(lambda x: x not in ('False', 'Later'), ['Class2d', 'CTF', 'Extract', 'Picking', 'Select2d']) if entry]: check_files.append(['Path', 'e2proc2d.py']) else: pass except KeyError: pass if self.settings['Copy']['Compress'] == 'Compress cmd': check_files.append([self.settings['Copy']['Compress'], '--command_compress_path']) check_files.append([self.settings['Copy']['Compress'], '--command_uncompress_path']) else: pass for typ, name in check_files: if name != 'False' and name != 'Later': try: is_file = os.path.isfile(self.settings[typ][name]) except KeyError: self.sig_error.emit( '{0} path not valid or disabled (Advanced, Rare)! Please adjust it!'.format(name) ) error = True else: if shutil.which(self.settings[typ][name]) is not None: pass elif not is_file: self.sig_error.emit( '{0} path not valid or disabled (Advanced, Rare)! Please adjust it!'.format(name) ) error = True else: pass else: pass ctf_name = self.settings['Copy']['CTF'] motion_name = self.settings['Copy']['Motion'] try: if self.settings[ctf_name]['Use movies'] and motion_name in ('Later', 'False'): self.sig_error.emit('Cannot automatically adjust the pixel size for the CTF output files in case of binned micrographs. Remember to adjust it manually if necessary.') except KeyError: pass train2d_name = self.settings['Copy']['Train2d'] select2d_name = self.settings['Copy']['Select2d'] class2d_name = self.settings['Copy']['Class2d'] extract_name = self.settings['Copy']['Extract'] if int(self.settings['Output']['Number of feedbacks']) != 0: if 'Later' in (train2d_name, select2d_name, extract_name, class2d_name): self.sig_error.emit('Number of feedbacks provided and Extract, Class2d, Select2d, or Train2d set to Later. Remember that particle extraction and subsequent runs will stall until a Extract, Class2d, Select2d, or Train2d program is provided.') if 'False' in (train2d_name, select2d_name, extract_name, class2d_name): self.sig_error.emit('Number of feedbacks provided and Extract, Class2d, Select2d, or Train2d set to False. This is not possible, because the program will stall infinitely.') error = True auto3d_name = self.settings['Copy']['Auto3d'] if auto3d_name not in ('False', 'Later'): if self.settings['Copy']['Class2d'] in ('False') and \ not self.settings[auto3d_name]['input_volume']: self.sig_error.emit('An input volume needs to be provided to Auto3d if no 2D classification should be performed!') error = True if self.settings[auto3d_name]['Use SSH'] == 'True': if self.settings['Copy']['Copy to work'] != 'False' and self.settings['Copy']['Copy to work'] != 'Later': device_name = [ entry for entry in self.settings['Mount'][self.settings['Copy']['Copy to work']]['IP'].split('/') if entry.strip() ][0] ssh_command = 'ssh -o "StrictHostKeyChecking no" {0}@{1} ls'.format( self.settings[auto3d_name]['SSH username'], device_name ) child = pe.spawnu(ssh_command) try: expect_idx = child.expect( [ "{0}@{1}'s password:".format(self.settings[auto3d_name]['SSH username'], device_name), pe.EOF ], timeout=20 ) except pe.exceptions.TIMEOUT: self.sig_error.emit('TIMEOUT: SSH "ls" command failed! Username or Password in Auto3d might be wrong or Copy to work is not consistent!!') error = True if error: pass elif expect_idx == 0 and self.settings[auto3d_name]['Need SSH password'] == 'True': child.sendline(self.settings[auto3d_name]['SSH password']) elif expect_idx == 1 and self.settings[auto3d_name]['Need SSH password'] == 'True': self.sig_error.emit('SSH password provided, but not needed. Please check if passwordless authentification is enabled for this mount point.') error = True elif expect_idx == 0 and self.settings[auto3d_name]['Need SSH password'] == 'False': self.sig_error.emit('SSH password expected, but none provided!') error = True elif expect_idx == 1 and self.settings[auto3d_name]['Need SSH password'] == 'False': pass try: child.expect(pe.EOF, timeout=20) except pe.exceptions.TIMEOUT: self.sig_error.emit('TIMEOUT Password: SSH ls command failed! Username or Password in Auto3d might be wrong or Copy to work is not consistent!!') error = True else: self.sig_error.emit('"Copy to work" not specified for Auto3d ssh command.') error = True return error
[docs] def check_queue(self, queue_com): """ Check the content of the queues and react accordingly. Arguments: queue_com Return: None """ for key in queue_com: while not queue_com[key].empty(): if key == 'status': status, numbers, device, color = queue_com['status'].get() self.sig_status.emit(status, numbers, device, color) elif key == 'notification': notification = queue_com['notification'].get() self.sig_notification.emit(notification) elif key == 'error': error = queue_com['error'].get() self.sig_error.emit(error) elif key == 'info': error = queue_com['info'].get() self.sig_error.emit(error) elif key == 'log': log = queue_com['log'].get() try: with open(os.path.join(self.settings['log_folder'], 'sys_log.txt'), 'a+') as write: write.write('{0}\n'.format(log)) except FileNotFoundError: pass else: print( 'Processworker - check_queue:', key, ' Not known!', ' Unreachable code!', ' Please contact the TranSPHIRE authors' )
[docs] def fill_spot_dict(self): """ Fill the spot dictionary. Arguments: None Return: Spot dictionary """ dictionary = {} spot_file = self.settings['spot_file'] try: with open(spot_file, 'r') as read: lines = [line.rstrip() for line in read.readlines()] for line in lines: key, number = line.split('\t') dictionary[key] = number except FileNotFoundError: with open(spot_file, 'w') as write: write.write('') return dictionary
[docs] def prefill_queue(self, shared_dict, entry, restart_dict, keep_list): """ Prefill the queues for continue mode Arguments: shared_dict - Shared dictionary entry - Name of the queue process Return: None """ key = entry['name'] share = entry['group'] shared_dict_typ = shared_dict['typ'][key] number_file = shared_dict_typ['number_file'] settings_file = shared_dict_typ['settings_file'] feedback_lock_file = shared_dict_typ['feedback_lock_file'] save_file = shared_dict_typ['save_file'] done_file = shared_dict_typ['done_file'] list_file = shared_dict_typ['list_file'] share_list = shared_dict['share'][share] queue = shared_dict['queue'][key] queue_list = shared_dict_typ['queue_list'] if self.settings["Input"]["Software"] == "Just Stack": self.settings['copy_software_meta'] = False check_state = 0 try: check_state = restart_dict[key] except KeyError: pass prepend_list = [] if key.startswith('Copy_to'): for root, _, files in os.walk(self.settings['set_folder']): for entry in files: prepend_list.append(os.path.join(root, entry)) if check_state in (1, 2): try: shutil.move( feedback_lock_file, self.settings['restart_backup_folder'], ) except FileNotFoundError: pass try: shutil.move( self.settings['{}_folder_feedback_{}'.format(key.lower(), self.settings['do_feedback_loop'].value)], self.settings['restart_backup_folder'], ) except FileNotFoundError: pass try: shutil.move( self.settings['{}_folder_feedback_0'.format(key.lower())], self.settings['restart_backup_folder'], ) except FileNotFoundError: pass except shutil.Error: pass try: shutil.move( number_file, self.settings['restart_backup_folder'], ) except FileNotFoundError: pass if restart_dict['feedback']: try: shutil.move( settings_file, self.settings['restart_backup_folder'], ) except FileNotFoundError: pass special_cases = ('Extract', 'Train2d', 'CTF') lines = [] if check_state == 2 or key in special_cases: for entry in (save_file, done_file): if os.path.exists(entry): with open(entry, 'r') as read: lines.extend([line.strip() for line in read.readlines() if line.strip()]) else: with open(entry, 'w'): pass remove_patterns = [] if check_state in (1, 2) and key == 'CTF': if restart_dict['Motion'] != 0: remove_patterns = [ '^CTF_sum', ] elif check_state == 1 and key == 'Extract': if restart_dict['Picking'] == 1 and restart_dict['CTF'] == 1: # Reset Motion correction and therefore picking and CTF remove_patterns = [ '.*', ] elif restart_dict['Picking'] == 2 and restart_dict['CTF'] == 2: # Reset Picking and CTF but not motion correction remove_patterns = [ '.*\.box', '.*partres.txt', ] elif restart_dict['Picking'] == 2: # Only reset picking remove_patterns = [ '.*\.box', ] elif restart_dict['Picking'] == 1 and restart_dict['is_ctf_movie']: # Only reset picking remove_patterns = [ '.*\.box', ] elif restart_dict['CTF'] == 2: # Only reset CTF remove_patterns = [ '.*partres.txt', ] elif restart_dict['Picking'] == 1: assert False, 'Picking cannot be 1 without CTF be 1' elif restart_dict['CTF'] == 1: assert False, 'Picking cannot be 1 without CTF be 1' elif check_state == 1 and key == 'Train2d': if restart_dict['Motion'] == 2: remove_patterns = [ '.*' ] else: remove_patterns = [ '.*\.hdf' ] for pattern in remove_patterns: lines = sorted([entry for entry in lines if re.search(pattern, entry) is None]) with open(save_file, 'w') as write: write.write('\n'.join(sorted(lines)) + '\n') for line in lines: share_list.append(line.split('|||')[-1]) queue.put(line) with open(done_file, 'w'): pass with open(list_file, 'w'): pass elif check_state == 0: try: keep_list.append(os.path.basename(self.settings['{}_folder_feedback_0'.format(key.lower())])) except KeyError: pass if os.path.exists(save_file): with open(save_file, 'r') as read: lines = [line.strip() for line in read.readlines() if line.strip()] if key.startswith('Copy_to'): if '000_Feedback_results' in keep_list: keep_feedback = True else: keep_feedback = False good_lines = [] for line in lines: if os.path.dirname(line) == self.settings['project_folder']: good_lines.append(line) else: for entry in keep_list: if entry in line: if '000_Feedback_results' in line and not keep_feedback: pass else: good_lines.append(line) lines = [entry for entry in np.unique(good_lines) if os.path.exists(entry)] with open(save_file, 'w') as write: write.write('\n'.join(lines) + '\n') for line in lines: if self.settings['software_meta_tar'] in line: self.settings['copy_software_meta'] = False else: pass share_list.append(line.split('|||')[-1]) queue.put(line) else: with open(save_file, 'w'): pass if os.path.exists(done_file): with open(done_file, 'r') as read: lines = [line.strip() for line in read.readlines() if line.strip()] shared_dict_typ['file_number'] = len(lines) for line in lines: if self.settings['software_meta_tar'] in line: self.settings['copy_software_meta'] = False else: pass else: with open(done_file, 'w'): pass if os.path.exists(list_file): with open(list_file, 'r') as read: lines = [line.strip() for line in read.readlines() if line.strip()] for line in lines: if line: queue_list.append(line) if self.settings['software_meta_tar'] in line: self.settings['copy_software_meta'] = False else: pass else: with open(list_file, 'w'): pass # Tar index if not queue_list: tar_files = glob.glob(os.path.join( self.settings['tar_folder'], '{0}_*.tar'.format(key) )) if tar_files: shared_dict_typ['tar_idx'] = max([int(re.search('{0}_.*([0-9]{{6}})\.tar'.format(re.escape(key)), entry).group(1)) for entry in tar_files]) + 1 else: shared_dict_typ['tar_idx'] = 0 else: try: shared_dict_typ['tar_idx'] = max([int(re.search('{0}_.*([0-9]{{6}})\.tar'.format(re.escape(key)), entry).group(1)) for entry in queue_list if re.search('{0}_.*([0-9]{{6}})\.tar'.format(re.escape(key)), entry)]) except ValueError: pass if prepend_list: with open(save_file, 'a') as append: append.write('\n'.join(prepend_list) + '\n') for entry in prepend_list: share_list.append(entry.split('|||')[-1]) queue.put(entry)
from .processthread import ProcessThread