212 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			212 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | |
| Tests compressed data parsing functionality for all
 | |
| of the parsers defined in parsers.py
 | |
| """
 | |
| 
 | |
| import os
 | |
| from pathlib import Path
 | |
| import tarfile
 | |
| import zipfile
 | |
| 
 | |
| import pytest
 | |
| 
 | |
| from pandas import DataFrame
 | |
| import pandas._testing as tm
 | |
| 
 | |
| pytestmark = pytest.mark.filterwarnings(
 | |
|     "ignore:Passing a BlockManager to DataFrame:DeprecationWarning"
 | |
| )
 | |
| 
 | |
| 
 | |
| @pytest.fixture(params=[True, False])
 | |
| def buffer(request):
 | |
|     return request.param
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def parser_and_data(all_parsers, csv1):
 | |
|     parser = all_parsers
 | |
| 
 | |
|     with open(csv1, "rb") as f:
 | |
|         data = f.read()
 | |
|     expected = parser.read_csv(csv1)
 | |
| 
 | |
|     return parser, data, expected
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("compression", ["zip", "infer", "zip2"])
 | |
| def test_zip(parser_and_data, compression):
 | |
|     parser, data, expected = parser_and_data
 | |
| 
 | |
|     with tm.ensure_clean("test_file.zip") as path:
 | |
|         with zipfile.ZipFile(path, mode="w") as tmp:
 | |
|             tmp.writestr("test_file", data)
 | |
| 
 | |
|         if compression == "zip2":
 | |
|             with open(path, "rb") as f:
 | |
|                 result = parser.read_csv(f, compression="zip")
 | |
|         else:
 | |
|             result = parser.read_csv(path, compression=compression)
 | |
| 
 | |
|         tm.assert_frame_equal(result, expected)
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("compression", ["zip", "infer"])
 | |
| def test_zip_error_multiple_files(parser_and_data, compression):
 | |
|     parser, data, expected = parser_and_data
 | |
| 
 | |
|     with tm.ensure_clean("combined_zip.zip") as path:
 | |
|         inner_file_names = ["test_file", "second_file"]
 | |
| 
 | |
|         with zipfile.ZipFile(path, mode="w") as tmp:
 | |
|             for file_name in inner_file_names:
 | |
|                 tmp.writestr(file_name, data)
 | |
| 
 | |
|         with pytest.raises(ValueError, match="Multiple files"):
 | |
|             parser.read_csv(path, compression=compression)
 | |
| 
 | |
| 
 | |
| def test_zip_error_no_files(parser_and_data):
 | |
|     parser, _, _ = parser_and_data
 | |
| 
 | |
|     with tm.ensure_clean() as path:
 | |
|         with zipfile.ZipFile(path, mode="w"):
 | |
|             pass
 | |
| 
 | |
|         with pytest.raises(ValueError, match="Zero files"):
 | |
|             parser.read_csv(path, compression="zip")
 | |
| 
 | |
| 
 | |
| def test_zip_error_invalid_zip(parser_and_data):
 | |
|     parser, _, _ = parser_and_data
 | |
| 
 | |
|     with tm.ensure_clean() as path:
 | |
|         with open(path, "rb") as f:
 | |
|             with pytest.raises(zipfile.BadZipFile, match="File is not a zip file"):
 | |
|                 parser.read_csv(f, compression="zip")
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("filename", [None, "test.{ext}"])
 | |
| def test_compression(
 | |
|     request,
 | |
|     parser_and_data,
 | |
|     compression_only,
 | |
|     buffer,
 | |
|     filename,
 | |
|     compression_to_extension,
 | |
| ):
 | |
|     parser, data, expected = parser_and_data
 | |
|     compress_type = compression_only
 | |
| 
 | |
|     ext = compression_to_extension[compress_type]
 | |
|     filename = filename if filename is None else filename.format(ext=ext)
 | |
| 
 | |
|     if filename and buffer:
 | |
|         request.applymarker(
 | |
|             pytest.mark.xfail(
 | |
|                 reason="Cannot deduce compression from buffer of compressed data."
 | |
|             )
 | |
|         )
 | |
| 
 | |
|     with tm.ensure_clean(filename=filename) as path:
 | |
|         tm.write_to_compressed(compress_type, path, data)
 | |
|         compression = "infer" if filename else compress_type
 | |
| 
 | |
|         if buffer:
 | |
|             with open(path, "rb") as f:
 | |
|                 result = parser.read_csv(f, compression=compression)
 | |
|         else:
 | |
|             result = parser.read_csv(path, compression=compression)
 | |
| 
 | |
|         tm.assert_frame_equal(result, expected)
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("ext", [None, "gz", "bz2"])
 | |
| def test_infer_compression(all_parsers, csv1, buffer, ext):
 | |
|     # see gh-9770
 | |
|     parser = all_parsers
 | |
|     kwargs = {"index_col": 0, "parse_dates": True}
 | |
| 
 | |
|     expected = parser.read_csv(csv1, **kwargs)
 | |
|     kwargs["compression"] = "infer"
 | |
| 
 | |
|     if buffer:
 | |
|         with open(csv1, encoding="utf-8") as f:
 | |
|             result = parser.read_csv(f, **kwargs)
 | |
|     else:
 | |
|         ext = "." + ext if ext else ""
 | |
|         result = parser.read_csv(csv1 + ext, **kwargs)
 | |
| 
 | |
|     tm.assert_frame_equal(result, expected)
 | |
| 
 | |
| 
 | |
| def test_compression_utf_encoding(all_parsers, csv_dir_path, utf_value, encoding_fmt):
 | |
|     # see gh-18071, gh-24130
 | |
|     parser = all_parsers
 | |
|     encoding = encoding_fmt.format(utf_value)
 | |
|     path = os.path.join(csv_dir_path, f"utf{utf_value}_ex_small.zip")
 | |
| 
 | |
|     result = parser.read_csv(path, encoding=encoding, compression="zip", sep="\t")
 | |
|     expected = DataFrame(
 | |
|         {
 | |
|             "Country": ["Venezuela", "Venezuela"],
 | |
|             "Twitter": ["Hugo Chávez Frías", "Henrique Capriles R."],
 | |
|         }
 | |
|     )
 | |
| 
 | |
|     tm.assert_frame_equal(result, expected)
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize("invalid_compression", ["sfark", "bz3", "zipper"])
 | |
| def test_invalid_compression(all_parsers, invalid_compression):
 | |
|     parser = all_parsers
 | |
|     compress_kwargs = {"compression": invalid_compression}
 | |
| 
 | |
|     msg = f"Unrecognized compression type: {invalid_compression}"
 | |
| 
 | |
|     with pytest.raises(ValueError, match=msg):
 | |
|         parser.read_csv("test_file.zip", **compress_kwargs)
 | |
| 
 | |
| 
 | |
| def test_compression_tar_archive(all_parsers, csv_dir_path):
 | |
|     parser = all_parsers
 | |
|     path = os.path.join(csv_dir_path, "tar_csv.tar.gz")
 | |
|     df = parser.read_csv(path)
 | |
|     assert list(df.columns) == ["a"]
 | |
| 
 | |
| 
 | |
| def test_ignore_compression_extension(all_parsers):
 | |
|     parser = all_parsers
 | |
|     df = DataFrame({"a": [0, 1]})
 | |
|     with tm.ensure_clean("test.csv") as path_csv:
 | |
|         with tm.ensure_clean("test.csv.zip") as path_zip:
 | |
|             # make sure to create un-compressed file with zip extension
 | |
|             df.to_csv(path_csv, index=False)
 | |
|             Path(path_zip).write_text(
 | |
|                 Path(path_csv).read_text(encoding="utf-8"), encoding="utf-8"
 | |
|             )
 | |
| 
 | |
|             tm.assert_frame_equal(parser.read_csv(path_zip, compression=None), df)
 | |
| 
 | |
| 
 | |
| def test_writes_tar_gz(all_parsers):
 | |
|     parser = all_parsers
 | |
|     data = DataFrame(
 | |
|         {
 | |
|             "Country": ["Venezuela", "Venezuela"],
 | |
|             "Twitter": ["Hugo Chávez Frías", "Henrique Capriles R."],
 | |
|         }
 | |
|     )
 | |
|     with tm.ensure_clean("test.tar.gz") as tar_path:
 | |
|         data.to_csv(tar_path, index=False)
 | |
| 
 | |
|         # test that read_csv infers .tar.gz to gzip:
 | |
|         tm.assert_frame_equal(parser.read_csv(tar_path), data)
 | |
| 
 | |
|         # test that file is indeed gzipped:
 | |
|         with tarfile.open(tar_path, "r:gz") as tar:
 | |
|             result = parser.read_csv(
 | |
|                 tar.extractfile(tar.getnames()[0]), compression="infer"
 | |
|             )
 | |
|             tm.assert_frame_equal(result, data)
 |