509 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			509 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | |
| Read a SAS XPort format file into a Pandas DataFrame.
 | |
| 
 | |
| Based on code from Jack Cushman (github.com/jcushman/xport).
 | |
| 
 | |
| The file format is defined here:
 | |
| 
 | |
| https://support.sas.com/content/dam/SAS/support/en/technical-papers/record-layout-of-a-sas-version-5-or-6-data-set-in-sas-transport-xport-format.pdf
 | |
| """
 | |
| from __future__ import annotations
 | |
| 
 | |
| from collections import abc
 | |
| from datetime import datetime
 | |
| import struct
 | |
| from typing import TYPE_CHECKING
 | |
| import warnings
 | |
| 
 | |
| import numpy as np
 | |
| 
 | |
| from pandas.util._decorators import Appender
 | |
| from pandas.util._exceptions import find_stack_level
 | |
| 
 | |
| import pandas as pd
 | |
| 
 | |
| from pandas.io.common import get_handle
 | |
| from pandas.io.sas.sasreader import ReaderBase
 | |
| 
 | |
| if TYPE_CHECKING:
 | |
|     from pandas._typing import (
 | |
|         CompressionOptions,
 | |
|         DatetimeNaTType,
 | |
|         FilePath,
 | |
|         ReadBuffer,
 | |
|     )
 | |
| _correct_line1 = (
 | |
|     "HEADER RECORD*******LIBRARY HEADER RECORD!!!!!!!"
 | |
|     "000000000000000000000000000000  "
 | |
| )
 | |
| _correct_header1 = (
 | |
|     "HEADER RECORD*******MEMBER  HEADER RECORD!!!!!!!000000000000000001600000000"
 | |
| )
 | |
| _correct_header2 = (
 | |
|     "HEADER RECORD*******DSCRPTR HEADER RECORD!!!!!!!"
 | |
|     "000000000000000000000000000000  "
 | |
| )
 | |
| _correct_obs_header = (
 | |
|     "HEADER RECORD*******OBS     HEADER RECORD!!!!!!!"
 | |
|     "000000000000000000000000000000  "
 | |
| )
 | |
| _fieldkeys = [
 | |
|     "ntype",
 | |
|     "nhfun",
 | |
|     "field_length",
 | |
|     "nvar0",
 | |
|     "name",
 | |
|     "label",
 | |
|     "nform",
 | |
|     "nfl",
 | |
|     "num_decimals",
 | |
|     "nfj",
 | |
|     "nfill",
 | |
|     "niform",
 | |
|     "nifl",
 | |
|     "nifd",
 | |
|     "npos",
 | |
|     "_",
 | |
| ]
 | |
| 
 | |
| 
 | |
| _base_params_doc = """\
 | |
| Parameters
 | |
| ----------
 | |
| filepath_or_buffer : str or file-like object
 | |
|     Path to SAS file or object implementing binary read method."""
 | |
| 
 | |
| _params2_doc = """\
 | |
| index : identifier of index column
 | |
|     Identifier of column that should be used as index of the DataFrame.
 | |
| encoding : str
 | |
|     Encoding for text data.
 | |
| chunksize : int
 | |
|     Read file `chunksize` lines at a time, returns iterator."""
 | |
| 
 | |
| _format_params_doc = """\
 | |
| format : str
 | |
|     File format, only `xport` is currently supported."""
 | |
| 
 | |
| _iterator_doc = """\
 | |
| iterator : bool, default False
 | |
|     Return XportReader object for reading file incrementally."""
 | |
| 
 | |
| 
 | |
| _read_sas_doc = f"""Read a SAS file into a DataFrame.
 | |
| 
 | |
| {_base_params_doc}
 | |
| {_format_params_doc}
 | |
| {_params2_doc}
 | |
| {_iterator_doc}
 | |
| 
 | |
| Returns
 | |
| -------
 | |
| DataFrame or XportReader
 | |
| 
 | |
| Examples
 | |
| --------
 | |
| Read a SAS Xport file:
 | |
| 
 | |
| >>> df = pd.read_sas('filename.XPT')
 | |
| 
 | |
| Read a Xport file in 10,000 line chunks:
 | |
| 
 | |
| >>> itr = pd.read_sas('filename.XPT', chunksize=10000)
 | |
| >>> for chunk in itr:
 | |
| >>>     do_something(chunk)
 | |
| 
 | |
| """
 | |
| 
 | |
| _xport_reader_doc = f"""\
 | |
| Class for reading SAS Xport files.
 | |
| 
 | |
| {_base_params_doc}
 | |
| {_params2_doc}
 | |
| 
 | |
| Attributes
 | |
| ----------
 | |
| member_info : list
 | |
|     Contains information about the file
 | |
| fields : list
 | |
|     Contains information about the variables in the file
 | |
| """
 | |
| 
 | |
| _read_method_doc = """\
 | |
| Read observations from SAS Xport file, returning as data frame.
 | |
| 
 | |
| Parameters
 | |
| ----------
 | |
| nrows : int
 | |
|     Number of rows to read from data file; if None, read whole
 | |
|     file.
 | |
| 
 | |
| Returns
 | |
| -------
 | |
| A DataFrame.
 | |
| """
 | |
| 
 | |
| 
 | |
| def _parse_date(datestr: str) -> DatetimeNaTType:
 | |
|     """Given a date in xport format, return Python date."""
 | |
|     try:
 | |
|         # e.g. "16FEB11:10:07:55"
 | |
|         return datetime.strptime(datestr, "%d%b%y:%H:%M:%S")
 | |
|     except ValueError:
 | |
|         return pd.NaT
 | |
| 
 | |
| 
 | |
| def _split_line(s: str, parts):
 | |
|     """
 | |
|     Parameters
 | |
|     ----------
 | |
|     s: str
 | |
|         Fixed-length string to split
 | |
|     parts: list of (name, length) pairs
 | |
|         Used to break up string, name '_' will be filtered from output.
 | |
| 
 | |
|     Returns
 | |
|     -------
 | |
|     Dict of name:contents of string at given location.
 | |
|     """
 | |
|     out = {}
 | |
|     start = 0
 | |
|     for name, length in parts:
 | |
|         out[name] = s[start : start + length].strip()
 | |
|         start += length
 | |
|     del out["_"]
 | |
|     return out
 | |
| 
 | |
| 
 | |
| def _handle_truncated_float_vec(vec, nbytes):
 | |
|     # This feature is not well documented, but some SAS XPORT files
 | |
|     # have 2-7 byte "truncated" floats.  To read these truncated
 | |
|     # floats, pad them with zeros on the right to make 8 byte floats.
 | |
|     #
 | |
|     # References:
 | |
|     # https://github.com/jcushman/xport/pull/3
 | |
|     # The R "foreign" library
 | |
| 
 | |
|     if nbytes != 8:
 | |
|         vec1 = np.zeros(len(vec), np.dtype("S8"))
 | |
|         dtype = np.dtype(f"S{nbytes},S{8 - nbytes}")
 | |
|         vec2 = vec1.view(dtype=dtype)
 | |
|         vec2["f0"] = vec
 | |
|         return vec2
 | |
| 
 | |
|     return vec
 | |
| 
 | |
| 
 | |
| def _parse_float_vec(vec):
 | |
|     """
 | |
|     Parse a vector of float values representing IBM 8 byte floats into
 | |
|     native 8 byte floats.
 | |
|     """
 | |
|     dtype = np.dtype(">u4,>u4")
 | |
|     vec1 = vec.view(dtype=dtype)
 | |
|     xport1 = vec1["f0"]
 | |
|     xport2 = vec1["f1"]
 | |
| 
 | |
|     # Start by setting first half of ieee number to first half of IBM
 | |
|     # number sans exponent
 | |
|     ieee1 = xport1 & 0x00FFFFFF
 | |
| 
 | |
|     # The fraction bit to the left of the binary point in the ieee
 | |
|     # format was set and the number was shifted 0, 1, 2, or 3
 | |
|     # places. This will tell us how to adjust the ibm exponent to be a
 | |
|     # power of 2 ieee exponent and how to shift the fraction bits to
 | |
|     # restore the correct magnitude.
 | |
|     shift = np.zeros(len(vec), dtype=np.uint8)
 | |
|     shift[np.where(xport1 & 0x00200000)] = 1
 | |
|     shift[np.where(xport1 & 0x00400000)] = 2
 | |
|     shift[np.where(xport1 & 0x00800000)] = 3
 | |
| 
 | |
|     # shift the ieee number down the correct number of places then
 | |
|     # set the second half of the ieee number to be the second half
 | |
|     # of the ibm number shifted appropriately, ored with the bits
 | |
|     # from the first half that would have been shifted in if we
 | |
|     # could shift a double. All we are worried about are the low
 | |
|     # order 3 bits of the first half since we're only shifting by
 | |
|     # 1, 2, or 3.
 | |
|     ieee1 >>= shift
 | |
|     ieee2 = (xport2 >> shift) | ((xport1 & 0x00000007) << (29 + (3 - shift)))
 | |
| 
 | |
|     # clear the 1 bit to the left of the binary point
 | |
|     ieee1 &= 0xFFEFFFFF
 | |
| 
 | |
|     # set the exponent of the ieee number to be the actual exponent
 | |
|     # plus the shift count + 1023. Or this into the first half of the
 | |
|     # ieee number. The ibm exponent is excess 64 but is adjusted by 65
 | |
|     # since during conversion to ibm format the exponent is
 | |
|     # incremented by 1 and the fraction bits left 4 positions to the
 | |
|     # right of the radix point.  (had to add >> 24 because C treats &
 | |
|     # 0x7f as 0x7f000000 and Python doesn't)
 | |
|     ieee1 |= ((((((xport1 >> 24) & 0x7F) - 65) << 2) + shift + 1023) << 20) | (
 | |
|         xport1 & 0x80000000
 | |
|     )
 | |
| 
 | |
|     ieee = np.empty((len(ieee1),), dtype=">u4,>u4")
 | |
|     ieee["f0"] = ieee1
 | |
|     ieee["f1"] = ieee2
 | |
|     ieee = ieee.view(dtype=">f8")
 | |
|     ieee = ieee.astype("f8")
 | |
| 
 | |
|     return ieee
 | |
| 
 | |
| 
 | |
| class XportReader(ReaderBase, abc.Iterator):
 | |
|     __doc__ = _xport_reader_doc
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         filepath_or_buffer: FilePath | ReadBuffer[bytes],
 | |
|         index=None,
 | |
|         encoding: str | None = "ISO-8859-1",
 | |
|         chunksize: int | None = None,
 | |
|         compression: CompressionOptions = "infer",
 | |
|     ) -> None:
 | |
|         self._encoding = encoding
 | |
|         self._lines_read = 0
 | |
|         self._index = index
 | |
|         self._chunksize = chunksize
 | |
| 
 | |
|         self.handles = get_handle(
 | |
|             filepath_or_buffer,
 | |
|             "rb",
 | |
|             encoding=encoding,
 | |
|             is_text=False,
 | |
|             compression=compression,
 | |
|         )
 | |
|         self.filepath_or_buffer = self.handles.handle
 | |
| 
 | |
|         try:
 | |
|             self._read_header()
 | |
|         except Exception:
 | |
|             self.close()
 | |
|             raise
 | |
| 
 | |
|     def close(self) -> None:
 | |
|         self.handles.close()
 | |
| 
 | |
|     def _get_row(self):
 | |
|         return self.filepath_or_buffer.read(80).decode()
 | |
| 
 | |
|     def _read_header(self) -> None:
 | |
|         self.filepath_or_buffer.seek(0)
 | |
| 
 | |
|         # read file header
 | |
|         line1 = self._get_row()
 | |
|         if line1 != _correct_line1:
 | |
|             if "**COMPRESSED**" in line1:
 | |
|                 # this was created with the PROC CPORT method and can't be read
 | |
|                 # https://documentation.sas.com/doc/en/pgmsascdc/9.4_3.5/movefile/p1bm6aqp3fw4uin1hucwh718f6kp.htm
 | |
|                 raise ValueError(
 | |
|                     "Header record indicates a CPORT file, which is not readable."
 | |
|                 )
 | |
|             raise ValueError("Header record is not an XPORT file.")
 | |
| 
 | |
|         line2 = self._get_row()
 | |
|         fif = [["prefix", 24], ["version", 8], ["OS", 8], ["_", 24], ["created", 16]]
 | |
|         file_info = _split_line(line2, fif)
 | |
|         if file_info["prefix"] != "SAS     SAS     SASLIB":
 | |
|             raise ValueError("Header record has invalid prefix.")
 | |
|         file_info["created"] = _parse_date(file_info["created"])
 | |
|         self.file_info = file_info
 | |
| 
 | |
|         line3 = self._get_row()
 | |
|         file_info["modified"] = _parse_date(line3[:16])
 | |
| 
 | |
|         # read member header
 | |
|         header1 = self._get_row()
 | |
|         header2 = self._get_row()
 | |
|         headflag1 = header1.startswith(_correct_header1)
 | |
|         headflag2 = header2 == _correct_header2
 | |
|         if not (headflag1 and headflag2):
 | |
|             raise ValueError("Member header not found")
 | |
|         # usually 140, could be 135
 | |
|         fieldnamelength = int(header1[-5:-2])
 | |
| 
 | |
|         # member info
 | |
|         mem = [
 | |
|             ["prefix", 8],
 | |
|             ["set_name", 8],
 | |
|             ["sasdata", 8],
 | |
|             ["version", 8],
 | |
|             ["OS", 8],
 | |
|             ["_", 24],
 | |
|             ["created", 16],
 | |
|         ]
 | |
|         member_info = _split_line(self._get_row(), mem)
 | |
|         mem = [["modified", 16], ["_", 16], ["label", 40], ["type", 8]]
 | |
|         member_info.update(_split_line(self._get_row(), mem))
 | |
|         member_info["modified"] = _parse_date(member_info["modified"])
 | |
|         member_info["created"] = _parse_date(member_info["created"])
 | |
|         self.member_info = member_info
 | |
| 
 | |
|         # read field names
 | |
|         types = {1: "numeric", 2: "char"}
 | |
|         fieldcount = int(self._get_row()[54:58])
 | |
|         datalength = fieldnamelength * fieldcount
 | |
|         # round up to nearest 80
 | |
|         if datalength % 80:
 | |
|             datalength += 80 - datalength % 80
 | |
|         fielddata = self.filepath_or_buffer.read(datalength)
 | |
|         fields = []
 | |
|         obs_length = 0
 | |
|         while len(fielddata) >= fieldnamelength:
 | |
|             # pull data for one field
 | |
|             fieldbytes, fielddata = (
 | |
|                 fielddata[:fieldnamelength],
 | |
|                 fielddata[fieldnamelength:],
 | |
|             )
 | |
| 
 | |
|             # rest at end gets ignored, so if field is short, pad out
 | |
|             # to match struct pattern below
 | |
|             fieldbytes = fieldbytes.ljust(140)
 | |
| 
 | |
|             fieldstruct = struct.unpack(">hhhh8s40s8shhh2s8shhl52s", fieldbytes)
 | |
|             field = dict(zip(_fieldkeys, fieldstruct))
 | |
|             del field["_"]
 | |
|             field["ntype"] = types[field["ntype"]]
 | |
|             fl = field["field_length"]
 | |
|             if field["ntype"] == "numeric" and ((fl < 2) or (fl > 8)):
 | |
|                 msg = f"Floating field width {fl} is not between 2 and 8."
 | |
|                 raise TypeError(msg)
 | |
| 
 | |
|             for k, v in field.items():
 | |
|                 try:
 | |
|                     field[k] = v.strip()
 | |
|                 except AttributeError:
 | |
|                     pass
 | |
| 
 | |
|             obs_length += field["field_length"]
 | |
|             fields += [field]
 | |
| 
 | |
|         header = self._get_row()
 | |
|         if not header == _correct_obs_header:
 | |
|             raise ValueError("Observation header not found.")
 | |
| 
 | |
|         self.fields = fields
 | |
|         self.record_length = obs_length
 | |
|         self.record_start = self.filepath_or_buffer.tell()
 | |
| 
 | |
|         self.nobs = self._record_count()
 | |
|         self.columns = [x["name"].decode() for x in self.fields]
 | |
| 
 | |
|         # Setup the dtype.
 | |
|         dtypel = [
 | |
|             ("s" + str(i), "S" + str(field["field_length"]))
 | |
|             for i, field in enumerate(self.fields)
 | |
|         ]
 | |
|         dtype = np.dtype(dtypel)
 | |
|         self._dtype = dtype
 | |
| 
 | |
|     def __next__(self) -> pd.DataFrame:
 | |
|         return self.read(nrows=self._chunksize or 1)
 | |
| 
 | |
|     def _record_count(self) -> int:
 | |
|         """
 | |
|         Get number of records in file.
 | |
| 
 | |
|         This is maybe suboptimal because we have to seek to the end of
 | |
|         the file.
 | |
| 
 | |
|         Side effect: returns file position to record_start.
 | |
|         """
 | |
|         self.filepath_or_buffer.seek(0, 2)
 | |
|         total_records_length = self.filepath_or_buffer.tell() - self.record_start
 | |
| 
 | |
|         if total_records_length % 80 != 0:
 | |
|             warnings.warn(
 | |
|                 "xport file may be corrupted.",
 | |
|                 stacklevel=find_stack_level(),
 | |
|             )
 | |
| 
 | |
|         if self.record_length > 80:
 | |
|             self.filepath_or_buffer.seek(self.record_start)
 | |
|             return total_records_length // self.record_length
 | |
| 
 | |
|         self.filepath_or_buffer.seek(-80, 2)
 | |
|         last_card_bytes = self.filepath_or_buffer.read(80)
 | |
|         last_card = np.frombuffer(last_card_bytes, dtype=np.uint64)
 | |
| 
 | |
|         # 8 byte blank
 | |
|         ix = np.flatnonzero(last_card == 2314885530818453536)
 | |
| 
 | |
|         if len(ix) == 0:
 | |
|             tail_pad = 0
 | |
|         else:
 | |
|             tail_pad = 8 * len(ix)
 | |
| 
 | |
|         self.filepath_or_buffer.seek(self.record_start)
 | |
| 
 | |
|         return (total_records_length - tail_pad) // self.record_length
 | |
| 
 | |
|     def get_chunk(self, size: int | None = None) -> pd.DataFrame:
 | |
|         """
 | |
|         Reads lines from Xport file and returns as dataframe
 | |
| 
 | |
|         Parameters
 | |
|         ----------
 | |
|         size : int, defaults to None
 | |
|             Number of lines to read.  If None, reads whole file.
 | |
| 
 | |
|         Returns
 | |
|         -------
 | |
|         DataFrame
 | |
|         """
 | |
|         if size is None:
 | |
|             size = self._chunksize
 | |
|         return self.read(nrows=size)
 | |
| 
 | |
|     def _missing_double(self, vec):
 | |
|         v = vec.view(dtype="u1,u1,u2,u4")
 | |
|         miss = (v["f1"] == 0) & (v["f2"] == 0) & (v["f3"] == 0)
 | |
|         miss1 = (
 | |
|             ((v["f0"] >= 0x41) & (v["f0"] <= 0x5A))
 | |
|             | (v["f0"] == 0x5F)
 | |
|             | (v["f0"] == 0x2E)
 | |
|         )
 | |
|         miss &= miss1
 | |
|         return miss
 | |
| 
 | |
|     @Appender(_read_method_doc)
 | |
|     def read(self, nrows: int | None = None) -> pd.DataFrame:
 | |
|         if nrows is None:
 | |
|             nrows = self.nobs
 | |
| 
 | |
|         read_lines = min(nrows, self.nobs - self._lines_read)
 | |
|         read_len = read_lines * self.record_length
 | |
|         if read_len <= 0:
 | |
|             self.close()
 | |
|             raise StopIteration
 | |
|         raw = self.filepath_or_buffer.read(read_len)
 | |
|         data = np.frombuffer(raw, dtype=self._dtype, count=read_lines)
 | |
| 
 | |
|         df_data = {}
 | |
|         for j, x in enumerate(self.columns):
 | |
|             vec = data["s" + str(j)]
 | |
|             ntype = self.fields[j]["ntype"]
 | |
|             if ntype == "numeric":
 | |
|                 vec = _handle_truncated_float_vec(vec, self.fields[j]["field_length"])
 | |
|                 miss = self._missing_double(vec)
 | |
|                 v = _parse_float_vec(vec)
 | |
|                 v[miss] = np.nan
 | |
|             elif self.fields[j]["ntype"] == "char":
 | |
|                 v = [y.rstrip() for y in vec]
 | |
| 
 | |
|                 if self._encoding is not None:
 | |
|                     v = [y.decode(self._encoding) for y in v]
 | |
| 
 | |
|             df_data.update({x: v})
 | |
|         df = pd.DataFrame(df_data)
 | |
| 
 | |
|         if self._index is None:
 | |
|             df.index = pd.Index(range(self._lines_read, self._lines_read + read_lines))
 | |
|         else:
 | |
|             df = df.set_index(self._index)
 | |
| 
 | |
|         self._lines_read += read_lines
 | |
| 
 | |
|         return df
 |