|
1 |
| -import os, re |
2 |
| -import unittest |
| 1 | +import re |
| 2 | +import pytest |
| 3 | +from pathlib import Path |
3 | 4 | import numpy as np
|
4 | 5 | import pandas as pd
|
5 | 6 | import tarfile
|
6 | 7 | import zipfile
|
7 | 8 |
|
8 | 9 | from hydrodiy.io import csv
|
9 | 10 |
|
10 |
| -class CsvTestCase(unittest.TestCase): |
| 11 | +FTEST = Path(__file__).resolve().parent |
11 | 12 |
|
12 |
| - def setUp(self): |
13 |
| - print('\t=> CsvTestCase') |
14 |
| - source_file = os.path.abspath(__file__) |
15 |
| - self.ftest = os.path.dirname(source_file) |
16 | 13 |
|
| 14 | +def test_read_csv1(): |
| 15 | + fcsv = FTEST / "states_centroids.csv.gz" |
| 16 | + data, comment = csv.read_csv(fcsv) |
| 17 | + st = pd.Series(["ACT", "NSW", "NT", "QLD", "SA", |
| 18 | + "TAS", "VIC", "WA"]) |
| 19 | + assert (all(data["state"]==st)) |
17 | 20 |
|
18 |
| - def test_read_csv1(self): |
19 |
| - fcsv = '%s/states_centroids.csv.gz'%self.ftest |
20 |
| - data, comment = csv.read_csv(fcsv) |
21 |
| - st = pd.Series(['ACT', 'NSW', 'NT', 'QLD', 'SA', |
22 |
| - 'TAS', 'VIC', 'WA']) |
23 |
| - self.assertTrue(all(data['state']==st)) |
24 | 21 |
|
| 22 | +def test_read_csv_names(): |
| 23 | + fcsv = FTEST / "states_centroids.csv.gz" |
| 24 | + cc = ["C{0}".format(k) for k in range(1, 8)] |
| 25 | + data, comment = csv.read_csv(fcsv, names=cc) |
| 26 | + assert (list(data.columns)==cc) |
25 | 27 |
|
26 |
| - def test_read_csv_names(self): |
27 |
| - fcsv = '%s/states_centroids.csv.gz'%self.ftest |
28 |
| - cc = ['C{0}'.format(k) for k in range(1, 8)] |
29 |
| - data, comment = csv.read_csv(fcsv, names=cc) |
30 |
| - self.assertEqual(list(data.columns), cc) |
31 | 28 |
|
| 29 | +def test_read_csv_names_noheader(): |
| 30 | + fcsv = FTEST / "states_centroids_noheader.csv" |
| 31 | + cc = ["C{0}".format(k) for k in range(1, 8)] |
| 32 | + data, comment = csv.read_csv(fcsv, has_colnames=False, names=cc) |
| 33 | + assert (list(data.columns)==cc) |
32 | 34 |
|
33 |
| - def test_read_csv_names_noheader(self): |
34 |
| - fcsv = '%s/states_centroids_noheader.csv'%self.ftest |
35 |
| - cc = ['C{0}'.format(k) for k in range(1, 8)] |
36 |
| - data, comment = csv.read_csv(fcsv, has_colnames=False, names=cc) |
37 |
| - self.assertEqual(list(data.columns), cc) |
38 | 35 |
|
| 36 | +def test_read_csv_noheader(): |
| 37 | + fcsv = FTEST / "states_centroids_noheader.csv" |
| 38 | + data, comment = csv.read_csv(fcsv, has_colnames=False) |
| 39 | + st = pd.Series(["ACT", "NSW", "NT", "QLD", "SA", |
| 40 | + "TAS", "VIC", "WA"]) |
| 41 | + assert (all(data[0]==st)) |
39 | 42 |
|
40 |
| - def test_read_csv_noheader(self): |
41 |
| - fcsv = '%s/states_centroids_noheader.csv'%self.ftest |
42 |
| - data, comment = csv.read_csv(fcsv, has_colnames=False) |
43 |
| - st = pd.Series(['ACT', 'NSW', 'NT', 'QLD', 'SA', |
44 |
| - 'TAS', 'VIC', 'WA']) |
45 |
| - self.assertTrue(all(data[0]==st)) |
46 | 43 |
|
| 44 | +def test_read_csv3(): |
| 45 | + fcsv = FTEST / "multiindex.csv" |
| 46 | + data, comment = csv.read_csv(fcsv) |
47 | 47 |
|
48 |
| - def test_read_csv3(self): |
49 |
| - fcsv = '%s/multiindex.csv'%self.ftest |
50 |
| - data, comment = csv.read_csv(fcsv) |
| 48 | + cols =["metric", "runoff_rank", |
| 49 | + "logsinh-likelihood", "logsinh-shapirotest", |
| 50 | + "yeojohnson-likelihood", "yeojohnson-shapirotest"] |
| 51 | + |
| 52 | + assert (all(data.columns==cols)) |
51 | 53 |
|
52 |
| - cols =['metric', 'runoff_rank', |
53 |
| - 'logsinh-likelihood', 'logsinh-shapirotest', |
54 |
| - 'yeojohnson-likelihood', 'yeojohnson-shapirotest'] |
55 | 54 |
|
56 |
| - self.assertTrue(all(data.columns==cols)) |
| 55 | +def test_read_csv4(): |
| 56 | + fcsv = FTEST / "climate.csv" |
| 57 | + data, comment = csv.read_csv(fcsv, |
| 58 | + parse_dates=[""], index_col=0) |
57 | 59 |
|
| 60 | + assert (len(comment) == 8) |
| 61 | + assert (comment["written_on"] == "2014-08-12 12:41") |
58 | 62 |
|
59 |
| - def test_read_csv4(self): |
60 |
| - fcsv = '%s/climate.csv'%self.ftest |
| 63 | + d = data.index[0] |
| 64 | + try: |
| 65 | + assert (isinstance(d, pd.tslib.Timestamp)) |
| 66 | + except AttributeError: |
| 67 | + # To handle new versions of pandas |
| 68 | + assert (isinstance(d, pd.Timestamp)) |
61 | 69 |
|
62 |
| - data, comment = csv.read_csv(fcsv, |
63 |
| - parse_dates=[''], index_col=0) |
64 | 70 |
|
65 |
| - self.assertTrue(len(comment) == 8) |
66 |
| - self.assertTrue(comment['written_on'] == '2014-08-12 12:41') |
| 71 | +def test_read_csv5(): |
| 72 | + fcsv = FTEST / "207004_monthly_total_01.csv" |
| 73 | + data, comment = csv.read_csv(fcsv, |
| 74 | + parse_dates=True, index_col=0) |
67 | 75 |
|
68 |
| - d = data.index[0] |
69 |
| - try: |
70 |
| - self.assertTrue(isinstance(d, pd.tslib.Timestamp)) |
71 |
| - except AttributeError: |
72 |
| - # To handle new versions of pandas |
73 |
| - self.assertTrue(isinstance(d, pd.Timestamp)) |
74 | 76 |
|
| 77 | +def test_read_csv_latin(): |
| 78 | + """ Test read_csv with latin_1 encoding """ |
| 79 | + fcsv = FTEST / "latin_1.zip" |
| 80 | + with pytest.raises(UnicodeDecodeError): |
| 81 | + data, comment = csv.read_csv(fcsv) |
| 82 | + |
| 83 | + data, comment = csv.read_csv(fcsv, |
| 84 | + encoding="latin_1") |
| 85 | + assert (np.allclose(data.iloc[:, 1:4].values, -99)) |
75 | 86 |
|
76 |
| - def test_read_csv5(self): |
77 |
| - fcsv = '%s/207004_monthly_total_01.csv'%self.ftest |
78 |
| - data, comment = csv.read_csv(fcsv, |
79 |
| - parse_dates=True, index_col=0) |
80 | 87 |
|
| 88 | +def test_write_csv1(): |
| 89 | + nval = 100 |
| 90 | + nc = 5 |
| 91 | + idx = pd.date_range("1990-01-01", periods=nval, freq="D") |
| 92 | + df1 = pd.DataFrame(np.random.normal(size=(nval, nc)), index=idx) |
| 93 | + |
| 94 | + fcsv1 = FTEST / "testwrite1.csv" |
| 95 | + csv.write_csv(df1, fcsv1, "Random data", |
| 96 | + Path(__file__), |
| 97 | + write_index=True) |
| 98 | + |
| 99 | + fcsv2 = FTEST / "testwrite2.csv" |
| 100 | + csv.write_csv(df1, fcsv2, "Random data", |
| 101 | + Path(__file__), |
| 102 | + float_format=None, |
| 103 | + write_index=True) |
81 | 104 |
|
82 |
| - def test_read_csv_latin(self): |
83 |
| - ''' Test read_csv with latin_1 encoding ''' |
84 |
| - fcsv = '%s/latin_1.zip'%self.ftest |
85 |
| - try: |
86 |
| - data, comment = csv.read_csv(fcsv) |
87 |
| - except UnicodeDecodeError as err: |
88 |
| - pass |
| 105 | + df1exp, comment = csv.read_csv(fcsv1, |
| 106 | + parse_dates=[""], index_col=0) |
89 | 107 |
|
90 |
| - data, comment = csv.read_csv(fcsv, |
91 |
| - encoding='latin_1') |
92 |
| - self.assertTrue(np.allclose(data.iloc[:, 1:4].values, -99)) |
| 108 | + df2exp, comment = csv.read_csv(fcsv2, |
| 109 | + parse_dates=[""], index_col=0) |
93 | 110 |
|
| 111 | + assert (int(comment["nrow"]) == nval) |
| 112 | + assert (int(comment["ncol"]) == nc) |
94 | 113 |
|
95 |
| - def test_write_csv1(self): |
96 |
| - nval = 100 |
97 |
| - nc = 5 |
98 |
| - idx = pd.date_range('1990-01-01', periods=nval, freq='D') |
99 |
| - df1 = pd.DataFrame(np.random.normal(size=(nval, nc)), index=idx) |
| 114 | + d = df1exp.index[0] |
| 115 | + try: |
| 116 | + assert (isinstance(d, pd.tslib.Timestamp)) |
| 117 | + except AttributeError: |
| 118 | + # To handle new versions of Pandas |
| 119 | + assert (isinstance(d, pd.Timestamp)) |
100 | 120 |
|
101 |
| - fcsv1 = '%s/testwrite1.csv'%self.ftest |
102 |
| - csv.write_csv(df1, fcsv1, 'Random data', |
103 |
| - os.path.abspath(__file__), |
104 |
| - write_index=True) |
| 121 | + assert (np.allclose(np.round(df1.values, 5), df1exp)) |
| 122 | + assert (np.allclose(df1, df2exp)) |
105 | 123 |
|
106 |
| - fcsv2 = '%s/testwrite2.csv'%self.ftest |
107 |
| - csv.write_csv(df1, fcsv2, 'Random data', |
108 |
| - os.path.abspath(__file__), |
109 |
| - float_format=None, |
110 |
| - write_index=True) |
| 124 | + for f in [fcsv1, fcsv2]: |
| 125 | + fz = f.parent / f"{f.stem}.zip" |
| 126 | + fz.unlink() |
111 | 127 |
|
112 |
| - df1exp, comment = csv.read_csv(fcsv1, |
113 |
| - parse_dates=[''], index_col=0) |
114 | 128 |
|
115 |
| - df2exp, comment = csv.read_csv(fcsv2, |
116 |
| - parse_dates=[''], index_col=0) |
| 129 | +def test_write_csv2(): |
| 130 | + nval = 100 |
| 131 | + nc = 5 |
| 132 | + idx = pd.date_range("1990-01-01", periods=nval, freq="D") |
| 133 | + df1 = pd.DataFrame(np.random.normal(size=(nval, nc)), index=idx) |
117 | 134 |
|
118 |
| - self.assertTrue(int(comment['nrow']) == nval) |
119 |
| - self.assertTrue(int(comment['ncol']) == nc) |
| 135 | + comment1 = {"co1":"comment", "co2":"comment 2"} |
| 136 | + fcsv = FTEST / "testwrite.csv" |
| 137 | + csv.write_csv(df1, fcsv, comment1, |
| 138 | + author="toto", |
| 139 | + source_file=Path(__file__), |
| 140 | + write_index=True) |
120 | 141 |
|
121 |
| - d = df1exp.index[0] |
122 |
| - try: |
123 |
| - self.assertTrue(isinstance(d, pd.tslib.Timestamp)) |
124 |
| - except AttributeError: |
125 |
| - # To handle new versions of Pandas |
126 |
| - self.assertTrue(isinstance(d, pd.Timestamp)) |
| 142 | + df2, comment2 = csv.read_csv(fcsv, |
| 143 | + parse_dates=[""], index_col=0) |
127 | 144 |
|
128 |
| - self.assertTrue(np.allclose(np.round(df1.values, 5), df1exp)) |
129 |
| - self.assertTrue(np.allclose(df1, df2exp)) |
| 145 | + assert (comment1["co1"] == comment2["co1"]) |
| 146 | + assert (comment1["co2"] == comment2["co2"]) |
| 147 | + assert ("toto" == comment2["author"]) |
| 148 | + assert (str(Path(__file__)) == comment2["source_file"]) |
130 | 149 |
|
131 |
| - os.remove(re.sub('csv', 'zip', fcsv1)) |
132 |
| - os.remove(re.sub('csv', 'zip', fcsv2)) |
| 150 | + fz = fcsv.parent / f"{fcsv.stem}.zip" |
| 151 | + fz.unlink() |
133 | 152 |
|
134 | 153 |
|
135 |
| - def test_write_csv2(self): |
136 |
| - nval = 100 |
137 |
| - nc = 5 |
138 |
| - idx = pd.date_range('1990-01-01', periods=nval, freq='D') |
139 |
| - df1 = pd.DataFrame(np.random.normal(size=(nval, nc)), index=idx) |
| 154 | +def test_write_csv3(): |
| 155 | + nval = 100 |
| 156 | + idx = pd.date_range("1990-01-01", periods=nval, freq="D") |
| 157 | + ds1 = pd.Series(np.random.normal(size=nval), index=idx) |
140 | 158 |
|
141 |
| - comment1 = {'co1':'comment', 'co2':'comment 2'} |
142 |
| - fcsv = '%s/testwrite.csv'%self.ftest |
143 |
| - csv.write_csv(df1, fcsv, comment1, |
144 |
| - author='toto', |
145 |
| - source_file=os.path.abspath(__file__), |
146 |
| - write_index=True) |
| 159 | + fcsv1 = FTEST / "testwrite3.csv" |
| 160 | + csv.write_csv(ds1, fcsv1, "Random data", |
| 161 | + Path(__file__), |
| 162 | + write_index=True) |
147 | 163 |
|
148 |
| - df2, comment2 = csv.read_csv(fcsv, |
149 |
| - parse_dates=[''], index_col=0) |
| 164 | + ds1exp, comment = csv.read_csv(fcsv1, |
| 165 | + parse_dates=[""], index_col=0) |
| 166 | + ds1exp = ds1exp.squeeze() |
150 | 167 |
|
151 |
| - self.assertTrue(comment1['co1'] == comment2['co1']) |
152 |
| - self.assertTrue(comment1['co2'] == comment2['co2']) |
153 |
| - self.assertTrue('toto' == comment2['author']) |
154 |
| - self.assertTrue(os.path.abspath(__file__) == comment2['source_file']) |
| 168 | + assert (np.allclose(ds1.round(5), ds1exp)) |
155 | 169 |
|
156 |
| - os.remove(re.sub('csv', 'zip', fcsv)) |
| 170 | + fz = fcsv1.parent / f"{fcsv1.stem}.zip" |
| 171 | + fz.unlink() |
157 | 172 |
|
158 | 173 |
|
159 |
| - def test_write_csv3(self): |
160 |
| - nval = 100 |
161 |
| - idx = pd.date_range('1990-01-01', periods=nval, freq='D') |
162 |
| - ds1 = pd.Series(np.random.normal(size=nval), index=idx) |
163 |
| - |
164 |
| - fcsv1 = '%s/testwrite3.csv'%self.ftest |
165 |
| - csv.write_csv(ds1, fcsv1, 'Random data', |
166 |
| - os.path.abspath(__file__), |
167 |
| - write_index=True) |
168 |
| - |
169 |
| - ds1exp, comment = csv.read_csv(fcsv1, |
170 |
| - parse_dates=[''], index_col=0) |
171 |
| - ds1exp = ds1exp.squeeze() |
| 174 | +def test_read_write_zip(): |
| 175 | + # Generate data |
| 176 | + df = {} |
| 177 | + for i in range(4): |
| 178 | + df["test_{0:02d}/test_{0}.csv".format(i)] = \ |
| 179 | + pd.DataFrame(np.random.normal(size=(100, 4))) |
172 | 180 |
|
173 |
| - self.assertTrue(np.allclose(ds1.round(5), ds1exp)) |
| 181 | + # Write data to archive |
| 182 | + farc = FTEST / "test_archive.zip" |
| 183 | + with zipfile.ZipFile(farc, "w") as arc: |
| 184 | + for k in df: |
| 185 | + # Add file to tar with a directory structure |
| 186 | + csv.write_csv(df[k], |
| 187 | + filename=k, |
| 188 | + comment="test "+str(i), |
| 189 | + archive=arc, |
| 190 | + float_format="%0.20f", |
| 191 | + source_file=Path(__file__)) |
| 192 | + |
| 193 | + # Read it back and compare |
| 194 | + with zipfile.ZipFile(farc, "r") as arc: |
| 195 | + for k in df: |
| 196 | + df2, _ = csv.read_csv(k, archive=arc) |
| 197 | + assert (np.allclose(df[k].values, df2.values)) |
174 | 198 |
|
175 |
| - os.remove(re.sub('csv', 'zip', fcsv1)) |
176 |
| - |
177 |
| - |
178 |
| - def test_read_write_zip(self): |
179 |
| - # Generate data |
180 |
| - df = {} |
181 |
| - for i in range(4): |
182 |
| - df['test_{0:02d}/test_{0}.csv'.format(i)] = \ |
183 |
| - pd.DataFrame(np.random.normal(size=(100, 4))) |
184 |
| - |
185 |
| - # Write data to archive |
186 |
| - farc = os.path.join(self.ftest, 'test_archive.zip') |
187 |
| - with zipfile.ZipFile(farc, 'w') as arc: |
188 |
| - for k in df: |
189 |
| - # Add file to tar with a directory structure |
190 |
| - csv.write_csv(df[k], |
191 |
| - filename=k, |
192 |
| - comment='test '+str(i), |
193 |
| - archive=arc, |
194 |
| - float_format='%0.20f', |
195 |
| - source_file=os.path.abspath(__file__)) |
196 |
| - |
197 |
| - # Read it back and compare |
198 |
| - with zipfile.ZipFile(farc, 'r') as arc: |
199 |
| - for k in df: |
200 |
| - df2, _ = csv.read_csv(k, archive=arc) |
201 |
| - self.assertTrue(np.allclose(df[k].values, df2.values)) |
202 |
| - |
203 |
| - os.remove(farc) |
| 199 | + farc.unlink() |
204 | 200 |
|
205 |
| -if __name__ == "__main__": |
206 |
| - unittest.main() |
0 commit comments