Source code for mrcz.ioDM

# -*- coding: utf-8 -*-

from __future__ import division, print_function, absolute_import, unicode_literals
import numpy as np

# Executor for writing compressed blocks to disk
try:
    from concurrent.futures import ThreadPoolExecutor
except ImportError as e:
    if sys.version_info > (3,0):
        raise ImportError('Get the backport for `concurrent.futures` for Py2.7 as `pip install futures`')
    raise e
from mrcz.__version__ import __version__

_asyncExecutor = ThreadPoolExecutor(max_workers = 1)

class DMImage(object):
    """
    This structure-like class is for storing 2D image or 3D image stack 
    information, since Digital Micrograph likes to have more than one in a 
    file.  Stores Calibrations, Data, and ImageTags.  
    
    The NumPy array reference is stored in `imageData` and the meta-data is 
    stored in a dictionary `imageInfo`.  
    """
    def __init__( self ):
        self.imageInfo = {} # Calibrations, tags, all in a dictionary
        self.shape = [] # necessary for unraveling
        self.imageData = np.zeros(1)
    

[docs]class readDM4(object): """ A fast DM4 file reader to strip the data out of the large movie-mode files generated by K2 detectors along with important tags. Due to the emphasis on speed it's not a general file parser with dynamic allocation, so if Gatan changes the format a lot it will break the script. Parameters ---------- filename: str the verbose: bool whether to output debugging info or not. Example ------- dm4struct = readDM4( filename, verbose=False) dm4struct holds the (multiple) images in the DM4. Typically the zeroth image is the thumbnail. To convert to MRCZ:: imageData = dm4struct.im[1].imageData is a NumPy array containing image intensities:: imageInfo = dm4struct.im[1].imageInfo is a dict containing meta-data. Note ---- Chris Boothroyd provides some information on the structure of .DM4 files here: http://www.er-c.org/cbb/info/dmformat/ The directory structure of the file is tracked by passing the path of the parent to each tag parser. Gatan likes to have empty tag directories, so they are auto-numbered when present. One can examine file structure by setting ``verbose=True``, if you need to add some more tags to the dictionary generated by the parseTag function, for example. """ def __init__(self, filename, verbose=False): self.f = open(filename, 'rb') if verbose: print("Opening DM4 file: %s" % filename) # Instatiate class fields self._unnamedIndices = np.zeros( 20, dtype='int' ) # maximum number of nested tags self.im = [] # list of DMImage class objects self.verbose = verbose # version = struct.unpack( '>I', f.read(4) ) version = np.fromfile( self.f, dtype='>i4', count=1 )[0] if( version != 4): print( "Warning: Only DM4 is supported, will probably crash spectacularly" ) np.fromfile( self.f, dtype='>i8', count=1 )[0] # rootlen byteord = np.fromfile( self.f, dtype='>i4', count=1 )[0] # 1 = little endian, generally it always is if byteord != 1: print( "Error: only little endian data ordering supported at present" ) return np.fromfile( self.f, dtype='i1', count=1 )[0] # 1 = sorted np.fromfile( self.f, dtype='i1', count=1 )[0] # 1 = open structure ntags_root = np.fromfile( self.f, dtype='>i8', count=1 )[0] # number of tags if self.verbose: print( "DM version %d, byte order %d" %(version, byteord) ) # Now the tags start for J in np.arange( 0, ntags_root ): tag_type = np.fromfile( self.f, dtype='i1', count=1 )[0] # 20 = tag directory, 21 = tag base, 0 == EOF #print( "DEBUG: root tag: %d" % tag_type ) if( tag_type == 20 ): loc_tagstart = self.f.tell() tag_namelen = np.fromfile( self.f, dtype='>i2', count=1 )[0] if tag_namelen > 0: tag_name = self.f.read( tag_namelen ) else: tag_name = b"" tag_fieldlen = np.fromfile( self.f, dtype='>i8', count=1 )[0] #print( "rootTag: tag_name: %s has length %d and field length %d" %(tag_name, tag_namelen, tag_fieldlen )) if tag_name == b"ImageList": self.f.seek( loc_tagstart ) # So each image is an unnamed tag directory, composed of /ImageData and /ImageTags and /UniqueID self.parseTagDir( b"/" ) # We currently do not care about any tag but ImageList break else: # Go to next tag/tagdir self.f.seek( self.f.tell() + tag_fieldlen ) elif( tag_type == 21 ): # throw away tags in root because we don't care for them self.discardTag() elif( tag_type == 0 ): break # EOF pass self.f.close() # Unravel images for mage in self.im: # FYI: Gatan stores image stacks [z,y,x] which is opposite to how they store the dimensions (but is Numpy and C convention) mage.shape = np.flipud( np.array( mage.shape ) ) # This y-flip [:,::-1,...] puts us on the same origin-standard for data as MRC # Gatan uses a different standard for the origin location inside DM4 files. mage.imageData = np.reshape( mage.imageData, mage.shape, order='C' )[:,::-1,...] # Image data is now compatible with MRCs saved in GMS, IMOD, EMAN2, etc. # Clean up anything else
[docs] def parseTag( self, parent ): """ Parse a tag at the given file handle location """ tag_namelen = np.fromfile( self.f, dtype='>i2', count=1 )[0] if tag_namelen > 0: # tag_name = np.fromfile( self.f, dtype='S%d'% tag_namelen, count=1 )[0] tag_name = self.f.read( tag_namelen ) else: tag_name = b"" tag_fieldlen = np.fromfile( self.f, dtype='>i8', count=1 )[0] # print( "parseTag: tag_name: %s has length %d and field length %d" %(tag_name, tag_namelen, tag_fieldlen )) loc_tag = self.f.tell() # Save location so we can seek to end of tag later self.f.read(4) # Throw away %%%% seperator tag_ninfo = np.fromfile( self.f, dtype='>i8', count=1 )[0] if self.verbose: print( "Found tag: %s%s with %d elements" % (parent, tag_name, tag_ninfo) ) split_tag_name = (parent + tag_name).split( b'/' ) # Skip ImageList imageIndex = np.int64( split_tag_name[2] ) # Check to see if self.im[imageIndex] exists try: self.im[imageIndex] except IndexError: self.im.append( DMImage() ) if split_tag_name[3] == b'ImageData': dimCount = 0 if split_tag_name[4] == b'Calibrations': # No calibrations saved at present # /ImageList/1/ImageData/Calibrations/Brightness/Origin # /ImageList/1/ImageData/Calibrations/Brightness/Scale # /ImageList/1/ImageData/Calibrations/Brightness/Units # /ImageList/1/ImageData/Calibrations/Dimension/[0-2]/Origin # /ImageList/1/ImageData/Calibrations/Dimension/[0-2]/Scale # /ImageList/1/ImageData/Calibrations/Dimension/[0-2]/Units if split_tag_name[5] == b'Brightness': if split_tag_name[6] == b'Origin': self.im[imageIndex].imageInfo['IntensityOrigin'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[6] == b'Scale': self.im[imageIndex].imageInfo['IntensityScale'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[6] == b'Units': # unicode_array = self.retrieveTagData( f, tag_ninfo ) # self.im[imageIndex].imageInfo['IntensityUnits'] = "".join([chr(item) for item in unicode_array]) self.im[imageIndex].imageInfo['IntensityUnits'] = self.retrieveTagData( tag_ninfo ).astype(np.uint8).tostring().decode('utf-8') elif split_tag_name[5] == b'Dimension': # 0 = x, 1 = y, 2 = z for split_tag_name[6] #print( split_tag_name[6] ) #print( np.frombuffer( split_tag_name[6], dtype='>i8' ) ) #print( np.frombuffer( split_tag_name[6], dtype='<i8' ) ) if np.int64( split_tag_name[6] ) == 0: currDim = 'X' elif np.int64( split_tag_name[6] ) == 1: currDim = 'Y' elif np.int64( split_tag_name[6]) == 2: currDim = 'Z' else: raise ValueError( "DM4Import: Unknown dimension " + str(split_tag_name[6]) + " at location " + hex(self.f.tell()) ) # Units is stored as uint16 array, which is actually unicode? if split_tag_name[7] == b'Origin': self.im[imageIndex].imageInfo['Dim'+currDim+'Origin'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[7] == b'Scale': self.im[imageIndex].imageInfo['Dim'+currDim+'Scale'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[7] == b'Units': # Sometimes we have wierd characters in the Units tag try: self.im[imageIndex].imageInfo['Dim'+currDim+'Units']= self.retrieveTagData( tag_ninfo ).astype(np.int16).tostring().decode('utf-8') except: try: # "ISO-8859-1" # 'windows-1252' self.im[imageIndex].imageInfo['Dim'+currDim+'Units']= self.retrieveTagData( tag_ninfo ).astype(np.int16).tostring().decode("ISO-8859-1") except: pass pass elif split_tag_name[4] == b'Data': if self.verbose: print( "Found image %d"%imageIndex + " at offset : " + str(self.f.tell() ) ) self.im[imageIndex].imageData = self.retrieveTagData( tag_ninfo) pass elif split_tag_name[4] == b'Dimensions': self.im[imageIndex].shape.append( self.retrieveTagData( tag_ninfo ) ) dimCount += 1 pass pass elif split_tag_name[3] == b'ImageTags': if split_tag_name[4] == b'Acquisition': ### Find image flips ### # Here is where we can save any tags that we care for, like image rotations # /ImageList/1/ImageTags/Acquisition/Frame/Area/Transform/Transform List/0/Transpose/Horizontal Flip if split_tag_name[-1] == b"Horizontal Flip" and split_tag_name[-4] == b'Device': # So annoyingly there are several of these tags, only some of which are the true flips. self.im[imageIndex].imageInfo['HorzFlip'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[-1] == b"Vertical Flip" and split_tag_name[-4] == b'Device': self.im[imageIndex].imageInfo['VertFlip'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[-1] == b"Diagonal Flip" and split_tag_name[-4] == b'Device': self.im[imageIndex].imageInfo['DiagFlip'] = self.retrieveTagData( tag_ninfo ) # Pixel size seems to be complex, not sure if I'm retrieving it correctly (only important for non-square pixel detectors) elif split_tag_name[-1] == b"Pixel Size (um)" and split_tag_name[-3] == b'Device': self.im[imageIndex].imageInfo['DetectorPixelSize'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[-1] == b"Exposure" and split_tag_name[-2] == b'Detector': self.im[imageIndex].imageInfo['Exposure'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[-1] == b"hbin" and split_tag_name[-2] == b'Detector': self.im[imageIndex].imageInfo['HorzBin'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[-1] == b"vbin" and split_tag_name[-2] == b'Detector': self.im[imageIndex].imageInfo['VertBin'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[4] == b'Calibration': if split_tag_name[5] == b'Dose Rate': if split_tag_name[6] == b'Calibration': if not 'DoseRateCalibration' in self.im[imageIndex].imageInfo: self.im[imageIndex].imageInfo['DoseRateCalibration'] = [] try: self.im[imageIndex].imageInfo['DoseRateCalibration'].append( self.retrieveTagData( tag_ninfo ) ) except: print( "FAILED TO IMPORT DOSERATECALIBRATION" ) elif split_tag_name[4] == b'Microscope Info': if split_tag_name[5] == b'Actual Magnification': self.im[imageIndex].imageInfo['ActualMag'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[5] == b'Indicated Magnification': self.im[imageIndex].imageInfo['NominalMag'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[5] == b'Cs(mm)': self.im[imageIndex].imageInfo['C3'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[5] == b'Voltage': self.im[imageIndex].imageInfo['Voltage'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[5] == b'Mode': self.im[imageIndex].imageInfo['MicroscopeMode'] = self.retrieveTagData( tag_ninfo ).astype(np.uint8).tostring().decode('ascii') pass elif split_tag_name[4] == b'EELS Spectrometer': if split_tag_name[5] == b'Aperture index': self.im[imageIndex].imageInfo['EELSApertureIndex'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[5] == b'Aperture label': self.im[imageIndex].imageInfo['EELSApertureLabel'] = self.retrieveTagData( tag_ninfo ).astype(np.uint8).tostring().decode('ascii') elif split_tag_name[5] == b'Energy loss (eV)': self.im[imageIndex].imageInfo['EELSEnergyLoss'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[5] == b'Slit inserted': self.im[imageIndex].imageInfo['EELSSlitIn'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[5] == b'Slit width (eV)': self.im[imageIndex].imageInfo['EELSSlitWidth'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[4] == b"Latitude-S": if split_tag_name[5] == b"Distance To Focus Position": self.im[imageIndex].imageInfo['LatSDistanceToFocus'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[5] == b"Intended Defocus": self.im[imageIndex].imageInfo['LatSIntendedDefocus'] = self.retrieveTagData( tag_ninfo ) elif split_tag_name[5] == b"Beam Diameter (nm)": self.im[imageIndex].imageInfo['LatSBeamDiameter'] = self.retrieveTagData( tag_ninfo ) # Go to next location self.f.seek( loc_tag + tag_fieldlen ) return
def getTagDType( self, bytecode ): if bytecode == 2: return 'int16' elif bytecode == 3: return 'int32' elif bytecode == 4: return 'uint16' elif bytecode == 5: return 'uint32' elif bytecode == 6: return 'float32' elif bytecode == 7: return 'float64' elif bytecode == 8: return 'bool' elif bytecode == 11: return 'int64' else: return ''
[docs] def retrieveTagData( self, tag_ninfo): """ Get the actual data from the tag, which is then stored in one of the dicts or imageData """ # Here is basically where we should check tag_name to see if it's important... tag_loc = self.f.tell() tag_infos = np.fromfile( self.f, dtype='>i8', count=tag_ninfo ) for K in np.arange(0, tag_infos.size): if tag_infos[K] == 15: # struct # Structs are organized really strangely in the file, they are essentially in the tag header so # our reading doesn't work properly with them. In essence we need to go back to where # we think the struct should be. # print( "Correct struct ??? location: " + str( tag_loc + 8*K + 16 ) ) self.f.seek( tag_loc + 8*K + 16 ) struct_fieldcount = np.fromfile( self.f, dtype='>i8', count=1 ) struct_fielddtype = [] struct_fieldval = [] for J in np.arange(0,struct_fieldcount): np.fromfile( self.f, dtype='>i8', count=1 ) # Zero struct_fielddtype.append( self.getTagDType( np.fromfile( self.f, dtype='>i8', count=1 ) ) ) # print( str(J) + ": field dtype = " + str( struct_fielddtype[J] ) ) for J in np.arange(0,struct_fieldcount): struct_fieldval.append( np.fromfile(self.f, dtype=struct_fielddtype[J], count=1)[0] ) # print( str(J) + ": field val = " + str( struct_fieldval[J] ) ) return struct_fieldval elif tag_infos[K] == 20: # array if self.verbose: print( "Found array" ) try: K += 1 # Warning: sometimes arrays are empty array_dtype = self.getTagDType(tag_infos[K]) K += 1 array_ncount = tag_infos[K] if self.verbose: print( "Array dtype: " + array_dtype + ", array ncount: " + str(array_ncount) ) if( array_ncount > 0 ): return np.fromfile(self.f, dtype=array_dtype, count=array_ncount) except IndexError: pass pass elif tag_infos[K] == 9: # string tag_char = self.f.read(1) if self.verbose: print( "Found char: " + tag_char ) pass elif tag_infos[K] == 18: # string # str_len = if self.verbose: print( "FIXME Found string" ) pass else: # singleton tag_dtype = self.getTagDType( tag_infos[K] ) if tag_dtype != '': tag_data = np.fromfile( self.f, dtype = tag_dtype, count=1 )[0] # if self.verbose: print( "Singleton: " + tag_dtype + ": " + str(tag_data) ): if self.verbose: print( "Singleton: " + tag_dtype + ": " + str(tag_data) ) return tag_data pass
[docs] def discardTag(self): """ Quickly parse to the end of tag that we don't care about its information """ tag_namelen = np.fromfile( self.f, dtype='>i2', count=1 )[0] self.f.seek( self.f.tell() + tag_namelen ) tag_fieldlen = np.fromfile( self.f, dtype='>i8', count=1 )[0] self.f.seek( self.f.tell() + tag_fieldlen ) return
[docs] def parseTagDir(self, parent): """ Parse a tag directory at the given file handle location """ try: tag_namelen = np.fromfile( self.f, dtype='>i2', count=1 )[0] if tag_namelen > 0: tag_name = self.f.read( tag_namelen ) # tag_name = np.fromfile( self.f, dtype='S%d'%tag_namelen, count=1 )[0] else: tag_name = b"" tag_fieldlen = np.fromfile( self.f, dtype='>i8', count=1 )[0] # print( "parseTagDir: tag_name: %s has length %d and field length %d" %(tag_name, tag_namelen, tag_fieldlen )) except IndexError: if self.verbose: print( "Caught IndexError, trying to recover position in file" ) # f.seek( f.tell() ) return self.f # Handle empty tag_name by giving it an auto-generated number if not bool(tag_name): if( self.verbose ) : print( "Found empty tag" ) tag_depth = int( parent.count( b'/') - 1 ) # This is kind of mental gymnastics to maintain portablity between Python 2 and 3 tag_name = bytearray( str(self._unnamedIndices[ tag_depth ]).encode('ascii') ) # print( "Gymnastics " + str(self._unnamedIndices[ tag_depth ]) + " -> " + str(tag_name) ) self._unnamedIndices[ tag_depth ] += 1 # Reset all higher indices self._unnamedIndices[ tag_depth+1:] = 0 loc_tagdir = self.f.tell() self.f.read( 2 ) # Throw away sorted and closed ntags_dir = np.fromfile( self.f, dtype='>i8', count=1 )[0] if self.verbose: print( "Found tag dir " + str(parent) + str(tag_name) + " with " + str(ntags_dir) + " tags" ) # So typically ImageList has two empty name tag directories, of which one is various stuff and the other is the data for I in np.arange(0, ntags_dir ): try: subtag_type = np.fromfile( self.f, dtype='i1', count=1 )[0] # 20 = tag directory, 21 = tag base, 0 == EOF except IndexError: if self.verbose: print( "Caught IndexError, trying to recover" ) break if( subtag_type == 20 ): self.parseTagDir( bytes(parent) + bytes(tag_name) + b"/" ) elif( subtag_type == 21 ): self.parseTag( bytes(parent) + bytes(tag_name) + b"/" ) elif( subtag_type == 0 ): break # EOF # Go to next tag in root directory self.f.seek( loc_tagdir + tag_fieldlen ) return
def asyncReadDM4(*args, **kwargs): ''' Calls `readDM4` in a separate thread and executes it in the background. Parameters ---------- Valid arguments are as for `readDM4()`. Returns ------- future A ``concurrent.futures.Future()`` object. Calling ``future.result()`` will halt until the read is finished and returns the image and meta-data as per a normal call to `readMRC`. Example ------- worker = asyncReadMRC( 'someones_file.mrc' ) # Do some work mrcImage, mrcMeta = worker.result() ''' return _asyncExecutor.submit(readDM4, *args, **kwargs)