"""SQL io tests The SQL tests are broken down in different classes: - `PandasSQLTest`: base class with common methods for all test classes - Tests for the public API (only tests with sqlite3) - `_TestSQLApi` base class - `TestSQLApi`: test the public API with sqlalchemy engine - `TestSQLiteFallbackApi`: test the public API with a sqlite DBAPI connection - Tests for the different SQL flavors (flavor specific type conversions) - Tests for the sqlalchemy mode: `_TestSQLAlchemy` is the base class with common methods, `_TestSQLAlchemyConn` tests the API with a SQLAlchemy Connection object. The different tested flavors (sqlite3, MySQL, PostgreSQL) derive from the base class - Tests for the fallback mode (`TestSQLiteFallback`) """ from __future__ import print_function from warnings import catch_warnings import pytest import sqlite3 import csv import os import warnings import numpy as np import pandas as pd from datetime import datetime, date, time from pandas.core.dtypes.common import ( is_object_dtype, is_datetime64_dtype, is_datetime64tz_dtype) from pandas import DataFrame, Series, Index, MultiIndex, isna, concat from pandas import date_range, to_datetime, to_timedelta, Timestamp import pandas.compat as compat from pandas.compat import range, lrange, string_types, PY36 from pandas.core.tools.datetimes import format as date_format import pandas.io.sql as sql from pandas.io.sql import read_sql_table, read_sql_query import pandas.util.testing as tm try: import sqlalchemy import sqlalchemy.schema import sqlalchemy.sql.sqltypes as sqltypes from sqlalchemy.ext import declarative from sqlalchemy.orm import session as sa_session SQLALCHEMY_INSTALLED = True except ImportError: SQLALCHEMY_INSTALLED = False SQL_STRINGS = { 'create_iris': { 'sqlite': """CREATE TABLE iris ( "SepalLength" REAL, "SepalWidth" REAL, "PetalLength" REAL, "PetalWidth" REAL, "Name" TEXT )""", 'mysql': """CREATE TABLE iris ( `SepalLength` DOUBLE, `SepalWidth` DOUBLE, `PetalLength` DOUBLE, `PetalWidth` DOUBLE, `Name` VARCHAR(200) )""", 'postgresql': """CREATE TABLE iris ( "SepalLength" DOUBLE PRECISION, "SepalWidth" DOUBLE PRECISION, "PetalLength" DOUBLE PRECISION, "PetalWidth" DOUBLE PRECISION, "Name" VARCHAR(200) )""" }, 'insert_iris': { 'sqlite': """INSERT INTO iris VALUES(?, ?, ?, ?, ?)""", 'mysql': """INSERT INTO iris VALUES(%s, %s, %s, %s, "%s");""", 'postgresql': """INSERT INTO iris VALUES(%s, %s, %s, %s, %s);""" }, 'create_test_types': { 'sqlite': """CREATE TABLE types_test_data ( "TextCol" TEXT, "DateCol" TEXT, "IntDateCol" INTEGER, "IntDateOnlyCol" INTEGER, "FloatCol" REAL, "IntCol" INTEGER, "BoolCol" INTEGER, "IntColWithNull" INTEGER, "BoolColWithNull" INTEGER )""", 'mysql': """CREATE TABLE types_test_data ( `TextCol` TEXT, `DateCol` DATETIME, `IntDateCol` INTEGER, `IntDateOnlyCol` INTEGER, `FloatCol` DOUBLE, `IntCol` INTEGER, `BoolCol` BOOLEAN, `IntColWithNull` INTEGER, `BoolColWithNull` BOOLEAN )""", 'postgresql': """CREATE TABLE types_test_data ( "TextCol" TEXT, "DateCol" TIMESTAMP, "DateColWithTz" TIMESTAMP WITH TIME ZONE, "IntDateCol" INTEGER, "IntDateOnlyCol" INTEGER, "FloatCol" DOUBLE PRECISION, "IntCol" INTEGER, "BoolCol" BOOLEAN, "IntColWithNull" INTEGER, "BoolColWithNull" BOOLEAN )""" }, 'insert_test_types': { 'sqlite': { 'query': """ INSERT INTO types_test_data VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?) """, 'fields': ( 'TextCol', 'DateCol', 'IntDateCol', 'IntDateOnlyCol', 'FloatCol', 'IntCol', 'BoolCol', 'IntColWithNull', 'BoolColWithNull' ) }, 'mysql': { 'query': """ INSERT INTO types_test_data VALUES("%s", %s, %s, %s, %s, %s, %s, %s, %s) """, 'fields': ( 'TextCol', 'DateCol', 'IntDateCol', 'IntDateOnlyCol', 'FloatCol', 'IntCol', 'BoolCol', 'IntColWithNull', 'BoolColWithNull' ) }, 'postgresql': { 'query': """ INSERT INTO types_test_data VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, 'fields': ( 'TextCol', 'DateCol', 'DateColWithTz', 'IntDateCol', 'IntDateOnlyCol', 'FloatCol', 'IntCol', 'BoolCol', 'IntColWithNull', 'BoolColWithNull' ) }, }, 'read_parameters': { 'sqlite': "SELECT * FROM iris WHERE Name=? AND SepalLength=?", 'mysql': 'SELECT * FROM iris WHERE `Name`="%s" AND `SepalLength`=%s', 'postgresql': 'SELECT * FROM iris WHERE "Name"=%s AND "SepalLength"=%s' }, 'read_named_parameters': { 'sqlite': """ SELECT * FROM iris WHERE Name=:name AND SepalLength=:length """, 'mysql': """ SELECT * FROM iris WHERE `Name`="%(name)s" AND `SepalLength`=%(length)s """, 'postgresql': """ SELECT * FROM iris WHERE "Name"=%(name)s AND "SepalLength"=%(length)s """ }, 'create_view': { 'sqlite': """ CREATE VIEW iris_view AS SELECT * FROM iris """ } } class MixInBase(object): def teardown_method(self, method): for tbl in self._get_all_tables(): self.drop_table(tbl) self._close_conn() class MySQLMixIn(MixInBase): def drop_table(self, table_name): cur = self.conn.cursor() cur.execute("DROP TABLE IF EXISTS %s" % sql._get_valid_mysql_name(table_name)) self.conn.commit() def _get_all_tables(self): cur = self.conn.cursor() cur.execute('SHOW TABLES') return [table[0] for table in cur.fetchall()] def _close_conn(self): from pymysql.err import Error try: self.conn.close() except Error: pass class SQLiteMixIn(MixInBase): def drop_table(self, table_name): self.conn.execute("DROP TABLE IF EXISTS %s" % sql._get_valid_sqlite_name(table_name)) self.conn.commit() def _get_all_tables(self): c = self.conn.execute( "SELECT name FROM sqlite_master WHERE type='table'") return [table[0] for table in c.fetchall()] def _close_conn(self): self.conn.close() class SQLAlchemyMixIn(MixInBase): def drop_table(self, table_name): sql.SQLDatabase(self.conn).drop_table(table_name) def _get_all_tables(self): meta = sqlalchemy.schema.MetaData(bind=self.conn) meta.reflect() table_list = meta.tables.keys() return table_list def _close_conn(self): pass class PandasSQLTest(object): """ Base class with common private methods for SQLAlchemy and fallback cases. """ def _get_exec(self): if hasattr(self.conn, 'execute'): return self.conn else: return self.conn.cursor() def _load_iris_data(self): import io iris_csv_file = os.path.join(tm.get_data_path(), 'iris.csv') self.drop_table('iris') self._get_exec().execute(SQL_STRINGS['create_iris'][self.flavor]) with io.open(iris_csv_file, mode='r', newline=None) as iris_csv: r = csv.reader(iris_csv) next(r) # skip header row ins = SQL_STRINGS['insert_iris'][self.flavor] for row in r: self._get_exec().execute(ins, row) def _load_iris_view(self): self.drop_table('iris_view') self._get_exec().execute(SQL_STRINGS['create_view'][self.flavor]) def _check_iris_loaded_frame(self, iris_frame): pytype = iris_frame.dtypes[0].type row = iris_frame.iloc[0] assert issubclass(pytype, np.floating) tm.equalContents(row.values, [5.1, 3.5, 1.4, 0.2, 'Iris-setosa']) def _load_test1_data(self): columns = ['index', 'A', 'B', 'C', 'D'] data = [( '2000-01-03 00:00:00', 0.980268513777, 3.68573087906, -0.364216805298, -1.15973806169), ('2000-01-04 00:00:00', 1.04791624281, - 0.0412318367011, -0.16181208307, 0.212549316967), ('2000-01-05 00:00:00', 0.498580885705, 0.731167677815, -0.537677223318, 1.34627041952), ('2000-01-06 00:00:00', 1.12020151869, 1.56762092543, 0.00364077397681, 0.67525259227)] self.test_frame1 = DataFrame(data, columns=columns) def _load_test2_data(self): df = DataFrame(dict(A=[4, 1, 3, 6], B=['asd', 'gsq', 'ylt', 'jkl'], C=[1.1, 3.1, 6.9, 5.3], D=[False, True, True, False], E=['1990-11-22', '1991-10-26', '1993-11-26', '1995-12-12'])) df['E'] = to_datetime(df['E']) self.test_frame2 = df def _load_test3_data(self): columns = ['index', 'A', 'B'] data = [( '2000-01-03 00:00:00', 2 ** 31 - 1, -1.987670), ('2000-01-04 00:00:00', -29, -0.0412318367011), ('2000-01-05 00:00:00', 20000, 0.731167677815), ('2000-01-06 00:00:00', -290867, 1.56762092543)] self.test_frame3 = DataFrame(data, columns=columns) def _load_raw_sql(self): self.drop_table('types_test_data') self._get_exec().execute(SQL_STRINGS['create_test_types'][self.flavor]) ins = SQL_STRINGS['insert_test_types'][self.flavor] data = [ { 'TextCol': 'first', 'DateCol': '2000-01-03 00:00:00', 'DateColWithTz': '2000-01-01 00:00:00-08:00', 'IntDateCol': 535852800, 'IntDateOnlyCol': 20101010, 'FloatCol': 10.10, 'IntCol': 1, 'BoolCol': False, 'IntColWithNull': 1, 'BoolColWithNull': False, }, { 'TextCol': 'first', 'DateCol': '2000-01-04 00:00:00', 'DateColWithTz': '2000-06-01 00:00:00-07:00', 'IntDateCol': 1356998400, 'IntDateOnlyCol': 20101212, 'FloatCol': 10.10, 'IntCol': 1, 'BoolCol': False, 'IntColWithNull': None, 'BoolColWithNull': None, }, ] for d in data: self._get_exec().execute( ins['query'], [d[field] for field in ins['fields']] ) def _count_rows(self, table_name): result = self._get_exec().execute( "SELECT count(*) AS count_1 FROM %s" % table_name).fetchone() return result[0] def _read_sql_iris(self): iris_frame = self.pandasSQL.read_query("SELECT * FROM iris") self._check_iris_loaded_frame(iris_frame) def _read_sql_iris_parameter(self): query = SQL_STRINGS['read_parameters'][self.flavor] params = ['Iris-setosa', 5.1] iris_frame = self.pandasSQL.read_query(query, params=params) self._check_iris_loaded_frame(iris_frame) def _read_sql_iris_named_parameter(self): query = SQL_STRINGS['read_named_parameters'][self.flavor] params = {'name': 'Iris-setosa', 'length': 5.1} iris_frame = self.pandasSQL.read_query(query, params=params) self._check_iris_loaded_frame(iris_frame) def _to_sql(self): self.drop_table('test_frame1') self.pandasSQL.to_sql(self.test_frame1, 'test_frame1') assert self.pandasSQL.has_table('test_frame1') # Nuke table self.drop_table('test_frame1') def _to_sql_empty(self): self.drop_table('test_frame1') self.pandasSQL.to_sql(self.test_frame1.iloc[:0], 'test_frame1') def _to_sql_fail(self): self.drop_table('test_frame1') self.pandasSQL.to_sql( self.test_frame1, 'test_frame1', if_exists='fail') assert self.pandasSQL.has_table('test_frame1') pytest.raises(ValueError, self.pandasSQL.to_sql, self.test_frame1, 'test_frame1', if_exists='fail') self.drop_table('test_frame1') def _to_sql_replace(self): self.drop_table('test_frame1') self.pandasSQL.to_sql( self.test_frame1, 'test_frame1', if_exists='fail') # Add to table again self.pandasSQL.to_sql( self.test_frame1, 'test_frame1', if_exists='replace') assert self.pandasSQL.has_table('test_frame1') num_entries = len(self.test_frame1) num_rows = self._count_rows('test_frame1') assert num_rows == num_entries self.drop_table('test_frame1') def _to_sql_append(self): # Nuke table just in case self.drop_table('test_frame1') self.pandasSQL.to_sql( self.test_frame1, 'test_frame1', if_exists='fail') # Add to table again self.pandasSQL.to_sql( self.test_frame1, 'test_frame1', if_exists='append') assert self.pandasSQL.has_table('test_frame1') num_entries = 2 * len(self.test_frame1) num_rows = self._count_rows('test_frame1') assert num_rows == num_entries self.drop_table('test_frame1') def _roundtrip(self): self.drop_table('test_frame_roundtrip') self.pandasSQL.to_sql(self.test_frame1, 'test_frame_roundtrip') result = self.pandasSQL.read_query( 'SELECT * FROM test_frame_roundtrip') result.set_index('level_0', inplace=True) # result.index.astype(int) result.index.name = None tm.assert_frame_equal(result, self.test_frame1) def _execute_sql(self): # drop_sql = "DROP TABLE IF EXISTS test" # should already be done iris_results = self.pandasSQL.execute("SELECT * FROM iris") row = iris_results.fetchone() tm.equalContents(row, [5.1, 3.5, 1.4, 0.2, 'Iris-setosa']) def _to_sql_save_index(self): df = DataFrame.from_records([(1, 2.1, 'line1'), (2, 1.5, 'line2')], columns=['A', 'B', 'C'], index=['A']) self.pandasSQL.to_sql(df, 'test_to_sql_saves_index') ix_cols = self._get_index_columns('test_to_sql_saves_index') assert ix_cols == [['A', ], ] def _transaction_test(self): self.pandasSQL.execute("CREATE TABLE test_trans (A INT, B TEXT)") ins_sql = "INSERT INTO test_trans (A,B) VALUES (1, 'blah')" # Make sure when transaction is rolled back, no rows get inserted try: with self.pandasSQL.run_transaction() as trans: trans.execute(ins_sql) raise Exception('error') except: # ignore raised exception pass res = self.pandasSQL.read_query('SELECT * FROM test_trans') assert len(res) == 0 # Make sure when transaction is committed, rows do get inserted with self.pandasSQL.run_transaction() as trans: trans.execute(ins_sql) res2 = self.pandasSQL.read_query('SELECT * FROM test_trans') assert len(res2) == 1 # ----------------------------------------------------------------------------- # -- Testing the public API class _TestSQLApi(PandasSQLTest): """ Base class to test the public API. From this two classes are derived to run these tests for both the sqlalchemy mode (`TestSQLApi`) and the fallback mode (`TestSQLiteFallbackApi`). These tests are run with sqlite3. Specific tests for the different sql flavours are included in `_TestSQLAlchemy`. Notes: flavor can always be passed even in SQLAlchemy mode, should be correctly ignored. we don't use drop_table because that isn't part of the public api """ flavor = 'sqlite' mode = None def setup_method(self, method): self.conn = self.connect() self._load_iris_data() self._load_iris_view() self._load_test1_data() self._load_test2_data() self._load_test3_data() self._load_raw_sql() def test_read_sql_iris(self): iris_frame = sql.read_sql_query( "SELECT * FROM iris", self.conn) self._check_iris_loaded_frame(iris_frame) def test_read_sql_view(self): iris_frame = sql.read_sql_query( "SELECT * FROM iris_view", self.conn) self._check_iris_loaded_frame(iris_frame) def test_to_sql(self): sql.to_sql(self.test_frame1, 'test_frame1', self.conn) assert sql.has_table('test_frame1', self.conn) def test_to_sql_fail(self): sql.to_sql(self.test_frame1, 'test_frame2', self.conn, if_exists='fail') assert sql.has_table('test_frame2', self.conn) pytest.raises(ValueError, sql.to_sql, self.test_frame1, 'test_frame2', self.conn, if_exists='fail') def test_to_sql_replace(self): sql.to_sql(self.test_frame1, 'test_frame3', self.conn, if_exists='fail') # Add to table again sql.to_sql(self.test_frame1, 'test_frame3', self.conn, if_exists='replace') assert sql.has_table('test_frame3', self.conn) num_entries = len(self.test_frame1) num_rows = self._count_rows('test_frame3') assert num_rows == num_entries def test_to_sql_append(self): sql.to_sql(self.test_frame1, 'test_frame4', self.conn, if_exists='fail') # Add to table again sql.to_sql(self.test_frame1, 'test_frame4', self.conn, if_exists='append') assert sql.has_table('test_frame4', self.conn) num_entries = 2 * len(self.test_frame1) num_rows = self._count_rows('test_frame4') assert num_rows == num_entries def test_to_sql_type_mapping(self): sql.to_sql(self.test_frame3, 'test_frame5', self.conn, index=False) result = sql.read_sql("SELECT * FROM test_frame5", self.conn) tm.assert_frame_equal(self.test_frame3, result) def test_to_sql_series(self): s = Series(np.arange(5, dtype='int64'), name='series') sql.to_sql(s, "test_series", self.conn, index=False) s2 = sql.read_sql_query("SELECT * FROM test_series", self.conn) tm.assert_frame_equal(s.to_frame(), s2) def test_to_sql_panel(self): with catch_warnings(record=True): panel = tm.makePanel() pytest.raises(NotImplementedError, sql.to_sql, panel, 'test_panel', self.conn) def test_roundtrip(self): sql.to_sql(self.test_frame1, 'test_frame_roundtrip', con=self.conn) result = sql.read_sql_query( 'SELECT * FROM test_frame_roundtrip', con=self.conn) # HACK! result.index = self.test_frame1.index result.set_index('level_0', inplace=True) result.index.astype(int) result.index.name = None tm.assert_frame_equal(result, self.test_frame1) def test_roundtrip_chunksize(self): sql.to_sql(self.test_frame1, 'test_frame_roundtrip', con=self.conn, index=False, chunksize=2) result = sql.read_sql_query( 'SELECT * FROM test_frame_roundtrip', con=self.conn) tm.assert_frame_equal(result, self.test_frame1) def test_execute_sql(self): # drop_sql = "DROP TABLE IF EXISTS test" # should already be done iris_results = sql.execute("SELECT * FROM iris", con=self.conn) row = iris_results.fetchone() tm.equalContents(row, [5.1, 3.5, 1.4, 0.2, 'Iris-setosa']) def test_date_parsing(self): # Test date parsing in read_sql # No Parsing df = sql.read_sql_query("SELECT * FROM types_test_data", self.conn) assert not issubclass(df.DateCol.dtype.type, np.datetime64) df = sql.read_sql_query("SELECT * FROM types_test_data", self.conn, parse_dates=['DateCol']) assert issubclass(df.DateCol.dtype.type, np.datetime64) assert df.DateCol.tolist() == [ pd.Timestamp(2000, 1, 3, 0, 0, 0), pd.Timestamp(2000, 1, 4, 0, 0, 0) ] df = sql.read_sql_query("SELECT * FROM types_test_data", self.conn, parse_dates={'DateCol': '%Y-%m-%d %H:%M:%S'}) assert issubclass(df.DateCol.dtype.type, np.datetime64) assert df.DateCol.tolist() == [ pd.Timestamp(2000, 1, 3, 0, 0, 0), pd.Timestamp(2000, 1, 4, 0, 0, 0) ] df = sql.read_sql_query("SELECT * FROM types_test_data", self.conn, parse_dates=['IntDateCol']) assert issubclass(df.IntDateCol.dtype.type, np.datetime64) assert df.IntDateCol.tolist() == [ pd.Timestamp(1986, 12, 25, 0, 0, 0), pd.Timestamp(2013, 1, 1, 0, 0, 0) ] df = sql.read_sql_query("SELECT * FROM types_test_data", self.conn, parse_dates={'IntDateCol': 's'}) assert issubclass(df.IntDateCol.dtype.type, np.datetime64) assert df.IntDateCol.tolist() == [ pd.Timestamp(1986, 12, 25, 0, 0, 0), pd.Timestamp(2013, 1, 1, 0, 0, 0) ] df = sql.read_sql_query("SELECT * FROM types_test_data", self.conn, parse_dates={'IntDateOnlyCol': '%Y%m%d'}) assert issubclass(df.IntDateOnlyCol.dtype.type, np.datetime64) assert df.IntDateOnlyCol.tolist() == [ pd.Timestamp('2010-10-10'), pd.Timestamp('2010-12-12') ] def test_date_and_index(self): # Test case where same column appears in parse_date and index_col df = sql.read_sql_query("SELECT * FROM types_test_data", self.conn, index_col='DateCol', parse_dates=['DateCol', 'IntDateCol']) assert issubclass(df.index.dtype.type, np.datetime64) assert issubclass(df.IntDateCol.dtype.type, np.datetime64) def test_timedelta(self): # see #6921 df = to_timedelta( Series(['00:00:01', '00:00:03'], name='foo')).to_frame() with tm.assert_produces_warning(UserWarning): df.to_sql('test_timedelta', self.conn) result = sql.read_sql_query('SELECT * FROM test_timedelta', self.conn) tm.assert_series_equal(result['foo'], df['foo'].astype('int64')) def test_complex(self): df = DataFrame({'a': [1 + 1j, 2j]}) # Complex data type should raise error pytest.raises(ValueError, df.to_sql, 'test_complex', self.conn) def test_to_sql_index_label(self): temp_frame = DataFrame({'col1': range(4)}) # no index name, defaults to 'index' sql.to_sql(temp_frame, 'test_index_label', self.conn) frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn) assert frame.columns[0] == 'index' # specifying index_label sql.to_sql(temp_frame, 'test_index_label', self.conn, if_exists='replace', index_label='other_label') frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn) assert frame.columns[0] == "other_label" # using the index name temp_frame.index.name = 'index_name' sql.to_sql(temp_frame, 'test_index_label', self.conn, if_exists='replace') frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn) assert frame.columns[0] == "index_name" # has index name, but specifying index_label sql.to_sql(temp_frame, 'test_index_label', self.conn, if_exists='replace', index_label='other_label') frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn) assert frame.columns[0] == "other_label" # index name is integer temp_frame.index.name = 0 sql.to_sql(temp_frame, 'test_index_label', self.conn, if_exists='replace') frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn) assert frame.columns[0] == "0" temp_frame.index.name = None sql.to_sql(temp_frame, 'test_index_label', self.conn, if_exists='replace', index_label=0) frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn) assert frame.columns[0] == "0" def test_to_sql_index_label_multiindex(self): temp_frame = DataFrame({'col1': range(4)}, index=MultiIndex.from_product( [('A0', 'A1'), ('B0', 'B1')])) # no index name, defaults to 'level_0' and 'level_1' sql.to_sql(temp_frame, 'test_index_label', self.conn) frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn) assert frame.columns[0] == 'level_0' assert frame.columns[1] == 'level_1' # specifying index_label sql.to_sql(temp_frame, 'test_index_label', self.conn, if_exists='replace', index_label=['A', 'B']) frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn) assert frame.columns[:2].tolist() == ['A', 'B'] # using the index name temp_frame.index.names = ['A', 'B'] sql.to_sql(temp_frame, 'test_index_label', self.conn, if_exists='replace') frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn) assert frame.columns[:2].tolist() == ['A', 'B'] # has index name, but specifying index_label sql.to_sql(temp_frame, 'test_index_label', self.conn, if_exists='replace', index_label=['C', 'D']) frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn) assert frame.columns[:2].tolist() == ['C', 'D'] # wrong length of index_label pytest.raises(ValueError, sql.to_sql, temp_frame, 'test_index_label', self.conn, if_exists='replace', index_label='C') def test_multiindex_roundtrip(self): df = DataFrame.from_records([(1, 2.1, 'line1'), (2, 1.5, 'line2')], columns=['A', 'B', 'C'], index=['A', 'B']) df.to_sql('test_multiindex_roundtrip', self.conn) result = sql.read_sql_query('SELECT * FROM test_multiindex_roundtrip', self.conn, index_col=['A', 'B']) tm.assert_frame_equal(df, result, check_index_type=True) def test_integer_col_names(self): df = DataFrame([[1, 2], [3, 4]], columns=[0, 1]) sql.to_sql(df, "test_frame_integer_col_names", self.conn, if_exists='replace') def test_get_schema(self): create_sql = sql.get_schema(self.test_frame1, 'test', con=self.conn) assert 'CREATE' in create_sql def test_get_schema_dtypes(self): float_frame = DataFrame({'a': [1.1, 1.2], 'b': [2.1, 2.2]}) dtype = sqlalchemy.Integer if self.mode == 'sqlalchemy' else 'INTEGER' create_sql = sql.get_schema(float_frame, 'test', con=self.conn, dtype={'b': dtype}) assert 'CREATE' in create_sql assert 'INTEGER' in create_sql def test_get_schema_keys(self): frame = DataFrame({'Col1': [1.1, 1.2], 'Col2': [2.1, 2.2]}) create_sql = sql.get_schema(frame, 'test', con=self.conn, keys='Col1') constraint_sentence = 'CONSTRAINT test_pk PRIMARY KEY ("Col1")' assert constraint_sentence in create_sql # multiple columns as key (GH10385) create_sql = sql.get_schema(self.test_frame1, 'test', con=self.conn, keys=['A', 'B']) constraint_sentence = 'CONSTRAINT test_pk PRIMARY KEY ("A", "B")' assert constraint_sentence in create_sql def test_chunksize_read(self): df = DataFrame(np.random.randn(22, 5), columns=list('abcde')) df.to_sql('test_chunksize', self.conn, index=False) # reading the query in one time res1 = sql.read_sql_query("select * from test_chunksize", self.conn) # reading the query in chunks with read_sql_query res2 = DataFrame() i = 0 sizes = [5, 5, 5, 5, 2] for chunk in sql.read_sql_query("select * from test_chunksize", self.conn, chunksize=5): res2 = concat([res2, chunk], ignore_index=True) assert len(chunk) == sizes[i] i += 1 tm.assert_frame_equal(res1, res2) # reading the query in chunks with read_sql_query if self.mode == 'sqlalchemy': res3 = DataFrame() i = 0 sizes = [5, 5, 5, 5, 2] for chunk in sql.read_sql_table("test_chunksize", self.conn, chunksize=5): res3 = concat([res3, chunk], ignore_index=True) assert len(chunk) == sizes[i] i += 1 tm.assert_frame_equal(res1, res3) def test_categorical(self): # GH8624 # test that categorical gets written correctly as dense column df = DataFrame( {'person_id': [1, 2, 3], 'person_name': ['John P. Doe', 'Jane Dove', 'John P. Doe']}) df2 = df.copy() df2['person_name'] = df2['person_name'].astype('category') df2.to_sql('test_categorical', self.conn, index=False) res = sql.read_sql_query('SELECT * FROM test_categorical', self.conn) tm.assert_frame_equal(res, df) def test_unicode_column_name(self): # GH 11431 df = DataFrame([[1, 2], [3, 4]], columns=[u'\xe9', u'b']) df.to_sql('test_unicode', self.conn, index=False) def test_escaped_table_name(self): # GH 13206 df = DataFrame({'A': [0, 1, 2], 'B': [0.2, np.nan, 5.6]}) df.to_sql('d1187b08-4943-4c8d-a7f6', self.conn, index=False) res = sql.read_sql_query('SELECT * FROM `d1187b08-4943-4c8d-a7f6`', self.conn) tm.assert_frame_equal(res, df) @pytest.mark.single class TestSQLApi(SQLAlchemyMixIn, _TestSQLApi): """ Test the public API as it would be used directly Tests for `read_sql_table` are included here, as this is specific for the sqlalchemy mode. """ flavor = 'sqlite' mode = 'sqlalchemy' def connect(self): if SQLALCHEMY_INSTALLED: return sqlalchemy.create_engine('sqlite:///:memory:') else: pytest.skip('SQLAlchemy not installed') def test_read_table_columns(self): # test columns argument in read_table sql.to_sql(self.test_frame1, 'test_frame', self.conn) cols = ['A', 'B'] result = sql.read_sql_table('test_frame', self.conn, columns=cols) assert result.columns.tolist() == cols def test_read_table_index_col(self): # test columns argument in read_table sql.to_sql(self.test_frame1, 'test_frame', self.conn) result = sql.read_sql_table('test_frame', self.conn, index_col="index") assert result.index.names == ["index"] result = sql.read_sql_table( 'test_frame', self.conn, index_col=["A", "B"]) assert result.index.names == ["A", "B"] result = sql.read_sql_table('test_frame', self.conn, index_col=["A", "B"], columns=["C", "D"]) assert result.index.names == ["A", "B"] assert result.columns.tolist() == ["C", "D"] def test_read_sql_delegate(self): iris_frame1 = sql.read_sql_query( "SELECT * FROM iris", self.conn) iris_frame2 = sql.read_sql( "SELECT * FROM iris", self.conn) tm.assert_frame_equal(iris_frame1, iris_frame2) iris_frame1 = sql.read_sql_table('iris', self.conn) iris_frame2 = sql.read_sql('iris', self.conn) tm.assert_frame_equal(iris_frame1, iris_frame2) def test_not_reflect_all_tables(self): # create invalid table qry = """CREATE TABLE invalid (x INTEGER, y UNKNOWN);""" self.conn.execute(qry) qry = """CREATE TABLE other_table (x INTEGER, y INTEGER);""" self.conn.execute(qry) with warnings.catch_warnings(record=True) as w: # Cause all warnings to always be triggered. warnings.simplefilter("always") # Trigger a warning. sql.read_sql_table('other_table', self.conn) sql.read_sql_query('SELECT * FROM other_table', self.conn) # Verify some things assert len(w) == 0 def test_warning_case_insensitive_table_name(self): # see gh-7815 # # We can't test that this warning is triggered, a the database # configuration would have to be altered. But here we test that # the warning is certainly NOT triggered in a normal case. with warnings.catch_warnings(record=True) as w: # Cause all warnings to always be triggered. warnings.simplefilter("always") # This should not trigger a Warning self.test_frame1.to_sql('CaseSensitive', self.conn) # Verify some things assert len(w) == 0 def _get_index_columns(self, tbl_name): from sqlalchemy.engine import reflection insp = reflection.Inspector.from_engine(self.conn) ixs = insp.get_indexes('test_index_saved') ixs = [i['column_names'] for i in ixs] return ixs def test_sqlalchemy_type_mapping(self): # Test Timestamp objects (no datetime64 because of timezone) (GH9085) df = DataFrame({'time': to_datetime(['201412120154', '201412110254'], utc=True)}) db = sql.SQLDatabase(self.conn) table = sql.SQLTable("test_type", db, frame=df) assert isinstance(table.table.c['time'].type, sqltypes.DateTime) def test_database_uri_string(self): # Test read_sql and .to_sql method with a database URI (GH10654) test_frame1 = self.test_frame1 # db_uri = 'sqlite:///:memory:' # raises # sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) near # "iris": syntax error [SQL: 'iris'] with tm.ensure_clean() as name: db_uri = 'sqlite:///' + name table = 'iris' test_frame1.to_sql(table, db_uri, if_exists='replace', index=False) test_frame2 = sql.read_sql(table, db_uri) test_frame3 = sql.read_sql_table(table, db_uri) query = 'SELECT * FROM iris' test_frame4 = sql.read_sql_query(query, db_uri) tm.assert_frame_equal(test_frame1, test_frame2) tm.assert_frame_equal(test_frame1, test_frame3) tm.assert_frame_equal(test_frame1, test_frame4) # using driver that will not be installed on Travis to trigger error # in sqlalchemy.create_engine -> test passing of this error to user try: # the rest of this test depends on pg8000's being absent import pg8000 # noqa pytest.skip("pg8000 is installed") except ImportError: pass db_uri = "postgresql+pg8000://user:pass@host/dbname" with tm.assert_raises_regex(ImportError, "pg8000"): sql.read_sql("select * from table", db_uri) def _make_iris_table_metadata(self): sa = sqlalchemy metadata = sa.MetaData() iris = sa.Table('iris', metadata, sa.Column('SepalLength', sa.REAL), sa.Column('SepalWidth', sa.REAL), sa.Column('PetalLength', sa.REAL), sa.Column('PetalWidth', sa.REAL), sa.Column('Name', sa.TEXT) ) return iris def test_query_by_text_obj(self): # WIP : GH10846 name_text = sqlalchemy.text('select * from iris where name=:name') iris_df = sql.read_sql(name_text, self.conn, params={ 'name': 'Iris-versicolor'}) all_names = set(iris_df['Name']) assert all_names == set(['Iris-versicolor']) def test_query_by_select_obj(self): # WIP : GH10846 iris = self._make_iris_table_metadata() name_select = sqlalchemy.select([iris]).where( iris.c.Name == sqlalchemy.bindparam('name')) iris_df = sql.read_sql(name_select, self.conn, params={'name': 'Iris-setosa'}) all_names = set(iris_df['Name']) assert all_names == set(['Iris-setosa']) class _EngineToConnMixin(object): """ A mixin that causes setup_connect to create a conn rather than an engine. """ def setup_method(self, method): super(_EngineToConnMixin, self).setup_method(method) engine = self.conn conn = engine.connect() self.__tx = conn.begin() self.pandasSQL = sql.SQLDatabase(conn) self.__engine = engine self.conn = conn def teardown_method(self, method): self.__tx.rollback() self.conn.close() self.conn = self.__engine self.pandasSQL = sql.SQLDatabase(self.__engine) super(_EngineToConnMixin, self).teardown_method(method) @pytest.mark.single class TestSQLApiConn(_EngineToConnMixin, TestSQLApi): pass @pytest.mark.single class TestSQLiteFallbackApi(SQLiteMixIn, _TestSQLApi): """ Test the public sqlite connection fallback API """ flavor = 'sqlite' mode = 'fallback' def connect(self, database=":memory:"): return sqlite3.connect(database) def test_sql_open_close(self): # Test if the IO in the database still work if the connection closed # between the writing and reading (as in many real situations). with tm.ensure_clean() as name: conn = self.connect(name) sql.to_sql(self.test_frame3, "test_frame3_legacy", conn, index=False) conn.close() conn = self.connect(name) result = sql.read_sql_query("SELECT * FROM test_frame3_legacy;", conn) conn.close() tm.assert_frame_equal(self.test_frame3, result) def test_con_string_import_error(self): if not SQLALCHEMY_INSTALLED: conn = 'mysql://root@localhost/pandas_nosetest' pytest.raises(ImportError, sql.read_sql, "SELECT * FROM iris", conn) else: pytest.skip('SQLAlchemy is installed') def test_read_sql_delegate(self): iris_frame1 = sql.read_sql_query("SELECT * FROM iris", self.conn) iris_frame2 = sql.read_sql("SELECT * FROM iris", self.conn) tm.assert_frame_equal(iris_frame1, iris_frame2) pytest.raises(sql.DatabaseError, sql.read_sql, 'iris', self.conn) def test_safe_names_warning(self): # GH 6798 df = DataFrame([[1, 2], [3, 4]], columns=['a', 'b ']) # has a space # warns on create table with spaces in names with tm.assert_produces_warning(): sql.to_sql(df, "test_frame3_legacy", self.conn, index=False) def test_get_schema2(self): # without providing a connection object (available for backwards comp) create_sql = sql.get_schema(self.test_frame1, 'test') assert 'CREATE' in create_sql def _get_sqlite_column_type(self, schema, column): for col in schema.split('\n'): if col.split()[0].strip('""') == column: return col.split()[1] raise ValueError('Column %s not found' % (column)) def test_sqlite_type_mapping(self): # Test Timestamp objects (no datetime64 because of timezone) (GH9085) df = DataFrame({'time': to_datetime(['201412120154', '201412110254'], utc=True)}) db = sql.SQLiteDatabase(self.conn) table = sql.SQLiteTable("test_type", db, frame=df) schema = table.sql_schema() assert self._get_sqlite_column_type(schema, 'time') == "TIMESTAMP" # ----------------------------------------------------------------------------- # -- Database flavor specific tests class _TestSQLAlchemy(SQLAlchemyMixIn, PandasSQLTest): """ Base class for testing the sqlalchemy backend. Subclasses for specific database types are created below. Tests that deviate for each flavor are overwritten there. """ flavor = None @classmethod def setup_class(cls): cls.setup_import() cls.setup_driver() # test connection try: conn = cls.connect() conn.connect() except sqlalchemy.exc.OperationalError: msg = "{0} - can't connect to {1} server".format(cls, cls.flavor) pytest.skip(msg) def setup_method(self, method): self.setup_connect() self._load_iris_data() self._load_raw_sql() self._load_test1_data() @classmethod def setup_import(cls): # Skip this test if SQLAlchemy not available if not SQLALCHEMY_INSTALLED: pytest.skip('SQLAlchemy not installed') @classmethod def setup_driver(cls): raise NotImplementedError() @classmethod def connect(cls): raise NotImplementedError() def setup_connect(self): try: self.conn = self.connect() self.pandasSQL = sql.SQLDatabase(self.conn) # to test if connection can be made: self.conn.connect() except sqlalchemy.exc.OperationalError: pytest.skip( "Can't connect to {0} server".format(self.flavor)) def test_aread_sql(self): self._read_sql_iris() def test_read_sql_parameter(self): self._read_sql_iris_parameter() def test_read_sql_named_parameter(self): self._read_sql_iris_named_parameter() def test_to_sql(self): self._to_sql() def test_to_sql_empty(self): self._to_sql_empty() def test_to_sql_fail(self): self._to_sql_fail() def test_to_sql_replace(self): self._to_sql_replace() def test_to_sql_append(self): self._to_sql_append() def test_create_table(self): temp_conn = self.connect() temp_frame = DataFrame( {'one': [1., 2., 3., 4.], 'two': [4., 3., 2., 1.]}) pandasSQL = sql.SQLDatabase(temp_conn) pandasSQL.to_sql(temp_frame, 'temp_frame') assert temp_conn.has_table('temp_frame') def test_drop_table(self): temp_conn = self.connect() temp_frame = DataFrame( {'one': [1., 2., 3., 4.], 'two': [4., 3., 2., 1.]}) pandasSQL = sql.SQLDatabase(temp_conn) pandasSQL.to_sql(temp_frame, 'temp_frame') assert temp_conn.has_table('temp_frame') pandasSQL.drop_table('temp_frame') assert not temp_conn.has_table('temp_frame') def test_roundtrip(self): self._roundtrip() def test_execute_sql(self): self._execute_sql() def test_read_table(self): iris_frame = sql.read_sql_table("iris", con=self.conn) self._check_iris_loaded_frame(iris_frame) def test_read_table_columns(self): iris_frame = sql.read_sql_table( "iris", con=self.conn, columns=['SepalLength', 'SepalLength']) tm.equalContents( iris_frame.columns.values, ['SepalLength', 'SepalLength']) def test_read_table_absent(self): pytest.raises( ValueError, sql.read_sql_table, "this_doesnt_exist", con=self.conn) def test_default_type_conversion(self): df = sql.read_sql_table("types_test_data", self.conn) assert issubclass(df.FloatCol.dtype.type, np.floating) assert issubclass(df.IntCol.dtype.type, np.integer) assert issubclass(df.BoolCol.dtype.type, np.bool_) # Int column with NA values stays as float assert issubclass(df.IntColWithNull.dtype.type, np.floating) # Bool column with NA values becomes object assert issubclass(df.BoolColWithNull.dtype.type, np.object) def test_bigint(self): # int64 should be converted to BigInteger, GH7433 df = DataFrame(data={'i64': [2**62]}) df.to_sql('test_bigint', self.conn, index=False) result = sql.read_sql_table('test_bigint', self.conn) tm.assert_frame_equal(df, result) def test_default_date_load(self): df = sql.read_sql_table("types_test_data", self.conn) # IMPORTANT - sqlite has no native date type, so shouldn't parse, but # MySQL SHOULD be converted. assert issubclass(df.DateCol.dtype.type, np.datetime64) def test_datetime_with_timezone(self): # edge case that converts postgresql datetime with time zone types # to datetime64[ns,psycopg2.tz.FixedOffsetTimezone..], which is ok # but should be more natural, so coerce to datetime64[ns] for now def check(col): # check that a column is either datetime64[ns] # or datetime64[ns, UTC] if is_datetime64_dtype(col.dtype): # "2000-01-01 00:00:00-08:00" should convert to # "2000-01-01 08:00:00" assert col[0] == Timestamp('2000-01-01 08:00:00') # "2000-06-01 00:00:00-07:00" should convert to # "2000-06-01 07:00:00" assert col[1] == Timestamp('2000-06-01 07:00:00') elif is_datetime64tz_dtype(col.dtype): assert str(col.dt.tz) == 'UTC' # "2000-01-01 00:00:00-08:00" should convert to # "2000-01-01 08:00:00" # "2000-06-01 00:00:00-07:00" should convert to # "2000-06-01 07:00:00" # GH 6415 expected_data = [Timestamp('2000-01-01 08:00:00', tz='UTC'), Timestamp('2000-06-01 07:00:00', tz='UTC')] expected = Series(expected_data, name=col.name) tm.assert_series_equal(col, expected) else: raise AssertionError("DateCol loaded with incorrect type " "-> {0}".format(col.dtype)) # GH11216 df = pd.read_sql_query("select * from types_test_data", self.conn) if not hasattr(df, 'DateColWithTz'): pytest.skip("no column with datetime with time zone") # this is parsed on Travis (linux), but not on macosx for some reason # even with the same versions of psycopg2 & sqlalchemy, possibly a # Postgrsql server version difference col = df.DateColWithTz assert (is_object_dtype(col.dtype) or is_datetime64_dtype(col.dtype) or is_datetime64tz_dtype(col.dtype)) df = pd.read_sql_query("select * from types_test_data", self.conn, parse_dates=['DateColWithTz']) if not hasattr(df, 'DateColWithTz'): pytest.skip("no column with datetime with time zone") col = df.DateColWithTz assert is_datetime64tz_dtype(col.dtype) assert str(col.dt.tz) == 'UTC' check(df.DateColWithTz) df = pd.concat(list(pd.read_sql_query("select * from types_test_data", self.conn, chunksize=1)), ignore_index=True) col = df.DateColWithTz assert is_datetime64tz_dtype(col.dtype) assert str(col.dt.tz) == 'UTC' expected = sql.read_sql_table("types_test_data", self.conn) col = expected.DateColWithTz assert is_datetime64tz_dtype(col.dtype) tm.assert_series_equal(df.DateColWithTz, expected.DateColWithTz) # xref #7139 # this might or might not be converted depending on the postgres driver df = sql.read_sql_table("types_test_data", self.conn) check(df.DateColWithTz) def test_date_parsing(self): # No Parsing df = sql.read_sql_table("types_test_data", self.conn) df = sql.read_sql_table("types_test_data", self.conn, parse_dates=['DateCol']) assert issubclass(df.DateCol.dtype.type, np.datetime64) df = sql.read_sql_table("types_test_data", self.conn, parse_dates={'DateCol': '%Y-%m-%d %H:%M:%S'}) assert issubclass(df.DateCol.dtype.type, np.datetime64) df = sql.read_sql_table("types_test_data", self.conn, parse_dates={ 'DateCol': {'format': '%Y-%m-%d %H:%M:%S'}}) assert issubclass(df.DateCol.dtype.type, np.datetime64) df = sql.read_sql_table( "types_test_data", self.conn, parse_dates=['IntDateCol']) assert issubclass(df.IntDateCol.dtype.type, np.datetime64) df = sql.read_sql_table( "types_test_data", self.conn, parse_dates={'IntDateCol': 's'}) assert issubclass(df.IntDateCol.dtype.type, np.datetime64) df = sql.read_sql_table("types_test_data", self.conn, parse_dates={'IntDateCol': {'unit': 's'}}) assert issubclass(df.IntDateCol.dtype.type, np.datetime64) def test_datetime(self): df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3), 'B': np.arange(3.0)}) df.to_sql('test_datetime', self.conn) # with read_table -> type information from schema used result = sql.read_sql_table('test_datetime', self.conn) result = result.drop('index', axis=1) tm.assert_frame_equal(result, df) # with read_sql -> no type information -> sqlite has no native result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn) result = result.drop('index', axis=1) if self.flavor == 'sqlite': assert isinstance(result.loc[0, 'A'], string_types) result['A'] = to_datetime(result['A']) tm.assert_frame_equal(result, df) else: tm.assert_frame_equal(result, df) def test_datetime_NaT(self): df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3), 'B': np.arange(3.0)}) df.loc[1, 'A'] = np.nan df.to_sql('test_datetime', self.conn, index=False) # with read_table -> type information from schema used result = sql.read_sql_table('test_datetime', self.conn) tm.assert_frame_equal(result, df) # with read_sql -> no type information -> sqlite has no native result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn) if self.flavor == 'sqlite': assert isinstance(result.loc[0, 'A'], string_types) result['A'] = to_datetime(result['A'], errors='coerce') tm.assert_frame_equal(result, df) else: tm.assert_frame_equal(result, df) def test_datetime_date(self): # test support for datetime.date df = DataFrame([date(2014, 1, 1), date(2014, 1, 2)], columns=["a"]) df.to_sql('test_date', self.conn, index=False) res = read_sql_table('test_date', self.conn) result = res['a'] expected = to_datetime(df['a']) # comes back as datetime64 tm.assert_series_equal(result, expected) def test_datetime_time(self): # test support for datetime.time df = DataFrame([time(9, 0, 0), time(9, 1, 30)], columns=["a"]) df.to_sql('test_time', self.conn, index=False) res = read_sql_table('test_time', self.conn) tm.assert_frame_equal(res, df) # GH8341 # first, use the fallback to have the sqlite adapter put in place sqlite_conn = TestSQLiteFallback.connect() sql.to_sql(df, "test_time2", sqlite_conn, index=False) res = sql.read_sql_query("SELECT * FROM test_time2", sqlite_conn) ref = df.applymap(lambda _: _.strftime("%H:%M:%S.%f")) tm.assert_frame_equal(ref, res) # check if adapter is in place # then test if sqlalchemy is unaffected by the sqlite adapter sql.to_sql(df, "test_time3", self.conn, index=False) if self.flavor == 'sqlite': res = sql.read_sql_query("SELECT * FROM test_time3", self.conn) ref = df.applymap(lambda _: _.strftime("%H:%M:%S.%f")) tm.assert_frame_equal(ref, res) res = sql.read_sql_table("test_time3", self.conn) tm.assert_frame_equal(df, res) def test_mixed_dtype_insert(self): # see GH6509 s1 = Series(2**25 + 1, dtype=np.int32) s2 = Series(0.0, dtype=np.float32) df = DataFrame({'s1': s1, 's2': s2}) # write and read again df.to_sql("test_read_write", self.conn, index=False) df2 = sql.read_sql_table("test_read_write", self.conn) tm.assert_frame_equal(df, df2, check_dtype=False, check_exact=True) def test_nan_numeric(self): # NaNs in numeric float column df = DataFrame({'A': [0, 1, 2], 'B': [0.2, np.nan, 5.6]}) df.to_sql('test_nan', self.conn, index=False) # with read_table result = sql.read_sql_table('test_nan', self.conn) tm.assert_frame_equal(result, df) # with read_sql result = sql.read_sql_query('SELECT * FROM test_nan', self.conn) tm.assert_frame_equal(result, df) def test_nan_fullcolumn(self): # full NaN column (numeric float column) df = DataFrame({'A': [0, 1, 2], 'B': [np.nan, np.nan, np.nan]}) df.to_sql('test_nan', self.conn, index=False) # with read_table result = sql.read_sql_table('test_nan', self.conn) tm.assert_frame_equal(result, df) # with read_sql -> not type info from table -> stays None df['B'] = df['B'].astype('object') df['B'] = None result = sql.read_sql_query('SELECT * FROM test_nan', self.conn) tm.assert_frame_equal(result, df) def test_nan_string(self): # NaNs in string column df = DataFrame({'A': [0, 1, 2], 'B': ['a', 'b', np.nan]}) df.to_sql('test_nan', self.conn, index=False) # NaNs are coming back as None df.loc[2, 'B'] = None # with read_table result = sql.read_sql_table('test_nan', self.conn) tm.assert_frame_equal(result, df) # with read_sql result = sql.read_sql_query('SELECT * FROM test_nan', self.conn) tm.assert_frame_equal(result, df) def _get_index_columns(self, tbl_name): from sqlalchemy.engine import reflection insp = reflection.Inspector.from_engine(self.conn) ixs = insp.get_indexes(tbl_name) ixs = [i['column_names'] for i in ixs] return ixs def test_to_sql_save_index(self): self._to_sql_save_index() def test_transactions(self): self._transaction_test() def test_get_schema_create_table(self): # Use a dataframe without a bool column, since MySQL converts bool to # TINYINT (which read_sql_table returns as an int and causes a dtype # mismatch) self._load_test3_data() tbl = 'test_get_schema_create_table' create_sql = sql.get_schema(self.test_frame3, tbl, con=self.conn) blank_test_df = self.test_frame3.iloc[:0] self.drop_table(tbl) self.conn.execute(create_sql) returned_df = sql.read_sql_table(tbl, self.conn) tm.assert_frame_equal(returned_df, blank_test_df, check_index_type=False) self.drop_table(tbl) def test_dtype(self): cols = ['A', 'B'] data = [(0.8, True), (0.9, None)] df = DataFrame(data, columns=cols) df.to_sql('dtype_test', self.conn) df.to_sql('dtype_test2', self.conn, dtype={'B': sqlalchemy.TEXT}) meta = sqlalchemy.schema.MetaData(bind=self.conn) meta.reflect() sqltype = meta.tables['dtype_test2'].columns['B'].type assert isinstance(sqltype, sqlalchemy.TEXT) pytest.raises(ValueError, df.to_sql, 'error', self.conn, dtype={'B': str}) # GH9083 df.to_sql('dtype_test3', self.conn, dtype={'B': sqlalchemy.String(10)}) meta.reflect() sqltype = meta.tables['dtype_test3'].columns['B'].type assert isinstance(sqltype, sqlalchemy.String) assert sqltype.length == 10 # single dtype df.to_sql('single_dtype_test', self.conn, dtype=sqlalchemy.TEXT) meta = sqlalchemy.schema.MetaData(bind=self.conn) meta.reflect() sqltypea = meta.tables['single_dtype_test'].columns['A'].type sqltypeb = meta.tables['single_dtype_test'].columns['B'].type assert isinstance(sqltypea, sqlalchemy.TEXT) assert isinstance(sqltypeb, sqlalchemy.TEXT) def test_notna_dtype(self): cols = {'Bool': Series([True, None]), 'Date': Series([datetime(2012, 5, 1), None]), 'Int': Series([1, None], dtype='object'), 'Float': Series([1.1, None]) } df = DataFrame(cols) tbl = 'notna_dtype_test' df.to_sql(tbl, self.conn) returned_df = sql.read_sql_table(tbl, self.conn) # noqa meta = sqlalchemy.schema.MetaData(bind=self.conn) meta.reflect() if self.flavor == 'mysql': my_type = sqltypes.Integer else: my_type = sqltypes.Boolean col_dict = meta.tables[tbl].columns assert isinstance(col_dict['Bool'].type, my_type) assert isinstance(col_dict['Date'].type, sqltypes.DateTime) assert isinstance(col_dict['Int'].type, sqltypes.Integer) assert isinstance(col_dict['Float'].type, sqltypes.Float) def test_double_precision(self): V = 1.23456789101112131415 df = DataFrame({'f32': Series([V, ], dtype='float32'), 'f64': Series([V, ], dtype='float64'), 'f64_as_f32': Series([V, ], dtype='float64'), 'i32': Series([5, ], dtype='int32'), 'i64': Series([5, ], dtype='int64'), }) df.to_sql('test_dtypes', self.conn, index=False, if_exists='replace', dtype={'f64_as_f32': sqlalchemy.Float(precision=23)}) res = sql.read_sql_table('test_dtypes', self.conn) # check precision of float64 assert (np.round(df['f64'].iloc[0], 14) == np.round(res['f64'].iloc[0], 14)) # check sql types meta = sqlalchemy.schema.MetaData(bind=self.conn) meta.reflect() col_dict = meta.tables['test_dtypes'].columns assert str(col_dict['f32'].type) == str(col_dict['f64_as_f32'].type) assert isinstance(col_dict['f32'].type, sqltypes.Float) assert isinstance(col_dict['f64'].type, sqltypes.Float) assert isinstance(col_dict['i32'].type, sqltypes.Integer) assert isinstance(col_dict['i64'].type, sqltypes.BigInteger) def test_connectable_issue_example(self): # This tests the example raised in issue # https://github.com/pandas-dev/pandas/issues/10104 def foo(connection): query = 'SELECT test_foo_data FROM test_foo_data' return sql.read_sql_query(query, con=connection) def bar(connection, data): data.to_sql(name='test_foo_data', con=connection, if_exists='append') def main(connectable): with connectable.connect() as conn: with conn.begin(): foo_data = conn.run_callable(foo) conn.run_callable(bar, foo_data) DataFrame({'test_foo_data': [0, 1, 2]}).to_sql( 'test_foo_data', self.conn) main(self.conn) def test_temporary_table(self): test_data = u'Hello, World!' expected = DataFrame({'spam': [test_data]}) Base = declarative.declarative_base() class Temporary(Base): __tablename__ = 'temp_test' __table_args__ = {'prefixes': ['TEMPORARY']} id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) spam = sqlalchemy.Column(sqlalchemy.Unicode(30), nullable=False) Session = sa_session.sessionmaker(bind=self.conn) session = Session() with session.transaction: conn = session.connection() Temporary.__table__.create(conn) session.add(Temporary(spam=test_data)) session.flush() df = sql.read_sql_query( sql=sqlalchemy.select([Temporary.spam]), con=conn, ) tm.assert_frame_equal(df, expected) class _TestSQLAlchemyConn(_EngineToConnMixin, _TestSQLAlchemy): def test_transactions(self): pytest.skip( "Nested transactions rollbacks don't work with Pandas") class _TestSQLiteAlchemy(object): """ Test the sqlalchemy backend against an in-memory sqlite database. """ flavor = 'sqlite' @classmethod def connect(cls): return sqlalchemy.create_engine('sqlite:///:memory:') @classmethod def setup_driver(cls): # sqlite3 is built-in cls.driver = None def test_default_type_conversion(self): df = sql.read_sql_table("types_test_data", self.conn) assert issubclass(df.FloatCol.dtype.type, np.floating) assert issubclass(df.IntCol.dtype.type, np.integer) # sqlite has no boolean type, so integer type is returned assert issubclass(df.BoolCol.dtype.type, np.integer) # Int column with NA values stays as float assert issubclass(df.IntColWithNull.dtype.type, np.floating) # Non-native Bool column with NA values stays as float assert issubclass(df.BoolColWithNull.dtype.type, np.floating) def test_default_date_load(self): df = sql.read_sql_table("types_test_data", self.conn) # IMPORTANT - sqlite has no native date type, so shouldn't parse, but assert not issubclass(df.DateCol.dtype.type, np.datetime64) def test_bigint_warning(self): # test no warning for BIGINT (to support int64) is raised (GH7433) df = DataFrame({'a': [1, 2]}, dtype='int64') df.to_sql('test_bigintwarning', self.conn, index=False) with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") sql.read_sql_table('test_bigintwarning', self.conn) assert len(w) == 0 class _TestMySQLAlchemy(object): """ Test the sqlalchemy backend against an MySQL database. """ flavor = 'mysql' @classmethod def connect(cls): url = 'mysql+{driver}://root@localhost/pandas_nosetest' return sqlalchemy.create_engine(url.format(driver=cls.driver), connect_args=cls.connect_args) @classmethod def setup_driver(cls): try: import pymysql # noqa cls.driver = 'pymysql' from pymysql.constants import CLIENT cls.connect_args = {'client_flag': CLIENT.MULTI_STATEMENTS} except ImportError: pytest.skip('pymysql not installed') def test_default_type_conversion(self): df = sql.read_sql_table("types_test_data", self.conn) assert issubclass(df.FloatCol.dtype.type, np.floating) assert issubclass(df.IntCol.dtype.type, np.integer) # MySQL has no real BOOL type (it's an alias for TINYINT) assert issubclass(df.BoolCol.dtype.type, np.integer) # Int column with NA values stays as float assert issubclass(df.IntColWithNull.dtype.type, np.floating) # Bool column with NA = int column with NA values => becomes float assert issubclass(df.BoolColWithNull.dtype.type, np.floating) def test_read_procedure(self): # see GH7324. Although it is more an api test, it is added to the # mysql tests as sqlite does not have stored procedures df = DataFrame({'a': [1, 2, 3], 'b': [0.1, 0.2, 0.3]}) df.to_sql('test_procedure', self.conn, index=False) proc = """DROP PROCEDURE IF EXISTS get_testdb; CREATE PROCEDURE get_testdb () BEGIN SELECT * FROM test_procedure; END""" connection = self.conn.connect() trans = connection.begin() try: r1 = connection.execute(proc) # noqa trans.commit() except: trans.rollback() raise res1 = sql.read_sql_query("CALL get_testdb();", self.conn) tm.assert_frame_equal(df, res1) # test delegation to read_sql_query res2 = sql.read_sql("CALL get_testdb();", self.conn) tm.assert_frame_equal(df, res2) class _TestPostgreSQLAlchemy(object): """ Test the sqlalchemy backend against an PostgreSQL database. """ flavor = 'postgresql' @classmethod def connect(cls): url = 'postgresql+{driver}://postgres@localhost/pandas_nosetest' return sqlalchemy.create_engine(url.format(driver=cls.driver)) @classmethod def setup_driver(cls): try: import psycopg2 # noqa cls.driver = 'psycopg2' except ImportError: pytest.skip('psycopg2 not installed') def test_schema_support(self): # only test this for postgresql (schema's not supported in # mysql/sqlite) df = DataFrame({'col1': [1, 2], 'col2': [ 0.1, 0.2], 'col3': ['a', 'n']}) # create a schema self.conn.execute("DROP SCHEMA IF EXISTS other CASCADE;") self.conn.execute("CREATE SCHEMA other;") # write dataframe to different schema's df.to_sql('test_schema_public', self.conn, index=False) df.to_sql('test_schema_public_explicit', self.conn, index=False, schema='public') df.to_sql('test_schema_other', self.conn, index=False, schema='other') # read dataframes back in res1 = sql.read_sql_table('test_schema_public', self.conn) tm.assert_frame_equal(df, res1) res2 = sql.read_sql_table('test_schema_public_explicit', self.conn) tm.assert_frame_equal(df, res2) res3 = sql.read_sql_table('test_schema_public_explicit', self.conn, schema='public') tm.assert_frame_equal(df, res3) res4 = sql.read_sql_table('test_schema_other', self.conn, schema='other') tm.assert_frame_equal(df, res4) pytest.raises(ValueError, sql.read_sql_table, 'test_schema_other', self.conn, schema='public') # different if_exists options # create a schema self.conn.execute("DROP SCHEMA IF EXISTS other CASCADE;") self.conn.execute("CREATE SCHEMA other;") # write dataframe with different if_exists options df.to_sql('test_schema_other', self.conn, schema='other', index=False) df.to_sql('test_schema_other', self.conn, schema='other', index=False, if_exists='replace') df.to_sql('test_schema_other', self.conn, schema='other', index=False, if_exists='append') res = sql.read_sql_table( 'test_schema_other', self.conn, schema='other') tm.assert_frame_equal(concat([df, df], ignore_index=True), res) # specifying schema in user-provided meta # The schema won't be applied on another Connection # because of transactional schemas if isinstance(self.conn, sqlalchemy.engine.Engine): engine2 = self.connect() meta = sqlalchemy.MetaData(engine2, schema='other') pdsql = sql.SQLDatabase(engine2, meta=meta) pdsql.to_sql(df, 'test_schema_other2', index=False) pdsql.to_sql(df, 'test_schema_other2', index=False, if_exists='replace') pdsql.to_sql(df, 'test_schema_other2', index=False, if_exists='append') res1 = sql.read_sql_table( 'test_schema_other2', self.conn, schema='other') res2 = pdsql.read_table('test_schema_other2') tm.assert_frame_equal(res1, res2) @pytest.mark.single class TestMySQLAlchemy(_TestMySQLAlchemy, _TestSQLAlchemy): pass @pytest.mark.single class TestMySQLAlchemyConn(_TestMySQLAlchemy, _TestSQLAlchemyConn): pass @pytest.mark.single class TestPostgreSQLAlchemy(_TestPostgreSQLAlchemy, _TestSQLAlchemy): pass @pytest.mark.single class TestPostgreSQLAlchemyConn(_TestPostgreSQLAlchemy, _TestSQLAlchemyConn): pass @pytest.mark.single class TestSQLiteAlchemy(_TestSQLiteAlchemy, _TestSQLAlchemy): pass @pytest.mark.single class TestSQLiteAlchemyConn(_TestSQLiteAlchemy, _TestSQLAlchemyConn): pass # ----------------------------------------------------------------------------- # -- Test Sqlite / MySQL fallback @pytest.mark.single class TestSQLiteFallback(SQLiteMixIn, PandasSQLTest): """ Test the fallback mode against an in-memory sqlite database. """ flavor = 'sqlite' @classmethod def connect(cls): return sqlite3.connect(':memory:') def setup_method(self, method): self.conn = self.connect() self.pandasSQL = sql.SQLiteDatabase(self.conn) self._load_iris_data() self._load_test1_data() def test_read_sql(self): self._read_sql_iris() def test_read_sql_parameter(self): self._read_sql_iris_parameter() def test_read_sql_named_parameter(self): self._read_sql_iris_named_parameter() def test_to_sql(self): self._to_sql() def test_to_sql_empty(self): self._to_sql_empty() def test_to_sql_fail(self): self._to_sql_fail() def test_to_sql_replace(self): self._to_sql_replace() def test_to_sql_append(self): self._to_sql_append() def test_create_and_drop_table(self): temp_frame = DataFrame( {'one': [1., 2., 3., 4.], 'two': [4., 3., 2., 1.]}) self.pandasSQL.to_sql(temp_frame, 'drop_test_frame') assert self.pandasSQL.has_table('drop_test_frame') self.pandasSQL.drop_table('drop_test_frame') assert not self.pandasSQL.has_table('drop_test_frame') def test_roundtrip(self): self._roundtrip() def test_execute_sql(self): self._execute_sql() def test_datetime_date(self): # test support for datetime.date df = DataFrame([date(2014, 1, 1), date(2014, 1, 2)], columns=["a"]) df.to_sql('test_date', self.conn, index=False) res = read_sql_query('SELECT * FROM test_date', self.conn) if self.flavor == 'sqlite': # comes back as strings tm.assert_frame_equal(res, df.astype(str)) elif self.flavor == 'mysql': tm.assert_frame_equal(res, df) def test_datetime_time(self): # test support for datetime.time, GH #8341 df = DataFrame([time(9, 0, 0), time(9, 1, 30)], columns=["a"]) df.to_sql('test_time', self.conn, index=False) res = read_sql_query('SELECT * FROM test_time', self.conn) if self.flavor == 'sqlite': # comes back as strings expected = df.applymap(lambda _: _.strftime("%H:%M:%S.%f")) tm.assert_frame_equal(res, expected) def _get_index_columns(self, tbl_name): ixs = sql.read_sql_query( "SELECT * FROM sqlite_master WHERE type = 'index' " + "AND tbl_name = '%s'" % tbl_name, self.conn) ix_cols = [] for ix_name in ixs.name: ix_info = sql.read_sql_query( "PRAGMA index_info(%s)" % ix_name, self.conn) ix_cols.append(ix_info.name.tolist()) return ix_cols def test_to_sql_save_index(self): self._to_sql_save_index() def test_transactions(self): if PY36: pytest.skip("not working on python > 3.5") self._transaction_test() def _get_sqlite_column_type(self, table, column): recs = self.conn.execute('PRAGMA table_info(%s)' % table) for cid, name, ctype, not_null, default, pk in recs: if name == column: return ctype raise ValueError('Table %s, column %s not found' % (table, column)) def test_dtype(self): if self.flavor == 'mysql': pytest.skip('Not applicable to MySQL legacy') cols = ['A', 'B'] data = [(0.8, True), (0.9, None)] df = DataFrame(data, columns=cols) df.to_sql('dtype_test', self.conn) df.to_sql('dtype_test2', self.conn, dtype={'B': 'STRING'}) # sqlite stores Boolean values as INTEGER assert self._get_sqlite_column_type( 'dtype_test', 'B') == 'INTEGER' assert self._get_sqlite_column_type( 'dtype_test2', 'B') == 'STRING' pytest.raises(ValueError, df.to_sql, 'error', self.conn, dtype={'B': bool}) # single dtype df.to_sql('single_dtype_test', self.conn, dtype='STRING') assert self._get_sqlite_column_type( 'single_dtype_test', 'A') == 'STRING' assert self._get_sqlite_column_type( 'single_dtype_test', 'B') == 'STRING' def test_notna_dtype(self): if self.flavor == 'mysql': pytest.skip('Not applicable to MySQL legacy') cols = {'Bool': Series([True, None]), 'Date': Series([datetime(2012, 5, 1), None]), 'Int': Series([1, None], dtype='object'), 'Float': Series([1.1, None]) } df = DataFrame(cols) tbl = 'notna_dtype_test' df.to_sql(tbl, self.conn) assert self._get_sqlite_column_type(tbl, 'Bool') == 'INTEGER' assert self._get_sqlite_column_type(tbl, 'Date') == 'TIMESTAMP' assert self._get_sqlite_column_type(tbl, 'Int') == 'INTEGER' assert self._get_sqlite_column_type(tbl, 'Float') == 'REAL' def test_illegal_names(self): # For sqlite, these should work fine df = DataFrame([[1, 2], [3, 4]], columns=['a', 'b']) # Raise error on blank pytest.raises(ValueError, df.to_sql, "", self.conn) for ndx, weird_name in enumerate( ['test_weird_name]', 'test_weird_name[', 'test_weird_name`', 'test_weird_name"', 'test_weird_name\'', '_b.test_weird_name_01-30', '"_b.test_weird_name_01-30"', '99beginswithnumber', '12345', u'\xe9']): df.to_sql(weird_name, self.conn) sql.table_exists(weird_name, self.conn) df2 = DataFrame([[1, 2], [3, 4]], columns=['a', weird_name]) c_tbl = 'test_weird_col_name%d' % ndx df2.to_sql(c_tbl, self.conn) sql.table_exists(c_tbl, self.conn) # ----------------------------------------------------------------------------- # -- Old tests from 0.13.1 (before refactor using sqlalchemy) _formatters = { datetime: lambda dt: "'%s'" % date_format(dt), str: lambda x: "'%s'" % x, np.str_: lambda x: "'%s'" % x, compat.text_type: lambda x: "'%s'" % x, compat.binary_type: lambda x: "'%s'" % x, float: lambda x: "%.8f" % x, int: lambda x: "%s" % x, type(None): lambda x: "NULL", np.float64: lambda x: "%.10f" % x, bool: lambda x: "'%s'" % x, } def format_query(sql, *args): """ """ processed_args = [] for arg in args: if isinstance(arg, float) and isna(arg): arg = None formatter = _formatters[type(arg)] processed_args.append(formatter(arg)) return sql % tuple(processed_args) def tquery(query, con=None, cur=None): """Replace removed sql.tquery function""" res = sql.execute(query, con=con, cur=cur).fetchall() if res is None: return None else: return list(res) def _skip_if_no_pymysql(): try: import pymysql # noqa except ImportError: pytest.skip('pymysql not installed, skipping') @pytest.mark.single class TestXSQLite(SQLiteMixIn): def setup_method(self, method): self.method = method self.conn = sqlite3.connect(':memory:') def test_basic(self): frame = tm.makeTimeDataFrame() self._check_roundtrip(frame) def test_write_row_by_row(self): frame = tm.makeTimeDataFrame() frame.iloc[0, 0] = np.nan create_sql = sql.get_schema(frame, 'test') cur = self.conn.cursor() cur.execute(create_sql) cur = self.conn.cursor() ins = "INSERT INTO test VALUES (%s, %s, %s, %s)" for idx, row in frame.iterrows(): fmt_sql = format_query(ins, *row) tquery(fmt_sql, cur=cur) self.conn.commit() result = sql.read_sql("select * from test", con=self.conn) result.index = frame.index tm.assert_frame_equal(result, frame, check_less_precise=True) def test_execute(self): frame = tm.makeTimeDataFrame() create_sql = sql.get_schema(frame, 'test') cur = self.conn.cursor() cur.execute(create_sql) ins = "INSERT INTO test VALUES (?, ?, ?, ?)" row = frame.iloc[0] sql.execute(ins, self.conn, params=tuple(row)) self.conn.commit() result = sql.read_sql("select * from test", self.conn) result.index = frame.index[:1] tm.assert_frame_equal(result, frame[:1]) def test_schema(self): frame = tm.makeTimeDataFrame() create_sql = sql.get_schema(frame, 'test') lines = create_sql.splitlines() for l in lines: tokens = l.split(' ') if len(tokens) == 2 and tokens[0] == 'A': assert tokens[1] == 'DATETIME' frame = tm.makeTimeDataFrame() create_sql = sql.get_schema(frame, 'test', keys=['A', 'B']) lines = create_sql.splitlines() assert 'PRIMARY KEY ("A", "B")' in create_sql cur = self.conn.cursor() cur.execute(create_sql) @tm.capture_stdout def test_execute_fail(self): create_sql = """ CREATE TABLE test ( a TEXT, b TEXT, c REAL, PRIMARY KEY (a, b) ); """ cur = self.conn.cursor() cur.execute(create_sql) sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn) sql.execute('INSERT INTO test VALUES("foo", "baz", 2.567)', self.conn) with pytest.raises(Exception): sql.execute('INSERT INTO test VALUES("foo", "bar", 7)', self.conn) @tm.capture_stdout def test_execute_closed_connection(self): create_sql = """ CREATE TABLE test ( a TEXT, b TEXT, c REAL, PRIMARY KEY (a, b) ); """ cur = self.conn.cursor() cur.execute(create_sql) sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn) self.conn.close() with pytest.raises(Exception): tquery("select * from test", con=self.conn) # Initialize connection again (needed for tearDown) self.setup_method(self.method) def test_na_roundtrip(self): pass def _check_roundtrip(self, frame): sql.to_sql(frame, name='test_table', con=self.conn, index=False) result = sql.read_sql("select * from test_table", self.conn) # HACK! Change this once indexes are handled properly. result.index = frame.index expected = frame tm.assert_frame_equal(result, expected) frame['txt'] = ['a'] * len(frame) frame2 = frame.copy() frame2['Idx'] = Index(lrange(len(frame2))) + 10 sql.to_sql(frame2, name='test_table2', con=self.conn, index=False) result = sql.read_sql("select * from test_table2", self.conn, index_col='Idx') expected = frame.copy() expected.index = Index(lrange(len(frame2))) + 10 expected.index.name = 'Idx' tm.assert_frame_equal(expected, result) def test_keyword_as_column_names(self): df = DataFrame({'From': np.ones(5)}) sql.to_sql(df, con=self.conn, name='testkeywords', index=False) def test_onecolumn_of_integer(self): # GH 3628 # a column_of_integers dataframe should transfer well to sql mono_df = DataFrame([1, 2], columns=['c0']) sql.to_sql(mono_df, con=self.conn, name='mono_df', index=False) # computing the sum via sql con_x = self.conn the_sum = sum(my_c0[0] for my_c0 in con_x.execute("select * from mono_df")) # it should not fail, and gives 3 ( Issue #3628 ) assert the_sum == 3 result = sql.read_sql("select * from mono_df", con_x) tm.assert_frame_equal(result, mono_df) def test_if_exists(self): df_if_exists_1 = DataFrame({'col1': [1, 2], 'col2': ['A', 'B']}) df_if_exists_2 = DataFrame( {'col1': [3, 4, 5], 'col2': ['C', 'D', 'E']}) table_name = 'table_if_exists' sql_select = "SELECT * FROM %s" % table_name def clean_up(test_table_to_drop): """ Drops tables created from individual tests so no dependencies arise from sequential tests """ self.drop_table(test_table_to_drop) # test if invalid value for if_exists raises appropriate error pytest.raises(ValueError, sql.to_sql, frame=df_if_exists_1, con=self.conn, name=table_name, if_exists='notvalidvalue') clean_up(table_name) # test if_exists='fail' sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name, if_exists='fail') pytest.raises(ValueError, sql.to_sql, frame=df_if_exists_1, con=self.conn, name=table_name, if_exists='fail') # test if_exists='replace' sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name, if_exists='replace', index=False) assert tquery(sql_select, con=self.conn) == [(1, 'A'), (2, 'B')] sql.to_sql(frame=df_if_exists_2, con=self.conn, name=table_name, if_exists='replace', index=False) assert (tquery(sql_select, con=self.conn) == [(3, 'C'), (4, 'D'), (5, 'E')]) clean_up(table_name) # test if_exists='append' sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name, if_exists='fail', index=False) assert tquery(sql_select, con=self.conn) == [(1, 'A'), (2, 'B')] sql.to_sql(frame=df_if_exists_2, con=self.conn, name=table_name, if_exists='append', index=False) assert (tquery(sql_select, con=self.conn) == [(1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E')]) clean_up(table_name) @pytest.mark.single @pytest.mark.skip(reason="gh-13611: there is no support for MySQL " "if SQLAlchemy is not installed") class TestXMySQL(MySQLMixIn): @classmethod def setup_class(cls): _skip_if_no_pymysql() # test connection import pymysql try: # Try Travis defaults. # No real user should allow root access with a blank password. pymysql.connect(host='localhost', user='root', passwd='', db='pandas_nosetest') except: pass else: return try: pymysql.connect(read_default_group='pandas') except pymysql.ProgrammingError: pytest.skip( "Create a group of connection parameters under the heading " "[pandas] in your system's mysql default file, " "typically located at ~/.my.cnf or /etc/.my.cnf. ") except pymysql.Error: pytest.skip( "Cannot connect to database. " "Create a group of connection parameters under the heading " "[pandas] in your system's mysql default file, " "typically located at ~/.my.cnf or /etc/.my.cnf. ") def setup_method(self, method): _skip_if_no_pymysql() import pymysql try: # Try Travis defaults. # No real user should allow root access with a blank password. self.conn = pymysql.connect(host='localhost', user='root', passwd='', db='pandas_nosetest') except: pass else: return try: self.conn = pymysql.connect(read_default_group='pandas') except pymysql.ProgrammingError: pytest.skip( "Create a group of connection parameters under the heading " "[pandas] in your system's mysql default file, " "typically located at ~/.my.cnf or /etc/.my.cnf. ") except pymysql.Error: pytest.skip( "Cannot connect to database. " "Create a group of connection parameters under the heading " "[pandas] in your system's mysql default file, " "typically located at ~/.my.cnf or /etc/.my.cnf. ") self.method = method def test_basic(self): _skip_if_no_pymysql() frame = tm.makeTimeDataFrame() self._check_roundtrip(frame) def test_write_row_by_row(self): _skip_if_no_pymysql() frame = tm.makeTimeDataFrame() frame.iloc[0, 0] = np.nan drop_sql = "DROP TABLE IF EXISTS test" create_sql = sql.get_schema(frame, 'test') cur = self.conn.cursor() cur.execute(drop_sql) cur.execute(create_sql) ins = "INSERT INTO test VALUES (%s, %s, %s, %s)" for idx, row in frame.iterrows(): fmt_sql = format_query(ins, *row) tquery(fmt_sql, cur=cur) self.conn.commit() result = sql.read_sql("select * from test", con=self.conn) result.index = frame.index tm.assert_frame_equal(result, frame, check_less_precise=True) def test_chunksize_read_type(self): _skip_if_no_pymysql() frame = tm.makeTimeDataFrame() frame.index.name = "index" drop_sql = "DROP TABLE IF EXISTS test" cur = self.conn.cursor() cur.execute(drop_sql) sql.to_sql(frame, name='test', con=self.conn) query = "select * from test" chunksize = 5 chunk_gen = pd.read_sql_query(sql=query, con=self.conn, chunksize=chunksize, index_col="index") chunk_df = next(chunk_gen) tm.assert_frame_equal(frame[:chunksize], chunk_df) def test_execute(self): _skip_if_no_pymysql() frame = tm.makeTimeDataFrame() drop_sql = "DROP TABLE IF EXISTS test" create_sql = sql.get_schema(frame, 'test') cur = self.conn.cursor() with warnings.catch_warnings(): warnings.filterwarnings("ignore", "Unknown table.*") cur.execute(drop_sql) cur.execute(create_sql) ins = "INSERT INTO test VALUES (%s, %s, %s, %s)" row = frame.iloc[0].values.tolist() sql.execute(ins, self.conn, params=tuple(row)) self.conn.commit() result = sql.read_sql("select * from test", self.conn) result.index = frame.index[:1] tm.assert_frame_equal(result, frame[:1]) def test_schema(self): _skip_if_no_pymysql() frame = tm.makeTimeDataFrame() create_sql = sql.get_schema(frame, 'test') lines = create_sql.splitlines() for l in lines: tokens = l.split(' ') if len(tokens) == 2 and tokens[0] == 'A': assert tokens[1] == 'DATETIME' frame = tm.makeTimeDataFrame() drop_sql = "DROP TABLE IF EXISTS test" create_sql = sql.get_schema(frame, 'test', keys=['A', 'B']) lines = create_sql.splitlines() assert 'PRIMARY KEY (`A`, `B`)' in create_sql cur = self.conn.cursor() cur.execute(drop_sql) cur.execute(create_sql) @tm.capture_stdout def test_execute_fail(self): _skip_if_no_pymysql() drop_sql = "DROP TABLE IF EXISTS test" create_sql = """ CREATE TABLE test ( a TEXT, b TEXT, c REAL, PRIMARY KEY (a(5), b(5)) ); """ cur = self.conn.cursor() cur.execute(drop_sql) cur.execute(create_sql) sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn) sql.execute('INSERT INTO test VALUES("foo", "baz", 2.567)', self.conn) with pytest.raises(Exception): sql.execute('INSERT INTO test VALUES("foo", "bar", 7)', self.conn) @tm.capture_stdout def test_execute_closed_connection(self): _skip_if_no_pymysql() drop_sql = "DROP TABLE IF EXISTS test" create_sql = """ CREATE TABLE test ( a TEXT, b TEXT, c REAL, PRIMARY KEY (a(5), b(5)) ); """ cur = self.conn.cursor() cur.execute(drop_sql) cur.execute(create_sql) sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn) self.conn.close() with pytest.raises(Exception): tquery("select * from test", con=self.conn) # Initialize connection again (needed for tearDown) self.setup_method(self.method) def test_na_roundtrip(self): _skip_if_no_pymysql() pass def _check_roundtrip(self, frame): _skip_if_no_pymysql() drop_sql = "DROP TABLE IF EXISTS test_table" cur = self.conn.cursor() with warnings.catch_warnings(): warnings.filterwarnings("ignore", "Unknown table.*") cur.execute(drop_sql) sql.to_sql(frame, name='test_table', con=self.conn, index=False) result = sql.read_sql("select * from test_table", self.conn) # HACK! Change this once indexes are handled properly. result.index = frame.index result.index.name = frame.index.name expected = frame tm.assert_frame_equal(result, expected) frame['txt'] = ['a'] * len(frame) frame2 = frame.copy() index = Index(lrange(len(frame2))) + 10 frame2['Idx'] = index drop_sql = "DROP TABLE IF EXISTS test_table2" cur = self.conn.cursor() with warnings.catch_warnings(): warnings.filterwarnings("ignore", "Unknown table.*") cur.execute(drop_sql) sql.to_sql(frame2, name='test_table2', con=self.conn, index=False) result = sql.read_sql("select * from test_table2", self.conn, index_col='Idx') expected = frame.copy() # HACK! Change this once indexes are handled properly. expected.index = index expected.index.names = result.index.names tm.assert_frame_equal(expected, result) def test_keyword_as_column_names(self): _skip_if_no_pymysql() df = DataFrame({'From': np.ones(5)}) sql.to_sql(df, con=self.conn, name='testkeywords', if_exists='replace', index=False) def test_if_exists(self): _skip_if_no_pymysql() df_if_exists_1 = DataFrame({'col1': [1, 2], 'col2': ['A', 'B']}) df_if_exists_2 = DataFrame( {'col1': [3, 4, 5], 'col2': ['C', 'D', 'E']}) table_name = 'table_if_exists' sql_select = "SELECT * FROM %s" % table_name def clean_up(test_table_to_drop): """ Drops tables created from individual tests so no dependencies arise from sequential tests """ self.drop_table(test_table_to_drop) # test if invalid value for if_exists raises appropriate error pytest.raises(ValueError, sql.to_sql, frame=df_if_exists_1, con=self.conn, name=table_name, if_exists='notvalidvalue') clean_up(table_name) # test if_exists='fail' sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name, if_exists='fail', index=False) pytest.raises(ValueError, sql.to_sql, frame=df_if_exists_1, con=self.conn, name=table_name, if_exists='fail') # test if_exists='replace' sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name, if_exists='replace', index=False) assert tquery(sql_select, con=self.conn) == [(1, 'A'), (2, 'B')] sql.to_sql(frame=df_if_exists_2, con=self.conn, name=table_name, if_exists='replace', index=False) assert (tquery(sql_select, con=self.conn) == [(3, 'C'), (4, 'D'), (5, 'E')]) clean_up(table_name) # test if_exists='append' sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name, if_exists='fail', index=False) assert tquery(sql_select, con=self.conn) == [(1, 'A'), (2, 'B')] sql.to_sql(frame=df_if_exists_2, con=self.conn, name=table_name, if_exists='append', index=False) assert (tquery(sql_select, con=self.conn) == [(1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E')]) clean_up(table_name)