Source code for catkit.flow.qeio

from .fwio import array_to_list
import os
import contextlib
import json
import re
import hashlib
import msgpack
import numpy as np
from ase import Atoms
from ase.calculators.singlepoint import SinglePointCalculator as SPC
from ase.db import connect
from ase.units import Ry, Bohr
from ase.io import read, write
# 1.889726 is the atomic unit of length per Angstrom (aul/A)
aul = 1.889726


[docs]def geometry_hash(atoms): """ A hash based strictly on the geometry features of an atoms object: positions, cell, and symbols. This is intended for planewave basis set calculations, so pbc is not considered. Each element is sorted in the algorithem to help prevent new hashs for identical geometries. """ atoms = atoms.copy() atoms.wrap() pos = atoms.get_positions() # Sort the cell array by magnitude of z, y, x coordinates, in that order cell = np.array(sorted(atoms.get_cell(), key=lambda x: (x[2], x[1], x[0]))) # Flatten the array and return a string of numbers only # We only consider position changes up to 3 decimal places cell_hash = np.array_str(np.ndarray.flatten(cell.round(3))) cell_hash = ''.join(cell_hash.strip('[]').split()).replace('.', '') # Sort the atoms positions similarly, but store the sorting order pos = atoms.get_positions() srt = [i for i, _ in sorted(enumerate(pos), key=lambda x: (x[1][2], x[1][1], x[1][0]))] pos_hash = np.array_str(np.ndarray.flatten(pos[srt].round(3))) pos_hash = ''.join(pos_hash.strip('[]').split()).replace('.', '') # Create a symbols hash in the same fashion conserving position sort order sym = np.array(atoms.get_atomic_numbers())[srt] sym_hash = np.array_str(np.ndarray.flatten(sym)) sym_hash = ''.join(sym_hash.strip('[]').split()) # Assemble a master hash and convert it through an md5 master_hash = cell_hash + pos_hash + sym_hash md5 = hashlib.md5(master_hash.encode('utf-8')) _hash = md5.hexdigest() return _hash
[docs]def write_to_db(path, db_name='master.db', keys={}, traj=False, pdos=False): """ This function is used to write an ASE database entry for each atoms image inside a QE directory. """ db = connect(os.path.join(os.getcwd(), db_name)) # Change to the directory with cd(path): # Get the atoms object from the current directory if traj: images = read('output.traj', ':') else: images = [read('output.traj')] for i, atoms in enumerate(images): # Get keys-value-pairs from directory name. path = [x for x in os.getcwd().split('/') if '=' in x] keys.update({ 'traj': i, 'rtraj': len(images) - i - 1}) for key_value in path: key = key_value.split('=')[0] value = key_value.split('=')[1] # Try to recognize characters and convert to # specific data types for easy access later. if '.' in value: value = float(value) elif value.isdigit(): value = int(value) elif value == 'False': value = bool(False) elif value == 'True': value = bool(True) else: value = str(value) # Add directory keys keys[key] = value # Collect the psppath from the pw.inp file with open('pw.inp', 'r') as f: line = f.readline() while 'pseudo_dir' not in line: line = f.readline() psppath = re.split("[']", line)[1] atoms.info['psppath'] = psppath data = { 'path': os.getcwd(), 'parameters': atoms.info, 'geometry_hash': geometry_hash(atoms)} if pdos and i == 0: if os.path.exists('dos.msg'): with open('dos.msg') as f: dos_decrypt = msgpack.load(f) dos_decrypt = list(dos_decrypt) array_to_list(dos_decrypt) data['dos'] = json.dumps(dos_decrypt, encoding='utf-8') # Generate/add-to the db file db.write(atoms, key_value_pairs=keys, data=data)
[docs]@contextlib.contextmanager def cd(path): """Does path management: if the path doesn't exists, create it otherwise, move into it until the indentation is broken. e.g. with cd('the/path/is/real'): 'do things in the new path' Parameters ---------- path : str Directory path to create and change into. """ cwd = os.getcwd() try: if not os.path.exists(path): os.makedirs(path) os.chdir(path) yield finally: os.chdir(cwd)
[docs]def attach_results(f, atoms, write_file=True): """ Return the TS corrected energy for a scf instance in a log file and attach them to the given atoms obejct. Will also attach the forces and stress if applicable. """ energy, forces, stress = None, None, None line = f.readline() while '! total energy' not in line: line = f.readline() energy = float(line.split()[-2]) * Ry # Correct for non-zero temperature smearing for i in range(80): line = f.readline() if ' smearing contrib.' in line: energy -= 0.5 * float(line.split()[-2]) * Ry # Collect the forces on the atoms if 'Forces acting on atoms (Ry/au):' in line: for _ in range(4): line = f.readline() if 'atom' in line: break forces = [] for _ in range(len(atoms)): forces += [line.split()[-3:]] line = f.readline() forces = np.array(forces, dtype=float) / Ry * aul # If forces were located, attempt to find stress for i in range(10): line = f.readline() if 'total stress' in line: stress = [] for _ in range(3): line = f.readline() stress += [line.split()[-3:]] stress = np.array(stress, dtype=float) / Ry * Bohr ** 3 break # attach the calculator calc = SPC( atoms=atoms, energy=energy, forces=forces, stress=stress) atoms.set_calculator(calc) return atoms
[docs]def log_to_atoms(log_file='log', ent=-1, out_file=None): """ Parse a QE log file for atoms trajectory and return a list of atoms objects representative of the relaxation path. NOTE: trajectory information is only returned for calculations run with BFGS internal to QE. """ images = [] with open(log_file) as f: line = f.readline() # Flag to read trajectory 'ent' only with os.popen( 'grep -n Giannozzi ' + log_file + ' 2>/dev/null', 'r') as p: n = int(p.readlines()[ent].split()[0].strip(':')) for i in range(n): line = f.readline() # Read lines one at a time while line: line = f.readline() # Signifies a new trajectory # Clear any existing values from previous runs if '(npk)' in line: # Look for an input trajectory in the same file and use it # Convenient for conserving constraints, tags, and atoms info in_file = os.path.join( '/'.join(log_file.split('/')[:-1]), 'input.traj') if os.path.exists(in_file): # If it does exist, read it in as the initial configuration atoms = read(in_file) atoms.wrap() natoms = len(atoms) pos = atoms.get_positions() # Skip past the geometry information while 'site n.' not in line: line = f.readline() # Otherwise, collect from the data else: atoms = None # Example properties ###################### # bravais-lattice index = 0 # lattice parameter (alat) = 1.8897 a.u. # unit-cell volume = 3209.1777 (a.u.)^3 # number of atoms/cell = 5 # number of atomic types = 2 # number of electrons = 45.00 # number of Kohn-Sham states= 55 # kinetic-energy cutoff = 36.7493 Ry # charge density cutoff = 367.4932 Ry # convergence threshold = 7.3E-08 # mixing beta = 0.1000 # number of iterations used = 8 plain mixing # Exchange-correlation = BEEF ( 1 4 27 13 2) # nstep = 50 # Collect potentially relevent properties # The elif can be omitted if order is assured elif 'number of atoms/cell =' in line: natoms = int(line.split()[-1]) # Collect cell dimensions elif 'celldm(1)' in line: alat = float(line.split()[1]) / aul elif 'crystal' in line: cell = [] for _ in range(3): line = f.readline() cell += [[float(x) for x in line.split()[3:6]]] cell = np.array(cell) * alat # Collect positions, symbols, and number of atoms elif 'site n.' in line: pos, syms = [], [] for _ in range(natoms): line = f.readline() pos += [line.split()[-4:-1]] syms += [line.split()[1].strip('0123456789')] pos = np.array(pos, dtype=float) * alat # Setup the atoms object atoms = Atoms(syms, pos, cell=cell, pbc=(1, 1, 1)) # This should be the last piece of information elif 'number of k points=' in line: atoms = attach_results(f, atoms) # Add atom to images images = [atoms] # Only atomic positions and energies need to be collected now # until the calculation ends while 'JOB DONE.' not in line and line: line = f.readline() # A duplicate of the coordinates printed previously if 'Begin final coordinates' in line: break if 'ATOMIC_POSITIONS' in line: atoms = atoms.copy() coord = line.split('(')[-1] for i in range(natoms): line = f.readline() pos[i][:] = line.split()[1:4] # It's possible to recover constraints here, # but not yet implemented If there are 7 # characters in the line, we have constraints # if len(line.split()) == 7: # cons += [line.split()[-3:]] # else: # cons += [[1] * 3] # cons = np.array(cons, dtype=float) if coord == 'alat)': atoms.set_positions(pos * alat) elif coord == 'bohr)': atoms.set_positions(pos * Bohr) elif coord == 'angstrom)': atoms.set_positions(pos) else: atoms.set_scaled_positions(pos) atoms = attach_results(f, atoms) images += [atoms] if out_file: write(out_file, images) return images