# -*- coding: utf-8 -*-
"""
Basic outline of parsing sections:
::
<parse_pre_header>
***************************
<parse_calculation_header>
***************************
<parse_geometry_input>
* GEOMETRY EDITING
<parse_calculation_setup>
CRYSTAL - SCF - TYPE OF CALCULATION :
<parse_scf_section>
SCF ENDED
<parse_scf_final_energy>
OPTOPTOPTOPT
<parse_optimisation>
OPT END -
<parse_band_gaps>
FINAL OPTIMIZED GEOMETRY
<parse_final_geometry>
MULLIKEN POPULATION ANALYSIS
<parse_mulliken_analysis>
"""
from collections import namedtuple
import copy
from fnmatch import fnmatch
import re
import traceback
from jsonextended import edict
from aiida_crystal17.common.parsing import split_numbers, convert_units
try:
from distutils.util import strtobool
except ImportError:
from distutils import strtobool
ParsedSection = namedtuple(
'ParsedSection',
['next_lineno', 'data', 'parser_error', 'non_terminating_error'])
ParsedSection.__new__.__defaults__ = (None, ) * len(ParsedSection._fields)
# a mapping of known error messages to exit codes, in order of importance
KNOWN_ERRORS = (
("END OF DATA IN INPUT DECK", "ERROR_CRYSTAL_INPUT"),
("FORMAT ERROR IN INPUT DECK", "ERROR_CRYSTAL_INPUT"),
("GEOMETRY DATA FILE NOT FOUND", "ERROR_CRYSTAL_INPUT"),
("Wavefunction file can not be found",
"ERROR_WAVEFUNCTION_NOT_FOUND"), # restart error
("SCF ENDED - TOO MANY CYCLES", "UNCONVERGED_SCF"),
("SCF FAILED",
"UNCONVERGED_SCF"), # usually found after: SCF ENDED - TOO MANY CYCLES
("GEOMETRY OPTIMIZATION FAILED",
"UNCONVERGED_GEOMETRY"), # usually because run out of steps
("CONVERGENCE TESTS UNSATISFIED",
"UNCONVERGED_GEOMETRY"), # usually found after: OPT END - FAILED
("OPT END - FAILED", "UNCONVERGED_GEOMETRY"),
("BASIS SET LINEARLY DEPENDENT",
"BASIS_SET_LINEARLY_DEPENDENT"), # occurs during geometry optimisations
("SCF abnormal end", "ERROR_SCF_ABNORMAL_END"), # catch all error
("MPI_Abort", "ERROR_MPI_ABORT"))
[docs]def read_crystal_stdout(content):
output = {
"units": {
"conversion": "CODATA2014",
"energy": "eV",
"length": "angstrom",
"angle": "degrees"
},
"errors": [],
"warnings": [],
"parser_errors": [],
"parser_exceptions": []
}
# remove MPI statuses,
# which can get mixed with the start of the program stdout and corrupt the output
# TODO: removing these will affect the reporting of line numbers in errors
regex = re.compile('(\\s*PROCESS\\s*\\d+\\s*OF\\s*\\d+\\s*WORKING\\s*\n)+')
content = re.sub(regex, '\n', content)
lines = content.splitlines()
if not lines:
output["parser_errors"] += ["the file is empty"]
return assign_exit_code(output)
# make an initial parse to find all errors/warnings and start lines for sections
errors, run_warnings, parser_errors, telapse_seconds, start_lines = initial_parse(
lines)
output["errors"] += errors
output["warnings"] += run_warnings
output["parser_errors"] += errors
if telapse_seconds is not None:
output["execution_time_seconds"] = telapse_seconds
lineno = 0
# parse until the program header
outcome = parse_section(parse_pre_header, lines, lineno, output,
"non_program")
if outcome is None or outcome.parser_error is not None:
return assign_exit_code(output)
lineno = outcome.next_lineno
# parse the program header section
outcome = parse_section(parse_calculation_header, lines, lineno, output,
"header")
if outcome is None or outcome.parser_error is not None:
return assign_exit_code(output)
lineno = outcome.next_lineno
# parse the initial geometry input
outcome = parse_section(parse_geometry_input, lines, lineno, output,
"geometry_input")
if outcome is None or outcome.parser_error is not None:
return assign_exit_code(output)
lineno = outcome.next_lineno
# parse the calculation setup and initial geometry
outcome = parse_section(parse_calculation_setup, lines, lineno, output,
None)
if outcome is None or outcome.parser_error is not None:
return assign_exit_code(output)
lineno = outcome.next_lineno
# parse the initial SCF run
outcome = parse_section(parse_scf_section, lines, lineno, output,
("initial_scf", "cycles"))
if outcome is None or outcome.parser_error is not None:
return assign_exit_code(output)
lineno = outcome.next_lineno
# parse the final energy of the scf run
outcome = parse_section(parse_scf_final_energy, lines, lineno, output,
("initial_scf", "final_energy"))
# Note: we don't abort on error
if outcome is not None:
lineno = outcome.next_lineno
# TODO test these lines aren't in-between initial scf and opt:
# ("* GEOMETRY EDITING", "CRYSTAL - SCF - TYPE OF CALCULATION :", "SCF ENDED")
# Note: in a few runs I observed, scf maxcycle was reached, but then the scf started again!
# parse the optimisation (if present)
if "optimization" in start_lines:
outcome = parse_section(parse_optimisation, lines,
start_lines["optimization"], output,
"optimisation")
# Note: we don't abort on error
if outcome is not None:
lineno = outcome.next_lineno
if outcome is not None and outcome.parser_error is None:
# TODO do band gaps only com after optimisation?
outcome = parse_section(parse_band_gaps, lines, lineno, output,
"band_gaps")
# Note: we don't abort on error
if outcome is not None:
lineno = outcome.next_lineno
# parse the final optimized geometry (if present)
if "final_geometry" in start_lines:
outcome = parse_section(parse_final_geometry, lines,
start_lines["final_geometry"], output,
"final_geometry")
# Note: we don't abort on error
if outcome is not None:
lineno = outcome.next_lineno
if "mulliken" in start_lines:
outcome = parse_section(parse_mulliken_analysis, lines,
start_lines["mulliken"], output, "mulliken")
# Note: we don't abort on error
if outcome is not None:
lineno = outcome.next_lineno
return assign_exit_code(output)
[docs]def parse_section(func, lines, initial_lineno, output, key_name):
"""parse a section of the stdout file
Parameters
----------
func : callable
a function that returns a `ParsedSection` object
lines : list[str]
initial_lineno : int
output : dict
current output from the parser
key_name : str or list[str] or None
the key_name of output to assign the data to (if None directly update)
Returns
-------
ParsedSection or None
"""
try:
outcome = func(lines, initial_lineno)
except Exception as err:
traceback.print_exc()
output["parser_exceptions"].append(str(err))
return None
if outcome.data:
if key_name is None:
output.update(outcome.data)
else:
suboutput = output
if isinstance(key_name, (tuple, list)):
for key in key_name[:-1]:
suboutput = suboutput.setdefault(key, {})
key_name = key_name[-1]
suboutput[key_name] = outcome.data
if outcome.non_terminating_error is not None:
output["errors"].append(outcome.non_terminating_error)
if outcome.parser_error is not None:
output["parser_errors"].append(outcome.parser_error)
return outcome
[docs]def assign_exit_code(output):
exit_code = 0
if output["errors"]:
exit_code = "ERROR_CRYSTAL_RUN"
for known_error_msg, code_name in KNOWN_ERRORS:
found = False
for error_msg in output["errors"]:
if known_error_msg in error_msg:
found = True
break
if found:
exit_code = code_name
break
elif output["parser_errors"]:
exit_code = "ERROR_PARSING_STDOUT"
elif output["parser_exceptions"]:
exit_code = "ERROR_PARSING_STDOUT"
output["exit_code"] = exit_code
return output
[docs]def initial_parse(lines):
""" scan the file for errors, and find the final elapsed time value """
errors = []
warnings = []
parser_errors = []
mpi_abort = False
telapse_line = None
start_lines = {}
second_opt_line = False
# This is required since output looks like
# OPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPT
# STARTING GEOMETRY OPTIMIZATION - INFORMATION ON SCF MOVED TO SCFOUT.LOG
# GEOMETRY OPTIMIZATION INFORMATION STORED IN OPTINFO.DAT
# OPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPTOPT
for lineno, line in enumerate(lines):
if "WARNING" in line.upper():
warnings.append(line.strip())
elif "ERROR" in line:
errors.append(line.strip())
elif "SCF abnormal end" in line: # only present when run using runcry
errors.append(line.strip())
elif "MPI_Abort" in line:
# only record one mpi_abort event (to not clutter output)
if not mpi_abort:
errors.append(line.strip())
mpi_abort = True
elif "CONVERGENCE TESTS UNSATISFIED" in line.upper():
errors.append(line.strip())
elif "TELAPSE" in line:
telapse_line = lineno
# search for an optimisation
elif "OPTOPTOPTOPT" in line:
if "optimization" in start_lines:
if second_opt_line:
parser_errors.append(
"found two lines starting optimization section: "
"{0} and {1}".format(start_lines["optimization"],
lineno))
else:
second_opt_line = True
start_lines["optimization"] = lineno
elif "CONVERGENCE ON GRADIENTS SATISFIED AFTER THE FIRST OPTIMIZATION CYCLE" in line:
if "optimization" in start_lines:
if second_opt_line:
parser_errors.append(
"found two lines starting optimization section: "
"{0} and {1}".format(start_lines["optimization"],
lineno))
else:
second_opt_line = True
start_lines["optimization"] = lineno
# search for mulliken analysis
elif line.strip().startswith("MULLIKEN POPULATION ANALYSIS"):
# can have ALPHA+BETA ELECTRONS and ALPHA-BETA ELECTRONS (denoted in line above mulliken_starts)
start_lines.setdefault("mulliken", []).append(lineno)
# search for final geometry
elif "FINAL OPTIMIZED GEOMETRY" in line:
if "final_geometry" in start_lines:
parser_errors.append(
"found two lines starting 'FINAL OPTIMIZED GEOMETRY':"
" {0} and {1}".format(start_lines["final_geometry"],
lineno))
start_lines["final_geometry"] = lineno
total_seconds = None
if telapse_line:
total_seconds = int(
split_numbers(lines[telapse_line].split("TELAPSE")[1])[0])
# m, s = divmod(total_seconds, 60)
# h, m = divmod(m, 60)
# elapsed_time = "%d:%02d:%02d" % (h, m, s)
return errors, warnings, parser_errors, total_seconds, start_lines
[docs]def parse_calculation_setup(lines, initial_lineno):
""" parse initial setup data (starting after intital geometry input)
Parameters
----------
lines: list[str]
initial_lineno: int
Returns
-------
ParsedSection
"""
data = {"calculation": {"spin": False}, "initial_geometry": {}}
end_lineno = None
for i, line in enumerate(lines[initial_lineno:]):
curr_lineno = initial_lineno + i
line = line.strip()
if line.startswith("CRYSTAL - SCF - TYPE OF CALCULATION :"):
end_lineno = curr_lineno
break
elif line.startswith("TYPE OF CALCULATION :"):
data["calculation"]["type"] = line.replace("TYPE OF CALCULATION :",
"").strip().lower()
if "HAMILTONIAN" in lines[curr_lineno + 1]:
regex = r"\(EXCHANGE\)\[CORRELATION\] FUNCTIONAL:\((.*)\)\[(.*)\]"
string = lines[curr_lineno + 3].strip()
if re.match(regex, string):
data["calculation"]["functional"] = {
"exchange": re.search(regex, string).group(1),
"correlation": re.search(regex, string).group(2)
}
elif "SPIN POLARIZ" in line:
data["calculation"]["spin"] = True
parse_geometry_section(data["initial_geometry"], curr_lineno, line, lines)
parse_symmetry_section(data["initial_geometry"], curr_lineno, line, lines)
if end_lineno is None:
return ParsedSection(curr_lineno, data,
"couldn't find start of initial scf calculation")
regexes = {
'n_atoms':
re.compile(r"\sN. OF ATOMS PER CELL\s*(\d*)", re.DOTALL),
'n_shells':
re.compile(r"\sNUMBER OF SHELLS\s*(\d*)", re.DOTALL),
'n_ao':
re.compile(r"\sNUMBER OF AO\s*(\d*)", re.DOTALL),
'n_electrons':
re.compile(r"\sN. OF ELECTRONS PER CELL\s*(\d*)", re.DOTALL),
'n_core_el':
re.compile(r"\sCORE ELECTRONS PER CELL\s*(\d*)", re.DOTALL),
'n_symops':
re.compile(r"\sN. OF SYMMETRY OPERATORS\s*(\d*)", re.DOTALL),
'n_kpoints_ibz':
re.compile(r"\sNUMBER OF K POINTS IN THE IBZ\s*(\d*)", re.DOTALL),
'n_kpoints_gilat':
re.compile(r"\s NUMBER OF K POINTS\(GILAT NET\)\s*(\d*)", re.DOTALL),
}
content = "\n".join(lines[initial_lineno:end_lineno])
for name, regex in regexes.items():
num = regex.search(content)
if num is not None:
data['calculation'][name] = int(num.groups()[0])
return ParsedSection(curr_lineno, data, None)
[docs]def parse_geometry_section(dct, i, line, lines, startline=0):
""" update dict get geometry related variables
Parameters
----------
dct
i
line
lines
startline: int
Returns
-------
"""
if fnmatch(line, "LATTICE PARAMETERS*(*)"):
if not ("ANGSTROM" in line and "DEGREES" in line):
raise IOError(
"was expecting lattice parameters in angstroms and degrees on line:"
" {0}, got: {1}".format(startline + i, line))
for pattern, field, pattern2 in [
('PRIMITIVE*CELL*', "primitive_cell", "ATOMS IN THE ASYMMETRIC UNIT*"),
('CRYSTALLOGRAPHIC*CELL*', "crystallographic_cell",
"COORDINATES IN THE CRYSTALLOGRAPHIC CELL")
]:
if fnmatch(line, pattern):
if not fnmatch(lines[i + 1].strip(), "A*B*C*ALPHA*BETA*GAMMA"):
raise IOError("was expecting A B C ALPHA BETA GAMMA on line:"
" {0}, got: {1}".format(startline + i + 1,
lines[i + 1]))
dct[field] = edict.merge([
dct.get(field, {}),
{
"cell_parameters":
dict(
zip(['a', 'b', 'c', 'alpha', 'beta', 'gamma'],
split_numbers(lines[i + 2])))
}
])
if fnmatch(line, pattern2):
periodic = [True, True, True]
if not fnmatch(lines[i + 1].strip(), "ATOM*X/A*Y/B*Z/C"):
# for 2d (slab) can get z in angstrom (and similar for 1d)
if fnmatch(lines[i + 1].strip(), "ATOM*X/A*Y/B*Z(ANGSTROM)*"):
periodic = [True, True, False]
elif fnmatch(lines[i + 1].strip(),
"ATOM*X/A*Y(ANGSTROM)*Z(ANGSTROM)*"):
periodic = [True, False, False]
elif fnmatch(lines[i + 1].strip(),
"ATOM*X(ANGSTROM)*Y(ANGSTROM)*Z(ANGSTROM)*"):
periodic = [False, False, False]
cell_params = dict(
zip(['a', 'b', 'c', 'alpha', 'beta', 'gamma'],
[500., 500., 500., 90., 90., 90.]))
dct[field] = edict.merge(
[dct.get(field, {}), {
"cell_parameters": cell_params
}])
else:
raise IOError(
"was expecting ATOM X Y Z (in units of ANGSTROM or fractional) on line:"
" {0}, got: {1}".format(startline + i + 1,
lines[i + 1]))
if not all(periodic) and "cell_parameters" not in dct.get(
field, {}):
raise IOError(
"require cell parameters to have been set for non-periodic directions in line"
" #{0} : {1}".format(startline + i + 1, lines[i + 1]))
a, b, c, alpha, beta, gamma = [None] * 6
if not all(periodic):
cell = dct[field]["cell_parameters"]
a, b, c, alpha, beta, gamma = [
cell[p] for p in ["a", "b", "c", "alpha", "beta", "gamma"]
]
nextindx = i + 3
atom_data = {
'ids': [],
'assymetric': [],
'atomic_numbers': [],
'symbols': [],
"fcoords": []
}
atom_data["pbc"] = periodic
while lines[nextindx].strip(
) and not lines[nextindx].strip()[0].isalpha():
fields = lines[nextindx].strip().split()
atom_data['ids'].append(fields[0])
atom_data['assymetric'].append(bool(strtobool(fields[1])))
atom_data['atomic_numbers'].append(int(fields[2]))
atom_data['symbols'].append(fields[3].lower().capitalize())
if all(periodic):
atom_data['fcoords'].append(
[float(fields[4]),
float(fields[5]),
float(fields[6])])
elif periodic == [True, True, False
] and alpha == 90 and beta == 90:
atom_data['fcoords'].append([
float(fields[4]),
float(fields[5]),
float(fields[6]) / c
])
# TODO other periodic types (1D, 0D)
nextindx += 1
if not atom_data["fcoords"]:
atom_data.pop("fcoords")
dct[field] = edict.merge([dct.get(field, {}), atom_data])
# TODO These ccoords DON'T work with lattice parameters (at least for final run)
if fnmatch(line, "CARTESIAN COORDINATES - PRIMITIVE CELL*"):
if not fnmatch(lines[i + 2].strip(),
"*ATOM*X(ANGSTROM)*Y(ANGSTROM)*Z(ANGSTROM)"):
raise IOError(
"was expecting ATOM X(ANGSTROM) Y(ANGSTROM) Z(ANGSTROM) on line:"
" {0}, got: {1}".format(startline + i + 2, lines[i + 2]))
nextindx = i + 4
atom_data = {
'ids': [],
'atomic_numbers': [],
'symbols': [],
"ccoords": []
}
while lines[nextindx].strip(
) and not lines[nextindx].strip()[0].isalpha():
fields = lines[nextindx].strip().split()
atom_data['ids'].append(fields[0])
atom_data['atomic_numbers'].append(int(fields[1]))
atom_data['symbols'].append(fields[2].lower().capitalize())
atom_data['ccoords'].append(
[float(fields[3]),
float(fields[4]),
float(fields[5])])
nextindx += 1
dct["primitive_cell"] = edict.merge(
[dct.get("primitive_cell", {}), atom_data])
if fnmatch(line, "DIRECT LATTICE VECTORS CARTESIAN COMPONENTS*"):
if "ANGSTROM" not in line:
raise IOError("was expecting lattice vectors in angstroms on line:"
" {0}, got: {1}".format(startline + i, line))
if not fnmatch(lines[i + 1].strip(), "X*Y*Z"):
raise IOError("was expecting X Y Z on line:"
" {0}, got: {1}".format(startline + i + 1,
lines[i + 1]))
if "crystallographic_cell" not in dct:
dct["crystallographic_cell"] = {}
if "cell_vectors" in dct["crystallographic_cell"]:
raise IOError("found multiple cell vectors on line:"
" {0}, got: {1}".format(startline + i + 1,
lines[i + 1]))
vectors = {
"a": split_numbers(lines[i + 2]),
"b": split_numbers(lines[i + 3]),
"c": split_numbers(lines[i + 4])
}
dct["primitive_cell"]["cell_vectors"] = vectors
[docs]def parse_symmetry_section(dct, i, line, lines, startline=0):
""" update dict with symmetry related variables
Parameters
----------
dct
i
line
lines
startline: int
Returns
-------
"""
if fnmatch(line, "*SYMMOPS - TRANSLATORS IN FRACTIONAL UNITS*"):
nums = split_numbers(line)
if not len(nums) == 1:
raise IOError(
"was expecting a single number, representing the number of symmops, on this line:"
" {0}, got: {1}".format(startline + i, line))
nsymmops = int(nums[0])
if not fnmatch(
lines[i + 1],
"*MATRICES AND TRANSLATORS IN THE CRYSTALLOGRAPHIC REFERENCE FRAME*"
):
raise IOError(
"was expecting CRYSTALLOGRAPHIC REFERENCE FRAME on this line"
" {0}, got: {1}".format(startline + i + 1,
lines[i + 1].strip()))
if not fnmatch(lines[i + 2], "*V*INV*ROTATION MATRICES*TRANSLATORS*"):
raise IOError("was expecting symmetry headers on this line"
" {0}, got: {1}".format(startline + i + 2,
lines[i + 2].strip()))
symmops = []
for j in range(nsymmops):
values = split_numbers(lines[i + 3 + j])
if not len(values) == 14:
raise IOError(
"was expecting 14 values for symmetry data on this line"
" {0}, got: {1}".format(startline + i + 3 + j,
lines[i + 3 + j].strip()))
symmops.append(values[2:14])
dct["primitive_symmops"] = symmops
[docs]def parse_scf_section(lines, initial_lineno, final_lineno=None):
""" read scf data
Parameters
----------
lines: list[str]
initial_lineno: int
final_lineno: int or None
Returns
-------
ParsedSection
"""
scf = []
scf_cyc = None
last_cyc_num = None
for k, line in enumerate(lines[initial_lineno:]):
curr_lineno = k + initial_lineno
if "SCF ENDED" in line or (final_lineno is not None
and curr_lineno == final_lineno):
# add last scf cycle
if scf_cyc:
scf.append(scf_cyc)
if "CONVERGE" not in line:
return ParsedSection(curr_lineno, scf, None, line.strip())
else:
return ParsedSection(curr_lineno, scf, None)
line = line.strip()
if fnmatch(line, "CYC*"):
# start new cycle
if scf_cyc is not None:
scf.append(scf_cyc)
scf_cyc = {}
# check we are adding them in sequential order
cur_cyc_num = split_numbers(line)[0]
if last_cyc_num is not None:
if cur_cyc_num != last_cyc_num + 1:
return ParsedSection(
curr_lineno, scf,
"was expecting the SCF cyle number to be {0} in line {1}: {2}"
.format(int(last_cyc_num + 1), curr_lineno, line))
last_cyc_num = cur_cyc_num
if fnmatch(line, "*ETOT*"):
if not fnmatch(line, "*ETOT(AU)*"):
raise IOError("was expecting units in a.u. on line {0}, "
"got: {1}".format(curr_lineno, line))
# this is the initial energy of the configuration and so actually the energy of the previous run
if scf:
scf[-1]["energy"] = scf[-1].get("energy", {})
scf[-1]["energy"]["total"] = convert_units(
split_numbers(line)[1], "hartree", "eV")
elif scf_cyc is None:
continue
# The total magnetization is the integral of the magnetization in the cell:
# MT=∫ (nup-ndown) d3 r
#
# The absolute magnetization is the integral of the absolute value of the magnetization in the cell:
# MA=∫ |nup-ndown| d3 r
#
# In a simple ferromagnetic material they should be equal (except possibly for an overall sign).
# In simple antiferromagnets (like FeO) MT is zero and MA is twice the magnetization of each of the two atoms.
if line.startswith("CHARGE NORMALIZATION FACTOR"):
scf_cyc["CHARGE NORMALIZATION FACTOR".lower().replace(
" ", "_")] = split_numbers(line)[0]
if line.startswith("SUMMED SPIN DENSITY"):
scf_cyc["spin_density_total"] = split_numbers(line)[0]
if line.startswith("TOTAL ATOMIC CHARGES"):
scf_cyc["atomic_charges_peratom"] = []
j = curr_lineno + 1
while len(lines[j].strip().split()) == len(
split_numbers(lines[j])):
scf_cyc["atomic_charges_peratom"] += split_numbers(lines[j])
j += 1
if line.startswith("TOTAL ATOMIC SPINS"):
scf_cyc["spin_density_peratom"] = []
j = curr_lineno + 1
while len(lines[j].strip().split()) == len(
split_numbers(lines[j])):
scf_cyc["spin_density_peratom"] += split_numbers(lines[j])
j += 1
scf_cyc["spin_density_absolute"] = sum(
[abs(s) for s in split_numbers(lines[curr_lineno + 1])])
# add last scf cycle
if scf_cyc:
scf.append(scf_cyc)
return ParsedSection(
curr_lineno, scf,
"Did not find end of SCF section (starting on line {})".format(
initial_lineno))
[docs]def parse_scf_final_energy(lines, initial_lineno, final_lineno=None):
""" read post initial scf data
Parameters
----------
lines: list[str]
initial_lineno: int
Returns
-------
"""
scf_energy = {}
for i, line in enumerate(lines[initial_lineno:]):
if final_lineno is not None and i + initial_lineno == final_lineno:
return ParsedSection(final_lineno, scf_energy)
if line.strip().startswith("TTTTTTT") or line.strip().startswith(
"******"):
return ParsedSection(final_lineno, scf_energy)
if fnmatch(line.strip(), "TOTAL ENERGY*DE*"):
if not fnmatch(line.strip(), "TOTAL ENERGY*AU*DE*"):
raise IOError("was expecting units in a.u. on line:"
" {0}, got: {1}".format(initial_lineno + i,
line))
if "total_corrected" in scf_energy:
raise IOError("total corrected energy found twice, on line:"
" {0}, got: {1}".format(initial_lineno + i,
line))
scf_energy["total_corrected"] = convert_units(
split_numbers(line)[1], "hartree", "eV")
return ParsedSection(
final_lineno, scf_energy,
"Did not find end of Post SCF section (starting on line {})".format(
initial_lineno))
[docs]def parse_optimisation(lines, initial_lineno):
""" read geometric optimisation
Parameters
----------
lines: list[str]
initial_lineno: int
Returns
-------
ParsedSection
"""
if "CONVERGENCE ON GRADIENTS SATISFIED AFTER THE FIRST OPTIMIZATION CYCLE" in lines[
initial_lineno]:
for k, line in enumerate(lines[initial_lineno:]):
curr_lineno = initial_lineno + k
line = line.strip()
if "OPT END -" in line:
if not fnmatch(line, "*E(AU)*"):
raise IOError("was expecting units in a.u. on line:"
" {0}, got: {1}".format(curr_lineno, line))
data = [{
"energy": {
"total_corrected":
convert_units(
split_numbers(lines[-1])[0], "hartree", "eV")
}
}]
return ParsedSection(curr_lineno, data)
return ParsedSection(
curr_lineno, [],
"did not find 'OPT END', after optimisation start at line {}".
format(initial_lineno))
opt_cycles = []
opt_cyc = None
scf_start_no = None
failed_opt_step = False
for k, line in enumerate(lines[initial_lineno:]):
curr_lineno = initial_lineno + k
line = line.strip()
if "OPT END -" in line:
if opt_cyc and not failed_opt_step:
opt_cycles.append(opt_cyc)
return ParsedSection(curr_lineno, opt_cycles)
if fnmatch(line, "*OPTIMIZATION*POINT*"):
if opt_cyc is not None and not failed_opt_step:
opt_cycles.append(opt_cyc)
opt_cyc = {}
scf_start_no = None
failed_opt_step = False
elif opt_cyc is None:
continue
# when using ONELOG optimisation key word
if "CRYSTAL - SCF - TYPE OF CALCULATION :" in line:
if scf_start_no is not None:
return ParsedSection(
curr_lineno, opt_cycles,
"found two lines starting scf ('CRYSTAL - SCF - ') in opt step {0}:"
.format(len(opt_cycles)) + " {0} and {1}".format(
scf_start_no, curr_lineno))
scf_start_no = curr_lineno
elif "SCF ENDED" in line:
if "CONVERGE" not in line:
pass # errors.append(line.strip())
outcome = parse_scf_section(lines, scf_start_no + 1,
curr_lineno + 1)
# TODO test if error
opt_cyc["scf"] = outcome.data
parse_geometry_section(opt_cyc, curr_lineno, line, lines)
# TODO move to read_post_scf?
if fnmatch(line, "TOTAL ENERGY*DE*"):
if not fnmatch(line, "TOTAL ENERGY*AU*DE*AU*"):
return ParsedSection(
curr_lineno, opt_cycles,
"was expecting units in a.u. on line:"
" {0}, got: {1}".format(curr_lineno, line))
opt_cyc["energy"] = opt_cyc.get("energy", {})
opt_cyc["energy"]["total_corrected"] = convert_units(
split_numbers(line)[1], "hartree", "eV")
for param in [
"MAX GRADIENT", "RMS GRADIENT", "MAX DISPLAC", "RMS DISPLAC"
]:
if fnmatch(line, "{}*CONVERGED*".format(param)):
if "convergence" not in opt_cyc:
opt_cyc["convergence"] = {}
opt_cyc["convergence"][param.lower().replace(" ", "_")] = bool(
strtobool(line.split()[-1]))
if fnmatch(line,
"*SCF DID NOT CONVERGE. RETRYING WITH A SMALLER OPT STEP*"):
# TODO add failed optimisation steps with dummy energy and extra parameter?
# for now discard this optimisation step
failed_opt_step = True
if opt_cyc and not failed_opt_step:
opt_cycles.append(opt_cyc)
return ParsedSection(
curr_lineno, opt_cycles,
"did not find 'OPT END', after optimisation start at line {}".format(
initial_lineno))
[docs]def parse_final_geometry(lines, initial_lineno):
""" read final optimized geometry section
Parameters
----------
lines: list[str]
initial_lineno: int
Returns
-------
ParsedSection
"""
data = {}
for i, line in enumerate(lines[initial_lineno:]):
line = line.strip()
parse_geometry_section(data, initial_lineno + i, line, lines)
parse_symmetry_section(data, initial_lineno + i, line, lines)
# TODO handle exceptions
# TODO breaking line?
return ParsedSection(initial_lineno + i, data)
[docs]def parse_band_gaps(lines, initial_lineno):
""" read band gap information
Note: this is new for CRYSTAL17
Parameters
----------
lines: list[str]
initial_lineno: int
Returns
-------
ParsedSection
"""
band_gaps = {}
for k, line in enumerate(lines[initial_lineno:]):
curr_lineno = initial_lineno + k
line = line.strip()
# TODO breaking line?
# TODO use regex:
# re.compile(r"(DIRECT|INDIRECT) ENERGY BAND GAP:\s*([.\d]*)",
# re.DOTALL),
if "BAND GAP" in line:
if fnmatch(line.strip(), "ALPHA BAND GAP:*eV"):
bgvalue = split_numbers(line)[0]
bgtype = "alpha"
elif fnmatch(line.strip(), "BETA BAND GAP:*eV"):
bgvalue = split_numbers(line)[0]
bgtype = "beta"
elif fnmatch(line.strip(), "BAND GAP:*eV"):
bgvalue = split_numbers(line)[0]
bgtype = "all"
else:
return ParsedSection(
initial_lineno, band_gaps,
"found a band gap of unknown format at line {0}: {1}".
format(curr_lineno, line))
if bgtype in band_gaps:
return ParsedSection(
initial_lineno, band_gaps,
"band gap data already contains {0} value before line {1}: {2}"
.format(bgtype, curr_lineno, line))
band_gaps[bgtype] = bgvalue
return ParsedSection(initial_lineno, band_gaps)
[docs]def parse_mulliken_analysis(lines, mulliken_indices):
"""
Parameters
----------
lines: list[str]
mulliken_indices: list[int]
Returns
-------
ParsedSection
"""
mulliken = {}
for i, indx in enumerate(mulliken_indices):
name = lines[indx - 1].strip().lower()
if not (name == "ALPHA+BETA ELECTRONS".lower()
or name == "ALPHA-BETA ELECTRONS".lower()):
return ParsedSection(
mulliken_indices[0], mulliken,
"was expecting mulliken to be alpha+beta or alpha-beta on line:"
" {0}, got: {1}".format(indx - 1, lines[indx - 1]))
mulliken[name.replace(" ", "_")] = {
"ids": [],
"symbols": [],
"atomic_numbers": [],
"charges": []
}
if len(mulliken_indices) > i + 1:
searchlines = lines[indx + 1:mulliken_indices[i + 1]]
else:
searchlines = lines[indx + 1:]
charge_line = None
for j, line in enumerate(searchlines):
if fnmatch(line.strip(), "*ATOM*Z*CHARGE*SHELL*POPULATION*"):
charge_line = j + 2
break
if charge_line is None:
continue
while searchlines[charge_line].strip(
) and not searchlines[charge_line].strip()[0].isalpha():
fields = searchlines[charge_line].strip().split()
# shell population can wrap multiple lines, the one we want has the label in it
if len(fields) != len(split_numbers(searchlines[charge_line])):
mulliken[name.replace(" ", "_")]["ids"].append(int(fields[0]))
mulliken[name.replace(" ", "_")]["symbols"].append(
fields[1].lower().capitalize())
mulliken[name.replace(" ", "_")]["atomic_numbers"].append(
int(fields[2]))
mulliken[name.replace(" ", "_")]["charges"].append(
float(fields[3]))
charge_line += 1
return ParsedSection(mulliken_indices[0], mulliken)