diff --git a/dev-requirements.txt b/dev-requirements.txt index 9069d8f..2046010 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -8,6 +8,7 @@ nose>=1.3.7,<2.0.0 manage.py>=0.2.10,<0.3.0 pkutils>=3.0.0,<4.0.0 pylint>=2.5.0,<3.0.0 +pytest>=8,<9 responses>=0.9.0,<0.15.0 setuptools>=42.0.2 tox>=3.14.3,<4.0.0 diff --git a/tests/test_fntools.py b/tests/test_fntools.py index 45e5f0d..3c5348c 100644 --- a/tests/test_fntools.py +++ b/tests/test_fntools.py @@ -6,7 +6,6 @@ Provides main unit tests. """ -import nose.tools as nt import itertools as it import requests import responses @@ -14,6 +13,8 @@ from io import StringIO from operator import itemgetter +import pytest + from meza import fntools as ft, io, stats @@ -26,77 +27,77 @@ def setup_module(): class TestIterStringIO: def test_strip(self): - nt.assert_equal("2123.45", ft.strip("2,123.45")) + assert "2123.45" == ft.strip("2,123.45") parsed = ft.strip("2.123,45", thousand_sep=".", decimal_sep=",") - nt.assert_equal("2123.45", parsed) - nt.assert_equal("spam", ft.strip("spam")) + assert "2123.45" == parsed + assert "spam" == ft.strip("spam") def test_is_numeric(self): - nt.assert_true(ft.is_numeric("2,123.45")) - nt.assert_true(ft.is_numeric("2.123,45")) - nt.assert_true(ft.is_numeric("0.45")) - nt.assert_true(ft.is_numeric(1)) - nt.assert_true(ft.is_numeric("10e5")) - nt.assert_false(ft.is_numeric("spam")) - nt.assert_false(ft.is_numeric("02139")) - nt.assert_true(ft.is_numeric("02139", strip_zeros=True)) - nt.assert_false(ft.is_numeric("spam")) - nt.assert_false(ft.is_numeric(None)) - nt.assert_false(ft.is_numeric("")) + assert ft.is_numeric("2,123.45") + assert ft.is_numeric("2.123,45") + assert ft.is_numeric("0.45") + assert ft.is_numeric(1) + assert ft.is_numeric("10e5") + assert not ft.is_numeric("spam") + assert not ft.is_numeric("02139") + assert ft.is_numeric("02139", strip_zeros=True) + assert not ft.is_numeric("spam") + assert not ft.is_numeric(None) + assert not ft.is_numeric("") def test_is_numeric_currency_zero_value(self): """Regression test for https://github.com/reubano/meza/issues/36""" for sym in ft.CURRENCIES: - nt.assert_true(ft.is_numeric(f"0{sym}")) - nt.assert_true(ft.is_numeric(f"{sym}0")) + assert ft.is_numeric(f"0{sym}") + assert ft.is_numeric(f"{sym}0") def test_is_int(self): - nt.assert_false(ft.is_int("5/4/82")) + assert not ft.is_int("5/4/82") def test_is_bool(self): - nt.assert_true(ft.is_bool("y")) - nt.assert_true(ft.is_bool(1)) - nt.assert_true(ft.is_bool(False)) - nt.assert_true(ft.is_bool("false")) - nt.assert_true(ft.is_bool("n")) - nt.assert_true(ft.is_bool(0)) - nt.assert_false(ft.is_bool("")) - nt.assert_false(ft.is_bool(None)) + assert ft.is_bool("y") + assert ft.is_bool(1) + assert ft.is_bool(False) + assert ft.is_bool("false") + assert ft.is_bool("n") + assert ft.is_bool(0) + assert not ft.is_bool("") + assert not ft.is_bool(None) def test_is_null(self): - nt.assert_false(ft.is_null("")) - nt.assert_false(ft.is_null(" ")) - nt.assert_false(ft.is_null(False)) - nt.assert_false(ft.is_null("0")) - nt.assert_false(ft.is_null(0)) - nt.assert_true(ft.is_null("", blanks_as_nulls=True)) - nt.assert_true(ft.is_null(" ", blanks_as_nulls=True)) + assert not ft.is_null("") + assert not ft.is_null(" ") + assert not ft.is_null(False) + assert not ft.is_null("0") + assert not ft.is_null(0) + assert ft.is_null("", blanks_as_nulls=True) + assert ft.is_null(" ", blanks_as_nulls=True) def test_byte_array(self): content = "Iñtërnâtiônàližætiøn" expected = bytearray("Iñtërnâtiônàližætiøn".encode("utf-8")) - nt.assert_equal(expected, ft.byte(content)) - nt.assert_equal(expected, ft.byte(iter(content))) - nt.assert_equal(expected, ft.byte(list(content))) + assert expected == ft.byte(content) + assert expected == ft.byte(iter(content)) + assert expected == ft.byte(list(content)) def test_afterish(self): - nt.assert_equal(-1, ft.afterish("1001", ".")) - nt.assert_equal(3, ft.afterish("1,001")) - nt.assert_equal(3, ft.afterish("2,100,001.00")) - nt.assert_equal(2, ft.afterish("1,000.00", ".")) + assert -1 == ft.afterish("1001", ".") + assert 3 == ft.afterish("1,001") + assert 3 == ft.afterish("2,100,001.00") + assert 2 == ft.afterish("1,000.00", ".") - with nt.assert_raises(ValueError): + with pytest.raises(ValueError): ft.afterish("eggs", ".") def test_get_separators(self): expected = {"thousand_sep": ",", "decimal_sep": "."} - nt.assert_equal(expected, ft.get_separators("2,123.45")) + assert expected == ft.get_separators("2,123.45") expected = {"thousand_sep": ".", "decimal_sep": ","} - nt.assert_equal(expected, ft.get_separators("2.123,45")) + assert expected == ft.get_separators("2.123,45") - with nt.assert_raises(ValueError): + with pytest.raises(ValueError): ft.get_separators("spam") def test_fill(self): @@ -107,16 +108,16 @@ def test_fill(self): previous = {} current = next(records) expected = {"column_a": "1", "column_b": "27", "column_c": ""} - nt.assert_equal(expected, current) + assert expected == current length = len(current) filled = ft.fill(previous, current, value=0) previous = dict(it.islice(filled, length)) count = next(filled) - nt.assert_equal(count, {"column_a": 0, "column_b": 0, "column_c": 1}) + assert count == {"column_a": 0, "column_b": 0, "column_c": 1} expected = {"column_a": "1", "column_b": "27", "column_c": 0} - nt.assert_equal(expected, previous) + assert expected == previous current = next(records) @@ -126,13 +127,13 @@ def test_fill(self): "column_c": None, } - nt.assert_equal(expected, current) + assert expected == current kwargs = {"fill_key": "column_b", "count": count} filled = ft.fill(previous, current, **kwargs) previous = dict(it.islice(filled, length)) count = next(filled) - nt.assert_equal({"column_a": 1, "column_b": 0, "column_c": 2}, count) + assert {"column_a": 1, "column_b": 0, "column_c": 2} == count expected = { "column_a": "too short!", @@ -140,13 +141,13 @@ def test_fill(self): "column_c": "too short!", } - nt.assert_equal(expected, previous) + assert expected == previous @responses.activate def test_chunk(self): content = io.StringIO("Iñtërnâtiônàližætiøn") - nt.assert_equal("Iñtër", next(ft.chunk(content, 5))) - nt.assert_equal("nâtiônàližætiøn", next(ft.chunk(content))) + assert "Iñtër" == next(ft.chunk(content, 5)) + assert "nâtiônàližætiøn" == next(ft.chunk(content)) url = "http://google.com" body = '' @@ -157,8 +158,8 @@ def test_chunk(self): # The chunk size is the number of bytes it should read into # memory. This is not necessarily the length of each item returned # as decoding can take place. - nt.assert_equal(20, len(next(ft.chunk(r.iter_content, 20, 29, 200)))) - nt.assert_equal(55, len(next(ft.chunk(r.iter_content)))) + assert 20 == len(next(ft.chunk(r.iter_content, 20, 29, 200))) + assert 55 == len(next(ft.chunk(r.iter_content))) def test_combine(self): records = [{"a": 1, "b": 2, "c": 3}, {"b": 4, "c": 5, "d": 6}] @@ -166,37 +167,37 @@ def test_combine(self): # Combine all keys pred = lambda key: True x, y = records[0], records[1] - nt.assert_equal(1, ft.combine(x, y, "a", pred=pred, op=sum)) - nt.assert_equal(6, ft.combine(x, y, "b", pred=pred, op=sum)) - nt.assert_equal(8, ft.combine(x, y, "c", pred=pred, op=sum)) + assert 1 == ft.combine(x, y, "a", pred=pred, op=sum) + assert 6 == ft.combine(x, y, "b", pred=pred, op=sum) + assert 8 == ft.combine(x, y, "c", pred=pred, op=sum) fltrer = lambda x: x is not None first = lambda x: next(filter(fltrer, x)) kwargs = {"pred": pred, "op": first, "default": None} - nt.assert_equal(2, ft.combine(x, y, "b", **kwargs)) + assert 2 == ft.combine(x, y, "b", **kwargs) kwargs = {"pred": pred, "op": stats.mean, "default": None} - nt.assert_equal(1.0, ft.combine(x, y, "a", **kwargs)) - nt.assert_equal(3.0, ft.combine(x, y, "b", **kwargs)) + assert 1.0 == ft.combine(x, y, "a", **kwargs) + assert 3.0 == ft.combine(x, y, "b", **kwargs) # Only combine key 'b' pred = lambda key: key == "b" - nt.assert_equal(5, ft.combine(x, y, "c", pred=pred, op=sum)) + assert 5 == ft.combine(x, y, "c", pred=pred, op=sum) # Only combine keys that have the same value of 'b' pred = itemgetter("b") - nt.assert_equal(6, ft.combine(x, y, "b", pred=pred, op=sum)) - nt.assert_equal(5, ft.combine(x, y, "c", pred=pred, op=sum)) + assert 6 == ft.combine(x, y, "b", pred=pred, op=sum) + assert 5 == ft.combine(x, y, "c", pred=pred, op=sum) def test_op_everseen(self): content = [4, 6, 3, 8, 2, 1] expected = [4, 4, 3, 3, 2, 1] - nt.assert_equal(expected, list(ft.op_everseen(content, pad=True))) - nt.assert_equal([4, 6, 8], list(ft.op_everseen(content, op="gt"))) + assert expected == list(ft.op_everseen(content, pad=True)) + assert [4, 6, 8] == list(ft.op_everseen(content, op="gt")) def test_objectify(self): kwargs = {"one": "1", "two": "2"} kw = ft.Objectify(kwargs, func=int) - nt.assert_equal(kw.one, 1) - nt.assert_equal(kw["two"], 2) - nt.assert_equal(dict(kw), {"one": 1, "two": 2}) + assert kw.one == 1 + assert kw["two"] == 2 + assert dict(kw) == {"one": 1, "two": 2} diff --git a/tests/test_io.py b/tests/test_io.py index 17b67c9..b8e0b64 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -18,8 +18,8 @@ import requests import responses -import nose.tools as nt import pygogo as gogo +import pytest from meza import io, convert as cv, DATA_DIR @@ -44,67 +44,67 @@ def func(filepath): class TestIterStringIO: """Unit tests for IterStringIO""" - def __init__(self): + def setup_method(self): self.phrase = io.IterStringIO(iter("Hello World")) self.text = io.IterStringIO("line one\nline two\nline three\n") self.ints = io.IterStringIO("0123456789", 5) def test_lines(self): """Test for reading lines""" - nt.assert_equal(bytearray(b"Hello"), self.phrase.read(5)) + assert bytearray(b"Hello") == self.phrase.read(5) self.phrase.write(iter("ly person")) - nt.assert_equal(bytearray(b" Worldly"), self.phrase.read(8)) + assert bytearray(b" Worldly") == self.phrase.read(8) self.phrase.write(": Iñtërnâtiônàližætiøn") expected = bytearray(" person: Iñtërnâtiônàližætiøn".encode("utf-8")) - nt.assert_equal(expected, self.phrase.read()) + assert expected == self.phrase.read() - nt.assert_equal(bytearray(b"line one"), self.text.readline()) - nt.assert_equal(bytearray(b"line two"), next(self.text)) + assert bytearray(b"line one") == self.text.readline() + assert bytearray(b"line two") == next(self.text) self.text.seek(0) - nt.assert_equal(bytearray(b"line one"), next(self.text)) - nt.assert_equal(bytearray(b"line two"), next(self.text)) - nt.assert_equal(16, self.text.tell()) + assert bytearray(b"line one") == next(self.text) + assert bytearray(b"line two") == next(self.text) + assert 16 == self.text.tell() self.text.seek(0) lines = list(self.text.readlines()) - nt.assert_equal(bytearray(b"line three"), lines[2]) + assert bytearray(b"line three") == lines[2] def test_seeking(self): """Test for seeking a file""" - nt.assert_equal(bytearray(b"01234"), self.ints.read(5)) + assert bytearray(b"01234") == self.ints.read(5) self.ints.seek(0) - nt.assert_equal(bytearray(b"0"), self.ints.read(1)) - nt.assert_equal(bytearray(b"1"), self.ints.read(1)) - nt.assert_equal(bytearray(b"2"), self.ints.read(1)) + assert bytearray(b"0") == self.ints.read(1) + assert bytearray(b"1") == self.ints.read(1) + assert bytearray(b"2") == self.ints.read(1) self.ints.seek(3) - nt.assert_equal(bytearray(b"3"), self.ints.read(1)) + assert bytearray(b"3") == self.ints.read(1) self.ints.seek(6) - nt.assert_equal(bytearray(b"6"), self.ints.read(1)) + assert bytearray(b"6") == self.ints.read(1) self.ints.seek(3) - nt.assert_equal(bytearray(b"3"), self.ints.read(1)) + assert bytearray(b"3") == self.ints.read(1) self.ints.seek(3) - nt.assert_equal(bytearray(b"3"), self.ints.read(1)) + assert bytearray(b"3") == self.ints.read(1) self.ints.seek(4) - nt.assert_equal(bytearray(b"4"), self.ints.read(1)) + assert bytearray(b"4") == self.ints.read(1) self.ints.seek(6) - nt.assert_equal(bytearray(b"6"), self.ints.read(1)) + assert bytearray(b"6") == self.ints.read(1) self.ints.seek(0) - nt.assert_equal(bytearray(b"2"), self.ints.read(1)) + assert bytearray(b"2") == self.ints.read(1) class TestUnicodeReader: """Unit tests for unicode support""" - def __init__(self): + def setup_method(self): self.cls_initialized = False self.row1 = {"a": "1", "b": "2", "c": "3"} self.row2 = {"a": "4", "b": "5", "c": "©"} @@ -115,15 +115,15 @@ def test_utf8(self): """Test for reading utf-8 files""" filepath = p.join(io.DATA_DIR, "utf8.csv") records = io.read_csv(filepath, sanitize=True) - nt.assert_equal(self.row1, next(records)) - nt.assert_equal(self.row3, next(records)) + assert self.row1 == next(records) + assert self.row3 == next(records) def test_latin(self): """Test for reading latin-1 files""" filepath = p.join(io.DATA_DIR, "latin1.csv") records = io.read_csv(filepath, encoding="latin-1") - nt.assert_equal(self.row1, next(records)) - nt.assert_equal(self.row2, next(records)) + assert self.row1 == next(records) + assert self.row2 == next(records) def test_windows(self): """Test for reading windows-1252 files""" @@ -133,70 +133,70 @@ def test_windows(self): # 'Windows-1252', you have to open with 'mac-roman' in order # to properly read it records = io.read_csv(filepath, encoding="mac-roman") - nt.assert_equal(self.row1, next(records)) - nt.assert_equal(self.row4, next(records)) + assert self.row1 == next(records) + assert self.row4 == next(records) def test_iso(self): """Test for reading iso-8859-1 files""" filepath = p.join(io.DATA_DIR, "iso88591.csv") records = io.read_csv(filepath, encoding="iso-8859-1") - nt.assert_equal(self.row1, next(records)) - nt.assert_equal(self.row2, next(records)) + assert self.row1 == next(records) + assert self.row2 == next(records) def test_utf16_big(self): """Test for reading utf-16BE files""" filepath = p.join(io.DATA_DIR, "utf16_big.csv") records = io.read_csv(filepath, encoding="utf-16-be") - nt.assert_equal(self.row1, next(records)) - nt.assert_equal(self.row3, next(records)) + assert self.row1 == next(records) + assert self.row3 == next(records) def test_utf16_little(self): """Test for reading utf-16LE files""" filepath = p.join(io.DATA_DIR, "utf16_little.csv") records = io.read_csv(filepath, encoding="utf-16-le") - nt.assert_equal(self.row1, next(records)) - nt.assert_equal(self.row3, next(records)) + assert self.row1 == next(records) + assert self.row3 == next(records) def test_bytes_encoding_detection_latin(self): """Test for detecting the encoding of a latin-1 bytes file""" filepath = p.join(io.DATA_DIR, "latin1.csv") records = io.read_csv(filepath, mode="rb") - nt.assert_equal(self.row1, next(records)) - nt.assert_equal(self.row2, next(records)) + assert self.row1 == next(records) + assert self.row2 == next(records) def test_wrong_encoding_detection_latin(self): """Test for detecting the encoding of a latin-1 file opened in ascii""" filepath = p.join(io.DATA_DIR, "latin1.csv") records = io.read_csv(filepath, encoding="ascii") - nt.assert_equal(self.row1, next(records)) - nt.assert_equal(self.row2, next(records)) + assert self.row1 == next(records) + assert self.row2 == next(records) def test_bytes_encoding_detection_windows(self): """Test for detecting the encoding of a windows-1252 bytes file""" filepath = p.join(io.DATA_DIR, "windows1252.csv") records = io.read_csv(filepath, mode="rb") - nt.assert_equal(self.row1, next(records)) - nt.assert_equal(self.row4, next(records)) + assert self.row1 == next(records) + assert self.row4 == next(records) def test_wrong_encoding_detection_windows(self): """Test for detecting the encoding of a windows file opened in ascii""" filepath = p.join(io.DATA_DIR, "windows1252.csv") records = io.read_csv(filepath, encoding="ascii") - nt.assert_equal(self.row1, next(records)) - nt.assert_equal(self.row4, next(records)) + assert self.row1 == next(records) + assert self.row4 == next(records) def test_kwargs(self): """Test for passing kwargs while reading csv files""" filepath = p.join(io.DATA_DIR, "utf8.csv") kwargs = {"delimiter": ","} records = io.read_csv(filepath, **kwargs) - nt.assert_equal(self.row1, next(records)) + assert self.row1 == next(records) class TestInput: """Unit tests for reading files""" - def __init__(self): + def setup_method(self): self.cls_initialized = False self.sheet0 = { "sparse_data": "Iñtërnâtiônàližætiøn", @@ -235,39 +235,39 @@ def test_newline_json(self): # pylint: disable=R0201 filepath = p.join(io.DATA_DIR, "iris.csv") records = io.read_csv(filepath) json = cv.records2json(records, newline=True) - nt.assert_equal(expected, loads(next(json))) + assert expected == loads(next(json)) filepath = p.join(io.DATA_DIR, "newline.json") records = io.read_json(filepath, newline=True) - nt.assert_equal({"a": 2, "b": 3}, next(records)) + assert {"a": 2, "b": 3} == next(records) def test_xls(self): """Test for reading excel files""" filepath = p.join(io.DATA_DIR, "test.xlsx") records = io.read_xls(filepath, sanitize=True, sheet=0) - nt.assert_equal(self.sheet0, next(records)) + assert self.sheet0 == next(records) records = io.read_xls(filepath, sanitize=True, sheet=1) - nt.assert_equal(self.sheet1, next(records)) + assert self.sheet1 == next(records) kwargs = {"first_row": 1, "first_col": 1} records = io.read_xls(filepath, sanitize=True, sheet=2, **kwargs) - nt.assert_equal(self.sheet0, next(records)) + assert self.sheet0 == next(records) records = io.read_xls(filepath, sanitize=True, sheet=3, **kwargs) - nt.assert_equal(self.sheet1, next(records)) + assert self.sheet1 == next(records) def test_csv(self): """Test for reading csv files""" filepath = p.join(io.DATA_DIR, "no_header_row.csv") records = io.read_csv(filepath, has_header=False) expected = {"column_1": "1", "column_2": "2", "column_3": "3"} - nt.assert_equal(expected, next(records)) + assert expected == next(records) filepath = p.join(io.DATA_DIR, "test_bad.csv") kwargs = {"sanitize": True, "first_row": 1, "first_col": 1} records = io.read_csv(filepath, **kwargs) - nt.assert_equal(self.sheet0_alt, next(records)) + assert self.sheet0_alt == next(records) filepath = p.join(io.DATA_DIR, "fixed_w_header.txt") widths = [0, 18, 29, 33, 38, 50] @@ -281,7 +281,7 @@ def test_csv(self): "Timestamp": "04:14:001971-01-01T04:14:00", } - nt.assert_equal(expected, next(records)) + assert expected == next(records) def test_csv_last_row(self): """Test for reading csv files with last_row option""" @@ -295,16 +295,16 @@ def test_csv_last_row(self): } records = list(io.read_csv(filepath)) - nt.assert_equal(expected, records[0]) - nt.assert_equal(150, len(records)) + assert expected == records[0] + assert 150 == len(records) records = list(io.read_csv(filepath, last_row=10)) - nt.assert_equal(expected, records[0]) - nt.assert_equal(10, len(records)) + assert expected == records[0] + assert 10 == len(records) records = list(io.read_csv(filepath, last_row=-50)) - nt.assert_equal(expected, records[0]) - nt.assert_equal(100, len(records)) + assert expected == records[0] + assert 100 == len(records) def test_dbf(self): # pylint: disable=R0201 """Test for reading dbf files""" @@ -327,16 +327,16 @@ def test_dbf(self): # pylint: disable=R0201 "intptlon10": "-092.9323194", } - nt.assert_equal(expected, next(records)) + assert expected == next(records) def test_vertical_table(self): # pylint: disable=R0201 """Test for reading a vertical html table""" filepath = p.join(io.DATA_DIR, "vertical_table.html") records = io.read_html(filepath, vertical=True) - nt.assert_equal("See IBM products", next(records)["Products"]) + assert "See IBM products" == next(records)["Products"] records = io.read_html(filepath, vertical=True, table=2) - with nt.assert_raises(StopIteration): + with pytest.raises(StopIteration): next(records) def test_excel_html_export(self): # pylint: disable=R0201 @@ -351,13 +351,13 @@ def test_excel_html_export(self): # pylint: disable=R0201 "unicode_test": "Ādam", } - nt.assert_equal(expected, next(records)) + assert expected == next(records) def test_get_reader(self): # pylint: disable=R0201 """Test for reading a file via the reader selector""" - nt.assert_true(callable(io.get_reader("csv"))) + assert callable(io.get_reader("csv")) - with nt.assert_raises(KeyError): + with pytest.raises(KeyError): io.get_reader("") def test_opened_files(self): @@ -366,21 +366,21 @@ def test_opened_files(self): with open(filepath, encoding="utf-8") as f: records = io.read_csv(f, sanitize=True) # pylint: disable=W0212 - nt.assert_equal(self.sheet0_alt, next(records)) + assert self.sheet0_alt == next(records) f = open(filepath, encoding="utf-8") try: records = io.read_csv(f, sanitize=True) - nt.assert_equal(self.sheet0_alt, next(records)) + assert self.sheet0_alt == next(records) finally: f.close() - f = open(filepath, "rU", newline=None) + f = open(filepath, newline=None) try: records = io.read_csv(f, sanitize=True) - nt.assert_equal(self.sheet0_alt, next(records)) + assert self.sheet0_alt == next(records) finally: f.close() @@ -388,13 +388,13 @@ def test_opened_files(self): with open(filepath, "r+b") as f: records = io.read_xls(f, sanitize=True, sheet=0) - nt.assert_equal(self.sheet0, next(records)) + assert self.sheet0 == next(records) f = open(filepath, "r+b") try: records = io.read_xls(f, sanitize=True, sheet=0) - nt.assert_equal(self.sheet0, next(records)) + assert self.sheet0 == next(records) finally: f.close() @@ -403,15 +403,15 @@ def test_reencode(self): with open(file_, encoding="utf-16-be") as f: utf8_f = io.reencode(f, remove_BOM=True) - nt.assert_equal(b"a,b,c", next(utf8_f).strip()) - nt.assert_equal(b"1,2,3", next(utf8_f).strip()) - nt.assert_equal("4,5,ʤ", next(utf8_f).decode("utf-8")) + assert b"a,b,c" == next(utf8_f).strip() + assert b"1,2,3" == next(utf8_f).strip() + assert "4,5,ʤ" == next(utf8_f).decode("utf-8") class TestUrlopen: """Unit tests for reading files with urlopen""" - def __init__(self): + def setup_method(self): self.cls_initialized = False self.utf8_row = {"a": "4", "b": "5", "c": "ʤ"} self.latin_row = {"a": "4", "b": "5", "c": "©"} @@ -424,7 +424,7 @@ def test_urlopen_utf8(self): f = response.fp records = io.read_csv(f) row = next(it.islice(records, 1, 2)) - nt.assert_equal(self.utf8_row, row) + assert self.utf8_row == row def test_urlopen_latin1(self): """Test for reading latin-1 files""" @@ -434,20 +434,20 @@ def test_urlopen_latin1(self): f = response.fp records = io.read_csv(f, encoding="latin-1") row = next(it.islice(records, 1, 2)) - nt.assert_equal(self.latin_row, row) + assert self.latin_row == row # def test_urlopen_remote(self): # """Test for reading remote web files""" # filepath = 'https://opendata.co.ke...' # response = urlopen('file://{}'.format(filepath)) # records = io.read_csv(response.fp) - # nt.assert_equal({}, next(records)) + # assert_equal({}, next(records)) class TestBytes: """Unit tests for reading byte streams""" - def __init__(self): + def setup_method(self): self.cls_initialized = False self.row1 = {"a": "1", "b": "2", "c": "3"} @@ -465,20 +465,20 @@ def test_bytes_io(self): with open(p.join(io.DATA_DIR, "utf8.csv"), "rb") as f: b = BytesIO(f.read()) records = io.read_csv(b, sanitize=True) - nt.assert_equal(self.row1, next(records)) - nt.assert_equal(self.row2, next(records)) + assert self.row1 == next(records) + assert self.row2 == next(records) def test_bytes(self): """Test for reading bytes mode opened file""" with open(p.join(io.DATA_DIR, "test.csv"), "rb") as f: records = io.read_csv(f, sanitize=True) - nt.assert_equal(self.sheet0_alt, next(records)) + assert self.sheet0_alt == next(records) class TestGeoJSON: """Unit tests for reading GeoJSON""" - def __init__(self): + def setup_method(self): self.cls_initialized = False self.bbox = [ -70.0624999987871, @@ -507,12 +507,12 @@ def test_geojson(self): records = io.read_geojson(self.filepath) record = next(records) - nt.assert_equal(expected, record) + assert expected == record for record in records: - nt.assert_true("id" in record) - nt.assert_equal(record["lon"], record["lon"]) - nt.assert_equal(record["lat"], record["lat"]) + assert "id" in record + assert record["lon"] == record["lon"] + assert record["lat"] == record["lat"] def test_geojson_with_key(self): """Test for reading GeoJSON files with a key""" @@ -521,24 +521,24 @@ def test_geojson_with_key(self): f = cv.records2geojson(records, key="id") geojson = loads(f.read()) - nt.assert_equal("FeatureCollection", geojson["type"]) - nt.assert_true("crs" in geojson) - nt.assert_equal(self.bbox, geojson["bbox"]) - nt.assert_true(geojson["features"]) + assert "FeatureCollection" == geojson["type"] + assert "crs" in geojson + assert self.bbox == geojson["bbox"] + assert geojson["features"] for feature in geojson["features"]: - nt.assert_equal("Feature", feature["type"]) - nt.assert_true("id" in feature) - nt.assert_less_equal(2, len(feature["properties"])) + assert "Feature" == feature["type"] + assert "id" in feature + assert 2 <= len(feature["properties"]) geometry = feature["geometry"] if geometry["type"] == "Point": - nt.assert_equal(2, len(geometry["coordinates"])) + assert 2 == len(geometry["coordinates"]) elif geometry["type"] == "LineString": - nt.assert_equal(2, len(geometry["coordinates"][0])) + assert 2 == len(geometry["coordinates"][0]) elif geometry["type"] == "Polygon": - nt.assert_equal(2, len(geometry["coordinates"][0][0])) + assert 2 == len(geometry["coordinates"][0][0]) def test_geojson_with_crs(self): """Test for reading GeoJSON files with CRS""" @@ -546,9 +546,9 @@ def test_geojson_with_crs(self): f = cv.records2geojson(records, crs="EPSG:4269") geojson = loads(f.read()) - nt.assert_true("crs" in geojson) - nt.assert_equal("name", geojson["crs"]["type"]) - nt.assert_equal("EPSG:4269", geojson["crs"]["properties"]["name"]) + assert "crs" in geojson + assert "name" == geojson["crs"]["type"] + assert "EPSG:4269" == geojson["crs"]["properties"]["name"] class TestOutput: @@ -560,14 +560,17 @@ def test_write(self): # pylint: disable=R0201 url = "http://google.com" body = '' content1 = StringIO("Iñtërnâtiônàližætiøn") - nt.assert_equal(20, io.write(StringIO(), content1)) + assert 20 == io.write(StringIO(), content1) content1.seek(0) - nt.assert_equal(20, io.write(TemporaryFile(), content1)) + with TemporaryFile() as tf: + assert 20 == io.write(tf, content1) content2 = io.IterStringIO(iter("Hello World")) - nt.assert_equal(12, io.write(TemporaryFile(), content2, chunksize=2)) + with TemporaryFile() as tf: + assert 12 == io.write(tf, content2, chunksize=2) # pylint: disable=E1101 responses.add(responses.GET, url=url, body=body) r = requests.get(url, stream=True) # pylint: disable=C0103 - nt.assert_equal(55, io.write(TemporaryFile(), r.iter_content)) + with TemporaryFile() as tf: + assert 55 == io.write(tf, r.iter_content) diff --git a/tests/test_process.py b/tests/test_process.py index 09b36a7..cd25389 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -6,7 +6,6 @@ Provides main unit tests. """ -import nose.tools as nt import itertools as it from decimal import Decimal @@ -14,6 +13,8 @@ from operator import itemgetter, truediv, eq, is_not, contains from collections import defaultdict +import pytest + from meza import process as pr, stats, fntools as ft @@ -30,9 +31,9 @@ class Test: def test_typecast(self): records = [{"float": "1.5"}] types = [{"id": "float", "type": "bool"}] - nt.assert_equal({"float": False}, next(pr.type_cast(records, types))) + assert {"float": False} == next(pr.type_cast(records, types)) - with nt.assert_raises(ValueError): + with pytest.raises(ValueError): next(pr.type_cast(records, types, warn=True)) def test_detect_types(self): @@ -49,9 +50,9 @@ def test_detect_types(self): records = it.repeat(record) records, result = pr.detect_types(records) - nt.assert_equal(17, result["count"]) - nt.assert_equal(Decimal("0.95"), result["confidence"]) - nt.assert_true(result["accurate"]) + assert 17 == result["count"] + assert Decimal("0.95") == result["confidence"] + assert result["accurate"] expected = { "null": "null", @@ -64,23 +65,23 @@ def test_detect_types(self): "datetime": "datetime", } - nt.assert_equal(expected, {r["id"]: r["type"] for r in result["types"]}) - nt.assert_equal(record, next(records)) + assert expected == {r["id"]: r["type"] for r in result["types"]} + assert record == next(records) result = pr.detect_types(records, 0.99)[1] - nt.assert_equal(100, result["count"]) - nt.assert_equal(Decimal("0.97"), result["confidence"]) - nt.assert_false(result["accurate"]) + assert 100 == result["count"] + assert Decimal("0.97") == result["confidence"] + assert not result["accurate"] result = pr.detect_types([record, record])[1] - nt.assert_equal(2, result["count"]) - nt.assert_equal(Decimal("0.87"), result["confidence"]) - nt.assert_false(result["accurate"]) + assert 2 == result["count"] + assert Decimal("0.87") == result["confidence"] + assert not result["accurate"] def test_detect_types_datetimes_midnight(self): records = it.repeat({"foo": "2000-01-01 00:00:00"}) records, result = pr.detect_types(records) - nt.assert_equal(result["types"], [{"id": "foo", "type": "datetime"}]) + assert result["types"] == [{"id": "foo", "type": "datetime"}] def test_fillempty(self): records = [ @@ -99,60 +100,60 @@ def test_fillempty(self): more_values_1 = [values[0], new_value_1, values[2]] fields = ["a"] - nt.assert_equal(values, list(pr.fillempty(records, 0, fields=fields))) + assert values == list(pr.fillempty(records, 0, fields=fields)) filled = pr.fillempty(records, method="front") - nt.assert_equal(more_values_1, list(filled)) + assert more_values_1 == list(filled) new_value_2 = {"a": "1", "b": "27", "c": "17"} new_value_3 = {"a": "0", "b": "too short!", "c": "17"} more_values_2 = [new_value_2, new_value_3, values[2]] filled = pr.fillempty(records, method="back") - nt.assert_equal(more_values_2, list(filled)) + assert more_values_2 == list(filled) more_values_3 = [values[0], new_value_3, values[2]] filled = pr.fillempty(records, method="back", limit=1) - nt.assert_equal(more_values_3, list(filled)) + assert more_values_3 == list(filled) kwargs = {"method": "b", "fields": ["a"]} new_value_4 = {"a": "too short!", "b": "too short!", "c": None} more_values_4 = [values[0], new_value_4, values[2]] - nt.assert_equal(more_values_4, list(pr.fillempty(records, **kwargs))) + assert more_values_4 == list(pr.fillempty(records, **kwargs)) def test_merge(self): expected = {"a": 1, "b": 10, "c": 11} result = pr.merge([{"a": 1, "b": 2}, {"b": 10, "c": 11}]) - nt.assert_equal(expected, result) + assert expected == result # setup records = [{"a": 1, "b": 2, "c": 3}, {"b": 4, "c": 5, "d": 6}] # Combine all keys - expected = {u"a": 1, u"c": 8, u"b": 6, u"d": 6} + expected = {"a": 1, "c": 8, "b": 6, "d": 6} result = pr.merge(records, pred=bool, op=sum) - nt.assert_equal(expected, result) + assert expected == result first = lambda pair: next(filter(partial(is_not, None), pair)) kwargs = {"pred": bool, "op": first, "default": None} - expected = {u"a": 1, u"b": 2, u"c": 3, u"d": 6} + expected = {"a": 1, "b": 2, "c": 3, "d": 6} result = pr.merge(records, **kwargs) - nt.assert_equal(expected, result) + assert expected == result # This will only reliably give the expected result for 2 records kwargs = {"pred": bool, "op": stats.mean, "default": None} - expected = {u"a": 1, u"b": 3.0, u"c": 4.0, u"d": 6.0} + expected = {"a": 1, "b": 3.0, "c": 4.0, "d": 6.0} result = pr.merge(records, **kwargs) - nt.assert_equal(expected, result) + assert expected == result # Only combine key 'b' - expected = {u"a": 1, u"b": 6, u"c": 5, u"d": 6} + expected = {"a": 1, "b": 6, "c": 5, "d": 6} result = pr.merge(records, pred="b", op=sum) - nt.assert_equal(expected, result) + assert expected == result # Only combine keys that have the same value of 'b' - expected = {u"a": 1, u"b": 6, u"c": 5, u"d": 6} + expected = {"a": 1, "b": 6, "c": 5, "d": 6} result = pr.merge(records, pred=itemgetter("b"), op=sum) - nt.assert_equal(expected, result) + assert expected == result # This will reliably work for any number of records counted = defaultdict(int) @@ -167,24 +168,24 @@ def test_merge(self): for k in r.keys(): counted[k] += 1 - expected = {u"a": 3, u"b": 3, u"c": 2, u"d": 1} - nt.assert_equal(expected, counted) + expected = {"a": 3, "b": 3, "c": 2, "d": 1} + assert expected == counted summed = pr.merge(records, pred=bool, op=sum) - expected = {u"a": 6, u"b": 15, u"c": 2, u"d": 7} - nt.assert_equal(expected, summed) + expected = {"a": 6, "b": 15, "c": 2, "d": 7} + assert expected == summed kwargs = {"pred": bool, "op": ft.fpartial(truediv)} - expected = {u"a": 2.0, u"b": 5.0, u"c": 1.0, u"d": 7.0} + expected = {"a": 2.0, "b": 5.0, "c": 1.0, "d": 7.0} result = pr.merge([summed, counted], **kwargs) - nt.assert_equal(expected, result) + assert expected == result # This should also reliably work for any number of records op = ft.fpartial(ft.sum_and_count) kwargs = {"pred": bool, "op": op, "default": None} merged = pr.merge(records, **kwargs) result = {x: truediv(*y) for x, y in merged.items()} - nt.assert_equal(expected, result) + assert expected == result def test_unique(self): records = [ @@ -201,7 +202,7 @@ def test_unique(self): pred = lambda x: x["name"][0] result = next(it.islice(pr.unique(records, pred=pred), 3, 4))["name"] - nt.assert_equal("rob", result) + assert "rob" == result def test_cut(self): records = [ @@ -212,10 +213,10 @@ def test_cut(self): expected = {"field_1": 1, "field_3": "male"} result = next(pr.cut(records, ["field_2"], exclude=True)) - nt.assert_equal(expected, result) + assert expected == result result = next(pr.cut(records, ["field_2"])) - nt.assert_equal({"field_2": "bill"}, result) + assert {"field_2": "bill"} == result def test_grep(self): records = [ @@ -228,19 +229,19 @@ def test_grep(self): rules = [{"fields": ["day"], "pattern": partial(eq, 1)}] result = next(pr.grep(records, rules))["name"] - nt.assert_equal("bill", result) + assert "bill" == result rules = [{"pattern": partial(contains, {1, "rob"})}] result = next(pr.grep(records, rules))["name"] - nt.assert_equal("rob", result) + assert "rob" == result rules = [{"pattern": partial(contains, {1, "rob"})}] result = next(pr.grep(records, rules, any_match=True))["name"] - nt.assert_equal("bill", result) + assert "bill" == result rules = [{"fields": ["name"], "pattern": "o"}] result = next(pr.grep(records, rules, inverse=True))["name"] - nt.assert_equal("bill", result) + assert "bill" == result def test_pivot(self): records = [ @@ -265,4 +266,4 @@ def test_pivot(self): result = list(pr.pivot(records, "D", "C", dropna=False)) expected_set = set(tuple(sorted(r.items())) for r in expected) result_set = set(tuple(sorted(r.items())) for r in result) - nt.assert_equal(expected_set, result_set) + assert expected_set == result_set diff --git a/tox.ini b/tox.ini index d6ce0a5..5c9f6c5 100644 --- a/tox.ini +++ b/tox.ini @@ -8,8 +8,8 @@ setenv = PYTHONWARNINGS=all commands = - !style: manage test - style: manage lint + !style: pytest + style: flake8 deps = -r{toxinidir}/dev-requirements.txt