Source code for catkit.hub.folderreader

from .cathubsqlite import CathubSQLite
from .tools import get_bases
from .import ase_tools

from datetime import date
import numpy as np
import os
import copy
import json


[docs]class FolderReader: """ Class for reading data from organized folders and writing to local CathubSQLite database. Folders should be arranged with make_folders_template and are read in the order: level: 0 folder_name 1 |-- publication 2 |-- dft_code 3 |-- dft_functional 4 |-- gas 4 |-- metal1 5 |-- facet 6 |-- reaction Parameters ---------- foldername: str debug: bool default is False. Choose True if the folderreader should continue in spite of errors. update: bool Update data if allready present in database file. defalt is True """ def __init__(self, folder_name, debug=False, strict=True, verbose=False, update=True): self.debug = debug self.strict = strict self.verbose = verbose self.update = update self.catbase, self.data_base, self.user, self.user_base \ = get_bases(folder_name=folder_name) self.user_base_level = len(self.user_base.split("/")) self.pub_level = 1 self.DFT_level = 2 self.XC_level = 3 self.reference_level = 4 self.slab_level = 5 self.reaction_level = 6 self.final_level = 6
[docs] def read(self, skip=[], goto_metal=None, goto_reaction=None): """ Get reactions from folders. Parameters ---------- skip: list of str list of folders not to read goto_reaction: str Skip ahead to this metal goto_reaction: Skip ahead to this reacion """ self.omit_folders = [] self.coverages = None if len(skip) > 0: for skip_f in skip: self.omit_folders.append(skip_f) found_reaction = False for root, dirs, files in os.walk(self.user_base): for omit_folder in self.omit_folders: # user specified omit_folder if omit_folder in dirs: dirs.remove(omit_folder) level = len(root.split("/")) - self.user_base_level if level == self.pub_level: self.read_pub(root) if level == self.DFT_level: self.DFT_code = read_name_from_folder(root) if level == self.XC_level: self.DFT_functional = read_name_from_folder(root) self.read_gas(root + '/gas/') if level == self.reference_level: if 'gas' in root.split("/")[-1]: continue if goto_metal is not None: if root.split("/")[-1] == goto_metal: goto_metal = None else: dirs[:] = [] # don't read any sub_dirs continue self.read_bulk(root, files) if level == self.slab_level: self.read_slab(root, files) if level == self.reaction_level: if goto_reaction is not None: if root.split("/")[-1] == goto_reaction: goto_reaction = None else: dirs[:] = [] # don't read any sub_dirs continue self.read_reaction(root, files) if level == self.final_level: self.read_energies(root, files) if self.key_value_pairs_reaction is not None: yield self.key_value_pairs_reaction
[docs] def write(self, skip=[], goto_reaction=None): for key_values in self.read(skip=skip, goto_reaction=goto_reaction): with CathubSQLite(self.cathub_db) as db: id = db.check( key_values['chemical_composition'], key_values['reaction_energy']) if id is None: id = db.write(key_values) print('Written to reaction db row id = {}'.format(id)) elif self.update: db.update(id, key_values) print('Updated reaction db row id = {}'.format(id)) else: print('Already in reaction db with row id = {}'.format(id))
[docs] def write_publication(self, pub_data): with CathubSQLite(self.cathub_db) as db: pid = db.check_publication(self.pub_id) if pid is None: pid = db.write_publication(pub_data) print('Written to publications db row id = {}'.format(pid)) return pid
[docs] def read_pub(self, root): pub_folder = root.split('/')[-1] publication_keys = {} try: pub_data = json.load(open(root + '/publication.txt', 'r')) if 'url' in pub_data.keys(): del pub_data['url'] self.title = pub_data['title'] self.authors = pub_data['authors'] self.year = pub_data['year'] if 'doi' not in pub_data: pub_data.update({'doi': None}) print('ERROR: No doi') self.doi = None else: self.doi = pub_data['doi'] if 'tags' not in pub_data: pub_data.update({'tags': None}) print('ERROR: No tags') self.tags = None for key, value in pub_data.items(): if isinstance(value, list): value = json.dumps(value) else: try: value = int(value) except BaseException: pass except Exception as e: print( 'ERROR: insufficient publication info {e}'.format( **locals())) self.doi = None pub_data = {'title': None, 'authors': None, 'journal': None, 'volume': None, 'number': None, 'pages': None, 'year': None, 'publisher': None, 'doi': None, 'tags': None } try: self.energy_corrections = json.load( open(root + '/energy_corrections.txt', 'r')) except BaseException: self.energy_corrections = {} if pub_data['title'] is None: self.title = root.split('/')[-1] pub_data.update({'title': self.title}) if pub_data['authors'] is None: self.authors = [self.user] pub_data.update({'authors': self.authors}) if pub_data['year'] is None: self.year = date.today().year pub_data.update({'year': self.year}) self.pub_id = self.authors[0].split(',')[0].split(' ')[0] + \ self.title.split(' ')[0].split('_')[0] + \ str(self.year) self.cathub_db = '{}{}.db'.format(self.data_base, self.pub_id) pub_data.update({'pub_id': self.pub_id}) self.pid = self.write_publication(pub_data)
[docs] def read_gas(self, root): files = [f for f in os.listdir(root) if os.path.isfile(root + '/' + f)] traj_files = ['{}/{}'.format(root, f) for f in files if f.endswith('.traj')] self.ase_ids_gas = {} self.traj_gas = {} for traj in traj_files: ase_id = None found = False if not ase_tools.check_traj(traj, self.strict, False): return chemical_composition = \ ''.join(sorted(ase_tools.get_chemical_formula( traj, mode='all'))) chemical_composition_hill = ase_tools.get_chemical_formula( traj, mode='hill') energy = ase_tools.get_energies([traj]) key_value_pairs = {"name": chemical_composition_hill, 'state': 'gas', 'epot': energy} id, ase_id = ase_tools.check_in_ase( traj, self.cathub_db) if ase_id is None: ase_id = ase_tools.write_ase(traj, self.cathub_db, self.user, **key_value_pairs) elif self.update: ase_tools.update_ase(self.cathub_db, id, **key_value_pairs) self.ase_ids_gas.update({chemical_composition: ase_id}) self.traj_gas.update({chemical_composition: traj})
[docs] def read_bulk(self, root, files): self.metal, self.crystal = root.split('/')[-1].split('_', 1) print('------------------------------------------------------') print(' Surface: {}'.format(self.metal)) print('------------------------------------------------------') self.ase_ids = {} traj_bulk = ['{}/{}'.format(root, f) for f in files if f.endswith('.traj') and 'bulk' in f][0] ase_id = None if not ase_tools.check_traj(traj_bulk, self.strict, False): return energy = ase_tools.get_energies([traj_bulk]) key_value_pairs = {"name": self.metal, 'state': 'bulk', 'epot': energy} id, ase_id = ase_tools.check_in_ase( traj_bulk, self.cathub_db) # self.ase_db) if ase_id is None: ase_id = ase_tools.write_ase(traj_bulk, self.cathub_db, self.user, **key_value_pairs) elif self.update: ase_tools.update_ase(self.cathub_db, id, **key_value_pairs) self.ase_ids.update({'bulk' + self.crystal: ase_id})
[docs] def read_slab(self, root, files): self.facet = root.split('/')[-1] self.ase_facet = 'x'.join(list(self.facet)) self.empty_traj = [ '{}/{}'.format(root, f) for f in files if f.endswith('.traj') and 'empty' in f][0] ase_id = None if not ase_tools.check_traj(self.empty_traj, self.strict, False): return energy = ase_tools.get_energies([self.empty_traj]) key_value_pairs = {"name": self.metal, 'state': 'star', 'epot': energy} key_value_pairs.update({'species': ''}) id, ase_id = ase_tools.check_in_ase( self.empty_traj, self.cathub_db) # self.ase_db) if ase_id is None: ase_id = ase_tools.write_ase(self.empty_traj, self.cathub_db, self.user, **key_value_pairs) elif self.update: ase_tools.update_ase(self.cathub_db, id, **key_value_pairs) self.ase_ids.update({'star': ase_id})
[docs] def read_reaction(self, root, files): folder_name = root.split('/')[-1] self.reaction, self.sites = ase_tools.get_reaction_from_folder( folder_name) # reaction dict print('----------- REACTION: {} --> {} --------------' .format('+'.join(self.reaction['reactants']), '+'.join(self.reaction['products']))) self.reaction_atoms, self.prefactors, self.prefactors_TS, \ self.states = ase_tools.get_reaction_atoms(self.reaction) """Create empty dictionaries""" r_empty = ['' for n in range(len(self.reaction_atoms['reactants']))] p_empty = ['' for n in range(len(self.reaction_atoms['products']))] self.traj_files = {'reactants': r_empty[:], 'products': p_empty[:]} key_value_pairs = {} """ Match reaction gas species with their traj file """ for key, mollist in self.reaction_atoms.items(): for i, molecule in enumerate(mollist): if self.states[key][i] == 'gas': assert molecule in self.ase_ids_gas.keys() self.traj_files[key][i] = self.traj_gas[molecule] species = ase_tools.clear_prefactor( self.reaction[key][i]) key_value_pairs.update( {'species': ase_tools.clear_state(species)}) self.ase_ids.update({species: self.ase_ids_gas[molecule]})
[docs] def read_energies(self, root, files): self.key_value_pairs_reaction = None if 'TS' in self.traj_files: del self.traj_files['TS'] if 'TSempty' in self.traj_files: del self.traj_files['TSempty'] traj_slabs = [f for f in files if f.endswith('.traj') and 'gas' not in f] if not self.debug: assert len(traj_slabs) > 0, \ 'Need at least one file in {}!'.format(root) else: try: assert len(traj_slabs) > 0 except BaseException: print('Need at least one file in {}!'.format(root)) return n_atoms = np.array([]) ts_i = None tsempty_i = None chemical_composition_slabs = [] breakloop = False for i, f in enumerate(traj_slabs): if 'empty' in f and 'TS' in f: tsempty_i = i elif 'TS' in f: ts_i = i traj = '{}/{}'.format(root, f) if not ase_tools.check_traj(traj, self.strict, False): return chemical_composition_slabs = \ np.append(chemical_composition_slabs, ase_tools.get_chemical_formula(traj, mode='all')) n_atoms = np.append(n_atoms, ase_tools.get_number_of_atoms(traj)) traj_empty = self.empty_traj empty_atn = ase_tools.get_atomic_numbers(traj_empty) prefactor_scale = copy.deepcopy(self.prefactors) for key1, values in prefactor_scale.items(): prefactor_scale[key1] = [1 for v in values] key_value_pairs = {} key_value_pairs.update({'name': ase_tools.get_chemical_formula(traj_empty), # 'site': self.sites, 'facet': self.ase_facet, 'layers': ase_tools.get_n_layers(traj_empty), 'state': 'star'}) """ Write empty slab to ASE""" ase_id = None id, ase_id = ase_tools.check_in_ase(traj_empty, self.cathub_db) for key, mollist in self.reaction_atoms.items(): if '' in mollist: n = mollist.index('') self.traj_files[key][n] = traj_empty key_value_pairs.update({'species': ''}) if ase_id is None: ase_id = ase_tools.write_ase(traj_empty, self.cathub_db, self.user, **key_value_pairs) elif self.update: ase_tools.update_ase(self.cathub_db, id, **key_value_pairs) self.ase_ids.update({'star': ase_id}) """ Handle other slabs""" for i, f in enumerate(traj_slabs): traj = '{}/{}'.format(root, f) atns = ase_tools.get_atomic_numbers(traj) if not (np.array(atns) > 8).any() and \ (np.array(empty_atn) > 8).any(): print("Only molecular species in traj file: {}".format(traj)) continue # Get supercell size relative to empty slab supercell_factor = 1 if len(atns) > len(empty_atn) * 2: # different supercells supercell_factor = len(res_slab_atn) // len(empty_atn) # Atomic numbers of adsorbate ads_atn = atns.copy() for atn in empty_atn * supercell_factor: ads_atn.remove(atn) ads_atn = sorted(ads_atn) ase_id = None id, ase_id = ase_tools.check_in_ase(traj, self.cathub_db) key_value_pairs.update({'epot': ase_tools.get_energies([traj])}) if i == ts_i: # transition state self.traj_files.update({'TS': [traj]}) self.prefactors.update({'TS': [1]}) prefactor_scale.update({'TS': [1]}) key_value_pairs.update({'species': 'TS'}) if ase_id is None: ase_id = ase_tools.write_ase(traj, self.cathub_db, self.user, **key_value_pairs) elif self.update: ase_tools.update_ase(self.cathub_db, id, **key_value_pairs) self.ase_ids.update({'TSstar': ase_id}) continue if i == tsempty_i: # empty slab for transition state self.traj_files.update({'TSempty': [traj]}) self.prefactors.update({'TSempty': [1]}) prefactor_scale.update({'TSempty': [1]}) key_value_pairs.update({'species': ''}) if ase_id is None: ase_id = ase_tools.write_ase(traj, self.cathub_db, self.user, **key_value_pairs) elif self.update: ase_tools.update_ase(self.cathub_db, id, **key_value_pairs) self.ase_ids.update({'TSemptystar': ase_id}) continue found = False for key, mollist in self.reaction_atoms.items(): if found: break for n, molecule in enumerate(mollist): if found: break molecule_atn = ase_tools.get_numbers_from_formula(molecule) for n_ads in range(1, 5): mol_atn = sorted(molecule_atn * n_ads) if ads_atn == mol_atn and \ self.states[key][n] == 'star': self.traj_files[key][n] = traj species = ase_tools.clear_prefactor( self.reaction[key][n]) id, ase_id = ase_tools.check_in_ase( traj, self.cathub_db) key_value_pairs.update( {'species': ase_tools.clear_state( species), 'n': n_ads, 'site': self.sites[species]}) if ase_id is None: ase_id = ase_tools.write_ase( traj, self.cathub_db, self.user, **key_value_pairs) elif self.update: ase_tools.update_ase( self.cathub_db, id, **key_value_pairs) self.ase_ids.update({species: ase_id}) found = True break if n_ads > 1: for key1, values in prefactor_scale.items(): for mol_i in range(len(values)): if self.states[key1][mol_i] == 'gas': prefactor_scale[key1][mol_i] = n_ads if supercell_factor > 1: for key2, values in prefactor_scale.items(): for mol_i in range(len(values)): if self.reaction[key2][mol_i] == 'star': prefactor_scale[key2][mol_i] *= supercell_factor surface_composition = self.metal chemical_composition = ase_tools.get_chemical_formula(traj_empty) prefactors_final = copy.deepcopy(self.prefactors) for key in self.prefactors: for i, v in enumerate(self.prefactors[key]): prefactors_final[key][i] = self.prefactors[key][i] * \ prefactor_scale[key][i] reaction_energy = None activation_energy = None try: reaction_energy, activation_energy = \ ase_tools.get_reaction_energy( self.traj_files, self.reaction, self.reaction_atoms, self.states, prefactors_final, self.prefactors_TS, self.energy_corrections) except BaseException: if self.debug: print('ERROR: reaction energy failed for files in: {}' .format(root)) else: raise RuntimeError( 'Reaction energy failed for files in: {}'.format(root)) expr = -10 < reaction_energy < 10 if not ase_tools.debug_assert( expr, 'reaction energy is wrong: {} eV: {}' .format(reaction_energy, root), self.debug): return expr = activation_energy is None \ or reaction_energy < activation_energy < 5 if not ase_tools.debug_assert(expr, 'activation energy is wrong: {} eV: {}' .format(activation_energy, root), self.debug): print(self.traj_files, prefactors_final, self.prefactors_TS) reaction_info = {'reactants': {}, 'products': {}} for key in ['reactants', 'products']: for i, r in enumerate(self.reaction[key]): r = ase_tools.clear_prefactor(r) reaction_info[key].update({r: self.prefactors[key][i]}) self.key_value_pairs_reaction = { 'chemical_composition': chemical_composition, 'surface_composition': surface_composition, 'facet': self.facet, 'sites': self.sites, 'coverages': self.coverages, 'reactants': reaction_info['reactants'], 'products': reaction_info['products'], 'reaction_energy': reaction_energy, 'activation_energy': activation_energy, 'dft_code': self.DFT_code, 'dft_functional': self.DFT_functional, 'pub_id': self.pub_id, 'doi': self.doi, 'year': int(self.year), 'ase_ids': self.ase_ids, 'energy_corrections': self.energy_corrections, 'username': self.user}
[docs]def read_name_from_folder(root): folder_name = root.split('/')[-1] return folder_name