Source code for catkit.hub.cathubsqlite

from ase.db.sqlite import SQLite3Database
import sqlite3
import json


init_commands = [
    """ CREATE TABLE publication (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    pub_id text UNIQUE,
    title text,
    authors text,
    journal text,
    volume text,
    number text,
    pages text,
    year integer,
    publisher text,
    doi text,
    tags text
    );""",

    """CREATE TABLE publication_system (
    ase_id text REFERENCES systems(unique_id),
    pub_id text REFERENCES publication(pub_id),
    PRIMARY KEY (pub_id, ase_id)
    );""",

    """CREATE TABLE reaction (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    chemical_composition text,
    surface_composition text,
    facet text,
    sites text,
    coverages text,
    reactants text,
    products text,
    reaction_energy real,
    activation_energy real,
    dft_code text,
    dft_functional text,
    username text,
    pub_id text,
    FOREIGN KEY (pub_id) REFERENCES publication(pub_id)
    );""",

    """ CREATE TABLE reaction_system (
    name text,
    energy_correction real,
    ase_id text,
    id integer,
    FOREIGN KEY (ase_id) REFERENCES systems(unique_id),
    FOREIGN KEY (id) REFERENCES reaction(id)
    );"""]


[docs]class CathubSQLite: """Class for managing SQLite3 database for reaction energies, publications and atomic structures. Builds on top of the ASE database for atomic strucutres https://wiki.fysik.dtu.dk/ase/ase/db/db.html with four additional tables: publication: publication info publication_system: one-to-many mapping between publication table and systems table in ASE database reaction: reaction energies for surfaces reaction_system: mamy-to-many mapping between reaction table and systems table in ASE database Connect to a database object: db = CathubSQLite('yourdbfile.db') Set up a connection for several manipulations: with db as CathubSQLite('yourdbfile.db'): Do your work... Parameters ---------- filename : str name of database file """ def __init__(self, filename): assert filename.endswith('.db'), 'filename should have .db extension' self.filename = filename self.initialized = False self.default = 'NULL' self.connection = None def _connect(self): return sqlite3.connect(self.filename, timeout=600) def __enter__(self): """Set connection upon entry using with statement""" assert self.connection is None self.connection = self._connect() return self def __exit__(self, exc_type, exc_value, tb): """Commit changes upon exit""" if exc_type is None: self.connection.commit() else: self.connection.rollback() self.connection.close() self.connection = None def _initialize(self, con): """Set up tables in SQL""" if self.initialized: return SQLite3Database()._initialize(con) # ASE db initialization cur = con.execute( 'SELECT COUNT(*) FROM sqlite_master WHERE name="reaction"') if cur.fetchone()[0] == 0: # no reaction table for init_command in init_commands: con.execute(init_command) # Create tables con.commit() self.initialized = True
[docs] def read(self, id, table='reaction'): """ Return an entire row of a table Parameters --------- id: int row integer table: str 'reaction', 'publication', 'publication_system', 'reaction_system' """ con = self.connection or self._connect() self._initialize(con) cur = con.cursor() cur.execute('SELECT * FROM \n {} \n WHERE \n {}.id={}'.format( table, table, id)) row = cur.fetchall() if len(row) == 14: # Old schema row = row.insert(5, 'None') return row
[docs] def write_publication(self, values): """ Write publication info to db Parameters ---------- values: dict with entries {'pub_id': str (short name for publication), 'authors': list of str () 'journal': str, 'volume': str, 'number': str, 'pages': 'str' 'year': int, 'publisher': str, 'doi': str, 'tags': list of str} """ con = self.connection or self._connect() self._initialize(con) cur = con.cursor() values = (values['pub_id'], values['title'], json.dumps(values['authors']), values['journal'], values['volume'], values['number'], values['pages'], values['year'], values['publisher'], values['doi'], json.dumps(values['tags'])) q = self.default + ',' + ', '.join('?' * len(values)) cur.execute('INSERT OR IGNORE INTO publication VALUES ({})'.format(q), values) pid = self.get_last_id(cur, table='publication') return pid
[docs] def write(self, values, data=None): """ Write reaction info to db file Parameters ---------- values: dict The values dict can include: {'chemical_composition': str (chemical composition on empty slab) , 'surface_composition': str (reduced chemical composition or shortname), 'facet': str 'sites': dict adsorption sites of species. f.ex: {'OH': 'ontop', 'O': 'hollow'} 'coverages': dict coverage of adsorbates relative to the unit cell f.ex. {'OH': 0.25, 'O': 0.5}) 'reactants'/ 'products': dict keys with name of chemical species folloved by phase (gas, *) values are the prefactor in the reaction. For reaction H2Ogas -> 2Hstar + O star you would write: 'reactants': {OHstar: 1, Hstar: 2} 'products': {OHstar: 1, Hstar: 2} 'reaction_energy': float 'activation_energy': float 'dft_code': str 'dft_functional': str 'username': str 'pub_id': str Should match the pub_id of the corresponding publications } """ con = self.connection or self._connect() self._initialize(con) cur = con.cursor() pub_id = values['pub_id'] ase_ids = values['ase_ids'] energy_corrections = values['energy_corrections'] if ase_ids is not None: check_ase_ids(values, ase_ids) else: ase_ids = {} values = (values['chemical_composition'], values['surface_composition'], values['facet'], json.dumps(values['sites']), json.dumps(values['coverages']), json.dumps(values['reactants']), json.dumps(values['products']), values['reaction_energy'], values['activation_energy'], values['dft_code'], values['dft_functional'], values['username'], values['pub_id'] ) """ Write to reaction table""" q = self.default + ',' + ', '.join('?' * len(values)) cur.execute('INSERT INTO reaction VALUES ({})'.format(q), values) id = self.get_last_id(cur) reaction_structure_values = [] """ Write to publication_system and reaction_system tables""" for name, ase_id in ase_ids.items(): if name in energy_corrections: energy_correction = energy_corrections[name] else: energy_correction = 0 reaction_structure_values.append([name, energy_correction, ase_id, id]) insert_statement = """INSERT OR IGNORE INTO publication_system(ase_id, pub_id) VALUES (?, ?)""" cur.execute(insert_statement, [ase_id, pub_id]) cur.executemany('INSERT INTO reaction_system VALUES (?, ?, ?, ?)', reaction_structure_values) if self.connection is None: con.commit() con.close() return id
[docs] def update(self, id, values, key_names='all'): """ Update reaction info for a selected row Parameters ---------- id: int row integer values: dict See write() method for details key_names: list or 'all' list with name of columns to update. Should match the keys-value pairs in values. default is 'all' """ con = self.connection or self._connect() self._initialize(con) cur = con.cursor() pub_id = values['pub_id'] ase_ids = values['ase_ids'] energy_corrections = values['energy_corrections'] if ase_ids is not None: check_ase_ids(values, ase_ids) else: ase_ids = {} key_list, value_list = get_key_value_list(key_names, values) N_keys = len(key_list) value_strlist = get_value_strlist(value_list) execute_str = ', '.join('{}={}'.format(key_list[i], value_strlist[i]) for i in range(N_keys)) update_command = 'UPDATE reaction SET {} WHERE id = {};'\ .format(execute_str, id) cur.execute(update_command) delete_command = 'DELETE from reaction_system WHERE id = {}'.format(id) cur.execute(delete_command) reaction_structure_values = [] for name, ase_id in ase_ids.items(): reaction_structure_values.append([name, energy_corrections.get(name), ase_id, id]) insert_statement = """INSERT OR IGNORE INTO publication_system(ase_id, pub_id) VALUES (?, ?)""" cur.execute(insert_statement, [ase_id, pub_id]) cur.executemany('INSERT INTO reaction_system VALUES (?, ?, ?, ?)', reaction_structure_values) if self.connection is None: con.commit() con.close() return id
[docs] def get_last_id(self, cur, table='reaction'): """ Get the id of the last written row in table Parameters ---------- cur: database connection().cursor() object table: str 'reaction', 'publication', 'publication_system', 'reaction_system' Returns: id """ cur.execute("SELECT seq FROM sqlite_sequence WHERE name='{0}'" .format(table)) result = cur.fetchone() if result is not None: id = result[0] else: id = 0 return id
[docs] def check(self, chemical_composition, reaction_energy): """ Check if entry with same surface and energy is allready written to database file Parameters ---------- chemcial_composition: str reaction_energy: str Returns id or None """ con = self.connection or self._connect() self._initialize(con) cur = con.cursor() statement = """SELECT reaction.id FROM reaction WHERE reaction.chemical_composition=? and reaction.reaction_energy=?""" argument = [chemical_composition, reaction_energy] cur.execute(statement, argument) rows = cur.fetchall() if len(rows) > 0: id = rows[0][0] else: id = None return id
[docs] def check_reaction_on_surface(self, chemical_composition, reactants, products): """ Check if entry with same surface and reaction is allready written to database file Parameters ---------- chemcial_composition: str reactants: dict products: dict Returns id or None """ con = self.connection or self._connect() self._initialize(con) cur = con.cursor() statement = """SELECT reaction.id FROM reaction WHERE reaction.chemical_composition='{}' and reaction.reactants='{}' and reaction.products='{}';""".format(chemical_composition, json.dumps(reactants), json.dumps(products)) cur.execute(statement) rows = cur.fetchall() if len(rows) > 0: id = rows[0][0] else: id = None return id
[docs] def check_publication(self, pub_id): con = self.connection or self._connect() self._initialize(con) cur = con.cursor() statement = """ SELECT id FROM publication WHERE publication.pub_id=?""" argument = [pub_id] cur.execute(statement, argument) rows = cur.fetchall() if len(rows) > 0: id = rows[0][0] else: id = None return id
[docs] def check_publication_structure(self, pub_id, ase_id): con = self.connection or self._connect() self._initialize(con) cur = con.cursor() statement = """ SELECT id FROM publication_system WHERE publication.pub_id=? and publication.ase_id=?""" argument = [pub_id, ase_id] cur.execute(statement, argument) rows = cur.fetchall() if len(rows) > 0: id = rows[0][0] else: id = None return id
[docs]def check_ase_ids(values, ase_ids): ase_values = ase_ids.values() assert len(set(ase_values)) == len(ase_values), 'Duplicate ASE ids!' reaction_species = set(list(values['reactants'].keys()) + list(values['products'].keys())) n_split = 0 for spec in ase_ids.keys(): if '_' in spec: n_split += 1 assert len(reaction_species) <= len(ase_values) + n_split, \ 'ASE ids missing!' return
[docs]def get_key_value_str(values): key_str = """chemical_composition, surface_composition, facet, sites, reactants,products, reaction_energy, activation_energy, dft_code, dft_functional, publication, doi, year, ase_ids, user""" value_str = "'{}'".format(values[1]) for v in values[2:]: try: # Unicode in python2 - must be a better way of handling this. if isinstance(v, unicode): v = v.encode('ascii', 'ignore') except NameError: pass if isinstance(v, str) or isinstance(v, dict): value_str += ", '{}'".format(v) elif v is None or v == '': value_str += ", {}".format('NULL') else: value_str += ", {}".format(v) return key_str, value_str
[docs]def get_key_value_list(key_list, values, table='reaction'): total_keys = {'reaction': ['chemical_composition', 'surface_composition', 'facet', 'sites', 'coverages', 'reactants', 'products', 'reaction_energy', 'activation_energy', 'dft_code', 'dft_functional', 'username', 'pub_id'], 'publication': ['pub_id', 'title', 'authors', 'journal', 'volume', 'number', 'pages', 'year', 'publisher', 'doi', 'tags'], 'reaction_system': ['name', 'energy_correction', 'ase_id', 'reaction_id'], 'publication_system': ['ase_id, pub_id']} total_key_list = total_keys[table] if key_list == 'all': key_list = total_key_list else: for key in key_list: assert key in total_key_list value_list = [values[key] for key in key_list] return key_list, value_list
[docs]def get_value_strlist(value_list): value_strlist = [] for v in value_list: try: # Unicode in python2 - must be a better way of handling this. if isinstance(v, unicode): v = v.encode('ascii', 'ignore') except NameError: pass if isinstance(v, dict): v = json.dumps(v) value_strlist.append("'{}'".format(v)) elif isinstance(v, str): value_strlist.append("'{}'".format(v)) elif v is None or v == '': value_strlist.append("{}".format('NULL')) else: value_strlist.append("{}".format(v)) return value_strlist