Source code for pyxtal_ff.utilities

import os
import gc
import time
import shelve
import numpy as np
from ase import Atoms, units
from copy import deepcopy
from functools import partial
from torch.utils.data import Dataset
from monty.serialization import loadfn
from multiprocessing import Pool, cpu_count
from collections.abc import MutableSequence
from .base_potential import ZBL


[docs]class Database():#MutableSequence): def __init__(self, name): self.name = name self.database = shelve.open(self.name) self.msg = "Index must be an integer in the interval [0,len(self)]" self.length = len(list(self.database.keys())) def __len__(self): return self.length def __setitem__(self, index, value): if isinstance(index, int) and index >= 0: self.database[str(index)] = value else: raise IndexError(self.msg) def __getitem__(self, index): if isinstance(index, int) and index >= 0 and index < len(self): return self.database[str(index)] else: raise IndexError(self.msg) def __delitem__(self, index): if isinstance(index, int) and index >= 0: del(self.database[str(index)]) else: raise IndexError(self.msg)
[docs] def insert(self, index, value): """ Insert the value to the dictionary at index. """ if isinstance(index, int) and index >= 0: pass else: raise IndexError(self.msg) self[index] = value
[docs] def append(self, value): """ Append value to the end of the sequence. """ self.insert(len(self), value)
[docs] def close(self): self.database.close()
[docs] def store(self, structure_file, function, storage, ase_db=None): """ Map structures to descriptors and store them, including features, to database. If compute is False, print pre-computed descriptors message. """ if function['base_potential']: self.base_potential = ZBL(function['base_potential']['inner'], function['base_potential']['outer']) else: self.base_potential = None if storage: if os.path.isdir(structure_file): fmt = 'dat' elif structure_file.find('json') > 0: fmt = 'json' elif structure_file.find('xyz') > 0: fmt = 'xyz' elif structure_file.find('db') > 0: fmt = 'db' elif structure_file.find('traj') > 0: fmt = 'traj' else: fmt = 'vasp-out' # extract the structures and energy, forces, and stress information. if os.path.exists(structure_file): if fmt == 'json': data = parse_json(structure_file) elif fmt == 'vasp-out': data = parse_OUTCAR_comp(structure_file) elif fmt == 'xyz': data = parse_xyz(structure_file) elif fmt == 'db': data = parse_ase_db(structure_file) elif fmt == 'traj': data = parse_traj(structure_file) elif fmt == 'dat': data = parse_dat(structure_file) else: raise NotImplementedError('PyXtal_FF supports only json, vasp-out, and xyz formats') else: raise FileNotFoundError(structure_file + ' cannot be found from the given path.') print("{:d} structures have been loaded.".format(len(data))) self.add(function, data) if ase_db is not None and fmt != 'db': convert_to_ase_db(data, ase_db) print("save the structures to {:}.".format(ase_db)) else: print(f"Features and precomputed descriptors exist: {self.name}.dat\n")
[docs] def add(self, function, data): """ Add descriptors for all structures to database. """ print('Computing the descriptors...') _N = deepcopy(function['N']) _cpu = deepcopy(function['ncpu']) N1 = len(data) if _N is not None and _N < N1: lists = range(_N) else: lists = range(N1) if _cpu == 1: for i, index in enumerate(lists): d = self.compute(function, data[index]) self.append(d) self.length += 1 print('\r{:4d} out of {:4d}'.format(i+1, len(lists)), flush=True, end='') else: # we do it for the reduced lists _data _data = [data[item] for item in lists] failed_ids = [] with Pool(_cpu) as p: func = partial(self.compute, function) for i, d in enumerate(p.imap_unordered(func, _data)): try: self.append(d) self.length += 1 except: # track the failed structure ids failed_ids.append(i) #pass print('\r{:4d} out of {:4d}'.format(i+1, len(_data)), flush=True, end='') p.close() p.join() # compute the missing structures in parallel calcs for id in failed_ids: d = self.compute(function, _data[id]) self.append(d) self.length += 1 print(f"\nSaving descriptor-feature data to {self.name}.dat\n")
[docs] def compute(self, function, data): """ Compute descriptor for one structure to the database. """ if function['type'] in ['BehlerParrinello', 'ACSF']: from pyxtal_ff.descriptors.ACSF import ACSF d = ACSF(function['parameters'], function['Rc'], function['force'], function['stress'], function['cutoff'], False).calculate(data['structure']) elif function['type'] in ['wACSF', 'wacsf']: from pyxtal_ff.descriptors.ACSF import ACSF d = ACSF(function['parameters'], function['Rc'], function['force'], function['stress'], function['cutoff'], True).calculate(data['structure']) elif function['type'] in ['SO4', 'Bispectrum', 'bispectrum']: from pyxtal_ff.descriptors.SO4 import SO4_Bispectrum d = SO4_Bispectrum(function['parameters']['lmax'], function['Rc'], derivative=function['force'], stress=function['stress'], normalize_U=function['parameters']['normalize_U'], cutoff_function=function['cutoff']).calculate(data['structure']) elif function['type'] in ['SO3', 'SOAP', 'soap']: from pyxtal_ff.descriptors.SO3 import SO3 d = SO3(function['parameters']['nmax'], function['parameters']['lmax'], function['Rc'], alpha=function['parameters']['alpha'], derivative=function['force'], stress=function['stress']).calculate(data['structure']) elif function['type'] in ['EAD', 'ead']: from pyxtal_ff.descriptors.EAD import EAD d = EAD(function['parameters'], function['Rc'], function['force'], function['stress'], function['cutoff']).calculate(data['structure']) elif function['type'] in ['SNAP', 'snap']: from pyxtal_ff.descriptors.SNAP import SO4_Bispectrum d = SO4_Bispectrum(function['weights'], function['parameters']['lmax'], function['Rc'], derivative=function['force'], stress=function['stress'], normalize_U=function['parameters']['normalize_U'], cutoff_function=function['cutoff'], rfac0=function['parameters']['rfac']).calculate(data['structure']) else: msg = f"{function['type']} is not implemented" raise NotImplementedError(msg) if d['rdxdr'] is not None: N = d['x'].shape[0] L = d['x'].shape[1] rdxdr = np.zeros([N, L, 3, 3]) for _m in range(N): ids = np.where(d['seq'][:,0]==_m)[0] rdxdr[_m, :, :, :] += np.einsum('ijkl->jkl', d['rdxdr'][ids, :, :, :]) d['rdxdr'] = rdxdr.reshape([N, L, 9])[:, :, [0, 4, 8, 1, 2, 5]] #d['rdxdr'] = np.einsum('ijklm->iklm', d['rdxdr'])\ #.reshape([shp[0], shp[2], shp[3]*shp[4]])[:, :, [0, 4, 8, 1, 2, 5]] #need to change #print(len(data['structure'])) if self.base_potential: base_d = self.base_potential.calculate(data['structure']) else: base_d = {'energy': 0., 'force': 0., 'stress': 0.} #print(data['energy']) #print(base_d['energy']) d['energy'] = np.asarray(data['energy'] - base_d['energy']) d['force'] = np.asarray(data['force']) - base_d['force'] if data['stress'] is not None: d['stress'] = np.asarray(data['stress']) - base_d['stress'] / units.GPa else: d['stress'] = data['stress'] d['group'] = data['group'] return d
[docs]def compute_descriptor(function, structure): """ Compute descriptor for one structure. """ if function['type'] in ['BehlerParrinello', 'ACSF']: from pyxtal_ff.descriptors.ACSF import ACSF d = ACSF(function['parameters'], function['Rc'], function['force'], function['stress'], function['cutoff'], False).calculate(structure) elif function['type'] in ['wACSF', 'wacsf']: from pyxtal_ff.descriptors.ACSF import ACSF d = ACSF(function['parameters'], function['Rc'], function['force'], function['stress'], function['cutoff'], True).calculate(structure) elif function['type'] in ['SO4', 'Bispectrum', 'bispectrum']: from pyxtal_ff.descriptors.SO4 import SO4_Bispectrum d = SO4_Bispectrum(function['parameters']['lmax'], function['Rc'], derivative=True, stress=True, normalize_U=function['parameters']['normalize_U'], cutoff_function=function['cutoff']).calculate(structure) elif function['type'] in ['SO3', 'SOAP', 'soap']: from pyxtal_ff.descriptors.SO3 import SO3 d = SO3(function['parameters']['nmax'], function['parameters']['lmax'], function['Rc'], derivative=True, stress=True).calculate(structure) elif function['type'] in ['EAD', 'ead']: from pyxtal_ff.descriptors.EAD import EAD d = EAD(function['parameters'], function['Rc'], True, True, function['cutoff']).calculate(structure) elif function['type'] in ['SNAP', 'snap']: from pyxtal_ff.descriptors.SNAP import SO4_Bispectrum d = SO4_Bispectrum(function['weights'], function['parameters']['lmax'], function['Rc'], derivative=True, stress=True, normalize_U=function['parameters']['normalize_U'], cutoff_function=function['cutoff']).calculate(structure) else: msg = f"{function['type']} is not implemented" raise NotImplementedError(msg) if d['rdxdr'] is not None: N = d['x'].shape[0] L = d['x'].shape[1] rdxdr = np.zeros([N, L, 3, 3]) for _m in range(N): ids = np.where(d['seq'][:,0]==_m)[0] rdxdr[_m, :, :, :] += np.einsum('ijkl->jkl', d['rdxdr'][ids, :, :, :]) d['rdxdr'] = rdxdr.reshape([N, L, 9])[:, :, [0, 4, 8, 1, 2, 5]] return d
[docs]def parse_json(path, N=None, Random=False): """ Extract structures/energy/forces/stress information from json file. """ if os.path.isfile(path): structure_dict = loadfn(path) elif os.path.isdir(path): import glob cwd = os.getcwd() os.chdir(path) files = glob.glob('*.json') os.chdir(cwd) structure_dict = [] for file in files: fp = os.path.join(path, file) structure_dict += loadfn(fp) if N is None: N = len(structure_dict) elif Random and N < len(structure_dict): structure_dict = sample(structure_dict, N) data = [] for i, d in enumerate(structure_dict): if 'structure' in d: structure = Atoms(symbols=d['structure'].atomic_numbers, positions=d['structure'].cart_coords, cell=d['structure'].lattice._matrix, pbc=True) v = structure.get_volume() if 'data' in d: key = 'data' else: key = 'outputs' if 'energy_per_atom' in d[key]: energy = d[key]['energy_per_atom']*len(structure) else: energy = d[key]['energy'] force = d[key]['forces'] try: if d['tags'][0] == 'Strain': group = 'Elastic' else: group = 'NoElastic' except: if d['group'] == 'Elastic': group = 'Elastic' else: group = 'NoElastic' #group = d['group'] # vasp default output: XX YY ZZ XY YZ ZX # pyxtal_ff/lammps use: XX YY ZZ XY XZ YZ # Here we assume the sequence is lammps if d['group'] == 'Elastic' and 'Mo 3x3x3 cell' in d['description']: stress = None group = 'NoElastic' elif 'virial_stress' in d[key]: #kB to GPa s = [s/10 for s in d[key]['virial_stress']] if d['group'] == 'Ni3Mo' or d['element'] == 'Cu': #just tentative fix stress = [s[0], s[1], s[2], s[3], s[4], s[5]] else: stress = [s[0], s[1], s[2], s[3], s[5], s[4]] elif 'stress' in d[key]: #kB to GPa s = [s/10 for s in d[key]['stress']] if d['group'] == 'Ni3Mo': #to fix the issue stress = [s[0], s[1], s[2], s[3], s[4], s[5]] else: stress = [s[0], s[1], s[2], s[3], s[5], s[4]] else: stress = None data.append({'structure': structure, 'energy': energy, 'force': force, 'stress': stress, 'group': group}) else: # For PyXtal structure = Atoms(symbols=d['elements'], scaled_positions=d['coords'], cell=d['lattice'], pbc=True) data.append({'structure': structure, 'energy': d['energy'], 'force': d['force'], 'stress': None, 'group': 'random'}) if i == (N-1): break return data
[docs]def create_label(elements, hiddenlayers): label = '' for e in elements: label += e label += '-' for l in hiddenlayers: label += str(l) label += '-' label += '1' return label
[docs]def get_descriptors_parameters(symmetry, system): from itertools import combinations_with_replacement G = [] if 'G2' in symmetry: combo = list(combinations_with_replacement(system, 1)) if 'Rs' not in symmetry['G2']: Rs = [0.] else: Rs = symmetry['G2']['Rs'] for eta in symmetry['G2']['eta']: for rs in Rs: for element in combo: g = [element[0], 'nan', rs, eta, 'nan', 'nan'] G.append(g) if 'G4' in symmetry: combo = list(combinations_with_replacement(system, 2)) for zeta in symmetry['G4']['zeta']: for lamBda in symmetry['G4']['lambda']: for eta in symmetry['G4']['eta']: for p_ele in combo: g = [p_ele[0], p_ele[1], 'nan', eta, lamBda, zeta] G.append(g) return G
[docs]def parse_xyz(structure_file, N=1000000): """ Extract structures/enegy/force information of xyz file provided by the Cambridge group """ data = [] with open(structure_file, 'r') as f: lines = f.readlines() count = 0 while True: line = lines[count] symbols = [] number = int(line) coords = np.zeros([number, 3]) forces = np.zeros([number, 3]) infos = lines[count+1].split('=') for i, info in enumerate(infos): if info.find('dft_energy') >= 0 or info.find('DFT_energy') >= 0: energy = float(infos[i+1].split()[0]) elif info.find('config_type') == 0: group = infos[i+1].split()[0] elif info.find('Lattice') == 8: lat = [] tmp = infos[i+1].split() for num in tmp: num = num.replace('"','') if num.replace('.', '', 1).replace('-', '', 1).isdigit(): lat.append(float(num)) lat = np.array(lat).reshape([3,3]) elif info.find('virial') > 0: s = [] tmp = infos[i+1].split() for num in tmp: num = num.replace('"','') if num.replace('.', '', 1).replace('-', '', 1).isdigit(): s.append(float(num)) s = np.array(s).flatten() stress = np.array([s[0], s[4], s[8], s[1], s[2], s[5]]) elif info.find('Properties') >= 0: items = infos[i+1].split(':') counts = 0 for i in range(int(len(items)/3)): if items[i*3] == 'dft_force' or items[i*3] == 'DFT_force': count_f = counts elif items[i*3] == 'pos': count_p = counts counts += int(items[i*3+2]) break for i in range(number): infos = lines[count+2+i].split() symbols.append(infos[0]) coords[i, :] = np.array([float(num) for num in infos[count_p:count_p+3]]) forces[i, :] = np.array([float(num) for num in infos[count_f:count_f+3]]) structure = Atoms(symbols=symbols, positions=coords, cell=lat, pbc=True) data.append({'structure': structure, 'energy': energy, 'force': forces, 'stress': stress, 'group': group}) count = count + number + 2 if count >= len(lines) or len(data) == N: break return data
[docs]def parse_OUTCAR_comp(structure_file, N=1000000): """ Extract structures/enegy/force information from compressed OUTCAR file """ data = [] with open(structure_file, 'r') as f: lines = f.readlines() read_symbol = True read_composition = False read_structure = False symbols = [] count = 0 while True: line = lines[count] if read_symbol: if line.find('POTCAR') > 0: symbol = line.split()[2] if symbol in symbols: read_symbol = False read_composition = True else: symbols.append(symbol) elif read_composition: if line.find('ions per type') > 0: symbol_array = [] tmp = line.split('=')[1].split() numIons = [] for i in range(len(symbols)): for j in range(int(tmp[i])): symbol_array.append(symbols[i]) read_composition = False read_structure = True count += 4 # search for " energy without entropy=" line_number = 0 for tmp in range(24): if lines[count+len(symbol_array)+tmp].find("entropy") > 0: line_number = len(symbol_array) + tmp break if line_number == 0: raise ValueError("Cannot identify the line starts with energy without entropy") else: #read_structure lat_string = lines[count+1:count+4] lat = np.zeros([3,3]) for i, l in enumerate(lines[count+1:count+4]): lat[i,:] = np.fromstring(l, dtype=float, sep=' ')[:3] coor = np.zeros([len(symbol_array), 3]) force = np.zeros([len(symbol_array), 3]) for i, l in enumerate(lines[count+6:count+6+len(symbol_array)]): array = np.fromstring(l, dtype=float, sep=' ') coor[i,:] = array[:3] force[i,:] = array[3:] energy = float(lines[count+line_number-1].split()[-4]) structure = Atoms(symbols=symbol_array, positions=coor, cell=lat, pbc=True) data.append({'structure': structure, 'energy': energy, 'force': force, 'stress': None, 'group': 'random'}) count += line_number - 1 count += 1 if count >= len(lines) or len(data) == N: break return data
[docs]def convert_to_ase_db(data, db_path='test.db'): from ase.db import connect with connect(db_path) as db: for d in data: struc = d['structure'] d.pop('structure', None) db.write(struc, data=d)
[docs]def parse_ase_db(db_path, N=None, Random=False): from ase.db import connect data = [] with connect(db_path) as db: if N is None: N = len(db) for i, row in enumerate(db.select()): structure = db.get_atoms(row.id) if "dft_stress" in row.data: stress = row.data["dft_stress"] elif "stress" in row.data: stress = row.data["stress"] else: stress = None if "group" in row.data: group = row.data["group"] else: group = None if "dft_energy" in row.data: eng = row.data["dft_energy"] else: eng = row.data["energy"] if "dft_force" in row.data: force = row.data["dft_force"] else: force = row.data["force"] data.append({'structure': structure, 'energy': eng, 'force': force, 'group': group, 'stress': stress}) if i == (N-1): break return data
[docs]def parse_traj(structure_file): from ase.build import sort from ase.io.trajectory import Trajectory data = [] out_traj = Trajectory(structure_file) for traj in out_traj: structure = sort(traj) energy = traj.get_potential_energy() force = traj.get_forces() try: # PyXtal_FF: XX YY ZZ XY XZ YZ # ASE : xx yy zz yz xz xy # eV/A^3 to GPa stress = -(traj.get_stress()/units.GPa)[[0, 1, 2, 5, 4, 3]] except: stress = None xjson = {'structure':structure, 'energy':energy, 'force':force, 'stress':stress, 'group':'random'} data.append(xjson) return data
[docs]def parse_dat(structure_files): import glob data = [] files = glob.glob(structure_files+'*.dat') for file in files: file_str = file.split('/') lines = open(file, 'r').readlines() if file_str[-1] == 'struct_info_1.dat': group = 'random' elif file_str[-1] in ['struct_info_2.dat', 'struct_info_3.dat']: group = 'no_stress' elif file_str[-1] == 'struct_info_4.dat': group = 'with_stress' elif file_str[-1] == 'struct_info_5.dat': group = 'with_stress' for i, line in enumerate(lines): content = line.split() if len(content) == 0: if group == 'no_stress': stress = [0.]*6 structure = Atoms(symbols=atoms, positions=positions, cell=cell, pbc=True) xdata = {'structure': structure, 'energy': energy, 'force': forces, 'stress': stress, 'group': group} data.append(xdata) mode = None continue if content[0] == 'STRUCTURE': cell, atoms, positions, forces = [], [], [], [] continue elif content[0] == 'CELL': mode = 'stress' if content[1] == 'STRESS' else 'cell' continue elif content[0] == 'ATOMIC': mode = 'position' if content[1] == 'NAME' else 'force' continue elif content[0] == 'TOTAL': mode = 'energy' continue if mode == 'energy': energy = float(line) elif mode == 'cell': cel = list(map(float, content)) cell.append(cel) elif mode == 'position': pos = list(map(float, content[1:])) atoms.append(content[0]) positions.append(pos) elif mode == 'force': force = list(map(float, content[1:])) forces.append(force) elif mode == 'stress': # Negative? # Convert to kbar? stress = list(map(float, content))#[[0, 3, 5, 1, 2, 4]] #stress = [stress[0], stress[3], stress[5], stress[1], stress[2], stress[4]] stress = [stress[0]*0.1, stress[3]*0.1, stress[5]*0.1, stress[1]*0.1, stress[2]*0.1, stress[4]*0.1] structure = Atoms(symbols=atoms, positions=positions, cell=cell, pbc=True) xdata = {'structure': structure, 'energy': energy, 'force': forces, 'stress': stress, 'group': group} data.append(xdata) mode = None return data