#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2019 Chris Sewell
#
# This file is part of aiida-crystal17.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms and conditions
# of version 3 of the GNU Lesser General Public License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
"""A data type to store CRYSTAL17 basis sets."""
from __future__ import absolute_import
import hashlib
from io import StringIO
import os
import pathlib
from aiida.common.exceptions import (
MultipleObjectsError,
ParsingError,
UniquenessError,
ValidationError,
)
from aiida.orm import Data, Group, QueryBuilder, Str, User
import yaml
from aiida_crystal17.common import flatten_dict, unflatten_dict
from aiida_crystal17.common.atoms import SYMBOLS_R
from aiida_crystal17.parsers.raw.parse_bases import parse_bsets_stdin
[docs]class BasisSetFamily(Group):
"""Group that represents a basis set family containing `BasisSetData` nodes."""
[docs]def retrieve_basis_sets(files, stop_if_existing):
"""Retrieve existing basis sets or create if them, if they do not exist.
:param files: list of basis set file paths
:param stop_if_existing: if True, check for the md5 of the files and,
if the file already exists in the DB, raises a MultipleObjectsError.
If False, simply adds the existing BasisSetData node to the group.
:return:
"""
basis_and_created = []
for basis_file in files:
_, content = parse_basis(basis_file)
md5sum = md5_from_string(content)
qb = QueryBuilder()
qb.append(BasisSetData, filters={"attributes.md5": {"==": md5sum}})
existing_basis = qb.first()
if existing_basis is None:
# return the basis set data instances, not stored
basisset, created = BasisSetData.get_or_create(
basis_file, use_first=True, store_basis=False
)
# to check whether only one basis set per element exists
# NOTE: actually, created has the meaning of "to_be_created"
basis_and_created.append((basisset, created))
else:
if stop_if_existing:
raise ValueError(
"A Basis Set with identical MD5 to "
" {} cannot be added with stop_if_existing"
"".format(basis_file)
)
existing_basis = existing_basis[0]
basis_and_created.append((existing_basis, False))
return basis_and_created
[docs]def parse_basis(basis_file):
"""Get relevant information from the basis file.
:param basis_file: absolute path to a file or open filelike object
:return: (metadata_dict, content_str)
- The basis file must contain one basis set in the CRYSTAL17 format
- blank lines and lines beginning '#' will be ignored
- the file can also start with a fenced (with ---),
yaml formatted header section
(Note keys should not contain '.'s)
Example
::
# an ignored comment
---
author: J Smith
year: 1999
---
8 2
1 0 3 2. 0.
1 1 3 6. 0.
"""
meta_data = {}
in_yaml = False
yaml_lines = []
protected_keys = ["atomic_number", "num_shells", "element", "basis_type", "content"]
parsing_data = False
content = []
if isinstance(basis_file, str):
basis_file = pathlib.Path(basis_file)
if isinstance(basis_file, pathlib.Path):
contentlines = basis_file.read_text().splitlines()
basis_file_name = basis_file.name
else:
basis_file.seek(0)
contentlines = basis_file.read().splitlines()
try:
basis_file_name = basis_file.name
except AttributeError:
basis_file_name = "StringIO"
for line in contentlines:
# ignore commented and blank lines
if line.strip().startswith("#") or not line.strip():
continue
if line.strip() == "---" and not parsing_data:
if not in_yaml:
in_yaml = True
continue
else:
head_data = yaml.safe_load("\n".join(yaml_lines))
head_data = {} if not head_data else head_data
if not isinstance(head_data, dict):
raise ParsingError(
"the header data could not be read for file: {}".format(
basis_file_name
)
)
if set(head_data.keys()).intersection(protected_keys):
raise ParsingError(
"the header data contained a forbidden key(s) "
"{} for file: {}".format(protected_keys, basis_file_name)
)
meta_data = head_data
in_yaml = False
parsing_data = True
continue
if in_yaml:
yaml_lines.append(line)
continue
parsing_data = True
content.append(line.strip())
data = parse_bsets_stdin("\n".join(content), isolated=True)
if len(data) > 1:
raise ParsingError(
"the basis set string contains more than one basis set: {}".format(
list(data.keys())
)
)
atomic_symbol = list(data.keys())[0]
meta_data["atomic_number"] = atomic_number = SYMBOLS_R[atomic_symbol]
meta_data["element"] = atomic_symbol
meta_data["basis_type"] = basis_type = data[atomic_symbol]["type"]
meta_data["num_shells"] = num_shells = len(data[atomic_symbol]["bs"])
meta_data["orbital_types"] = [o["type"] for o in data[atomic_symbol]["bs"]]
# the input atomic number may be > 100, but we should standardise this in the stored file
first_line = content[0].strip().split()
if len(first_line) != 2 or first_line[1] != str(num_shells):
raise ParsingError(
"The first line should contain only the atomic id and num shells ({}): '{}' for file {}".format(
num_shells, line, basis_file_name
)
)
newline = "{0} {1}".format(
atomic_number if basis_type == "all-electron" else 200 + atomic_number,
num_shells,
)
content[0] = newline
return meta_data, "\n".join(content)
[docs]def md5_from_string(string, encoding="utf-8"):
"""Return md5 hash of a string.
:param string: the string to hash
:param encoding: the encoding to use
:return:
"""
md5 = hashlib.md5(string.encode(encoding))
return md5.hexdigest()
[docs]class BasisSetData(Data):
"""A data type to store CRYSTAL17 basis sets.
It is intended to work much like the UpfData type
- The basis file must contain one basis set in the CRYSTAL17 format
- lines beginning # will be ignored
- the file can also start with a fenced,
yaml formatted header section (starting/ending '---')
(Note keys should not contain '.'s)
- only the actual basis data (not commented lines or the header section)
will be stored as a file and hashed
Example file
::
# an ignored comment
---
author: J Smith
year: 1999
---
8 2
1 0 3 2. 0.
1 1 3 6. 0.
"""
def __init__(self, filepath, **kwargs):
"""Read and store a file containing a single basis set.
Parameters
----------
filepath : str or filelike
"""
super(BasisSetData, self).__init__(**kwargs)
self.set_file(filepath)
@property
def filename(self):
"""Return the name of the file stored."""
return self.get_attribute("filename")
@property
def md5sum(self):
"""Return the md5 hash of the basis set.
:return:
"""
return self.get_attribute("md5", None)
[docs] def set_file(self, basis_file):
"""Pre-parse the file to store the attributes and content separately.
Parameters
----------
basis_file : str or filelike
Raises
------
ValueError
a file has already been set for this BasisSetData instance
"""
if "filename" in list(self.attributes_keys()):
raise ValueError(
"a file has already been set for this BasisSetData instance"
)
metadata, content = parse_basis(basis_file)
md5sum = md5_from_string(content)
# store the metadata and md5 in the database
for key, val in flatten_dict(metadata).items():
self.set_attribute(key, val)
self.set_attribute("md5", md5sum)
# store the rest of the file content as a file in the file repository
if isinstance(basis_file, str):
filename = os.path.basename(basis_file)
elif hasattr(basis_file, "name"):
filename = os.path.basename(basis_file.name)
else:
filename = "stringio.txt"
self.put_object_from_filelike(StringIO(content), path=filename, mode="w")
self.set_attribute("filename", filename)
[docs] @classmethod
def from_md5(cls, md5):
"""
Return a list of all Basis Sets that match a given MD5 hash.
Note that the hash has to be stored in a _md5 attribute, otherwise
the basis will not be found.
"""
qb = QueryBuilder()
qb.append(cls, filters={"attributes.md5": {"==": md5}})
return qb.all(flat=True)
@property
def metadata(self):
"""Return the attribute data as a nested dictionary.
:return: metadata dict
"""
return unflatten_dict({k: v for k, v in self.attributes_items()})
[docs] def open(self, mode="r"): # pylint: disable=arguments-differ
"""Return an open file handle to the content of this data node.
:param mode: the mode with which to open the file handle
:return: a file handle in read mode
"""
return self._repository.open(self.filename, mode=mode)
@property
def content(self):
"""Return the content string for insertion into .d12 file.
:return: content_str
"""
# TODO dealing with caching? see get_array in ArrayData,
# or are we doing this with the md5 hash
with self.open() as handle:
return handle.read()
@property
def element(self):
"""Return the element symbol associated with the basis set."""
return self.get_attribute("element", None)
[docs] def get_data(self):
"""Return the basis set content, parsed to a JSON format."""
return parse_bsets_stdin(self.content, isolated=True)[self.element]
[docs] @classmethod
def get_or_create(cls, basis_file, use_first=False, store_basis=True):
"""Pass the same parameter of the init; if a file with the same md5 is found, that BasisSetData is returned.
:param basis_file: an absolute filename on disk, or a filelike object
:param use_first: if False (default), raise an exception if more than
one basis set is found.
If it is True, instead, use the first available basis set.
:param bool store_basis: If false, the BasisSetData objects are not stored in the database.
:return (basis, created): where basis is the BasisSetData object,
and create is either True if the object was created,
or False if the object was retrieved from the DB.
"""
if isinstance(basis_file, str) and not os.path.isabs(basis_file):
raise ValueError("basis_file must be an absolute path")
_, content = parse_basis(basis_file)
md5sum = md5_from_string(content)
basissets = cls.from_md5(md5sum)
if not basissets:
if store_basis:
instance = cls(filepath=basis_file).store()
return (instance, True)
instance = cls(filepath=basis_file)
return (instance, True)
else:
if len(basissets) > 1:
if use_first:
return (basissets[0], False)
else:
raise ValueError(
"More than one copy of a basis set "
"with the same MD5 has been found in the "
"DB. pks={}".format(",".join([str(i.pk) for i in basissets]))
)
return basissets[0], False
[docs] def store(self, with_transaction=True, use_cache=None):
"""Store a new node in the DB, also saving its repository directory
and attributes, and reparsing the file so that the md5 and the element
are correctly reset.
After being called attributes cannot be
changed anymore! Instead, extras can be changed only AFTER calling
this store() function.
:note: After successful storage, those links that are in the cache, and
for which also the parent node is already stored, will be
automatically stored. The others will remain unstored.
:parameter with_transaction: if False, no transaction is used. This
is meant to be used ONLY if the outer calling function has already
a transaction open!
"""
if self.is_stored:
return self
if self.md5sum is None:
raise ValidationError("No valid Basis Set was passed!")
with self.open("r") as handle:
metadata, content = parse_basis(handle)
md5sum = md5_from_string(content)
for key, val in flatten_dict(metadata).items():
self.set_attribute(key, val)
self.set_attribute("md5", md5sum)
return super(BasisSetData, self).store(
with_transaction=with_transaction, use_cache=use_cache
)
[docs] def _validate(self):
from aiida.common.exceptions import ParsingError, ValidationError
super(BasisSetData, self)._validate()
try:
filename = self.filename
except AttributeError:
raise ValidationError("attribute 'filename' not set.")
objects = self.list_object_names()
if [filename] != objects:
raise ValidationError(
"The list of files in the folder does not "
"match the 'filename' attribute. "
"_filename='{}', content: {}".format(filename, self.list_object_names())
)
try:
with self.open("r") as handle:
metadata, content = parse_basis(handle)
except (ParsingError, IOError, NotImplementedError) as err:
raise ValidationError(f"The file '{filename}' could not be 'parsed: {err}")
md5 = md5_from_string(content)
try:
element = metadata["element"]
except KeyError:
raise ValidationError(
"No 'element' could be parsed in the " "BasisSet file"
)
try:
attr_element = self.get_attribute("element")
except AttributeError:
raise ValidationError("attribute 'element' not set.")
try:
attr_md5 = self.get_attribute("md5")
except AttributeError:
raise ValidationError("attribute 'md5' not set.")
if attr_element != element:
raise ValidationError(
"Attribute 'element' says '{}' but '{}' was "
"parsed instead.".format(attr_element, element)
)
if attr_md5 != md5:
raise ValidationError(
"Attribute 'md5' says '{}' but '{}' was "
"parsed instead.".format(attr_md5, md5)
)
[docs] def get_basis_family_names(self):
"""Get the list of all basiset family names to which the basis belongs."""
query = QueryBuilder()
query.append(BasisSetFamily, tag="group", project="label")
query.append(
self.__class__, filters={"id": {"==": self.id}}, with_group="group"
)
return query.all(flat=True)
[docs] @classmethod
def get_basis_group(cls, group_label):
"""Return the BasisFamily group with the given name."""
return BasisSetFamily.get(label=group_label)
[docs] @classmethod
def get_basis_group_map(cls, group_name):
"""Get a mapping of elements to basissets in a basis set family.
Parameters
----------
group_name : str
the group name of the basis set
Returns
-------
dict
a mapping of element to basis set
Raises
------
aiida.common.exceptions.MultipleObjectsError
if there is more than one element s
"""
family_bases = {}
family = cls.get_basis_group(group_name)
for node in family.nodes:
if isinstance(node, cls):
if node.element in family_bases:
raise MultipleObjectsError(
"More than one BasisSetData for element {} found in "
"family {}".format(node.element, group_name)
)
family_bases[node.element] = node
return family_bases
[docs] @classmethod
def get_basis_groups(cls, filter_elements=None, user=None):
"""Return all names of groups of type BasisFamily, possibly with some filters.
:param filter_elements: A string or a list of strings.
If present, returns only the groups that contains one Basis for
every element present in the list. Default=None, meaning that
all families are returned.
:param user: if None (default), return the groups for all users.
If defined, it should be either a DbUser instance, or a string
for the username (that is, the user email).
"""
builder = QueryBuilder()
builder.append(BasisSetFamily, tag="group", project="*")
if user:
builder.append(User, filters={"email": {"==": user}}, with_group="group")
if isinstance(filter_elements, str):
filter_elements = [filter_elements]
if filter_elements is not None:
builder.append(
cls,
filters={"attributes.element": {"in": filter_elements}},
with_group="group",
)
builder.order_by({BasisSetFamily: {"id": "asc"}})
return builder.all(flat=True)
[docs] @classmethod
def get_basissets_from_structure(cls, structure, family_name, by_kind=False):
"""
Given a family name (a BasisSetFamily group in the DB) and an AiiDA
structure, return a dictionary associating each element or kind name
(if ``by_kind=True``) with its BasisSetData object.
:raise aiida.common.exceptions.MultipleObjectsError:
if more than one Basis Set for the same element is found in the group.
:raise aiida.common.exceptions.NotExistent:
if no Basis Set for an element in the group is found in the group.
"""
from aiida.common.exceptions import NotExistent
family_bases = cls.get_basis_group_map(family_name)
basis_list = {}
for kind in structure.kinds:
symbol = kind.symbol
if symbol not in family_bases:
raise NotExistent(
"No BasisSetData for element {} found in family {}".format(
symbol, family_name
)
)
if by_kind:
basis_list[kind.name] = family_bases[symbol]
else:
basis_list[symbol] = family_bases[symbol]
return basis_list
[docs] @classmethod
def get_basissets_by_kind(cls, structure, family_name):
"""
Get a dictionary of {kind: basis} for all the kinds within the given
structure using the given basis set family name.
:param structure: The structure that will be used.
:param family_name: the name of the group containing the basis sets
"""
from collections import defaultdict
# A dict {kind_name: basis_object}
kind_basis_dict = cls.get_basissets_from_structure(
structure, family_name, by_kind=True
)
# We have to group the species by basis, I use the basis PK
# basis_dict will just map PK->basis_object
basis_dict = {}
# Will contain a list of all species of the basis with given PK
basis_species = defaultdict(list)
for kindname, basis in kind_basis_dict.items():
basis_dict[basis.pk] = basis
basis_species[basis.pk].append(kindname)
bases = {}
for basis_pk in basis_dict:
basis = basis_dict[basis_pk]
kinds = basis_species[basis_pk]
for kind in kinds:
bases[kind] = basis
return bases
# pylint: disable=too-many-locals,too-many-arguments
[docs] @classmethod
def upload_basisset_family(
cls,
folder,
group_name,
group_description,
stop_if_existing=True,
extension=".basis",
dry_run=False,
):
"""
Upload a set of Basis Set files in a given group.
:param folder: a path (str or pathlib.Path) containing all Basis Set files to be added.
Only files ending in set extension (case-sensitive) considered
:param group_name: the name of the group to create. If it exists and is
non-empty, a UniquenessError is raised.
:param group_description: a string to be set as the group description.
Overwrites previous descriptions, if the group was existing.
:param stop_if_existing: if True, check for the md5 of the files and,
if file already exists in the DB, raises a MultipleObjectsError.
If False, simply adds the existing BasisSetData node to the group.
:param extension: the filename extension to look for
:param dry_run: If True, do not change the database.
"""
if isinstance(folder, str):
folder = pathlib.Path(folder)
if not folder.is_dir():
raise ValueError("folder must be a directory")
paths = [
p.resolve() for p in folder.glob("*{}".format(extension)) if p.is_file()
]
automatic_user = User.objects.get_default()
group, group_created = BasisSetFamily.objects.get_or_create(
label=group_name, user=automatic_user
)
if group.user.email != automatic_user.email:
raise UniquenessError(
"There is already a BasisSetFamily group with name {}"
", but it belongs to user {}, therefore you "
"cannot modify it".format(group_name, group.user.email)
)
# Always update description, even if the group already existed
group.description = group_description
# NOTE: GROUP SAVED ONLY AFTER CHECKS OF UNICITY
basis_and_created = retrieve_basis_sets(paths, stop_if_existing)
# check whether basisset are unique per element
elements = [(i[0].element, i[0].md5sum) for i in basis_and_created]
# If group already exists, check also that I am not inserting more than
# once the same element
if not group_created:
for aiida_n in group.nodes:
# Skip non-basis sets
if not isinstance(aiida_n, BasisSetData):
continue
elements.append((aiida_n.element, aiida_n.md5sum))
elements = set(elements) # Discard elements with the same MD5, that would
# not be stored twice
elements_names = [e[0] for e in elements]
if not len(elements_names) == len(set(elements_names)):
duplicates = set([x for x in elements_names if elements_names.count(x) > 1])
duplicates_string = ", ".join(i for i in duplicates)
raise UniquenessError(
("More than one Basis found for the elements: " "{}").format(
duplicates_string
)
)
# At this point, save the group, if still unstored
if group_created and not dry_run:
group.store()
# save the basis set in the database, and add them to group
for basisset, created in basis_and_created:
if created and not dry_run:
basisset.store()
# Add elements to the group all together
if not dry_run:
group.add_nodes([basis for basis, created in basis_and_created])
nuploaded = len([_ for _, created in basis_and_created if created])
return len(paths), nuploaded