Robotics
code
nlsefouh commited on
Commit
aac5fad
·
verified ·
1 Parent(s): 534897b

Upload 8 files

Browse files
Files changed (8) hide show
  1. __init__.py +8 -13
  2. cp2k.py +65 -0
  3. i_pi.py +47 -0
  4. mdtraj.py +77 -0
  5. n2p2.py +65 -0
  6. runner.py +134 -0
  7. utilities.py +336 -0
  8. xyz.py +94 -0
__init__.py CHANGED
@@ -1,14 +1,9 @@
1
- # __init__.py
2
 
3
- from importlib import resources
4
- try:
5
- import tomllib
6
- except ModuleNotFoundError:
7
- import tomli as tomllib
8
-
9
- # Version of the package
10
- __version__ = "1.0.0"
11
-
12
- # Read URL of the feed from config file
13
- _cfg = tomllib.loads(resources.read_text("reader", "config.toml"))
14
- URL = _cfg["feed"]["url"]
 
1
+ """Input and output functionality to serve the rest of the package."""
2
 
3
+ from .utilities import *
4
+ from .cp2k import *
5
+ from .mdtraj import *
6
+ from .i_pi import *
7
+ from . import xyz
8
+ from . import runner
9
+ from . import n2p2
 
 
 
 
 
cp2k.py ADDED
@@ -0,0 +1,65 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Functions for CP2K-specific input/output."""
2
+
3
+ __all__ = ['add_energy_cp2k_comment', 'read_frames_cp2k']
4
+
5
+ from itertools import repeat
6
+
7
+ from .utilities import Frame, merge_frames, read_frames
8
+
9
+
10
+ def add_energy_cp2k_comment(frames):
11
+ """Parse CP2K energy and inject it into frames.
12
+
13
+ For each frame in `frames`, try to extract a CP2K-formatted potential energy
14
+ from the comment string and inject it back into the frame. Energy from CP2K is
15
+ in Hartree, so no conversion is needed.
16
+ """
17
+
18
+ for frame in frames:
19
+
20
+ if frame.energy is not None:
21
+ raise ValueError('Energy already present.')
22
+
23
+ try:
24
+ for pair in frame.comment.split(','):
25
+ items = pair.split('=')
26
+ if items[0].strip() == 'E':
27
+ frame.energy = float(items[1])
28
+ break
29
+ except (IndexError, ValueError):
30
+ raise ValueError('No CP2K energy found in comment line.')
31
+
32
+ yield frame
33
+
34
+
35
+ def read_frames_cp2k(fn_positions, cell=None, fn_forces=None, read_energy: bool = True, force_unit=1.0):
36
+ """Read data specifically produced by CP2K.
37
+
38
+ Arguments:
39
+ fn_positions: position trajectory file name, XYZ format
40
+ cell: a constant cell to use in all frames, optional
41
+ fn_forces: forces file name, XYZ format, optional
42
+ read_energy: whether to read energies from comments in `fn_positions`
43
+
44
+ Returns:
45
+ a `Frame` object
46
+ """
47
+
48
+ # positions from XYZ, energies from comment if requested
49
+ # we expect units of angstrom for positions from CP2K
50
+ frames_pos = read_frames(fn_positions, fformat='xyz')
51
+ if read_energy:
52
+ frames_pos = add_energy_cp2k_comment(frames_pos)
53
+ frames = [frames_pos]
54
+
55
+ # add a constant cell if provided
56
+ if cell is not None:
57
+ frames.append(repeat(Frame(cell=cell)))
58
+
59
+ # add forces from XYZ if filename was provided
60
+ # we expect atomic units for forces from CP2K per default
61
+ if fn_forces is not None:
62
+ frames.append(read_frames(fn_forces, fformat='xyz', name_data='forces', unit=force_unit))
63
+
64
+ # iterate over merged frames
65
+ yield from merge_frames(*frames)
i_pi.py ADDED
@@ -0,0 +1,47 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Read data specifically produced by i-PI"""
2
+
3
+ __all__ = ['read_frames_i_pi']
4
+
5
+ import itertools
6
+
7
+ from .utilities import Frame, merge_frames, read_frames
8
+
9
+
10
+ def read_frames_i_pi(fn_positions, cell=None, fn_forces=None, fn_energies=None, column_energy=4):
11
+ """Read data specifically produced by i-PI.
12
+
13
+ We assume typically used units - angstrom for positions, atomic units for forces and energies.
14
+ i-PI can save data in any units, but we do not attempt to be fully general here. The strides
15
+ if all files are assumed to be the same. For other units or strides, compose the frames by hand
16
+ or write a custom reader function.
17
+
18
+ Arguments:
19
+ fn_positions: position trajectory file name, XYZ format
20
+ cell: a constant cell to use in all frames, optional
21
+ fn_forces: forces file name, XYZ format, optional
22
+ fn_energies: energies file name, n2p2 energy format, optional
23
+
24
+ Returns:
25
+ a `Frame` object
26
+ """
27
+
28
+ # positions from XYZ, we expect units of angstrom for positions from ipi
29
+ frames_pos = read_frames(fn_positions, fformat='xyz')
30
+ frames = [frames_pos]
31
+
32
+ # add a constant cell if provided
33
+ if cell is not None:
34
+ frames.append(itertools.repeat(Frame(cell=cell)))
35
+
36
+ # add forces from XYZ if filename was provided
37
+ # we expect atomic units for forces from i-PI
38
+ if fn_forces is not None:
39
+ frames.append(read_frames(fn_forces, fformat='xyz', name_data='forces', unit=1.0))
40
+
41
+ # add energies from file if filename was provided
42
+ # we expect atomic units for energies from i-PI
43
+ if fn_energies is not None:
44
+ frames.append(read_frames(fn_energies, fformat='N2P2_E', column=column_energy))
45
+
46
+ # iterate over merged frames
47
+ yield from merge_frames(*frames)
mdtraj.py ADDED
@@ -0,0 +1,77 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Read data using MDTraj."""
2
+
3
+ __all__ = [
4
+ 'read_frames_mdtraj',
5
+ ]
6
+
7
+ try:
8
+ import mdtraj
9
+ except ImportError:
10
+ mdtraj = None
11
+
12
+ from .utilities import Frame
13
+ from ..constants import nm
14
+
15
+
16
+ def read_frames_mdtraj(fn_in, top=None, names_atoms='type', name_data='positions', unit=nm, unit_cell=nm, chunk=100):
17
+ """Read data from a file using the MDTraj package.
18
+
19
+ Arguments:
20
+ fn_in: name of trajectory file to read, passed to `mdtraj.iterload`
21
+ top: MDTraj topology, passed to `mdtraj.iterload`
22
+ names_atoms: which atom names to use, 'type' or 'element'
23
+ name_data: what quantity to take the data as
24
+ unit: unit to scale data by, multiplicative factor in atomic units
25
+ unit_cell: unit to scale cell by, multiplicative factor in atomic units
26
+ chunk: size of one trajectory chunk, passed to `mdtraj.iterload`
27
+
28
+ Yields:
29
+ One AML `Frame` object at a time
30
+ """
31
+
32
+ # open the trajectory for interation
33
+ trj = mdtraj.iterload(fn_in, top=top, chunk=chunk)
34
+
35
+ # no atom names yet
36
+ names = None
37
+
38
+ # prepare data names
39
+ if name_data not in ('positions', 'forces'):
40
+ raise ValueError(f'Unsupported `name_data`: {name_data}. Expected "positions" or "forces".')
41
+
42
+ # iterate over all frames
43
+ for chunk in trj:
44
+
45
+ # prepare atom names
46
+ # (`trj` is a generator, no topology information there)
47
+ if names is None:
48
+ if names_atoms == 'type':
49
+ names = [atom.name for atom in chunk.topology.atoms]
50
+ elif names_atoms == 'element':
51
+ names = [atom.element.symbol for atom in chunk.topology.atoms]
52
+ else:
53
+ raise ValueError(f'Expected "type" or "element" for `name_atoms`, got {names_atoms}.')
54
+
55
+ for i in range(len(chunk)):
56
+
57
+ # atomic data
58
+ data = chunk.xyz[i, :, :] * unit
59
+
60
+ # cell data, if present
61
+ if chunk.unitcell_vectors is not None:
62
+ cell = chunk.unitcell_vectors[i, ...] * unit_cell
63
+ else:
64
+ cell = None
65
+
66
+ # prepare all kwargs and construct a frame
67
+ kwargs = {
68
+ 'names': names,
69
+ name_data: data,
70
+ 'cell': cell
71
+ }
72
+ yield Frame(**kwargs)
73
+
74
+
75
+ if mdtraj is None:
76
+ del read_frames_mdtraj
77
+ __all__.remove('read_frames_mdtraj')
n2p2.py ADDED
@@ -0,0 +1,65 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Functions to read and write n2p2 data files."""
2
+
3
+ __all__ = [
4
+ 'read_epre_n2p2',
5
+ 'read_fpre_n2p2'
6
+ ]
7
+
8
+ import numpy as np
9
+
10
+ from .utilities import Frame, register_io
11
+
12
+
13
+ @register_io('N2P2_E','read')
14
+ def read_epre_n2p2(f_in, column=3):
15
+ """Read the outcome of the energy prediction from file"""
16
+
17
+ line = f_in.readline()
18
+ # no more data in the file
19
+ if not line:
20
+ return None
21
+ # Skip comment lines:
22
+ while True:
23
+ if '#' not in line:
24
+ break
25
+ line = f_in.readline()
26
+ energy = float(line.split()[column])
27
+ return Frame(energy=energy)
28
+
29
+
30
+ @register_io('N2P2_F','read')
31
+ def read_fpre_n2p2(f_in):
32
+ """Read the outcome of the force prediction from file"""
33
+
34
+ line = f_in.readline()
35
+ # no more data in the file
36
+ if not line:
37
+ return None
38
+ # Skip comment lines:
39
+ while True:
40
+ if '#' not in line:
41
+ break
42
+ line = f_in.readline()
43
+
44
+ items = line.split()
45
+ config = items[0]
46
+
47
+ forces = []
48
+ forces.append(float(items[3]))
49
+ while True:
50
+ last_pos = f_in.tell()
51
+ line = f_in.readline()
52
+ # no more data in the file
53
+ if not line:
54
+ break
55
+ items = line.split()
56
+ # Stop if config changes
57
+ if items[0] != config:
58
+ f_in.seek(last_pos)
59
+ break
60
+
61
+ forces.append(float(items[3]))
62
+
63
+ forces = np.array(forces)
64
+ forces = forces.reshape((len(forces)//3, 3))
65
+ return Frame(forces=forces)
runner.py ADDED
@@ -0,0 +1,134 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Functions to read and write RuNNer data files."""
2
+
3
+ __all__ = [
4
+ 'write_frame_runner',
5
+ 'read_frame_runner',
6
+ ]
7
+
8
+ import numpy as np
9
+
10
+ from .utilities import Frame, register_io
11
+
12
+
13
+ @register_io('RuNNer', 'read', 'data') # noqa: C901
14
+ def read_frame_runner(f_in):
15
+ """Read one frame of the RuNNer format from an open file.
16
+
17
+ Arguments:
18
+ f_in: open file in the RuNNer format
19
+
20
+ Returns:
21
+ `Frame` instance or `None`
22
+ """
23
+
24
+ # For reference, in n2p2, this is implemented in `Structure::readFromFile`, found somewhere here:
25
+ # https://github.com/CompPhysVienna/n2p2/blob/master/src/libnnp/Structure.cpp#L84
26
+
27
+ # read first line to examine it
28
+ line_begin = f_in.readline()
29
+
30
+ # no more data in the file
31
+ if not line_begin:
32
+ return None
33
+
34
+ # there is some data, frame should start with 'begin'
35
+ if line_begin.strip() != 'begin':
36
+ raise ValueError
37
+
38
+ comment = None
39
+ cell = []
40
+ names = []
41
+ positions = []
42
+ forces = []
43
+ energy = None
44
+
45
+ for line in f_in:
46
+ items = line.split()
47
+ tag = items[0]
48
+
49
+ if tag == 'comment':
50
+ comment = " ".join(items[1:])
51
+
52
+ elif tag == 'lattice':
53
+ cell.append([float(item) for item in items[1:]])
54
+
55
+ elif tag == 'atom':
56
+ positions.append([float(item) for item in items[1:4]])
57
+ names.append(items[4])
58
+ forces.append([float(item) for item in items[7:10]])
59
+ # items[5] is atomic energy, only RuNNer itself (potentially) deals with that
60
+ # items[6] is atomic energy - not really used by anyone
61
+
62
+ elif tag == 'energy':
63
+ energy = float(items[1])
64
+
65
+ elif tag == 'charge':
66
+ pass
67
+
68
+ elif tag == 'end':
69
+ break
70
+
71
+ else:
72
+ raise ValueError('Unexpected data in file.')
73
+
74
+ if len(names) == 0:
75
+ raise ValueError('No atomic data.')
76
+ cell = np.array(cell)
77
+ if cell.shape != (3, 3) and len(cell) != 0:
78
+ raise ValueError('Wrong cell data.')
79
+ if len(cell) == 0:
80
+ cell = None
81
+ positions = np.array(positions)
82
+ forces = np.array(forces)
83
+
84
+ # Prepare frame
85
+ frame = Frame(names=names, positions=positions, comment=comment, cell=cell, energy=energy, forces=forces)
86
+
87
+ return frame
88
+
89
+
90
+ @register_io('RuNNer', 'write', 'data')
91
+ def write_frame_runner(f_out, frame):
92
+
93
+ # "cell" and "lattice" is the same data, we just use the terminology of the file format here.
94
+ #
95
+ # Note that atomic charges, atomic energies, and total charge currently not supported
96
+ # and zeros will be written in the file for these.
97
+
98
+ # Check that required data is in the frame:
99
+ if (frame.positions is None) or (frame.names is None):
100
+ raise ValueError('Frame does not contain required properties - atom names and positions.')
101
+
102
+ fmt_lattice = 'lattice ' + 3*'{:16.6f}' + '\n'
103
+ fmt_one = '{:13.6f}'
104
+ fmt_atom = 'atom ' + 3*fmt_one + '{:^6s}' + 5*fmt_one + '\n'
105
+ fmt_energy = 'energy ' + fmt_one + '\n'
106
+ fmt_charge = 'charge ' + fmt_one + '\n'
107
+
108
+ f_out.write('begin\n')
109
+
110
+ if frame.comment is not None:
111
+ f_out.write('comment ' + frame.comment + '\n')
112
+
113
+ if frame.cell is not None:
114
+ for lattice_vector in frame.cell:
115
+ f_out.write(fmt_lattice.format(*lattice_vector))
116
+
117
+ if frame.forces is not None:
118
+ for i, name in enumerate(frame.names):
119
+ f_out.write(fmt_atom.format(*frame.positions[i], name,
120
+ 0.0, 0.0, *frame.forces[i]))
121
+ else:
122
+ for i, name in enumerate(frame.names):
123
+ f_out.write(fmt_atom.format(*frame.positions[i], name,
124
+ 0.0, 0.0, 0.0, 0.0, 0.0))
125
+
126
+ if frame.energy is None:
127
+ energy = 0.0
128
+ else:
129
+ energy = frame.energy
130
+ f_out.write(fmt_energy.format(energy))
131
+
132
+ f_out.write(fmt_charge.format(0.0))
133
+
134
+ f_out.write('end\n')
utilities.py ADDED
@@ -0,0 +1,336 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Input and output utilities.
2
+
3
+ The central concept in the I/O infrastructure is a "frame" - a dataclass that represents one atomic
4
+ configuration that contains data of different kinds. Atomic units are used in the frame object itself,
5
+ unless explicitly stated otherwise. Units specified by the file format are used in the files themselves.
6
+ """
7
+
8
+ __all__ = [
9
+ 'AnyPath',
10
+ 'get_fn_test',
11
+ 'Frame',
12
+ 'open_safe',
13
+ 'working_directory',
14
+ 'temporary_directory',
15
+ 'to_file',
16
+ 'from_file',
17
+ 'read_frames',
18
+ 'write_frames',
19
+ 'merge_frames',
20
+ ]
21
+
22
+ import os
23
+ import shutil
24
+ from collections import defaultdict
25
+ from contextlib import contextmanager
26
+ from dataclasses import dataclass
27
+ from pathlib import Path
28
+ from typing import Optional, Sequence, Union
29
+
30
+ import numpy as np
31
+
32
+ from ..utilities import AMLIOError
33
+
34
+
35
+ # functions that are registered to read and write frames
36
+ formats = defaultdict(dict)
37
+
38
+
39
+ # mapping of file extensions to file formats
40
+ ext2fmt = dict()
41
+
42
+
43
+ AnyPath = Union[str, Path]
44
+
45
+
46
+ def get_fn_test(filename):
47
+ """Get absolute file names of test data.
48
+
49
+ Arguments:
50
+ filename: name of file in the test data directory, no path
51
+ """
52
+ fn_out = Path(__file__).parent.parent / '../tests/data' / filename
53
+ return fn_out.resolve()
54
+
55
+
56
+ def register_io(fformat: str, operation: str, extension: Union[str, None] = None):
57
+ """Decorator to register an I/O operation for a specific file format.
58
+
59
+ Optionally, the function can also register a file name extension to automatic
60
+ detection of file format from file name.
61
+
62
+ Arguments:
63
+ fformat: name of file format
64
+ operation: I/O operation - "read" or "write"
65
+ extension: file name extension or `None`
66
+ """
67
+ def decorator(function):
68
+ if operation not in ('read', 'write'):
69
+ raise ValueError('Unrecognized operation. Allowed values: "read", "write".')
70
+ formats[fformat][operation] = function
71
+ if extension is not None:
72
+ formats[fformat]['extension'] = extension
73
+ if (extension in ext2fmt.keys()) and ext2fmt[extension] != fformat:
74
+ raise ValueError(f'Attempted to register the same file extension ({extension}) twice.')
75
+ ext2fmt[extension] = fformat
76
+ return decorator
77
+
78
+
79
+ @dataclass(eq=False)
80
+ class Frame:
81
+ """All possible data of a single frame.
82
+
83
+ Used to exchange data between data structure and I/O routines. Defaults are set to `None`, which
84
+ corresponds to that given kind of data not being set/available. We do not provide a comparison operator,
85
+ at least for now, as comparing NumPy arrays is more involved.
86
+ """
87
+
88
+ # slots do not work correctly with dataclass
89
+ # Here is an alternative: https://pypi.org/project/dataslots/
90
+ # Here is some context: https://github.com/ericvsmith/dataclasses/issues/28
91
+ # __slots__ = ['names', 'positions', 'cell', 'comment', 'energy', 'forces']
92
+
93
+ names: Optional[Sequence] = None
94
+ positions: Optional[np.ndarray] = None
95
+ cell: Optional[np.ndarray] = None
96
+ comment: Optional[str] = None
97
+ energy: Optional[float] = None
98
+ forces: Optional[np.ndarray] = None
99
+
100
+ def update(self, other: 'Frame', force: bool = False):
101
+ """Update this frame with data from another.
102
+
103
+ Arguments:
104
+ other: another frame
105
+ force: whether to overwrite data
106
+ """
107
+
108
+ # check that we have the same atom names
109
+ if (other.names is not None) and (self.names != other.names):
110
+ raise ValueError('Inconsistent atom names.')
111
+
112
+ # take over all that we can
113
+ attrs = ['positions', 'cell', 'comment', 'energy', 'forces']
114
+ for attr in attrs:
115
+ attr_o = getattr(other, attr)
116
+ if attr_o is not None:
117
+ if force or (getattr(self, attr) is None):
118
+ setattr(self, attr, attr_o)
119
+
120
+
121
+ def open_safe(filename, mode='r', buffering=-1, verbose=False):
122
+ """A wrapper around `open` which saves backup files.
123
+
124
+ If opening for writing and `filename` exists, it will be renamed
125
+ so that we do not overwrite any data.
126
+
127
+ Arguments:
128
+ filename: name of file to open
129
+ mode: file open mode
130
+ buffering: passed through to `open`
131
+ verbose: whether to print to standard output what backup was performed
132
+
133
+ Returns:
134
+ an open file
135
+ """
136
+
137
+ if mode[0] == 'w':
138
+ # if writing, make sure file is not overwritten
139
+
140
+ filename = Path(filename)
141
+
142
+ i = 0
143
+ fn_backup = filename
144
+ while fn_backup.exists():
145
+ name_new = f'#{filename.name:s}#{i:d}#'
146
+ fn_backup = fn_backup.with_name(name_new)
147
+ i += 1
148
+
149
+ if fn_backup != filename:
150
+ filename.rename(fn_backup)
151
+ if verbose:
152
+ print(f'Backup performed: {filename} -> {fn_backup}\n')
153
+
154
+ elif mode[0] in ('r', 'a'):
155
+ # read or append, no danger of overwritten files
156
+ pass
157
+
158
+ else:
159
+ # did not expect that, more work needed
160
+ raise NotImplementedError(f'Unsupported file open mode: {mode:s}.')
161
+
162
+ return open(filename, mode, buffering)
163
+
164
+
165
+ @contextmanager
166
+ def working_directory(directory):
167
+ """Change working directory within the context.
168
+
169
+ This is not available in the standard library [1] but can be useful, especially for testing.
170
+ The old fixture in pytest (`tmpdir`) used py.path [2] which has `as_cwd`, but this is legacy
171
+ code now and not recommended [3].
172
+
173
+ [1] https://bugs.python.org/issue25625
174
+ [2] https://py.readthedocs.io/en/latest/path.html
175
+ [3] https://docs.pytest.org/en/latest/how-to/tmpdir.html
176
+
177
+ Arguments:
178
+ directory: directory to change to
179
+ """
180
+
181
+ # store the current working directory
182
+ dir_original = Path().absolute()
183
+
184
+ # try to change to the new one and then back
185
+ try:
186
+ os.chdir(directory)
187
+ yield
188
+ finally:
189
+ os.chdir(dir_original)
190
+
191
+
192
+ @contextmanager
193
+ def temporary_directory(directory: AnyPath, parents: bool = False, keep: bool = False):
194
+ """Create a temporary directory.
195
+
196
+ The directory is removed upon exiting the context, unless the users asks to keep it.
197
+
198
+ Arguments:
199
+ directory: directory to create
200
+ parents: whether to create parents as well
201
+ keep: whether to keep directory after exiting context
202
+ """
203
+
204
+ directory = Path(directory)
205
+
206
+ # catch conflict early, a nicer error message
207
+ if directory.exists():
208
+ raise AMLIOError(f'Unable to create directory, already exists: {directory.absolute()}')
209
+
210
+ # actually make the directory
211
+ directory.mkdir(parents=parents)
212
+
213
+ # create context, clean up if needed
214
+ try:
215
+ yield directory
216
+ finally:
217
+ if not keep:
218
+ shutil.rmtree(directory)
219
+
220
+
221
+ def from_file(fn_in, binary=False):
222
+ """Read the contents of a file into a variable.
223
+
224
+ By default, the file will be read as a text file, resulting in a string.
225
+ It `binary` is true, it will be read as a binary file, resulting in bytes.
226
+ """
227
+
228
+ mode = 'r'
229
+ if binary:
230
+ mode += 'b'
231
+ with open(fn_in, mode) as f_in:
232
+ data = f_in.read()
233
+ return data
234
+
235
+
236
+ def to_file(data, fn_out, binary=False, verbose=False):
237
+ """Write a variable to a file.
238
+
239
+ The provided `data` would typically be a string or bytes, if `binary` is true.
240
+ The output file name is protected against overwriting and if `verbose is true,
241
+ backup file creation will be reported.
242
+ """
243
+
244
+ mode = 'w'
245
+ if binary:
246
+ mode += 'b'
247
+ with open_safe(fn_out, mode, verbose=verbose) as f_out:
248
+ f_out.write(data)
249
+
250
+
251
+ def get_io_operation(fn, fformat, operation):
252
+ """Select I/O function for given file format.
253
+
254
+ Arguments:
255
+ fn: name of file to operate on
256
+ fformat: name of file format
257
+ operation: I/O operation - "read" or "write"
258
+
259
+ Returns:
260
+ function to read or write one frame
261
+ """
262
+
263
+ if operation not in ('read', 'write'):
264
+ raise ValueError('Unrecognized operation. Allowed values: "read", "write".')
265
+
266
+ # automatically pick a file format
267
+ if fformat is None:
268
+ fn = Path(fn)
269
+ extension = fn.suffix[1:]
270
+ try:
271
+ fformat = ext2fmt[extension]
272
+ except KeyError:
273
+ raise KeyError(f'Extension "{extension:s}" not registered for file format detection.')
274
+
275
+ try:
276
+ return formats[fformat][operation]
277
+ except KeyError:
278
+ msg = f'File format "{fformat:s}" not supported for operation "{operation:s}".'
279
+ raise ValueError(msg)
280
+
281
+
282
+ def read_frames(fn_in, fformat=None, **kwargs):
283
+ """Iterate over a trajectory file, returning all data for each frame."""
284
+
285
+ read_frame = get_io_operation(fn_in, fformat, 'read')
286
+
287
+ # read all frames, quit when there is no more data
288
+ # File formats read using MDTraj must be opened differently. Maybe there is a more elegany way to do that though
289
+ with open(fn_in) as f_in:
290
+ while True:
291
+ frame = read_frame(f_in, **kwargs)
292
+ if frame is None:
293
+ break
294
+ yield frame
295
+
296
+
297
+ def write_frames(fn_out, frames, fformat=None):
298
+ """Write frames to file.
299
+
300
+ The format of the file is given by `fformat` or inferred from the file
301
+ extension if `fformat` is `None`.
302
+
303
+ Arguments:
304
+ fn_out: name of output file
305
+ frames: iterator over `Frame` objects
306
+ fformat: format of the file, or `None`
307
+ label_prop: label of property to include, or `None`
308
+ """
309
+
310
+ write_frame = get_io_operation(fn_out, fformat, 'write')
311
+
312
+ # write all frames to file
313
+ with open_safe(fn_out, 'w') as f_out:
314
+ for frame in frames:
315
+ write_frame(f_out, frame)
316
+
317
+
318
+ def merge_frames(frames, *frames_others, force: bool = False):
319
+ """Merge frames from multiple sources.
320
+
321
+ The length of the result will be determined by the length of `frames`,
322
+ the other iterators should be at least as long as that.
323
+
324
+ Arguments:
325
+ frames: iterator over `Frame` objects
326
+ frames_others: more iterators over `Frame` instances
327
+ force: whether to overwrite data
328
+
329
+ Yields:
330
+ `Frame` objects
331
+ """
332
+
333
+ for frame in frames:
334
+ for frames_extra in frames_others:
335
+ frame.update(next(frames_extra), force=force)
336
+ yield frame
xyz.py ADDED
@@ -0,0 +1,94 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Functions to read and write XYZ files."""
2
+
3
+ __all__ = [
4
+ 'write_frame_xyz',
5
+ 'read_frame_xyz',
6
+ ]
7
+
8
+ import numpy as np
9
+
10
+ from ..constants import angstrom
11
+ from .utilities import Frame, register_io
12
+
13
+
14
+ @register_io('xyz', 'read', 'xyz')
15
+ def read_frame_xyz(f_in, name_data='positions', unit=angstrom):
16
+ """Read one frame of XYZ format from an open file.
17
+
18
+ Arguments:
19
+ f_in: open file in XYZ format
20
+ name_data: what quantity to take the XYZ data as
21
+ unit: unit to scale data by, multiplicative factor in atomic units
22
+
23
+ Returns:
24
+ `Frame` object or `None` if there is no more data
25
+ """
26
+
27
+ # read first line to examine it
28
+ line_begin = f_in.readline()
29
+
30
+ # no more data in the file
31
+ if not line_begin:
32
+ return None
33
+
34
+ # there is some data, frame should begin with natoms
35
+ natoms = int(line_begin)
36
+
37
+ # read comment line
38
+ comment = f_in.readline().rstrip()
39
+
40
+ names = []
41
+ data = []
42
+ for _ in range(natoms):
43
+ line = f_in.readline()
44
+ if line.strip() == '':
45
+ raise ValueError('Unexpected data in file.')
46
+ items = line.split()
47
+ names.append(items[0])
48
+ data.append([float(item) for item in items[1:4]])
49
+ data = np.array(data) * unit
50
+
51
+ # so unless the code fails, this will not trigger.
52
+ if len(names) != natoms:
53
+ raise ValueError('Inconsistent number of atoms in XYZ file.')
54
+
55
+ # prepare data
56
+ if name_data == 'positions':
57
+ positions = data
58
+ forces = None
59
+ elif name_data == 'forces':
60
+ positions = None
61
+ forces = data
62
+ else:
63
+ raise ValueError(f'Unsupported `name_data`: {name_data}. Expected "positions" or "forces".')
64
+
65
+ return Frame(names=names, positions=positions, comment=comment, energy=None, forces=forces)
66
+
67
+
68
+ @register_io('xyz', 'write', 'xyz')
69
+ def write_frame_xyz(f_out, frame, unit=angstrom):
70
+ """Print a single frame into an open XYZ file.
71
+
72
+ This is currently hard-coded to write positions, if we ever need to write forces
73
+ or something else, it needs generalizing.
74
+ """
75
+
76
+ # Check that required things are in frame:
77
+ if (frame.positions is None) or (frame.names is None):
78
+ raise ValueError('Frame does not contain required properties.')
79
+
80
+ fmt_one = '{:13.6f}'
81
+ fmt_prop = '{:6s} ' + 3*fmt_one + '\n'
82
+
83
+ # write number of atoms and comment line
84
+ f_out.write(f'{len(frame.names):d}\n')
85
+ if frame.comment is not None:
86
+ f_out.write(f'{frame.comment:s}\n')
87
+ else:
88
+ f_out.write('\n')
89
+
90
+ data = frame.positions / unit
91
+
92
+ # write atomic lines
93
+ for i, name in enumerate(frame.names):
94
+ f_out.write(fmt_prop.format(name, *data[i]))