return df
-@pytest.mark.parametrize(
- "obj, whitelist", zip((df_letters(), df_letters().floats),
- (df_whitelist, s_whitelist)))
-def test_groupby_whitelist(df_letters, obj, whitelist):
+@pytest.mark.parametrize("whitelist", [df_whitelist, s_whitelist])
+def test_groupby_whitelist(df_letters, whitelist):
df = df_letters
+ if whitelist == df_whitelist:
+ # dataframe
+ obj = df_letters
+ else:
+ obj = df_letters['floats']
+
# these are aliases so ok to have the alias __name__
alias = {'bfill': 'backfill',
return list(range(5))
-@pytest.fixture(params=[epoch_1960(),
- epoch_1960().to_pydatetime(),
- epoch_1960().to_datetime64(),
- str(epoch_1960())])
-def epochs(request):
- return request.param
+@pytest.fixture(params=['timestamp', 'pydatetime', 'datetime64', 'str_1960'])
+def epochs(epoch_1960, request):
+ """Timestamp at 1960-01-01 in various forms.
+
+ * pd.Timestamp
+ * datetime.datetime
+ * numpy.datetime64
+ * str
+ """
+ assert request.param in {'timestamp', 'pydatetime', 'datetime64',
+ "str_1960"}
+ if request.param == 'timestamp':
+ return epoch_1960
+ elif request.param == 'pydatetime':
+ return epoch_1960.to_pydatetime()
+ elif request.param == "datetime64":
+ return epoch_1960.to_datetime64()
+ else:
+ return str(epoch_1960)
@pytest.fixture
else:
return self.conn.cursor()
- def _load_iris_data(self, datapath):
+ @pytest.fixture(params=[('io', 'data', 'iris.csv')])
+ def load_iris_data(self, datapath, request):
import io
- iris_csv_file = datapath('io', 'data', 'iris.csv')
+ iris_csv_file = datapath(*request.param)
+
+ if not hasattr(self, 'conn'):
+ self.setup_connect()
self.drop_table('iris')
self._get_exec().execute(SQL_STRINGS['create_iris'][self.flavor])
flavor = 'sqlite'
mode = None
- @pytest.fixture(autouse=True)
- def setup_method(self, datapath):
+ def setup_connect(self):
self.conn = self.connect()
- self._load_iris_data(datapath)
+
+ @pytest.fixture(autouse=True)
+ def setup_method(self, load_iris_data):
+ self.load_test_data_and_sql()
+
+ def load_test_data_and_sql(self):
self._load_iris_view()
self._load_test1_data()
self._load_test2_data()
"""
@pytest.fixture(autouse=True)
- def setup_method(self, datapath):
- super(_EngineToConnMixin, self).setup_method(datapath)
+ def setup_method(self, load_iris_data):
+ super(_EngineToConnMixin, self).load_test_data_and_sql()
engine = self.conn
conn = engine.connect()
self.__tx = conn.begin()
msg = "{0} - can't connect to {1} server".format(cls, cls.flavor)
pytest.skip(msg)
- @pytest.fixture(autouse=True)
- def setup_method(self, datapath):
- self.setup_connect()
-
- self._load_iris_data(datapath)
+ def load_test_data_and_sql(self):
self._load_raw_sql()
self._load_test1_data()
+ @pytest.fixture(autouse=True)
+ def setup_method(self, load_iris_data):
+ self.load_test_data_and_sql()
+
@classmethod
def setup_import(cls):
# Skip this test if SQLAlchemy not available
def connect(cls):
return sqlite3.connect(':memory:')
- @pytest.fixture(autouse=True)
- def setup_method(self, datapath):
+ def setup_connect(self):
self.conn = self.connect()
- self.pandasSQL = sql.SQLiteDatabase(self.conn)
-
- self._load_iris_data(datapath)
+ def load_test_data_and_sql(self):
+ self.pandasSQL = sql.SQLiteDatabase(self.conn)
self._load_test1_data()
+ @pytest.fixture(autouse=True)
+ def setup_method(self, load_iris_data):
+ self.load_test_data_and_sql()
+
def test_read_sql(self):
self._read_sql_iris()
self.method = request.function
self.conn = sqlite3.connect(':memory:')
+ # In some test cases we may close db connection
+ # Re-open conn here so we can perform cleanup in teardown
+ yield
+ self.method = request.function
+ self.conn = sqlite3.connect(':memory:')
+
def test_basic(self):
frame = tm.makeTimeDataFrame()
self._check_roundtrip(frame)
with pytest.raises(Exception):
sql.execute('INSERT INTO test VALUES("foo", "bar", 7)', self.conn)
- def test_execute_closed_connection(self, request, datapath):
+ def test_execute_closed_connection(self):
create_sql = """
CREATE TABLE test
(
with pytest.raises(Exception):
tquery("select * from test", con=self.conn)
- # Initialize connection again (needed for tearDown)
- self.setup_method(request, datapath)
-
def test_na_roundtrip(self):
pass
tm.assert_series_equal(idx.value_counts(normalize=True), exp)
+main_dtypes = [
+ 'datetime',
+ 'datetimetz',
+ 'timedelta',
+ 'int8',
+ 'int16',
+ 'int32',
+ 'int64',
+ 'float32',
+ 'float64',
+ 'uint8',
+ 'uint16',
+ 'uint32',
+ 'uint64'
+]
+
+
@pytest.fixture
def s_main_dtypes():
+ """A DataFrame with many dtypes
+
+ * datetime
+ * datetimetz
+ * timedelta
+ * [u]int{8,16,32,64}
+ * float{32,64}
+
+ The columns are the name of the dtype.
+ """
df = pd.DataFrame(
{'datetime': pd.to_datetime(['2003', '2002',
'2001', '2002',
return df
+@pytest.fixture(params=main_dtypes)
+def s_main_dtypes_split(request, s_main_dtypes):
+ """Each series in s_main_dtypes."""
+ return s_main_dtypes[request.param]
+
+
def assert_check_nselect_boundary(vals, dtype, method):
# helper function for 'test_boundary_{dtype}' tests
s = Series(vals, dtype=dtype)
with tm.assert_raises_regex(TypeError, msg):
method(arg)
- @pytest.mark.parametrize(
- "s",
- [v for k, v in s_main_dtypes().iteritems()])
- def test_nsmallest_nlargest(self, s):
+ def test_nsmallest_nlargest(self, s_main_dtypes_split):
# float, int, datetime64 (use i8), timedelts64 (same),
# object that are numbers, object that are strings
+ s = s_main_dtypes_split
assert_series_equal(s.nsmallest(2), s.iloc[[2, 1]])
assert_series_equal(s.nsmallest(2, keep='last'), s.iloc[[2, 3]])