Source code for dxchange.writer

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# #########################################################################
# Copyright (c) 2015, UChicago Argonne, LLC. All rights reserved.         #
#                                                                         #
# Copyright 2015. UChicago Argonne, LLC. This software was produced       #
# under U.S. Government contract DE-AC02-06CH11357 for Argonne National   #
# Laboratory (ANL), which is operated by UChicago Argonne, LLC for the    #
# U.S. Department of Energy. The U.S. Government has rights to use,       #
# reproduce, and distribute this software.  NEITHER THE GOVERNMENT NOR    #
# UChicago Argonne, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR        #
# ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE.  If software is     #
# modified to produce derivative works, such modified software should     #
# be clearly marked, so as not to confuse it with the version available   #
# from ANL.                                                               #
#                                                                         #
# Additionally, redistribution and use in source and binary forms, with   #
# or without modification, are permitted provided that the following      #
# conditions are met:                                                     #
#                                                                         #
#     * Redistributions of source code must retain the above copyright    #
#       notice, this list of conditions and the following disclaimer.     #
#                                                                         #
#     * Redistributions in binary form must reproduce the above copyright #
#       notice, this list of conditions and the following disclaimer in   #
#       the documentation and/or other materials provided with the        #
#       distribution.                                                     #
#                                                                         #
#     * Neither the name of UChicago Argonne, LLC, Argonne National       #
#       Laboratory, ANL, the U.S. Government, nor the names of its        #
#       contributors may be used to endorse or promote products derived   #
#       from this software without specific prior written permission.     #
#                                                                         #
# THIS SOFTWARE IS PROVIDED BY UChicago Argonne, LLC AND CONTRIBUTORS     #
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT       #
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS       #
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL UChicago     #
# Argonne, LLC OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,        #
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,    #
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;        #
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER        #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT      #
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN       #
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE         #
# POSSIBILITY OF SUCH DAMAGE.                                             #
# #########################################################################

"""
Module for data exporting data files.
"""

import dxchange.dtype as dt
import numpy as np
import os
import h5py
import logging
import re
from itertools import cycle

__author__ = "Doga Gursoy, Francesco De Carlo"
__copyright__ = "Copyright (c) 2015-2016, UChicago Argonne, LLC."
__version__ = "0.1.0"
__docformat__ = "restructuredtext en"
__all__ = [
    "write_dxf",
    "write_hdf5",
    "write_netcdf4",
    "write_npy",
    "write_tiff",
    "write_tiff_stack",
    "write_vtr",
    "write_aps_1id_report",
]

logger = logging.getLogger(__name__)


def _check_import(modname):
    try:
        return __import__(modname)
    except ImportError:
        logger.warn("Warning: " + modname + " module not found")
        return None


dxfile = _check_import("dxfile")
netCDF4 = _check_import("netCDF4")


def get_body(fname):
    """
    Get file name after extension removed.
    """
    body = os.path.splitext(fname)[0]
    return body


def get_extension(fname):
    """
    Get file extension.
    """
    return "." + fname.split(".")[-1]


def remove_trailing_digits(text):
    digit_string = re.search(r"\d+$", text)
    if digit_string is not None:
        number_of_digits = len(digit_string.group())
        text = "".join(text[:-number_of_digits])
        return (text, number_of_digits)
    else:
        return (text, 0)


def _init_dirs(fname):
    """
    Initialize directories for saving output files.

    Parameters
    ----------
    fname : str
        Output file name.
    """
    dname = os.path.dirname(os.path.abspath(fname))
    if not os.path.exists(dname):
        os.makedirs(dname)


def _suggest_new_fname(fname, digit):
    """
    Suggest new string with an attached (or increased) value indexing
    at the end of a given string.

    For example if "myfile.tiff" exist, it will return "myfile-1.tiff".

    Parameters
    ----------
    fname : str
        Output file name.
    digit : int, optional
        Number of digits in indexing stacked files.

    Returns
    -------
    str
        Indexed new string.
    """
    if os.path.isfile(fname):
        body = get_body(fname)
        ext = get_extension(fname)
        indq = 1
        file_exist = False
        while not file_exist:
            fname = body + "-" + "{0:0={1}d}".format(indq, digit) + ext
            if not os.path.isfile(fname):
                file_exist = True
            else:
                indq += 1
    return fname


def _init_write(arr, fname, ext, dtype, overwrite):
    if not (isinstance(fname, str)):
        fname = "tmp/data" + ext
    else:
        if not fname.endswith(ext):
            fname = fname + ext
    fname = os.path.abspath(fname)
    if not overwrite:
        fname = _suggest_new_fname(fname, digit=1)
    _init_dirs(fname)
    if dtype is not None:
        arr = dt.as_dtype(arr, dtype)
    return fname, arr


def _write_hdf5_dataset(h5object, data, dname, appendaxis, maxshape):
    """
    Create a dataset and write data to a specific hdf5 object
    (file or group).

    Parameters
    ----------
    h5object: h5py object
        hdf5 object to write the dataset to.
    data : ndarray
        Array data to be saved.
    dname : str
        dataset name
    appendaxis : int
        Axis of dataset to which data will be appended.
        If given will create a resizable dataset.
    maxshape: tuple
        maxshape of resizable dataset.
        Only used if dataset has not been created.
    """
    if appendaxis is not None:
        if dname not in h5object:
            h5object.create_dataset(dname, data=data, maxshape=maxshape)
        else:
            size = h5object[dname].shape
            newsize = list(size)
            newsize[appendaxis] += data.shape[appendaxis]
            h5object[dname].resize(newsize)

            slices = 3 * [
                slice(None, None, None),
            ]
            slices[appendaxis] = slice(size[appendaxis], None, None)
            h5object[dname][tuple(slices)] = data
    else:
        h5object.create_dataset(dname, data=data)


[docs]def write_hdf5( data, fname="tmp/data.h5", gname="exchange", dname="data", dtype=None, overwrite=False, appendaxis=None, maxsize=None, ): """ Write data to hdf5 file in a specific group. Parameters ---------- data : ndarray Array data to be saved. fname : str File name to which the data is saved. ``.h5`` extension will be appended if it does not already have one. gname : str, optional Path to the group inside hdf5 file where data will be written. dname : str, optional Name for dataset where data will be written. dtype : data-type, optional By default, the data-type is inferred from the input data. overwrite: bool, optional if True, overwrites the existing file if the file exists. appendaxis: int, optional Axis where data is to be appended to. Must be given when creating a resizable dataset. maxsizee: int, optional Maximum size that the dataset can be resized to along the given axis. """ mode = "w" if overwrite else "a" if appendaxis is not None: overwrite = True # True if appending to file so fname is not changed maxshape = list(data.shape) maxshape[appendaxis] = maxsize else: maxshape = maxsize fname, data = _init_write(data, fname, ".h5", dtype, overwrite) with h5py.File(fname, mode=mode) as f: if "implements" not in f: f.create_dataset("implements", data=gname) if gname not in f: f.create_group(gname) _write_hdf5_dataset(f[gname], data, dname, appendaxis, maxshape)
[docs]def write_netcdf4( data, fname="tmp/data.nc", dname="VOLUME", dtype=None, overwrite=False, appendaxis=None, maxsize=None, ): """ Write data to netCDF file. Parameters ---------- data : ndarray Array data to be saved. fname : str File name to which the data is saved. """ mode = "w" if overwrite else "a" ncfile = netCDF4.Dataset(fname, mode="w", format="NETCDF3_64BIT") x_dim = ncfile.createDimension("NX", data.shape[0]) y_dim = ncfile.createDimension("NY", data.shape[1]) z_dim = ncfile.createDimension("NZ", data.shape[2]) d = ncfile.createVariable(dname, data.dtype, ("NX", "NY", "NZ")) d[:, :, :] = data ncfile.close()
[docs]def write_dxf(data, fname="tmp/data.h5", axes="theta:y:x", dtype=None, overwrite=False): """ Write data to a data exchange hdf5 file. Parameters ---------- data : ndarray Array data to be saved. fname : str File name to which the data is saved. ``.h5`` extension will be appended if it does not already have one. axes : str Attribute labels for the data array axes. dtype : data-type, optional By default, the data-type is inferred from the input data. overwrite: bool, optional if True, overwrites the existing file if the file exists. """ fname, data = _init_write(data, fname, ".h5", dtype, overwrite) f = dxfile.dxtomo.File(fname, mode="w") f.add_entry( dxfile.dxtomo.Entry.data( data={ "value": data, "units": "counts", "description": "transmission", "axes": axes, } ) ) f.close()
[docs]def write_npy(data, fname="tmp/data.npy", dtype=None, overwrite=False): """ Write data to a binary file in NumPy ``.npy`` format. Parameters ---------- data : ndarray Array data to be saved. fname : str File name to which the data is saved. ``.npy`` extension will be appended if it does not already have one. """ fname, data = _init_write(data, fname, ".npy", dtype, overwrite) np.save(fname, data)
[docs]def write_tiff(data, fname="tmp/data.tiff", dtype=None, overwrite=False): """ Write image data to a tiff file. Parameters ---------- data : ndarray Array data to be saved. fname : str File name to which the data is saved. ``.tiff`` extension will be appended if it does not already have one. dtype : data-type, optional By default, the data-type is inferred from the input data. overwrite: bool, optional if True, overwrites the existing file if the file exists. """ fname, data = _init_write(data, fname, ".tiff", dtype, overwrite) import tifffile tifffile.imwrite(fname, data)
[docs]def write_tiff_stack( data, fname="tmp/data.tiff", dtype=None, axis=0, digit=5, start=0, overwrite=False ): """ Write data to stack of tiff file. Parameters ---------- data : ndarray Array data to be saved. fname : str Base file name to which the data is saved. ``.tiff`` extension will be appended if it does not already have one. dtype : data-type, optional By default, the data-type is inferred from the input data. axis : int, optional Axis along which stacking is performed. start : int, optional First index of file in stack for saving. digit : int, optional Number of digits in indexing stacked files. overwrite: bool, optional if True, overwrites the existing file if the file exists. """ fname, data = _init_write(data, fname, ".tiff", dtype, True) body = get_body(fname) ext = get_extension(fname) _data = np.swapaxes(data, 0, axis) for m in range(start, start + data.shape[axis]): _fname = body + "_" + "{0:0={1}d}".format(m, digit) + ext if not overwrite: _fname = _suggest_new_fname(_fname, digit=1) write_tiff(_data[m - start], _fname, overwrite=overwrite)
[docs]def write_vtr(data, fname="tmp/data.vtr", down_sampling=(5, 5, 5)): """ Write the reconstructed data (img stackes) to vtr file (retangular grid) Parameters ---------- data : np.3darray reconstructed 3D image stacks with axis=0 as the omega fname : str file name of the output vtr file down_sampling : tuple down sampling steps along three axes Returns ------- None """ # vtk is only used here, therefore doing an in module import import vtk from vtk.util import numpy_support # convert to unit8 can significantly reduce the output vtr file # size, or just do a severe down-sampling data = ( _normalize_imgstacks( data[ :: down_sampling[0], :: down_sampling[1], :: down_sampling[2], ] ) * 255 ) # --init rectangular grid rGrid = vtk.vtkRectilinearGrid() coordArray = [ vtk.vtkDoubleArray(), vtk.vtkDoubleArray(), vtk.vtkDoubleArray(), ] coords = np.array([np.arange(data.shape[i]) for i in range(3)]) coords = [ 0.5 * np.array( [3.0 * coords[i][0] - coords[i][0 + int(len(coords[i]) > 1)]] + [coords[i][j - 1] + coords[i][j] for j in range(1, len(coords[i]))] + [3.0 * coords[i][-1] - coords[i][-1 - int(len(coords[i]) > 1)]] ) for i in range(3) ] grid = np.array(list(map(len, coords)), "i") rGrid.SetDimensions(*grid) for i, points in enumerate(coords): for point in points: coordArray[i].InsertNextValue(point) rGrid.SetXCoordinates(coordArray[0]) rGrid.SetYCoordinates(coordArray[1]) rGrid.SetZCoordinates(coordArray[2]) # vtk requires x to be the fast axis # NOTE: # Proper coordinate transformation is required to connect the # tomography data with other down-stream analysis (such as FF-HEDM # and NF-HEDM). imgstacks = np.swapaxes(data, 0, 2) VTKarray = numpy_support.numpy_to_vtk( num_array=imgstacks.flatten().astype(np.uint8), deep=True, array_type=vtk.VTK_UNSIGNED_CHAR, ) VTKarray.SetName("img") rGrid.GetCellData().AddArray(VTKarray) rGrid.Modified() if vtk.VTK_MAJOR_VERSION <= 5: rGrid.Update() # output to file writer = vtk.vtkXMLRectilinearGridWriter() writer.SetFileName(fname) writer.SetDataModeToBinary() writer.SetCompressorTypeToZLib() if vtk.VTK_MAJOR_VERSION <= 5: writer.SetInput(rGrid) else: writer.SetInputData(rGrid) writer.Write()
[docs]def write_aps_1id_report(df_scanmeta, reportfn): """ Generate report of beam conditions based on given DataFrame of the metadata Parameters ---------- df_scanmeta : pd.DataFrame DataFrame of the parsed metadata dxreader.read_aps_1id_metafile(log_file) reportfn : str Output report file name (include path) Returns ------- pd.DataFrame Updated Dataframe with added beam conditions """ import matplotlib.pyplot as plt # add calculation of four beam quality # -- Temporal Beam Stability df_scanmeta["TBS"] = df_scanmeta["IC-E3"] / df_scanmeta["IC-E3"].values[0] # -- Vertical Beam Stability df_scanmeta["VBS"] = df_scanmeta["IC-E1"] / df_scanmeta["IC-E2"] # -- Beam Loss at Slit df_scanmeta["BLS"] = (df_scanmeta["IC-E1"] + df_scanmeta["IC-E2"]) / df_scanmeta[ "IC-E3" ] # -- Beam Loss during Travel df_scanmeta["BLT"] = df_scanmeta["IC-E5"] / df_scanmeta["IC-E3"] # -- corresponding color code pltlbs = ["TBS", "VBS", "BLS", "BLT"] pltclrs = ["red", "blue", "magenta", "cyan"] # start plot fig = plt.figure(figsize=(8, 3)) ax = fig.add_subplot(111) lnclrs = cycle(["gray", "lime", "gray", "black"]) # -- plot one segment at a time for lb, clr in zip(pltlbs, pltclrs): addlabel = True for layerID in df_scanmeta["layerID"].unique(): tmpdf = df_scanmeta[df_scanmeta["layerID"] == layerID] for imgtype in tmpdf["type"].unique(): # plot the main curve currentSlice = tmpdf[tmpdf["type"] == imgtype] ax.plot( currentSlice["Date"], currentSlice[lb], linewidth=0.2, color=clr, label=lb if addlabel else "_nolegend_", alpha=0.5, ) addlabel = False # add the vertical guard tmpx = currentSlice["Date"].values tmpclr = next(lnclrs) for x in [tmpx[0], tmpx[-1]]: ax.plot( [x, x], [1e-4, 1e2], color=tmpclr, linewidth=0.05, linestyle="dashed", alpha=0.1, ) # -- set canvas property ax.set_yscale("log") plt.legend(loc=0) plt.ylim([0.9, 2.0]) # 10% as cut range plt.xticks(rotation=45) # -- save the figure (both pdf and png) plt.savefig( reportfn, transparent=True, bbox_inches="tight", pad_inches=0.1, ) # -- clear/close figure plt.close() return df_scanmeta
def _normalize_imgstacks(img): """ Normalize image stacks on a per layer base Parameters ---------- img : np.3darray img stacks to be normalized Returns ------- np.3darray normalized image stacks """ return img / np.amax( img.reshape( img.shape[0], img.shape[1] * img.shape[2], ), axis=1, ).reshape(img.shape[0], 1, 1)