#!/usr/bin/env python
import re
import os
import json
import requests
import pprint
import ase.db

from catkit.hub.cathubsqlite import CathubSQLite

all_columns = {'reactions': ['chemicalComposition', 'surfaceComposition',
                             'facet', 'sites', 'coverages', 'reactants',
                             'products', 'Equation',
                             'reactionEnergy', 'activationEnergy',
                             'dftCode', 'dftFunctional',
                             'username', 'pubId'],
               'publication': ['pubId', 'title', 'authors', 'journal',
                               'number', 'volume',
                               'pages', 'year', 'publisher', 'doi', 'tags'],
               'publications': ['pubId', 'title', 'authors', 'journal',
                                'volume', 'number',
                                'pages', 'year', 'publisher', 'doi', 'tags'],
               'reactionSystems': ['name', 'energyCorrection', 'aseId'],
               'publicationSystems': ['pubId', 'aseId']}

[docs]def query(table='reactions', columns=['chemicalComposition', 'reactants', 'products'], subtables=[], n_results=10, queries={}, print_output=False): query_string = graphql_query(table=table, subtables=subtables, columns=columns, n_results=n_results, queries=queries) return execute_graphQL(query_string)
[docs]def execute_graphQL(query_string): root = '' print('Connecting to database at') print('') print('Executing query:') print('') print(query_string) print('') print('Getting data from server...') print('') data =, {'query': query_string}).json() print('Result:') print('') pprint.pprint(data) return data
[docs]def graphql_query(table='reactions', subtables=[], columns=['chemicalComposition', 'reactants', 'products'], n_results=10, queries={}): statement = '{' statement += '{}(first: {}'.format(table, n_results) for key, value in queries.items(): if isinstance(value, str): statement += ', {}: "{}"'.format(key, value) elif isinstance(value, bool): if value: statement += ', {}: true'.format(key) else: statement += ', {}: false'.format(key) else: statement += ', {}: {}'.format(key, value) statement += ') {\n' statement += ' totalCount\n edges {\n node { \n' for column in columns: column = map_column_names(column) statement += ' {}\n'.format(column) for subtable in subtables: statement += ' {}'.format(subtable) statement += '{\n' for column in all_columns[subtable]: statement += ' {}\n'.format(column) statement += ' }\n' statement += ' }\n' statement += ' }\n' statement += '}}' return statement
[docs]def get_reactions(n_results=20, write_db=False, **kwargs): """ Get reactions from server Give key value strings as arguments """ queries = {} for key, value in kwargs.items(): key = map_column_names(key) if key == 'distinct': if value in ['True', 'true']: # WARNING: undefined variable name 'query_dict' query_dict.update({key: True}) continue try: value = int(value) queries.update({key: value}) except BaseException: queries.update({key: '{0}'.format(value)}) subtables = [] if write_db: subtables = ['reactionSystems', 'publication'] else: subtables = [] data = query(table='reactions', subtables=subtables, columns=all_columns['reactions'], n_results=n_results, queries=queries) if not write_db: return data print('Writing result to Reactions.db') for row in data['data']['reactions']['edges']: with CathubSQLite('Reactions.db') as db: row = row['node'] key_values = {} for key in all_columns['reactions']: v = row[key] # if isinstance(v, unicode): # v = v.encode('utf-8') try: v = json.loads(v) except BaseException: pass key_values[convert(key)] = v ase_ids = {} energy_corrections = {} for row_rs in row['reactionSystems']: if row_rs['name'] == 'N/A': continue ase_ids[row_rs['name']] = row_rs['aseId'] energy_corrections[row_rs['name']] = row_rs['energyCorrection'] if not ase_ids: ase_ids = None energy_corrections = None key_values['ase_ids'] = ase_ids key_values['energy_corrections'] = ase_ids # publications pub_key_values = {} row_p = row['publication'] for key in all_columns['publications']: pub_key_values[convert(key)] = row_p[key] db.write_publication(pub_key_values) # reactions and reaction_systems id = db.check(key_values['chemical_composition'], key_values['reaction_energy']) if id is None: id = db.write(key_values) else: db.update(id, key_values) if ase_ids is not None: # Ase structures with ase.db.connect('Reactions.db') as ase_db: for unique_id in ase_ids.values(): if ase_db.count('unique_id={}'.format(unique_id)) == 0: atomsrow = get_atomsrow_by_id(unique_id) ase_db.write(atomsrow) return data
[docs]def get_publications(**kwargs): queries = {} for key, value in kwargs.items(): key = map_column_names(key) if key == 'distinct': if value in ['True', 'true']: # WARNING: undefined variable name 'query_dict' query_dict.update({key: True}) continue try: value = int(value) queries.update({key: value}) except BaseException: queries.update({key: '{0}'.format(value)}) # WARNING: undefined variable name 'publication_columns' return query(table='publications', columns=publication_columns, queries=queries)
[docs]def get_ase_db(): ps = os.environ.get('DB_PASSWORD') return ase.db.connect( 'postgresql://catvisitor:{}'.format(ps))
[docs]def get_atomsrow_by_id(unique_id): db = get_ase_db() row = db.get('unique_id={}'.format(unique_id)) return row
# def get_atomsrow_by_ids(unique_ids): # db = get_ase_db() # con = db._connect() # cur = con.cursor()
[docs]def get_atoms_by_id(unique_id): row = get_atomsrow_by_id(unique_id) return row.toatoms()
[docs]def map_column_names(column): mapping = {'surface': 'chemicalComposition'} if column in mapping: return mapping[column] else: return column
[docs]def convert(name): s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name) return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
if __name__ == '__main__': query = query(table='reactions', columns=['chemicalComposition', 'reactants', 'products'], n_results=10, queries={'chemicalComposition': "~Pt"})