from __future__ import division, absolute_import, print_function import sys import gzip import os import threading from tempfile import NamedTemporaryFile import time import warnings import gc import io from io import BytesIO, StringIO from datetime import datetime import locale import re import numpy as np import numpy.ma as ma from numpy.lib._iotools import ConverterError, ConversionWarning from numpy.compat import asbytes, bytes, unicode, Path from numpy.ma.testutils import assert_equal from numpy.testing import ( run_module_suite, assert_warns, assert_, SkipTest, assert_raises_regex, assert_raises, assert_allclose, assert_array_equal, temppath, tempdir, dec, IS_PYPY, suppress_warnings ) class TextIO(BytesIO): """Helper IO class. Writes encode strings to bytes if needed, reads return bytes. This makes it easier to emulate files opened in binary mode without needing to explicitly convert strings to bytes in setting up the test data. """ def __init__(self, s=""): BytesIO.__init__(self, asbytes(s)) def write(self, s): BytesIO.write(self, asbytes(s)) def writelines(self, lines): BytesIO.writelines(self, [asbytes(s) for s in lines]) MAJVER, MINVER = sys.version_info[:2] IS_64BIT = sys.maxsize > 2**32 try: import bz2 HAS_BZ2 = True except ImportError: HAS_BZ2 = False try: import lzma HAS_LZMA = True except ImportError: HAS_LZMA = False def strptime(s, fmt=None): """ This function is available in the datetime module only from Python >= 2.5. """ if type(s) == bytes: s = s.decode("latin1") return datetime(*time.strptime(s, fmt)[:3]) class RoundtripTest(object): def roundtrip(self, save_func, *args, **kwargs): """ save_func : callable Function used to save arrays to file. file_on_disk : bool If true, store the file on disk, instead of in a string buffer. save_kwds : dict Parameters passed to `save_func`. load_kwds : dict Parameters passed to `numpy.load`. args : tuple of arrays Arrays stored to file. """ save_kwds = kwargs.get('save_kwds', {}) load_kwds = kwargs.get('load_kwds', {}) file_on_disk = kwargs.get('file_on_disk', False) if file_on_disk: target_file = NamedTemporaryFile(delete=False) load_file = target_file.name else: target_file = BytesIO() load_file = target_file try: arr = args save_func(target_file, *arr, **save_kwds) target_file.flush() target_file.seek(0) if sys.platform == 'win32' and not isinstance(target_file, BytesIO): target_file.close() arr_reloaded = np.load(load_file, **load_kwds) self.arr = arr self.arr_reloaded = arr_reloaded finally: if not isinstance(target_file, BytesIO): target_file.close() # holds an open file descriptor so it can't be deleted on win if 'arr_reloaded' in locals(): if not isinstance(arr_reloaded, np.lib.npyio.NpzFile): os.remove(target_file.name) def check_roundtrips(self, a): self.roundtrip(a) self.roundtrip(a, file_on_disk=True) self.roundtrip(np.asfortranarray(a)) self.roundtrip(np.asfortranarray(a), file_on_disk=True) if a.shape[0] > 1: # neither C nor Fortran contiguous for 2D arrays or more self.roundtrip(np.asfortranarray(a)[1:]) self.roundtrip(np.asfortranarray(a)[1:], file_on_disk=True) def test_array(self): a = np.array([], float) self.check_roundtrips(a) a = np.array([[1, 2], [3, 4]], float) self.check_roundtrips(a) a = np.array([[1, 2], [3, 4]], int) self.check_roundtrips(a) a = np.array([[1 + 5j, 2 + 6j], [3 + 7j, 4 + 8j]], dtype=np.csingle) self.check_roundtrips(a) a = np.array([[1 + 5j, 2 + 6j], [3 + 7j, 4 + 8j]], dtype=np.cdouble) self.check_roundtrips(a) def test_array_object(self): a = np.array([], object) self.check_roundtrips(a) a = np.array([[1, 2], [3, 4]], object) self.check_roundtrips(a) def test_1D(self): a = np.array([1, 2, 3, 4], int) self.roundtrip(a) @dec.knownfailureif(sys.platform == 'win32', "Fail on Win32") def test_mmap(self): a = np.array([[1, 2.5], [4, 7.3]]) self.roundtrip(a, file_on_disk=True, load_kwds={'mmap_mode': 'r'}) a = np.asfortranarray([[1, 2.5], [4, 7.3]]) self.roundtrip(a, file_on_disk=True, load_kwds={'mmap_mode': 'r'}) def test_record(self): a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')]) self.check_roundtrips(a) @dec.slow def test_format_2_0(self): dt = [(("%d" % i) * 100, float) for i in range(500)] a = np.ones(1000, dtype=dt) with warnings.catch_warnings(record=True): warnings.filterwarnings('always', '', UserWarning) self.check_roundtrips(a) class TestSaveLoad(RoundtripTest): def roundtrip(self, *args, **kwargs): RoundtripTest.roundtrip(self, np.save, *args, **kwargs) assert_equal(self.arr[0], self.arr_reloaded) assert_equal(self.arr[0].dtype, self.arr_reloaded.dtype) assert_equal(self.arr[0].flags.fnc, self.arr_reloaded.flags.fnc) class TestSavezLoad(RoundtripTest): def roundtrip(self, *args, **kwargs): RoundtripTest.roundtrip(self, np.savez, *args, **kwargs) try: for n, arr in enumerate(self.arr): reloaded = self.arr_reloaded['arr_%d' % n] assert_equal(arr, reloaded) assert_equal(arr.dtype, reloaded.dtype) assert_equal(arr.flags.fnc, reloaded.flags.fnc) finally: # delete tempfile, must be done here on windows if self.arr_reloaded.fid: self.arr_reloaded.fid.close() os.remove(self.arr_reloaded.fid.name) @dec.skipif(not IS_64BIT, "Works only with 64bit systems") @dec.slow def test_big_arrays(self): L = (1 << 31) + 100000 a = np.empty(L, dtype=np.uint8) with temppath(prefix="numpy_test_big_arrays_", suffix=".npz") as tmp: np.savez(tmp, a=a) del a npfile = np.load(tmp) a = npfile['a'] # Should succeed npfile.close() del a # Avoid pyflakes unused variable warning. def test_multiple_arrays(self): a = np.array([[1, 2], [3, 4]], float) b = np.array([[1 + 2j, 2 + 7j], [3 - 6j, 4 + 12j]], complex) self.roundtrip(a, b) def test_named_arrays(self): a = np.array([[1, 2], [3, 4]], float) b = np.array([[1 + 2j, 2 + 7j], [3 - 6j, 4 + 12j]], complex) c = BytesIO() np.savez(c, file_a=a, file_b=b) c.seek(0) l = np.load(c) assert_equal(a, l['file_a']) assert_equal(b, l['file_b']) def test_BagObj(self): a = np.array([[1, 2], [3, 4]], float) b = np.array([[1 + 2j, 2 + 7j], [3 - 6j, 4 + 12j]], complex) c = BytesIO() np.savez(c, file_a=a, file_b=b) c.seek(0) l = np.load(c) assert_equal(sorted(dir(l.f)), ['file_a','file_b']) assert_equal(a, l.f.file_a) assert_equal(b, l.f.file_b) def test_savez_filename_clashes(self): # Test that issue #852 is fixed # and savez functions in multithreaded environment def writer(error_list): with temppath(suffix='.npz') as tmp: arr = np.random.randn(500, 500) try: np.savez(tmp, arr=arr) except OSError as err: error_list.append(err) errors = [] threads = [threading.Thread(target=writer, args=(errors,)) for j in range(3)] for t in threads: t.start() for t in threads: t.join() if errors: raise AssertionError(errors) def test_not_closing_opened_fid(self): # Test that issue #2178 is fixed: # verify could seek on 'loaded' file with temppath(suffix='.npz') as tmp: with open(tmp, 'wb') as fp: np.savez(fp, data='LOVELY LOAD') with open(tmp, 'rb', 10000) as fp: fp.seek(0) assert_(not fp.closed) np.load(fp)['data'] # fp must not get closed by .load assert_(not fp.closed) fp.seek(0) assert_(not fp.closed) @dec.skipif(IS_PYPY, "context manager required on PyPy") def test_closing_fid(self): # Test that issue #1517 (too many opened files) remains closed # It might be a "weak" test since failed to get triggered on # e.g. Debian sid of 2012 Jul 05 but was reported to # trigger the failure on Ubuntu 10.04: # http://projects.scipy.org/numpy/ticket/1517#comment:2 with temppath(suffix='.npz') as tmp: np.savez(tmp, data='LOVELY LOAD') # We need to check if the garbage collector can properly close # numpy npz file returned by np.load when their reference count # goes to zero. Python 3 running in debug mode raises a # ResourceWarning when file closing is left to the garbage # collector, so we catch the warnings. Because ResourceWarning # is unknown in Python < 3.x, we take the easy way out and # catch all warnings. with suppress_warnings() as sup: sup.filter(Warning) # TODO: specify exact message for i in range(1, 1025): try: np.load(tmp)["data"] except Exception as e: msg = "Failed to load data from a file: %s" % e raise AssertionError(msg) def test_closing_zipfile_after_load(self): # Check that zipfile owns file and can close it. This needs to # pass a file name to load for the test. On windows failure will # cause a second error will be raised when the attempt to remove # the open file is made. prefix = 'numpy_test_closing_zipfile_after_load_' with temppath(suffix='.npz', prefix=prefix) as tmp: np.savez(tmp, lab='place holder') data = np.load(tmp) fp = data.zip.fp data.close() assert_(fp.closed) class TestSaveTxt(object): def test_array(self): a = np.array([[1, 2], [3, 4]], float) fmt = "%.18e" c = BytesIO() np.savetxt(c, a, fmt=fmt) c.seek(0) assert_equal(c.readlines(), [asbytes((fmt + ' ' + fmt + '\n') % (1, 2)), asbytes((fmt + ' ' + fmt + '\n') % (3, 4))]) a = np.array([[1, 2], [3, 4]], int) c = BytesIO() np.savetxt(c, a, fmt='%d') c.seek(0) assert_equal(c.readlines(), [b'1 2\n', b'3 4\n']) def test_1D(self): a = np.array([1, 2, 3, 4], int) c = BytesIO() np.savetxt(c, a, fmt='%d') c.seek(0) lines = c.readlines() assert_equal(lines, [b'1\n', b'2\n', b'3\n', b'4\n']) def test_0D_3D(self): c = BytesIO() assert_raises(ValueError, np.savetxt, c, np.array(1)) assert_raises(ValueError, np.savetxt, c, np.array([[[1], [2]]])) def test_record(self): a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')]) c = BytesIO() np.savetxt(c, a, fmt='%d') c.seek(0) assert_equal(c.readlines(), [b'1 2\n', b'3 4\n']) def test_delimiter(self): a = np.array([[1., 2.], [3., 4.]]) c = BytesIO() np.savetxt(c, a, delimiter=',', fmt='%d') c.seek(0) assert_equal(c.readlines(), [b'1,2\n', b'3,4\n']) def test_format(self): a = np.array([(1, 2), (3, 4)]) c = BytesIO() # Sequence of formats np.savetxt(c, a, fmt=['%02d', '%3.1f']) c.seek(0) assert_equal(c.readlines(), [b'01 2.0\n', b'03 4.0\n']) # A single multiformat string c = BytesIO() np.savetxt(c, a, fmt='%02d : %3.1f') c.seek(0) lines = c.readlines() assert_equal(lines, [b'01 : 2.0\n', b'03 : 4.0\n']) # Specify delimiter, should be overiden c = BytesIO() np.savetxt(c, a, fmt='%02d : %3.1f', delimiter=',') c.seek(0) lines = c.readlines() assert_equal(lines, [b'01 : 2.0\n', b'03 : 4.0\n']) # Bad fmt, should raise a ValueError c = BytesIO() assert_raises(ValueError, np.savetxt, c, a, fmt=99) def test_header_footer(self): # Test the functionality of the header and footer keyword argument. c = BytesIO() a = np.array([(1, 2), (3, 4)], dtype=int) test_header_footer = 'Test header / footer' # Test the header keyword argument np.savetxt(c, a, fmt='%1d', header=test_header_footer) c.seek(0) assert_equal(c.read(), asbytes('# ' + test_header_footer + '\n1 2\n3 4\n')) # Test the footer keyword argument c = BytesIO() np.savetxt(c, a, fmt='%1d', footer=test_header_footer) c.seek(0) assert_equal(c.read(), asbytes('1 2\n3 4\n# ' + test_header_footer + '\n')) # Test the commentstr keyword argument used on the header c = BytesIO() commentstr = '% ' np.savetxt(c, a, fmt='%1d', header=test_header_footer, comments=commentstr) c.seek(0) assert_equal(c.read(), asbytes(commentstr + test_header_footer + '\n' + '1 2\n3 4\n')) # Test the commentstr keyword argument used on the footer c = BytesIO() commentstr = '% ' np.savetxt(c, a, fmt='%1d', footer=test_header_footer, comments=commentstr) c.seek(0) assert_equal(c.read(), asbytes('1 2\n3 4\n' + commentstr + test_header_footer + '\n')) def test_file_roundtrip(self): with temppath() as name: a = np.array([(1, 2), (3, 4)]) np.savetxt(name, a) b = np.loadtxt(name) assert_array_equal(a, b) def test_complex_arrays(self): ncols = 2 nrows = 2 a = np.zeros((ncols, nrows), dtype=np.complex128) re = np.pi im = np.e a[:] = re + 1.0j * im # One format only c = BytesIO() np.savetxt(c, a, fmt=' %+.3e') c.seek(0) lines = c.readlines() assert_equal( lines, [b' ( +3.142e+00+ +2.718e+00j) ( +3.142e+00+ +2.718e+00j)\n', b' ( +3.142e+00+ +2.718e+00j) ( +3.142e+00+ +2.718e+00j)\n']) # One format for each real and imaginary part c = BytesIO() np.savetxt(c, a, fmt=' %+.3e' * 2 * ncols) c.seek(0) lines = c.readlines() assert_equal( lines, [b' +3.142e+00 +2.718e+00 +3.142e+00 +2.718e+00\n', b' +3.142e+00 +2.718e+00 +3.142e+00 +2.718e+00\n']) # One format for each complex number c = BytesIO() np.savetxt(c, a, fmt=['(%.3e%+.3ej)'] * ncols) c.seek(0) lines = c.readlines() assert_equal( lines, [b'(3.142e+00+2.718e+00j) (3.142e+00+2.718e+00j)\n', b'(3.142e+00+2.718e+00j) (3.142e+00+2.718e+00j)\n']) def test_custom_writer(self): class CustomWriter(list): def write(self, text): self.extend(text.split(b'\n')) w = CustomWriter() a = np.array([(1, 2), (3, 4)]) np.savetxt(w, a) b = np.loadtxt(w) assert_array_equal(a, b) def test_unicode(self): utf8 = b'\xcf\x96'.decode('UTF-8') a = np.array([utf8], dtype=np.unicode) with tempdir() as tmpdir: # set encoding as on windows it may not be unicode even on py3 np.savetxt(os.path.join(tmpdir, 'test.csv'), a, fmt=['%s'], encoding='UTF-8') def test_unicode_roundtrip(self): utf8 = b'\xcf\x96'.decode('UTF-8') a = np.array([utf8], dtype=np.unicode) # our gz wrapper support encoding suffixes = ['', '.gz'] # stdlib 2 versions do not support encoding if MAJVER > 2: if HAS_BZ2: suffixes.append('.bz2') if HAS_LZMA: suffixes.extend(['.xz', '.lzma']) with tempdir() as tmpdir: for suffix in suffixes: np.savetxt(os.path.join(tmpdir, 'test.csv' + suffix), a, fmt=['%s'], encoding='UTF-16-LE') b = np.loadtxt(os.path.join(tmpdir, 'test.csv' + suffix), encoding='UTF-16-LE', dtype=np.unicode) assert_array_equal(a, b) def test_unicode_bytestream(self): utf8 = b'\xcf\x96'.decode('UTF-8') a = np.array([utf8], dtype=np.unicode) s = BytesIO() np.savetxt(s, a, fmt=['%s'], encoding='UTF-8') s.seek(0) assert_equal(s.read().decode('UTF-8'), utf8 + '\n') def test_unicode_stringstream(self): utf8 = b'\xcf\x96'.decode('UTF-8') a = np.array([utf8], dtype=np.unicode) s = StringIO() np.savetxt(s, a, fmt=['%s'], encoding='UTF-8') s.seek(0) assert_equal(s.read(), utf8 + '\n') class LoadTxtBase(object): def check_compressed(self, fopen, suffixes): # Test that we can load data from a compressed file wanted = np.arange(6).reshape((2, 3)) linesep = ('\n', '\r\n', '\r') for sep in linesep: data = '0 1 2' + sep + '3 4 5' for suffix in suffixes: with temppath(suffix=suffix) as name: with fopen(name, mode='wt', encoding='UTF-32-LE') as f: f.write(data) res = self.loadfunc(name, encoding='UTF-32-LE') assert_array_equal(res, wanted) with fopen(name, "rt", encoding='UTF-32-LE') as f: res = self.loadfunc(f) assert_array_equal(res, wanted) # Python2 .open does not support encoding @dec.skipif(MAJVER == 2) def test_compressed_gzip(self): self.check_compressed(gzip.open, ('.gz',)) @dec.skipif(MAJVER == 2 or not HAS_BZ2) def test_compressed_gzip(self): self.check_compressed(bz2.open, ('.bz2',)) @dec.skipif(MAJVER == 2 or not HAS_LZMA) def test_compressed_gzip(self): self.check_compressed(lzma.open, ('.xz', '.lzma')) def test_encoding(self): with temppath() as path: with open(path, "wb") as f: f.write('0.\n1.\n2.'.encode("UTF-16")) x = self.loadfunc(path, encoding="UTF-16") assert_array_equal(x, [0., 1., 2.]) def test_stringload(self): # umlaute nonascii = b'\xc3\xb6\xc3\xbc\xc3\xb6'.decode("UTF-8") with temppath() as path: with open(path, "wb") as f: f.write(nonascii.encode("UTF-16")) x = self.loadfunc(path, encoding="UTF-16", dtype=np.unicode) assert_array_equal(x, nonascii) def test_binary_decode(self): utf16 = b'\xff\xfeh\x04 \x00i\x04 \x00j\x04' v = self.loadfunc(BytesIO(utf16), dtype=np.unicode, encoding='UTF-16') assert_array_equal(v, np.array(utf16.decode('UTF-16').split())) def test_converters_decode(self): # test converters that decode strings c = TextIO() c.write(b'\xcf\x96') c.seek(0) x = self.loadfunc(c, dtype=np.unicode, converters={0: lambda x: x.decode('UTF-8')}) a = np.array([b'\xcf\x96'.decode('UTF-8')]) assert_array_equal(x, a) def test_converters_nodecode(self): # test native string converters enabled by setting an encoding utf8 = b'\xcf\x96'.decode('UTF-8') with temppath() as path: with io.open(path, 'wt', encoding='UTF-8') as f: f.write(utf8) x = self.loadfunc(path, dtype=np.unicode, converters={0: lambda x: x + 't'}, encoding='UTF-8') a = np.array([utf8 + 't']) assert_array_equal(x, a) class TestLoadTxt(LoadTxtBase): loadfunc = staticmethod(np.loadtxt) def setUp(self): # lower chunksize for testing self.orig_chunk = np.lib.npyio._loadtxt_chunksize np.lib.npyio._loadtxt_chunksize = 1 def tearDown(self): np.lib.npyio._loadtxt_chunksize = self.orig_chunk def test_record(self): c = TextIO() c.write('1 2\n3 4') c.seek(0) x = np.loadtxt(c, dtype=[('x', np.int32), ('y', np.int32)]) a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')]) assert_array_equal(x, a) d = TextIO() d.write('M 64.0 75.0\nF 25.0 60.0') d.seek(0) mydescriptor = {'names': ('gender', 'age', 'weight'), 'formats': ('S1', 'i4', 'f4')} b = np.array([('M', 64.0, 75.0), ('F', 25.0, 60.0)], dtype=mydescriptor) y = np.loadtxt(d, dtype=mydescriptor) assert_array_equal(y, b) def test_array(self): c = TextIO() c.write('1 2\n3 4') c.seek(0) x = np.loadtxt(c, dtype=int) a = np.array([[1, 2], [3, 4]], int) assert_array_equal(x, a) c.seek(0) x = np.loadtxt(c, dtype=float) a = np.array([[1, 2], [3, 4]], float) assert_array_equal(x, a) def test_1D(self): c = TextIO() c.write('1\n2\n3\n4\n') c.seek(0) x = np.loadtxt(c, dtype=int) a = np.array([1, 2, 3, 4], int) assert_array_equal(x, a) c = TextIO() c.write('1,2,3,4\n') c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',') a = np.array([1, 2, 3, 4], int) assert_array_equal(x, a) def test_missing(self): c = TextIO() c.write('1,2,3,,5\n') c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', converters={3: lambda s: int(s or - 999)}) a = np.array([1, 2, 3, -999, 5], int) assert_array_equal(x, a) def test_converters_with_usecols(self): c = TextIO() c.write('1,2,3,,5\n6,7,8,9,10\n') c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', converters={3: lambda s: int(s or - 999)}, usecols=(1, 3,)) a = np.array([[2, -999], [7, 9]], int) assert_array_equal(x, a) def test_comments_unicode(self): c = TextIO() c.write('# comment\n1,2,3,5\n') c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', comments=u'#') a = np.array([1, 2, 3, 5], int) assert_array_equal(x, a) def test_comments_byte(self): c = TextIO() c.write('# comment\n1,2,3,5\n') c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', comments=b'#') a = np.array([1, 2, 3, 5], int) assert_array_equal(x, a) def test_comments_multiple(self): c = TextIO() c.write('# comment\n1,2,3\n@ comment2\n4,5,6 // comment3') c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', comments=['#', '@', '//']) a = np.array([[1, 2, 3], [4, 5, 6]], int) assert_array_equal(x, a) def test_comments_multi_chars(self): c = TextIO() c.write('/* comment\n1,2,3,5\n') c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', comments='/*') a = np.array([1, 2, 3, 5], int) assert_array_equal(x, a) # Check that '/*' is not transformed to ['/', '*'] c = TextIO() c.write('*/ comment\n1,2,3,5\n') c.seek(0) assert_raises(ValueError, np.loadtxt, c, dtype=int, delimiter=',', comments='/*') def test_skiprows(self): c = TextIO() c.write('comment\n1,2,3,5\n') c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', skiprows=1) a = np.array([1, 2, 3, 5], int) assert_array_equal(x, a) c = TextIO() c.write('# comment\n1,2,3,5\n') c.seek(0) x = np.loadtxt(c, dtype=int, delimiter=',', skiprows=1) a = np.array([1, 2, 3, 5], int) assert_array_equal(x, a) def test_usecols(self): a = np.array([[1, 2], [3, 4]], float) c = BytesIO() np.savetxt(c, a) c.seek(0) x = np.loadtxt(c, dtype=float, usecols=(1,)) assert_array_equal(x, a[:, 1]) a = np.array([[1, 2, 3], [3, 4, 5]], float) c = BytesIO() np.savetxt(c, a) c.seek(0) x = np.loadtxt(c, dtype=float, usecols=(1, 2)) assert_array_equal(x, a[:, 1:]) # Testing with arrays instead of tuples. c.seek(0) x = np.loadtxt(c, dtype=float, usecols=np.array([1, 2])) assert_array_equal(x, a[:, 1:]) # Testing with an integer instead of a sequence for int_type in [int, np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64]: to_read = int_type(1) c.seek(0) x = np.loadtxt(c, dtype=float, usecols=to_read) assert_array_equal(x, a[:, 1]) # Testing with some crazy custom integer type class CrazyInt(object): def __index__(self): return 1 crazy_int = CrazyInt() c.seek(0) x = np.loadtxt(c, dtype=float, usecols=crazy_int) assert_array_equal(x, a[:, 1]) c.seek(0) x = np.loadtxt(c, dtype=float, usecols=(crazy_int,)) assert_array_equal(x, a[:, 1]) # Checking with dtypes defined converters. data = '''JOE 70.1 25.3 BOB 60.5 27.9 ''' c = TextIO(data) names = ['stid', 'temp'] dtypes = ['S4', 'f8'] arr = np.loadtxt(c, usecols=(0, 2), dtype=list(zip(names, dtypes))) assert_equal(arr['stid'], [b"JOE", b"BOB"]) assert_equal(arr['temp'], [25.3, 27.9]) # Testing non-ints in usecols c.seek(0) bogus_idx = 1.5 assert_raises_regex( TypeError, '^usecols must be.*%s' % type(bogus_idx), np.loadtxt, c, usecols=bogus_idx ) assert_raises_regex( TypeError, '^usecols must be.*%s' % type(bogus_idx), np.loadtxt, c, usecols=[0, bogus_idx, 0] ) def test_fancy_dtype(self): c = TextIO() c.write('1,2,3.0\n4,5,6.0\n') c.seek(0) dt = np.dtype([('x', int), ('y', [('t', int), ('s', float)])]) x = np.loadtxt(c, dtype=dt, delimiter=',') a = np.array([(1, (2, 3.0)), (4, (5, 6.0))], dt) assert_array_equal(x, a) def test_shaped_dtype(self): c = TextIO("aaaa 1.0 8.0 1 2 3 4 5 6") dt = np.dtype([('name', 'S4'), ('x', float), ('y', float), ('block', int, (2, 3))]) x = np.loadtxt(c, dtype=dt) a = np.array([('aaaa', 1.0, 8.0, [[1, 2, 3], [4, 5, 6]])], dtype=dt) assert_array_equal(x, a) def test_3d_shaped_dtype(self): c = TextIO("aaaa 1.0 8.0 1 2 3 4 5 6 7 8 9 10 11 12") dt = np.dtype([('name', 'S4'), ('x', float), ('y', float), ('block', int, (2, 2, 3))]) x = np.loadtxt(c, dtype=dt) a = np.array([('aaaa', 1.0, 8.0, [[[1, 2, 3], [4, 5, 6]], [[7, 8, 9], [10, 11, 12]]])], dtype=dt) assert_array_equal(x, a) def test_str_dtype(self): # see gh-8033 c = ["str1", "str2"] for dt in (str, np.bytes_): a = np.array(["str1", "str2"], dtype=dt) x = np.loadtxt(c, dtype=dt) assert_array_equal(x, a) def test_empty_file(self): with suppress_warnings() as sup: sup.filter(message="loadtxt: Empty input file:") c = TextIO() x = np.loadtxt(c) assert_equal(x.shape, (0,)) x = np.loadtxt(c, dtype=np.int64) assert_equal(x.shape, (0,)) assert_(x.dtype == np.int64) def test_unused_converter(self): c = TextIO() c.writelines(['1 21\n', '3 42\n']) c.seek(0) data = np.loadtxt(c, usecols=(1,), converters={0: lambda s: int(s, 16)}) assert_array_equal(data, [21, 42]) c.seek(0) data = np.loadtxt(c, usecols=(1,), converters={1: lambda s: int(s, 16)}) assert_array_equal(data, [33, 66]) def test_dtype_with_object(self): # Test using an explicit dtype with an object data = """ 1; 2001-01-01 2; 2002-01-31 """ ndtype = [('idx', int), ('code', object)] func = lambda s: strptime(s.strip(), "%Y-%m-%d") converters = {1: func} test = np.loadtxt(TextIO(data), delimiter=";", dtype=ndtype, converters=converters) control = np.array( [(1, datetime(2001, 1, 1)), (2, datetime(2002, 1, 31))], dtype=ndtype) assert_equal(test, control) def test_uint64_type(self): tgt = (9223372043271415339, 9223372043271415853) c = TextIO() c.write("%s %s" % tgt) c.seek(0) res = np.loadtxt(c, dtype=np.uint64) assert_equal(res, tgt) def test_int64_type(self): tgt = (-9223372036854775807, 9223372036854775807) c = TextIO() c.write("%s %s" % tgt) c.seek(0) res = np.loadtxt(c, dtype=np.int64) assert_equal(res, tgt) def test_from_float_hex(self): # IEEE doubles and floats only, otherwise the float32 # conversion may fail. tgt = np.logspace(-10, 10, 5).astype(np.float32) tgt = np.hstack((tgt, -tgt)).astype(float) inp = '\n'.join(map(float.hex, tgt)) c = TextIO() c.write(inp) for dt in [float, np.float32]: c.seek(0) res = np.loadtxt(c, dtype=dt) assert_equal(res, tgt, err_msg="%s" % dt) def test_from_complex(self): tgt = (complex(1, 1), complex(1, -1)) c = TextIO() c.write("%s %s" % tgt) c.seek(0) res = np.loadtxt(c, dtype=complex) assert_equal(res, tgt) def test_universal_newline(self): with temppath() as name: with open(name, 'w') as f: f.write('1 21\r3 42\r') data = np.loadtxt(name) assert_array_equal(data, [[1, 21], [3, 42]]) def test_empty_field_after_tab(self): c = TextIO() c.write('1 \t2 \t3\tstart \n4\t5\t6\t \n7\t8\t9.5\t') c.seek(0) dt = {'names': ('x', 'y', 'z', 'comment'), 'formats': (' unicode upcast utf8 = u'\u03d6' latin1 = u'\xf6\xfc\xf6' # skip test if cannot encode utf8 test string with preferred # encoding. The preferred encoding is assumed to be the default # encoding of io.open. Will need to change this for PyTest, maybe # using pytest.mark.xfail(raises=***). try: import locale encoding = locale.getpreferredencoding() utf8.encode(encoding) except (UnicodeError, ImportError): raise SkipTest('Skipping test_utf8_file_nodtype_unicode, ' 'unable to encode utf8 in preferred encoding') with temppath() as path: with io.open(path, "wt") as f: f.write(u"norm1,norm2,norm3\n") f.write(u"norm1," + latin1 + u",norm3\n") f.write(u"test1,testNonethe" + utf8 + u",test3\n") with warnings.catch_warnings(record=True) as w: warnings.filterwarnings('always', '', np.VisibleDeprecationWarning) test = np.genfromtxt(path, dtype=None, comments=None, delimiter=',') # Check for warning when encoding not specified. assert_(w[0].category is np.VisibleDeprecationWarning) ctl = np.array([ ["norm1", "norm2", "norm3"], ["norm1", latin1, "norm3"], ["test1", "testNonethe" + utf8, "test3"]], dtype=np.unicode) assert_array_equal(test, ctl) def test_recfromtxt(self): # data = TextIO('A,B\n0,1\n2,3') kwargs = dict(delimiter=",", missing_values="N/A", names=True) test = np.recfromtxt(data, **kwargs) control = np.array([(0, 1), (2, 3)], dtype=[('A', int), ('B', int)]) assert_(isinstance(test, np.recarray)) assert_equal(test, control) # data = TextIO('A,B\n0,1\n2,N/A') test = np.recfromtxt(data, dtype=None, usemask=True, **kwargs) control = ma.array([(0, 1), (2, -1)], mask=[(False, False), (False, True)], dtype=[('A', int), ('B', int)]) assert_equal(test, control) assert_equal(test.mask, control.mask) assert_equal(test.A, [0, 2]) def test_recfromcsv(self): # data = TextIO('A,B\n0,1\n2,3') kwargs = dict(missing_values="N/A", names=True, case_sensitive=True) test = np.recfromcsv(data, dtype=None, **kwargs) control = np.array([(0, 1), (2, 3)], dtype=[('A', int), ('B', int)]) assert_(isinstance(test, np.recarray)) assert_equal(test, control) # data = TextIO('A,B\n0,1\n2,N/A') test = np.recfromcsv(data, dtype=None, usemask=True, **kwargs) control = ma.array([(0, 1), (2, -1)], mask=[(False, False), (False, True)], dtype=[('A', int), ('B', int)]) assert_equal(test, control) assert_equal(test.mask, control.mask) assert_equal(test.A, [0, 2]) # data = TextIO('A,B\n0,1\n2,3') test = np.recfromcsv(data, missing_values='N/A',) control = np.array([(0, 1), (2, 3)], dtype=[('a', int), ('b', int)]) assert_(isinstance(test, np.recarray)) assert_equal(test, control) # data = TextIO('A,B\n0,1\n2,3') dtype = [('a', int), ('b', float)] test = np.recfromcsv(data, missing_values='N/A', dtype=dtype) control = np.array([(0, 1), (2, 3)], dtype=dtype) assert_(isinstance(test, np.recarray)) assert_equal(test, control) #gh-10394 data = TextIO('color\n"red"\n"blue"') test = np.recfromcsv(data, converters={0: lambda x: x.strip(b'\"')}) control = np.array([('red',), ('blue',)], dtype=[('color', (bytes, 4))]) assert_equal(test.dtype, control.dtype) assert_equal(test, control) def test_max_rows(self): # Test the `max_rows` keyword argument. data = '1 2\n3 4\n5 6\n7 8\n9 10\n' txt = TextIO(data) a1 = np.genfromtxt(txt, max_rows=3) a2 = np.genfromtxt(txt) assert_equal(a1, [[1, 2], [3, 4], [5, 6]]) assert_equal(a2, [[7, 8], [9, 10]]) # max_rows must be at least 1. assert_raises(ValueError, np.genfromtxt, TextIO(data), max_rows=0) # An input with several invalid rows. data = '1 1\n2 2\n0 \n3 3\n4 4\n5 \n6 \n7 \n' test = np.genfromtxt(TextIO(data), max_rows=2) control = np.array([[1., 1.], [2., 2.]]) assert_equal(test, control) # Test keywords conflict assert_raises(ValueError, np.genfromtxt, TextIO(data), skip_footer=1, max_rows=4) # Test with invalid value assert_raises(ValueError, np.genfromtxt, TextIO(data), max_rows=4) # Test with invalid not raise with suppress_warnings() as sup: sup.filter(ConversionWarning) test = np.genfromtxt(TextIO(data), max_rows=4, invalid_raise=False) control = np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]]) assert_equal(test, control) test = np.genfromtxt(TextIO(data), max_rows=5, invalid_raise=False) control = np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]]) assert_equal(test, control) # Structured array with field names. data = 'a b\n#c d\n1 1\n2 2\n#0 \n3 3\n4 4\n5 5\n' # Test with header, names and comments txt = TextIO(data) test = np.genfromtxt(txt, skip_header=1, max_rows=3, names=True) control = np.array([(1.0, 1.0), (2.0, 2.0), (3.0, 3.0)], dtype=[('c', ' should convert to float # 2**34 = 17179869184 => should convert to int64 # 2**10 = 1024 => should convert to int (int32 on 32-bit systems, # int64 on 64-bit systems) data = TextIO('73786976294838206464 17179869184 1024') test = np.ndfromtxt(data, dtype=None) assert_equal(test.dtype.names, ['f0', 'f1', 'f2']) assert_(test.dtype['f0'] == float) assert_(test.dtype['f1'] == np.int64) assert_(test.dtype['f2'] == np.integer) assert_allclose(test['f0'], 73786976294838206464.) assert_equal(test['f1'], 17179869184) assert_equal(test['f2'], 1024) class TestPathUsage(object): # Test that pathlib.Path can be used @dec.skipif(Path is None, "No pathlib.Path") def test_loadtxt(self): with temppath(suffix='.txt') as path: path = Path(path) a = np.array([[1.1, 2], [3, 4]]) np.savetxt(path, a) x = np.loadtxt(path) assert_array_equal(x, a) @dec.skipif(Path is None, "No pathlib.Path") def test_save_load(self): # Test that pathlib.Path instances can be used with savez. with temppath(suffix='.npy') as path: path = Path(path) a = np.array([[1, 2], [3, 4]], int) np.save(path, a) data = np.load(path) assert_array_equal(data, a) @dec.skipif(Path is None, "No pathlib.Path") def test_savez_load(self): # Test that pathlib.Path instances can be used with savez. with temppath(suffix='.npz') as path: path = Path(path) np.savez(path, lab='place holder') with np.load(path) as data: assert_array_equal(data['lab'], 'place holder') @dec.skipif(Path is None, "No pathlib.Path") def test_savez_compressed_load(self): # Test that pathlib.Path instances can be used with savez. with temppath(suffix='.npz') as path: path = Path(path) np.savez_compressed(path, lab='place holder') data = np.load(path) assert_array_equal(data['lab'], 'place holder') data.close() @dec.skipif(Path is None, "No pathlib.Path") def test_genfromtxt(self): with temppath(suffix='.txt') as path: path = Path(path) a = np.array([(1, 2), (3, 4)]) np.savetxt(path, a) data = np.genfromtxt(path) assert_array_equal(a, data) @dec.skipif(Path is None, "No pathlib.Path") def test_ndfromtxt(self): # Test outputing a standard ndarray with temppath(suffix='.txt') as path: path = Path(path) with path.open('w') as f: f.write(u'1 2\n3 4') control = np.array([[1, 2], [3, 4]], dtype=int) test = np.ndfromtxt(path, dtype=int) assert_array_equal(test, control) @dec.skipif(Path is None, "No pathlib.Path") def test_mafromtxt(self): # From `test_fancy_dtype_alt` above with temppath(suffix='.txt') as path: path = Path(path) with path.open('w') as f: f.write(u'1,2,3.0\n4,5,6.0\n') test = np.mafromtxt(path, delimiter=',') control = ma.array([(1.0, 2.0, 3.0), (4.0, 5.0, 6.0)]) assert_equal(test, control) @dec.skipif(Path is None, "No pathlib.Path") def test_recfromtxt(self): with temppath(suffix='.txt') as path: path = Path(path) with path.open('w') as f: f.write(u'A,B\n0,1\n2,3') kwargs = dict(delimiter=",", missing_values="N/A", names=True) test = np.recfromtxt(path, **kwargs) control = np.array([(0, 1), (2, 3)], dtype=[('A', int), ('B', int)]) assert_(isinstance(test, np.recarray)) assert_equal(test, control) @dec.skipif(Path is None, "No pathlib.Path") def test_recfromcsv(self): with temppath(suffix='.txt') as path: path = Path(path) with path.open('w') as f: f.write(u'A,B\n0,1\n2,3') kwargs = dict(missing_values="N/A", names=True, case_sensitive=True) test = np.recfromcsv(path, dtype=None, **kwargs) control = np.array([(0, 1), (2, 3)], dtype=[('A', int), ('B', int)]) assert_(isinstance(test, np.recarray)) assert_equal(test, control) def test_gzip_load(): a = np.random.random((5, 5)) s = BytesIO() f = gzip.GzipFile(fileobj=s, mode="w") np.save(f, a) f.close() s.seek(0) f = gzip.GzipFile(fileobj=s, mode="r") assert_array_equal(np.load(f), a) def test_gzip_loadtxt(): # Thanks to another windows brokeness, we can't use # NamedTemporaryFile: a file created from this function cannot be # reopened by another open call. So we first put the gzipped string # of the test reference array, write it to a securely opened file, # which is then read from by the loadtxt function s = BytesIO() g = gzip.GzipFile(fileobj=s, mode='w') g.write(b'1 2 3\n') g.close() s.seek(0) with temppath(suffix='.gz') as name: with open(name, 'wb') as f: f.write(s.read()) res = np.loadtxt(name) s.close() assert_array_equal(res, [1, 2, 3]) def test_gzip_loadtxt_from_string(): s = BytesIO() f = gzip.GzipFile(fileobj=s, mode="w") f.write(b'1 2 3\n') f.close() s.seek(0) f = gzip.GzipFile(fileobj=s, mode="r") assert_array_equal(np.loadtxt(f), [1, 2, 3]) def test_npzfile_dict(): s = BytesIO() x = np.zeros((3, 3)) y = np.zeros((3, 3)) np.savez(s, x=x, y=y) s.seek(0) z = np.load(s) assert_('x' in z) assert_('y' in z) assert_('x' in z.keys()) assert_('y' in z.keys()) for f, a in z.items(): assert_(f in ['x', 'y']) assert_equal(a.shape, (3, 3)) assert_(len(z.items()) == 2) for f in z: assert_(f in ['x', 'y']) assert_('x' in z.keys()) def test_load_refcount(): # Check that objects returned by np.load are directly freed based on # their refcount, rather than needing the gc to collect them. f = BytesIO() np.savez(f, [1, 2, 3]) f.seek(0) assert_(gc.isenabled()) gc.disable() try: gc.collect() np.load(f) # gc.collect returns the number of unreachable objects in cycles that # were found -- we are checking that no cycles were created by np.load n_objects_in_cycles = gc.collect() finally: gc.enable() assert_equal(n_objects_in_cycles, 0) if __name__ == "__main__": run_module_suite()