#!/usr/bin/env python
# coding: utf8
"""
Authors: Jérôme Kieffer, ESRF
email:jerome.kieffer@esrf.fr
Cif Binary Files images are 2D images written by the Pilatus detector and others.
They use a modified (simplified) byte-offset algorithm.
CIF is a library for manipulating Crystallographic information files and tries
to conform to the specification of the IUCR
"""
# get ready for python3
from __future__ import with_statement, print_function
__author__ = "Jérôme Kieffer"
__contact__ = "jerome.kieffer@esrf.eu"
__license__ = "GPLv3+"
__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
__version__ = ["Generated by CIF.py: Jan 2005 - April 2012",
"Written by Jerome Kieffer: Jerome.Kieffer@esrf.eu",
"On-line data analysis / ISDD ", "ESRF Grenoble (France)"]
import os, logging
logger = logging.getLogger("cbfimage")
import numpy
from .fabioimage import fabioimage
from .compression import decByteOffet_numpy, md5sum, compByteOffet_numpy
#import time
DATA_TYPES = { "signed 8-bit integer" : numpy.int8,
"signed 16-bit integer" : numpy.int16,
"signed 32-bit integer" : numpy.int32,
"signed 64-bit integer" : numpy.int64
}
MINIMUM_KEYS = ["X-Binary-Size-Fastest-Dimension",
'ByteOrder',
'Data type',
'X dimension',
'Y dimension',
'Number of readouts']
STARTER = "\x0c\x1a\x04\xd5"
PADDING = 512
[docs]class cbfimage(fabioimage):
"""
Read the Cif Binary File data format
"""
def __init__(self, data=None , header=None, fname=None):
"""
Constructor of the class CIF Binary File reader.
@param _strFilename: the name of the file to open
@type _strFilename: string
"""
fabioimage.__init__(self, data, header)
self.cif = CIF()
if fname is not None: #load the file)
self.read(fname)
@staticmethod
[docs] def checkData(data=None):
if data is None:
return None
elif numpy.issubdtype(data.dtype, int):
return data
else:
return data.astype(int)
def _readheader(self, inStream):
"""
Read in a header in some CBF format from a string representing binary stuff
@param inStream: file containing the Cif Binary part.
@type inStream: opened file.
"""
self.cif.loadCIF(inStream, _bKeepComment=True)
# backport contents of the CIF data to the headers
for key in self.cif:
if key != "_array_data.data":
self.header_keys.append(key)
self.header[key] = self.cif[key].strip(" \"\n\r\t")
if not "_array_data.data" in self.cif:
raise Exception("cbfimage: CBF file %s is corrupt, cannot find data block with '_array_data.data' key" % self.fname)
inStream2 = self.cif["_array_data.data"]
sep = "\r\n"
iSepPos = inStream2.find(sep)
if iSepPos < 0 or iSepPos > 80:
sep = "\n" #switch back to unix representation
lines = inStream2.split(sep)
for oneLine in lines[1:]:
if len(oneLine) < 10:
break
try:
key, val = oneLine.split(':' , 1)
except ValueError:
key, val = oneLine.split('=' , 1)
key = key.strip()
self.header_keys.append(key)
self.header[key] = val.strip(" \"\n\r\t")
missing = []
for item in MINIMUM_KEYS:
if item not in self.header_keys:
missing.append(item)
if len(missing) > 0:
logger.debug("CBF file misses the keys " + " ".join(missing))
[docs] def read(self, fname, frame=None):
"""
Read in header into self.header and
the data into self.data
"""
self.filename = fname
self.header = {}
self.resetvals()
infile = self._open(fname, "rb")
self._readheader(infile)
# Compute image size
try:
self.dim1 = int(self.header['X-Binary-Size-Fastest-Dimension'])
self.dim2 = int(self.header['X-Binary-Size-Second-Dimension'])
except:
raise Exception(IOError, "CBF file %s is corrupt, no dimensions in it" % fname)
try:
bytecode = DATA_TYPES[self.header['X-Binary-Element-Type']]
self.bpp = len(numpy.array(0, bytecode).tostring())
except KeyError:
bytecode = numpy.int32
self.bpp = 32
logger.warning("Defaulting type to int32")
if self.header["conversions"] == "x-CBF_BYTE_OFFSET":
self.data = self._readbinary_byte_offset(self.cif["_array_data.data"]).astype(bytecode).reshape((self.dim2, self.dim1))
else:
raise Exception(IOError, "Compression scheme not yet supported, please contact FABIO development team")
self.bytecode = self.data.dtype.type
self.resetvals()
# # ensure the PIL image is reset
self.pilimage = None
return self
def _readbinary_byte_offset(self, inStream):
"""
Read in a binary part of an x-CBF_BYTE_OFFSET compressed image
@param inStream: the binary image (without any CIF decorators)
@type inStream: python string.
@return: a linear numpy array without shape and dtype set
@rtype: numpy array
"""
startPos = inStream.find(STARTER) + 4
data = inStream[ startPos: startPos + int(self.header["X-Binary-Size"])]
try:
import byte_offset
except ImportError:
logger.warning("Error in byte_offset part: Falling back to Numpy implementation")
myData = decByteOffet_numpy(data, size=self.dim1 * self.dim2)
else:
myData = byte_offset.analyseCython(data, size=self.dim1 * self.dim2)
assert len(myData) == self.dim1 * self.dim2
return myData
[docs] def write(self, fname):
"""
write the file in CBF format
@param fname: name of the file
@type: string
"""
if self.data is not None:
self.dim2, self.dim1 = self.data.shape
else:
raise RuntimeError("CBF image contains no data")
binary_blob = compByteOffet_numpy(self.data)
# l = len(binary_blob)
# if (l % PADDING) != 0:
# rem = PADDING - (l % PADDING)
# binary_blob += "\x00" * rem
dtype = "Unknown"
for key, value in DATA_TYPES.iteritems():
if value == self.data.dtype:
dtype = key
binary_block = [
"--CIF-BINARY-FORMAT-SECTION--",
"Content-Type: application/octet-stream;",
' conversions="x-CBF_BYTE_OFFSET"',
'Content-Transfer-Encoding: BINARY',
"X-Binary-Size: %d" % (len(binary_blob)),
"X-Binary-ID: 1",
'X-Binary-Element-Type: "%s"' % (dtype),
"X-Binary-Element-Byte-Order: LITTLE_ENDIAN" ,
"Content-MD5: %s" % md5sum(binary_blob),
"X-Binary-Number-of-Elements: %s" % (self.dim1 * self.dim2),
"X-Binary-Size-Fastest-Dimension: %d" % self.dim1,
"X-Binary-Size-Second-Dimension: %d" % self.dim2,
"X-Binary-Size-Padding: %d" % 1,
"",
STARTER + binary_blob,
"",
"--CIF-BINARY-FORMAT-SECTION----"
]
if "_array_data.header_contents" not in self.header:
nonCifHeaders = []
else:
nonCifHeaders = [i.strip()[2:] for i in self.header["_array_data.header_contents"].split("\n") if i.find("# ") >= 0]
for key in self.header:
if (key not in self.header_keys):
self.header_keys.append(key)
for key in self.header_keys:
if key.startswith("_") :
if key not in self.cif or self.cif[key] != self.header[key]:
self.cif[key] = self.header[key]
elif key.startswith("X-Binary-"):
pass
elif key.startswith("Content-"):
pass
elif key.startswith("conversions"):
pass
elif key.startswith("filename"):
pass
elif key in self.header:
nonCifHeaders.append("%s %s" % (key, self.header[key]))
if len(nonCifHeaders) > 0:
self.cif["_array_data.header_contents"] = "\r\n".join(["# %s" % i for i in nonCifHeaders])
self.cif["_array_data.data"] = "\r\n".join(binary_block)
self.cif.saveCIF(fname, linesep="\r\n", binary=True)
################################################################################
# CIF class
################################################################################
[docs]class CIF(dict):
"""
This is the CIF class, it represents the CIF dictionary;
and as a a python dictionary thus inherits from the dict built in class.
"""
EOL = ["\r", "\n", "\r\n", "\n\r"]
BLANK = [" ", "\t"] + EOL
START_COMMENT = ["\"", "\'"]
BINARY_MARKER = "--CIF-BINARY-FORMAT-SECTION--"
def __init__(self, _strFilename=None):
"""
Constructor of the class.
@param _strFilename: the name of the file to open
@type _strFilename: filename (str) or file object
"""
dict.__init__(self)
self._ordered = []
if _strFilename is not None: #load the file)
self.loadCIF(_strFilename)
def __setitem__(self, key, value):
if key not in self._ordered:
self._ordered.append(key)
return dict.__setitem__(self, key, value)
[docs] def pop(self, key):
if key in self._ordered:
self._ordered.remove(key)
return dict.pop(self, key)
[docs] def popitem(self, key):
if key in self._ordered:
self._ordered.remove(key)
return dict.popitem(self, key)
[docs] def loadCIF(self, _strFilename, _bKeepComment=False):
"""Load the CIF file and populates the CIF dictionary into the object
@param _strFilename: the name of the file to open
@type _strFilename: string
@param _strFilename: the name of the file to open
@type _strFilename: string
@return: None
"""
if isinstance(_strFilename, (str, unicode)):
if os.path.isfile(_strFilename):
infile = open(_strFilename, "rb")
else:
raise RuntimeError("CIF.loadCIF: No such file to open: %s" % _strFilename)
#elif isinstance(_strFilename, file, bz2.BZ2File, ):
elif "read" in dir(_strFilename):
infile = _strFilename
else:
raise RuntimeError("CIF.loadCIF: what is %s type %s" % (_strFilename, type(_strFilename)))
if _bKeepComment:
self._parseCIF(infile.read())
else:
self._parseCIF(CIF._readCIF(infile))
readCIF = loadCIF
@staticmethod
[docs] def isAscii(_strIn):
"""
Check if all characters in a string are ascii,
@param _strIn: input string
@type _strIn: python string
@return: boolean
@rtype: boolean
"""
bIsAcii = True
for i in _strIn:
if ord(i) > 127:
bIsAcii = False
break
return bIsAcii
@staticmethod
def _readCIF(_instream):
"""
- Check if the filename containing the CIF data exists
- read the cif file
- removes the comments
@param _instream: the file containing the CIF data
@type _instream: open file in read mode
@return: a string containing the raw data
@rtype: string
"""
if not "readlines" in dir(_instream):
raise RuntimeError("CIF._readCIF(instream): I expected instream to be an opened file,\
here I got %s type %s" % (_instream, type(_instream)))
lLinesRead = _instream.readlines()
sText = ""
for sLine in lLinesRead:
iPos = sLine.find("#")
if iPos >= 0:
if CIF.isAscii(sLine):
sText += sLine[:iPos] + os.linesep
if iPos > 80 :
logger.warning("This line is too long and could cause problems in PreQuest: %s", sLine)
else :
sText += sLine
if len(sLine.strip()) > 80 :
logger.warning("This line is too long and could cause problems in PreQues: %s", sLine)
return sText
def _parseCIF(self, sText):
"""
- Parses the text of a CIF file
- Cut it in fields
- Find all the loops and process
- Find all the keys and values
@param sText: the content of the CIF - file
@type sText: string
@return: Nothing, the data are incorporated at the CIF object dictionary
@rtype: None
"""
loopidx = []
looplen = []
loop = []
#first of all : separate the cif file in fields
lFields = CIF._splitCIF(sText.strip())
#Then : look for loops
for i in range(len(lFields)):
if lFields[i].lower() == "loop_":
loopidx.append(i)
if len(loopidx) > 0:
for i in loopidx:
loopone, length, keys = CIF._analyseOneLoop(lFields, i)
loop.append([keys, loopone])
looplen.append(length)
for i in range(len(loopidx) - 1, -1, -1):
f1 = lFields[:loopidx[i]] + lFields[loopidx[i] + looplen[i]:]
lFields = f1
self["loop_"] = loop
for i in range(len(lFields) - 1):
# print lFields[i], lFields[i+1]
if len(lFields[i + 1]) == 0 : lFields[i + 1] = "?"
if lFields[i][0] == "_" and lFields[i + 1][0] != "_":
self[lFields[i]] = lFields[i + 1]
@staticmethod
def _splitCIF(sText):
"""
Separate the text in fields as defined in the CIF
@param sText: the content of the CIF - file
@type sText: string
@return: list of all the fields of the CIF
@rtype: list
"""
lFields = []
while True:
if len(sText) == 0:
break
elif sText[0] == "'":
idx = 0
bFinished = False
while not bFinished:
idx += 1 + sText[idx + 1:].find("'")
##########debuging in case we arrive at the end of the text
if idx >= len(sText) - 1:
# print sText,idx,len(sText)
lFields.append(sText[1:-1].strip())
sText = ""
bFinished = True
break
if sText[idx + 1] in CIF.BLANK:
lFields.append(sText[1:idx].strip())
sText1 = sText[idx + 1:]
sText = sText1.strip()
bFinished = True
elif sText[0] == '"':
idx = 0
bFinished = False
while not bFinished:
idx += 1 + sText[idx + 1:].find('"')
##########debuging in case we arrive at the end of the text
if idx >= len(sText) - 1:
# print sText,idx,len(sText)
lFields.append(sText[1:-1].strip())
# print lFields[-1]
sText = ""
bFinished = True
break
if sText[idx + 1] in CIF.BLANK:
lFields.append(sText[1:idx].strip())
# print lFields[-1]
sText1 = sText[idx + 1:]
sText = sText1.strip()
bFinished = True
elif sText[0] == ';':
if sText[1:].strip().find(CIF.BINARY_MARKER) == 0:
idx = sText[32:].find(CIF.BINARY_MARKER)
if idx == -1:
idx = 0
else:
idx += 32 + len(CIF.BINARY_MARKER)
else:
idx = 0
bFinished = False
while not bFinished:
idx += 1 + sText[idx + 1:].find(';')
if sText[idx - 1] in CIF.EOL:
lFields.append(sText[1:idx - 1].strip())
sText1 = sText[idx + 1:]
sText = sText1.strip()
bFinished = True
else:
f = sText.split(None, 1)[0]
lFields.append(f)
# print lFields[-1]
sText1 = sText[len(f):].strip()
sText = sText1
return lFields
@staticmethod
def _analyseOneLoop(lFields, iStart):
"""Processes one loop in the data extraction of the CIF file
@param lFields: list of all the words contained in the cif file
@type lFields: list
@param iStart: the starting index corresponding to the "loop_" key
@type iStart: integer
@return: the list of loop dictionaries, the length of the data
extracted from the lFields and the list of all the keys of the loop.
@rtype: tuple
"""
# in earch loop we first search the length of the loop
# print lFields
# curloop = {}
loop = []
keys = []
i = iStart + 1
bFinished = False
while not bFinished:
if lFields[i][0] == "_":
keys.append(lFields[i])#.lower())
i += 1
else:
bFinished = True
data = []
while True:
if i >= len(lFields):
break
elif len(lFields[i]) == 0:
break
elif lFields[i][0] == "_":
break
elif lFields[i] in ["loop_", "stop_", "global_", "data_", "save_"]:
break
else:
data.append(lFields[i])
i += 1
#print len(keys), len(data)
k = 0
if len(data) < len(keys):
element = {}
for j in keys:
if k < len(data):
element[j] = data[k]
else :
element[j] = "?"
k += 1
#print element
loop.append(element)
else:
#print data
#print keys
for i in range(len(data) / len(keys)):
element = {}
for j in keys:
element[j] = data[k]
k += 1
# print element
loop.append(element)
# print loop
return loop, 1 + len(keys) + len(data), keys
#############################################################################################
######## everything needed to write a cif file #########################################
#############################################################################################
[docs] def saveCIF(self, _strFilename="test.cif", linesep=os.linesep, binary=False):
"""Transforms the CIF object in string then write it into the given file
@param _strFilename: the of the file to be written
@param linesep: line separation used (to force compatibility with windows/unix)
@param binary: Shall we write the data as binary (True only for imageCIF/CBF)
@type param: string
"""
if binary:
mode = "wb"
else:
mode = "w"
try:
fFile = open(_strFilename, mode)
except IOError:
print("Error during the opening of file for write: %s" %
_strFilename)
return
fFile.write(self.tostring(_strFilename, linesep))
try:
fFile.close()
except IOError:
print("Error during the closing of file for write: %s" %
_strFilename)
[docs] def tostring(self, _strFilename=None, linesep=os.linesep):
"""
Converts a cif dictionnary to a string according to the CIF syntax
@param _strFilename: the name of the filename to be appended in the header of the CIF file
@type _strFilename: string
@return: a sting that corresponds to the content of the CIF - file.
"""
# sCifText = ""
lstStrCif = ["# " + i for i in __version__]
if "_chemical_name_common" in self:
t = self["_chemical_name_common"].split()[0]
elif _strFilename is not None:
t = os.path.splitext(os.path.split(str(_strFilename).strip())[1])[0]
else:
t = ""
lstStrCif.append("data_%s" % (t))
#first of all get all the keys :
lKeys = self.keys()
lKeys.sort()
for key in lKeys[:]:
if key in self._ordered:
lKeys.remove(key)
self._ordered += lKeys
for sKey in self._ordered:
if sKey == "loop_":
continue
if sKey not in self:
self._ordered.remove(sKey)
logger.debug("Skipping key %s from ordered list as no more present in dict")
continue
sValue = str(self[sKey])
if sValue.find("\n") > -1: #should add value between ;;
lLine = [sKey, ";", sValue, ";", ""]
elif len(sValue.split()) > 1: #should add value between ''
sLine = "%s '%s'" % (sKey, sValue)
if len(sLine) > 80:
lLine = [str(sKey), sValue]
else:
lLine = [sLine]
else:
sLine = "%s %s" % (sKey, sValue)
if len(sLine) > 80:
lLine = [str(sKey), sValue]
else:
lLine = [sLine]
lstStrCif += lLine
if self.has_key("loop_"):
for loop in self["loop_"]:
lstStrCif.append("loop_ ")
lKeys = loop[0]
llData = loop[1]
lstStrCif += [" %s" % (sKey) for sKey in lKeys]
for lData in llData:
sLine = " "
for key in lKeys:
sRawValue = lData[key]
if sRawValue.find("\n") > -1: #should add value between ;;
lstStrCif += [sLine, ";", str(sRawValue), ";"]
sLine = " "
else:
if len(sRawValue.split()) > 1: #should add value between ''
value = "'%s'" % (sRawValue)
else:
value = str(sRawValue)
if len(sLine) + len(value) > 78:
lstStrCif += [sLine]
sLine = " " + value
else:
sLine += " " + value
lstStrCif.append(sLine)
lstStrCif.append("")
return linesep.join(lstStrCif)
[docs] def exists(self, sKey):
"""
Check if the key exists in the CIF and is non empty.
@param sKey: CIF key
@type sKey: string
@param cif: CIF dictionary
@return: True if the key exists in the CIF dictionary and is non empty
@rtype: boolean
"""
bExists = False
if self.has_key(sKey):
if len(self[sKey]) >= 1:
if self[sKey][0] not in ["?", "."]:
bExists = True
return bExists
[docs] def existsInLoop(self, sKey):
"""
Check if the key exists in the CIF dictionary.
@param sKey: CIF key
@type sKey: string
@param cif: CIF dictionary
@return: True if the key exists in the CIF dictionary and is non empty
@rtype: boolean
"""
if not self.exists("loop_"):
return False
bExists = False
if not bExists:
for i in self["loop_"]:
for j in i[0]:
if j == sKey:
bExists = True
return bExists
[docs] def loadCHIPLOT(self, _strFilename):
"""
Load the powder diffraction CHIPLOT file and returns the
pd_CIF dictionary in the object
@param _strFilename: the name of the file to open
@type _strFilename: string
@return: the CIF object corresponding to the powder diffraction
@rtype: dictionary
"""
if not os.path.isfile(_strFilename):
errStr = "I cannot find the file %s" % _strFilename
logger.error(errStr)
raise IOError(errStr)
lInFile = open(_strFilename, "r").readlines()
self["_audit_creation_method"] = 'From 2-D detector using FIT2D and CIFfile'
self["_pd_meas_scan_method"] = "fixed"
self["_pd_spec_description"] = lInFile[0].strip()
try:
iLenData = int(lInFile[3])
except ValueError:
iLenData = None
lOneLoop = []
try:
f2ThetaMin = float(lInFile[4].split()[0])
last = ""
for sLine in lInFile[-20:]:
if sLine.strip() != "":
last = sLine.strip()
f2ThetaMax = float(last.split()[0])
limitsOK = True
except (ValueError, IndexError):
limitsOK = False
f2ThetaMin = 180.0
f2ThetaMax = 0
# print "limitsOK:", limitsOK
for sLine in lInFile[4:]:
sCleaned = sLine.split("#")[0].strip()
data = sCleaned.split()
if len(data) == 2 :
if not limitsOK:
f2Theta = float(data[0])
if f2Theta < f2ThetaMin :
f2ThetaMin = f2Theta
if f2Theta > f2ThetaMax :
f2ThetaMax = f2Theta
lOneLoop.append({ "_pd_meas_intensity_total": data[1] })
if not iLenData:
iLenData = len(lOneLoop)
assert (iLenData == len(lOneLoop))
self[ "_pd_meas_2theta_range_inc" ] = "%.4f" % ((f2ThetaMax - f2ThetaMin) / (iLenData - 1))
if self[ "_pd_meas_2theta_range_inc" ] < 0:
self[ "_pd_meas_2theta_range_inc" ] = abs (self[ "_pd_meas_2theta_range_inc" ])
tmp = f2ThetaMax
f2ThetaMax = f2ThetaMin
f2ThetaMin = tmp
self[ "_pd_meas_2theta_range_max" ] = "%.4f" % f2ThetaMax
self[ "_pd_meas_2theta_range_min" ] = "%.4f" % f2ThetaMin
self[ "_pd_meas_number_of_points" ] = str(iLenData)
self["loop_"] = [ [ ["_pd_meas_intensity_total" ], lOneLoop ] ]
@staticmethod
[docs] def LoopHasKey(loop, key):
"Returns True if the key (string) exist in the array called loop"""
try:
loop.index(key)
return True
except ValueError:
return False