/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.jdbc.e2e;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.e2e.AbstractJdbcSinkIT;
import io.debezium.connector.jdbc.junit.TestHelper;
import io.debezium.connector.jdbc.junit.jupiter.Sink;
import io.debezium.connector.jdbc.junit.jupiter.SinkType;
import io.debezium.connector.jdbc.junit.jupiter.WithPostgresExtension;
import io.debezium.connector.jdbc.junit.jupiter.e2e.ForSource;
import io.debezium.connector.jdbc.junit.jupiter.e2e.SkipExtractNewRecordState;
import io.debezium.connector.jdbc.junit.jupiter.e2e.SkipWhenSink;
import io.debezium.connector.jdbc.junit.jupiter.e2e.SkipWhenSinks;
import io.debezium.connector.jdbc.junit.jupiter.e2e.SkipWhenSource;
import io.debezium.connector.jdbc.junit.jupiter.e2e.SkipWhenSources;
import io.debezium.connector.jdbc.junit.jupiter.e2e.WithTemporalPrecisionMode;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.Source;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.SourceConnectorOptions;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.SourcePipelineInvocationContextProvider;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.SourceType;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.ValueBinder;
import io.debezium.connector.jdbc.naming.DefaultTableNamingStrategy;
import io.debezium.connector.jdbc.naming.TableNamingStrategy;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.time.MicroDuration;
import io.debezium.util.HexConverter;
import io.debezium.util.Strings;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.TimeZone;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.connect.sink.SinkRecord;
import org.fest.assertions.Assertions;
import org.fest.assertions.IntAssert;
import org.fest.assertions.ObjectAssert;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.utility.ThrowingFunction;

@ExtendWith(value={SourcePipelineInvocationContextProvider.class})
@SkipExtractNewRecordState
public abstract class AbstractJdbcSinkPipelineIT
extends AbstractJdbcSinkIT {
    private final TableNamingStrategy tableNamingStrategy = new DefaultTableNamingStrategy();
    private static final ZoneId SOURCE_ZONE_ID = TimeZone.getTimeZone(TestHelper.getSourceTimeZone()).toZoneId();
    private static final ZoneId SINK_ZONE_ID = TimeZone.getTimeZone(TestHelper.getSinkTimeZone()).toZoneId();

    @TestTemplate
    @SkipWhenSource(value={SourceType.ORACLE}, reason="No BIT data type support")
    public void testBitDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "bit", this.bitValues(source, "1", "0"), this.isBitCoercedToBoolean() ? List.of(Boolean.valueOf(true), Boolean.valueOf(false)) : List.of(Integer.valueOf(1), Integer.valueOf(0)), (SinkRecord record) -> {
            SourceConnectorOptions options = source.getOptions();
            switch (sink.getType()) {
                case ORACLE: {
                    this.assertColumn(sink, record, "id", this.getBooleanType(), 1);
                    this.assertColumn(sink, record, "data", this.getBooleanType(), 1);
                    break;
                }
                case POSTGRES: {
                    this.assertColumn(sink, record, "id", this.getBooleanType());
                    this.assertColumn(sink, record, "data", options.isColumnTypePropagated() ? "BIT" : this.getBooleanType());
                    break;
                }
                default: {
                    this.assertColumn(sink, record, "id", this.getBooleanType());
                    this.assertColumn(sink, record, "data", this.getBooleanType());
                }
            }
        }, (ResultSet rs, int index) -> this.isBitCoercedToBoolean() ? (Comparable<Boolean>)Boolean.valueOf(rs.getBoolean(index)) : (Comparable<Boolean>)Integer.valueOf(rs.getInt(index)));
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.ORACLE, SourceType.SQLSERVER}, reason="No BIT(n) data type support")
    @SkipWhenSink(value={SinkType.ORACLE, SinkType.DB2}, reason="BIT(n) is sent as bytes, BLOB is not permitted in primary keys")
    public void testBitWithSizeDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "bit(2)", this.bitValues(source, "10", "01"), List.of(Integer.valueOf(2), Integer.valueOf(1)), (SinkRecord record) -> {
            this.assertColumn(sink, record, "id", this.getBitsDataType(), 2);
            this.assertColumn(sink, record, "data", this.getBitsDataType(), 2);
        }, (ResultSet rs, int index) -> {
            switch (sink.getType()) {
                case POSTGRES: {
                    return Integer.parseInt(rs.getString(index), 2);
                }
                case SQLSERVER: {
                    return new BigInteger(rs.getBytes(index)).intValue();
                }
            }
            return rs.getInt(index);
        });
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.ORACLE, SourceType.SQLSERVER}, reason="No BIT(n) data type support")
    @SkipWhenSink(value={SinkType.MYSQL, SinkType.POSTGRES, SinkType.SQLSERVER}, reason="BIT(n) is only applicable to non-key columns")
    public void testBitWithSizeDataTypeNotInKey(Source source, Sink sink) throws Exception {
        String tableName = source.randomTableName();
        this.registerSourceConnector(source, tableName);
        source.execute(String.format("CREATE TABLE %s (data bit(2))", tableName));
        source.streamTable(tableName);
        source.execute(String.format("INSERT INTO %s VALUES (%s)", tableName, Strings.join((CharSequence)",", this.bitValues(source, "01"))));
        Properties sinkProperties = this.getDefaultSinkConfig(sink);
        sinkProperties.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        sinkProperties.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.NONE.getValue());
        sinkProperties.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        this.startSink(source, sinkProperties, tableName);
        SinkRecord record = this.consumeSinkRecord();
        this.assertColumn(sink, record, "data", this.getBitsDataType());
        sink.assertRows(this.getSinkTable(record, sink), (ThrowingFunction<ResultSet, Void>)((ThrowingFunction)rs -> {
            Blob blob = rs.getBlob(1);
            Assertions.assertThat((byte[])blob.getBytes(1L, (int)blob.length())).isEqualTo(ByteBuffer.allocate(1).put((byte)1).array());
            return null;
        }));
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No BIT VARYING(n) data type support")
    @SkipWhenSink(value={SinkType.ORACLE, SinkType.DB2}, reason="BIT VARYING(n) is sent as bytes, BLOB is not permitted in primary keys")
    public void testBitVaryingDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "bit varying(2)", this.bitValues(source, "10", "01"), List.of(Integer.valueOf(2), Integer.valueOf(1)), (SinkRecord record) -> {
            SourceConnectorOptions options = source.getOptions();
            this.assertColumn(sink, record, "id", this.getBitsDataType(), 2);
            if (options.isColumnTypePropagated() && sink.getType() == SinkType.POSTGRES) {
                this.assertColumn(sink, record, "data", "VARBIT", 2);
            } else {
                this.assertColumn(sink, record, "data", this.getBitsDataType(), 2);
            }
        }, (ResultSet rs, int index) -> {
            switch (sink.getType()) {
                case POSTGRES: {
                    return Integer.parseInt(rs.getString(index), 2);
                }
                case SQLSERVER: {
                    return new BigInteger(rs.getBytes(index)).intValue();
                }
            }
            return rs.getInt(index);
        });
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No BIT VARYING(n) data type support")
    @SkipWhenSink(value={SinkType.MYSQL, SinkType.POSTGRES, SinkType.SQLSERVER}, reason="BIT VARYING(n) is only applicable to non-key columns")
    public void testBitVaryingDataTypeNotInKey(Source source, Sink sink) throws Exception {
        String tableName = source.randomTableName();
        this.registerSourceConnector(source, tableName);
        source.execute(String.format("CREATE TABLE %s (data bit varying(2))", tableName));
        source.streamTable(tableName);
        source.execute(String.format("INSERT INTO %s VALUES (%s)", tableName, Strings.join((CharSequence)",", this.bitValues(source, "01"))));
        Properties sinkProperties = this.getDefaultSinkConfig(sink);
        sinkProperties.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        sinkProperties.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.NONE.getValue());
        sinkProperties.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        this.startSink(source, sinkProperties, tableName);
        SinkRecord record = this.consumeSinkRecord();
        this.assertColumn(sink, record, "data", this.getBitsDataType());
        sink.assertRows(this.getSinkTable(record, sink), (ThrowingFunction<ResultSet, Void>)((ThrowingFunction)rs -> {
            Blob blob = rs.getBlob(1);
            Assertions.assertThat((byte[])blob.getBytes(1L, (int)blob.length())).isEqualTo(ByteBuffer.allocate(1).put((byte)1).array());
            return null;
        }));
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.ORACLE, SourceType.SQLSERVER}, reason="No BOOLEAN data type support")
    public void testBooleanDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "boolean", List.of("true", "false"), List.of(Integer.valueOf(1), Integer.valueOf(0)), config -> this.applyJdbcSourceConverter(source, config, ".*id|.*data", null, null), record -> {
            if (source.getType().is(SourceType.MYSQL)) {
                this.assertColumn(sink, record, "id", this.getInt16Type());
                if (sink.getType().is(SinkType.MYSQL) && source.getOptions().isColumnTypePropagated()) {
                    this.assertColumn(sink, record, "data", this.getBooleanType());
                } else {
                    this.assertColumn(sink, record, "data", this.getInt16Type());
                }
            } else {
                this.assertColumn(sink, record, "id", this.getBooleanType());
                this.assertColumn(sink, record, "data", this.getBooleanType());
            }
        }, (rs, index) -> {
            switch (sink.getType()) {
                case DB2: 
                case POSTGRES: {
                    return rs.getBoolean(index) ? 1 : 0;
                }
            }
            return rs.getInt(index);
        });
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE}, reason="No TINYINT data type support")
    public void testTinyIntDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "tinyint", List.of(Integer.valueOf(10), Integer.valueOf(12)), record -> {
            boolean columnTypePropagated = source.getOptions().isColumnTypePropagated();
            this.assertColumn(sink, record, "id", this.getInt16Type());
            this.assertColumn(sink, record, "data", columnTypePropagated ? this.getInt8Type() : this.getInt16Type());
        }, ResultSet::getInt);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No TINYINT(n) data type support")
    public void testTinyIntWithSizeDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "tinyint(2)", List.of(Integer.valueOf(10), Integer.valueOf(12)), record -> {
            SourceConnectorOptions options = source.getOptions();
            boolean mysqlInt8 = SinkType.MYSQL.is(sink.getType()) && options.isColumnTypePropagated();
            this.assertColumn(sink, record, "id", this.getInt16Type());
            this.assertColumn(sink, record, "data", mysqlInt8 ? this.getInt8Type() : this.getInt16Type());
        }, ResultSet::getInt);
    }

    @TestTemplate
    public void testSmallIntDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "smallint", List.of(Integer.valueOf(10), Integer.valueOf(12)), record -> {
            if (source.getType().is(SourceType.ORACLE)) {
                this.assertColumn(sink, record, "id", this.getDecimalType(), this.getMaxDecimalPrecision(), 0);
                this.assertColumn(sink, record, "data", this.getDecimalType(), this.getMaxDecimalPrecision(), 0);
            } else {
                this.assertColumn(sink, record, "id", this.getInt16Type());
                this.assertColumn(sink, record, "data", this.getInt16Type());
            }
        }, ResultSet::getInt);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No SMALLINT(n) data type support")
    public void testSmallIntWithSizeDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "smallint(2)", List.of(Integer.valueOf(10), Integer.valueOf(12)), record -> {
            SourceConnectorOptions options = source.getOptions();
            this.assertColumn(sink, record, "id", this.getInt16Type());
            if (sink.getType().is(SinkType.ORACLE) && options.isColumnTypePropagated()) {
                this.assertColumn(sink, record, "data", this.getInt16Type(), 2);
            } else {
                this.assertColumn(sink, record, "data", this.getInt16Type());
            }
        }, ResultSet::getInt);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No SMALLSERIAL data type support")
    public void testSmallSerialDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "smallserial", List.of(Integer.valueOf(10), Integer.valueOf(12)), record -> {
            SourceConnectorOptions options = source.getOptions();
            this.assertColumn(sink, record, "id", this.getInt16Type());
            if (sink.getType().is(SinkType.POSTGRES) && options.isColumnTypePropagated()) {
                this.assertColumn(sink, record, "data", "SMALLSERIAL");
            } else {
                this.assertColumn(sink, record, "data", this.getInt16Type());
            }
        }, ResultSet::getInt);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No SERIAL data type support")
    public void testSerialDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "serial", List.of(Integer.valueOf(10), Integer.valueOf(12)), record -> {
            SourceConnectorOptions options = source.getOptions();
            this.assertColumn(sink, record, "id", this.getInt32Type());
            if (sink.getType().is(SinkType.POSTGRES) && options.isColumnTypePropagated()) {
                this.assertColumn(sink, record, "data", "SERIAL");
            } else {
                this.assertColumn(sink, record, "data", this.getInt32Type());
            }
        }, ResultSet::getInt);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No BIGSERIAL data type support")
    public void testBigSerialDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "bigserial", List.of(Integer.valueOf(10), Integer.valueOf(12)), record -> {
            SourceConnectorOptions options = source.getOptions();
            this.assertColumn(sink, record, "id", this.getInt64Type());
            if (sink.getType().is(SinkType.POSTGRES) && options.isColumnTypePropagated()) {
                this.assertColumn(sink, record, "data", "BIGSERIAL");
            } else {
                this.assertColumn(sink, record, "data", this.getInt64Type());
            }
        }, ResultSet::getInt);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No MEDIUMINT data type support")
    public void testMediumIntDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "mediumint", List.of(Integer.valueOf(10), Integer.valueOf(12)), record -> {
            SourceConnectorOptions options = source.getOptions();
            this.assertColumn(sink, record, "id", this.getInt32Type());
            if (sink.getType().is(SinkType.MYSQL) && options.isColumnTypePropagated()) {
                this.assertColumn(sink, record, "data", "MEDIUMINT");
            } else {
                this.assertColumn(sink, record, "data", this.getInt32Type());
            }
        }, ResultSet::getInt);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No MEDIUMINT(n) data type support")
    public void testMediumIntWithSizeDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "mediumint(2)", List.of(Integer.valueOf(10), Integer.valueOf(12)), record -> {
            SourceConnectorOptions options = source.getOptions();
            this.assertColumn(sink, record, "id", this.getInt32Type());
            if (sink.getType().is(SinkType.MYSQL) && options.isColumnTypePropagated()) {
                this.assertColumn(sink, record, "data", "MEDIUMINT");
            } else {
                this.assertColumn(sink, record, "data", this.getInt32Type());
            }
        }, ResultSet::getInt);
    }

    @TestTemplate
    public void testIntDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "int", List.of(Integer.valueOf(10), Integer.valueOf(12)), record -> {
            if (source.getType().is(SourceType.ORACLE)) {
                this.assertColumn(sink, record, "id", this.getDecimalType(), this.getMaxDecimalPrecision(), 0);
                this.assertColumn(sink, record, "data", this.getDecimalType(), this.getMaxDecimalPrecision(), 0);
            } else {
                this.assertColumn(sink, record, "id", this.getInt32Type());
                this.assertColumn(sink, record, "data", this.getInt32Type());
            }
        }, ResultSet::getInt);
    }

    @TestTemplate
    public void testIntegerDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "integer", List.of(Integer.valueOf(10), Integer.valueOf(12)), record -> {
            if (source.getType().is(SourceType.ORACLE)) {
                this.assertColumn(sink, record, "id", this.getDecimalType(), this.getMaxDecimalPrecision(), 0);
                this.assertColumn(sink, record, "data", this.getDecimalType(), this.getMaxDecimalPrecision(), 0);
            } else {
                this.assertColumn(sink, record, "id", this.getInt32Type());
                this.assertColumn(sink, record, "data", this.getInt32Type());
            }
        }, ResultSet::getInt);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No INTEGER(n) data type support")
    public void testIntegerWithSizeDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "integer(2)", List.of(Integer.valueOf(10), Integer.valueOf(12)), record -> {
            this.assertColumn(sink, record, "id", this.getInt32Type());
            this.assertColumn(sink, record, "data", this.getInt32Type());
        }, ResultSet::getInt);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.ORACLE}, reason="No BIGINT data type support")
    public void testBigIntDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "bigint", List.of(Integer.valueOf(10), Integer.valueOf(12)), record -> {
            this.assertColumn(sink, record, "id", this.getInt64Type());
            this.assertColumn(sink, record, "data", this.getInt64Type());
        }, ResultSet::getInt);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No BIGINT(n) data type support")
    public void testBigIntWithSizeDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "bigint(2)", List.of(Integer.valueOf(10), Integer.valueOf(12)), record -> {
            this.assertColumn(sink, record, "id", this.getInt64Type());
            this.assertColumn(sink, record, "data", this.getInt64Type());
        }, ResultSet::getInt);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No NUMBER data type support")
    public void testNumberDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "number", List.of(Integer.valueOf(10), Integer.valueOf(12)), record -> {
            this.assertColumn(sink, record, "id", this.getVariableScaleDecimalType());
            this.assertColumn(sink, record, "data", this.getVariableScaleDecimalType());
        }, ResultSet::getInt);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No NUMBER(n) data type support")
    public void testNumberWithPrecisionDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypes(source, sink, List.of("number(2)", "number(3)", "number(8)", "number(18)", "number(24)"), List.of(Integer.valueOf(10), Integer.valueOf(12), Integer.valueOf(14), Integer.valueOf(16), Integer.valueOf(18)), record -> {
            this.assertColumn(sink, record, "id0", this.getInt8Type());
            this.assertColumn(sink, record, "id1", this.getInt16Type());
            this.assertColumn(sink, record, "id2", this.getInt32Type());
            this.assertColumn(sink, record, "id3", this.getInt64Type());
            this.assertColumn(sink, record, "id4", this.getDecimalType(), 24, 0);
            this.assertColumn(sink, record, "data0", this.getInt8Type());
            this.assertColumn(sink, record, "data1", this.getInt16Type());
            this.assertColumn(sink, record, "data2", this.getInt32Type());
            this.assertColumn(sink, record, "data3", this.getInt64Type());
            this.assertColumn(sink, record, "data4", this.getDecimalType(), 24, 0);
        }, ResultSet::getInt);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No NUMBER(n,s) data type support")
    public void testNumberWithPrecisionAndScaleDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypes(source, sink, List.of("number(2,1)", "number(3,1)", "number(8,1)", "number(18,1)", "number(24,1)"), List.of(Double.valueOf(1.0), Double.valueOf(10.0), Double.valueOf(11.0), Double.valueOf(12.0), Double.valueOf(13.0)), record -> {
            this.assertColumn(sink, record, "id0", this.getDecimalType(), 2, 1);
            this.assertColumn(sink, record, "id1", this.getDecimalType(), 3, 1);
            this.assertColumn(sink, record, "id2", this.getDecimalType(), 8, 1);
            this.assertColumn(sink, record, "id3", this.getDecimalType(), 18, 1);
            this.assertColumn(sink, record, "id4", this.getDecimalType(), 24, 1);
            this.assertColumn(sink, record, "data0", this.getDecimalType(), 2, 1);
            this.assertColumn(sink, record, "data1", this.getDecimalType(), 3, 1);
            this.assertColumn(sink, record, "data2", this.getDecimalType(), 8, 1);
            this.assertColumn(sink, record, "data3", this.getDecimalType(), 18, 1);
            this.assertColumn(sink, record, "data4", this.getDecimalType(), 24, 1);
        }, ResultSet::getDouble);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No NUMBER(n,s) negative scale data type support")
    public void testNumberWithPrecisionAndNegativeScaleDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypes(source, sink, List.of("number(2,-1)", "number(3,-1)", "number(8,-1)", "number(18,-1)", "number(24,-3)"), List.of(Long.valueOf(1L), Long.valueOf(111L), Long.valueOf(11111111L), Long.valueOf(111111111111111111L), Long.valueOf(111111111111111111L)), List.of(Long.valueOf(0L), Long.valueOf(110L), Long.valueOf(11111110L), Long.valueOf(111111111111111110L), Long.valueOf(111111111111111000L)), record -> {
            boolean mysqlSink = sink.getType().is(SinkType.MYSQL);
            this.assertColumn(sink, record, "id0", mysqlSink ? this.getInt16Type() : this.getInt8Type());
            this.assertColumn(sink, record, "id1", this.getInt16Type());
            this.assertColumn(sink, record, "id2", this.getInt32Type());
            this.assertColumn(sink, record, "data0", mysqlSink ? this.getInt16Type() : this.getInt8Type());
            this.assertColumn(sink, record, "data1", this.getInt16Type());
            this.assertColumn(sink, record, "data2", this.getInt32Type());
            if (SinkType.ORACLE.is(sink.getType())) {
                this.assertColumn(sink, record, "id3", this.getDecimalType(), 18, -1);
                this.assertColumn(sink, record, "id4", this.getDecimalType(), 24, -3);
                this.assertColumn(sink, record, "data3", this.getDecimalType(), 18, -1);
                this.assertColumn(sink, record, "data4", this.getDecimalType(), 24, -3);
            } else {
                this.assertColumn(sink, record, "id3", this.getDecimalType(), 18, 0);
                this.assertColumn(sink, record, "id4", this.getDecimalType(), 24, 0);
                this.assertColumn(sink, record, "data3", this.getDecimalType(), 18, 0);
                this.assertColumn(sink, record, "data4", this.getDecimalType(), 24, 0);
            }
        }, ResultSet::getLong);
    }

    @TestTemplate
    public void testNumericDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "numeric", List.of(Integer.valueOf(10), Integer.valueOf(12)), record -> {
            if (source.getType().is(SourceType.POSTGRES)) {
                this.assertColumn(sink, record, "id", this.getVariableScaleDecimalType());
                this.assertColumn(sink, record, "data", this.getVariableScaleDecimalType());
            } else {
                this.assertColumn(sink, record, "id", this.getDecimalType());
                this.assertColumn(sink, record, "data", this.getDecimalType());
            }
        }, ResultSet::getInt);
    }

    @TestTemplate
    public void testNumericWithPrecisionDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypes(source, sink, List.of("numeric(2)", "numeric(3)", "numeric(8)", "numeric(18)", "numeric(24)"), List.of(Long.valueOf(10L), Long.valueOf(11L), Long.valueOf(12L), Long.valueOf(13L), Long.valueOf(14L)), record -> {
            if (SourceType.ORACLE.is(source.getType())) {
                this.assertColumn(sink, record, "id0", this.getInt8Type());
                this.assertColumn(sink, record, "id1", this.getInt16Type());
                this.assertColumn(sink, record, "id2", this.getInt32Type());
                this.assertColumn(sink, record, "id3", this.getInt64Type());
                this.assertColumn(sink, record, "id4", this.getDecimalType(), 24, 0);
                this.assertColumn(sink, record, "data0", this.getInt8Type());
                this.assertColumn(sink, record, "data1", this.getInt16Type());
                this.assertColumn(sink, record, "data2", this.getInt32Type());
                this.assertColumn(sink, record, "data3", this.getInt64Type());
                this.assertColumn(sink, record, "data4", this.getDecimalType(), 24, 0);
            } else {
                this.assertColumn(sink, record, "id0", this.getDecimalType(), 2, 0);
                this.assertColumn(sink, record, "id1", this.getDecimalType(), 3, 0);
                this.assertColumn(sink, record, "id2", this.getDecimalType(), 8, 0);
                this.assertColumn(sink, record, "id3", this.getDecimalType(), 18, 0);
                this.assertColumn(sink, record, "id4", this.getDecimalType(), 24, 0);
                this.assertColumn(sink, record, "data0", this.getDecimalType(), 2, 0);
                this.assertColumn(sink, record, "data1", this.getDecimalType(), 3, 0);
                this.assertColumn(sink, record, "data2", this.getDecimalType(), 8, 0);
                this.assertColumn(sink, record, "data3", this.getDecimalType(), 18, 0);
                this.assertColumn(sink, record, "data4", this.getDecimalType(), 24, 0);
            }
        }, ResultSet::getLong);
    }

    @TestTemplate
    public void testNumericWithPrecisionAndScaleDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypes(source, sink, List.of("numeric(2,1)", "numeric(3,1)", "numeric(8,1)", "numeric(18,1)", "numeric(24,1)"), List.of(Double.valueOf(1.0), Double.valueOf(10.0), Double.valueOf(11.0), Double.valueOf(12.0), Double.valueOf(13.0)), record -> {
            this.assertColumn(sink, record, "id0", this.getDecimalType(), 2, 1);
            this.assertColumn(sink, record, "id1", this.getDecimalType(), 3, 1);
            this.assertColumn(sink, record, "id2", this.getDecimalType(), 8, 1);
            this.assertColumn(sink, record, "id3", this.getDecimalType(), 18, 1);
            this.assertColumn(sink, record, "id4", this.getDecimalType(), 24, 1);
            this.assertColumn(sink, record, "data0", this.getDecimalType(), 2, 1);
            this.assertColumn(sink, record, "data1", this.getDecimalType(), 3, 1);
            this.assertColumn(sink, record, "data2", this.getDecimalType(), 8, 1);
            this.assertColumn(sink, record, "data3", this.getDecimalType(), 18, 1);
            this.assertColumn(sink, record, "data4", this.getDecimalType(), 24, 1);
        }, ResultSet::getDouble);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No NUMERIC(n,s) negative scale data type support")
    public void testNumericWithPrecisionAndNegativeScaleDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypes(source, sink, List.of("numeric(2,-1)", "numeric(3,-1)", "numeric(8,-1)", "numeric(18,-1)", "numeric(24,-3)"), List.of(Long.valueOf(1L), Long.valueOf(111L), Long.valueOf(11111111L), Long.valueOf(111111111111111111L), Long.valueOf(111111111111111111L)), List.of(Long.valueOf(0L), Long.valueOf(110L), Long.valueOf(11111110L), Long.valueOf(111111111111111110L), Long.valueOf(111111111111111000L)), record -> {
            boolean oracleSink = sink.getType().is(SinkType.ORACLE);
            boolean mysqlSink = sink.getType().is(SinkType.MYSQL);
            this.assertColumn(sink, record, "id0", mysqlSink ? this.getInt16Type() : this.getInt8Type());
            this.assertColumn(sink, record, "id1", this.getInt16Type());
            this.assertColumn(sink, record, "id2", this.getInt32Type());
            this.assertColumn(sink, record, "id3", this.getDecimalType(), 18, oracleSink ? -1 : 0);
            this.assertColumn(sink, record, "id4", this.getDecimalType(), 24, oracleSink ? -3 : 0);
            this.assertColumn(sink, record, "data0", mysqlSink ? this.getInt16Type() : this.getInt8Type());
            this.assertColumn(sink, record, "data1", this.getInt16Type());
            this.assertColumn(sink, record, "data2", this.getInt32Type());
            this.assertColumn(sink, record, "data3", this.getDecimalType(), 18, oracleSink ? -1 : 0);
            this.assertColumn(sink, record, "data4", this.getDecimalType(), 24, oracleSink ? -3 : 0);
        }, ResultSet::getLong);
    }

    @TestTemplate
    public void testDecimalDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "decimal", List.of(Integer.valueOf(10), Integer.valueOf(12)), record -> {
            if (source.getType().is(SourceType.POSTGRES)) {
                this.assertColumn(sink, record, "id", this.getVariableScaleDecimalType());
                this.assertColumn(sink, record, "data", this.getVariableScaleDecimalType());
            } else {
                this.assertColumn(sink, record, "id", this.getDecimalType());
                this.assertColumn(sink, record, "data", this.getDecimalType());
            }
        }, ResultSet::getInt);
    }

    @TestTemplate
    public void testDecimalWithPrecisionDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypes(source, sink, List.of("decimal(2)", "decimal(3)", "decimal(8)", "decimal(18)", "decimal(24)"), List.of(Long.valueOf(10L), Long.valueOf(11L), Long.valueOf(12L), Long.valueOf(13L), Long.valueOf(14L)), record -> {
            if (SourceType.ORACLE.is(source.getType())) {
                this.assertColumn(sink, record, "id0", this.getInt8Type());
                this.assertColumn(sink, record, "id1", this.getInt16Type());
                this.assertColumn(sink, record, "id2", this.getInt32Type());
                this.assertColumn(sink, record, "id3", this.getInt64Type());
                this.assertColumn(sink, record, "id4", this.getDecimalType(), 24, 0);
                this.assertColumn(sink, record, "data0", this.getInt8Type());
                this.assertColumn(sink, record, "data1", this.getInt16Type());
                this.assertColumn(sink, record, "data2", this.getInt32Type());
                this.assertColumn(sink, record, "data3", this.getInt64Type());
                this.assertColumn(sink, record, "data4", this.getDecimalType(), 24, 0);
            } else {
                this.assertColumn(sink, record, "id0", this.getDecimalType(), 2, 0);
                this.assertColumn(sink, record, "id1", this.getDecimalType(), 3, 0);
                this.assertColumn(sink, record, "id2", this.getDecimalType(), 8, 0);
                this.assertColumn(sink, record, "id3", this.getDecimalType(), 18, 0);
                this.assertColumn(sink, record, "id4", this.getDecimalType(), 24, 0);
                this.assertColumn(sink, record, "data0", this.getDecimalType(), 2, 0);
                this.assertColumn(sink, record, "data1", this.getDecimalType(), 3, 0);
                this.assertColumn(sink, record, "data2", this.getDecimalType(), 8, 0);
                this.assertColumn(sink, record, "data3", this.getDecimalType(), 18, 0);
                this.assertColumn(sink, record, "data4", this.getDecimalType(), 24, 0);
            }
        }, ResultSet::getLong);
    }

    @TestTemplate
    public void testDecimalWithPrecisionAndScaleDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypes(source, sink, List.of("decimal(2,1)", "decimal(3,1)", "decimal(8,1)", "decimal(18,1)", "decimal(24,1)"), List.of(Double.valueOf(1.0), Double.valueOf(10.0), Double.valueOf(11.0), Double.valueOf(12.0), Double.valueOf(13.0)), record -> {
            this.assertColumn(sink, record, "id0", this.getDecimalType(), 2, 1);
            this.assertColumn(sink, record, "id1", this.getDecimalType(), 3, 1);
            this.assertColumn(sink, record, "id2", this.getDecimalType(), 8, 1);
            this.assertColumn(sink, record, "id3", this.getDecimalType(), 18, 1);
            this.assertColumn(sink, record, "id4", this.getDecimalType(), 24, 1);
            this.assertColumn(sink, record, "data0", this.getDecimalType(), 2, 1);
            this.assertColumn(sink, record, "data1", this.getDecimalType(), 3, 1);
            this.assertColumn(sink, record, "data2", this.getDecimalType(), 8, 1);
            this.assertColumn(sink, record, "data3", this.getDecimalType(), 18, 1);
            this.assertColumn(sink, record, "data4", this.getDecimalType(), 24, 1);
        }, ResultSet::getDouble);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No DECIMAL(n,s) negative scale data type support")
    public void testDecimalWithPrecisionAndNegativeScaleDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypes(source, sink, List.of("decimal(2,-1)", "decimal(3,-1)", "decimal(8,-1)", "decimal(18,-1)", "decimal(24,-3)"), List.of(Long.valueOf(1L), Long.valueOf(111L), Long.valueOf(11111111L), Long.valueOf(111111111111111111L), Long.valueOf(111111111111111111L)), List.of(Long.valueOf(0L), Long.valueOf(110L), Long.valueOf(11111110L), Long.valueOf(111111111111111110L), Long.valueOf(111111111111111000L)), record -> {
            boolean mysqlSink = sink.getType().is(SinkType.MYSQL);
            this.assertColumn(sink, record, "id0", mysqlSink ? this.getInt16Type() : this.getInt8Type());
            this.assertColumn(sink, record, "id1", this.getInt16Type());
            this.assertColumn(sink, record, "id2", this.getInt32Type());
            this.assertColumn(sink, record, "data0", mysqlSink ? this.getInt16Type() : this.getInt8Type());
            this.assertColumn(sink, record, "data1", this.getInt16Type());
            this.assertColumn(sink, record, "data2", this.getInt32Type());
            if (SinkType.ORACLE.is(sink.getType())) {
                this.assertColumn(sink, record, "id3", this.getDecimalType(), 18, -1);
                this.assertColumn(sink, record, "id4", this.getDecimalType(), 24, -3);
                this.assertColumn(sink, record, "data3", this.getDecimalType(), 18, -1);
                this.assertColumn(sink, record, "data4", this.getDecimalType(), 24, -3);
            } else {
                this.assertColumn(sink, record, "id3", this.getDecimalType(), 18, 0);
                this.assertColumn(sink, record, "id4", this.getDecimalType(), 24, 0);
                this.assertColumn(sink, record, "data3", this.getDecimalType(), 18, 0);
                this.assertColumn(sink, record, "data4", this.getDecimalType(), 24, 0);
            }
        }, ResultSet::getLong);
    }

    @TestTemplate
    public void testRealDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "real", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), (ConnectorConfiguration config) -> this.applyJdbcSourceConverter(source, config, null, ".*id|.*data", null), (SinkRecord record) -> {
            String expectedType = source.getType().is(SourceType.ORACLE, SourceType.MYSQL) ? this.getFloat64Type() : this.getFloat32Type();
            this.assertColumn(sink, record, "id", expectedType);
            this.assertColumn(sink, record, "data", expectedType);
        }, ResultSet::getFloat);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="Applies to MySQL JDBC custom converter")
    public void testRealDataTypeTreatAsFloat(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "real", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), (ConnectorConfiguration config) -> {
            this.applyJdbcSourceConverter(source, config, null, ".*id|.*data", null);
            config.with("jdbc-sink.treat.real.as.double", "false");
        }, (SinkRecord record) -> {
            String expectedType = source.getType().is(SourceType.ORACLE) ? this.getFloat64Type() : this.getFloat32Type();
            this.assertColumn(sink, record, "id", expectedType);
            this.assertColumn(sink, record, "data", expectedType);
        }, ResultSet::getFloat);
    }

    @TestTemplate
    @Disabled(value="Not supported by any of our current source connectors")
    public void testRealWithPrecisionDataType(Source source, Sink sink) throws Exception {
        throw new IllegalStateException("Not yet implemented");
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No REAL(p,s) data type support")
    public void testRealWithPrecisionAndScaleDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "real(4, 2)", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), (ConnectorConfiguration config) -> this.applyJdbcSourceConverter(source, config, null, ".*id|.*data", null), (SinkRecord record) -> {
            this.assertColumn(sink, record, "id", this.getFloat64Type());
            this.assertColumn(sink, record, "data", this.getFloat64Type());
        }, ResultSet::getFloat);
    }

    @TestTemplate
    public void testFloatDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "float", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), record -> {
            String expectedType = !source.getType().is(SourceType.MYSQL) ? this.getFloat64Type() : this.getFloat32Type();
            this.assertColumn(sink, record, "id", expectedType);
            this.assertColumn(sink, record, "data", expectedType);
        }, ResultSet::getFloat);
    }

    @TestTemplate
    public void testFloatWithPrecisionDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "float(8)", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), record -> {
            String expectedType = source.getType().is(SourceType.ORACLE) ? this.getFloat64Type() : this.getFloat32Type();
            this.assertColumn(sink, record, "id", expectedType);
            this.assertColumn(sink, record, "data", expectedType);
        }, ResultSet::getFloat);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No FLOAT(p,s) data type support")
    public void testFloatWithPrecisionAndScaleDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "float(4, 2)", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), record -> {
            this.assertColumn(sink, record, "id", this.getFloat64Type());
            this.assertColumn(sink, record, "data", this.getFloat64Type());
        }, ResultSet::getFloat);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No DOUBLE data type support")
    public void testDoubleDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "double", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), record -> {
            this.assertColumn(sink, record, "id", this.getFloat64Type());
            this.assertColumn(sink, record, "data", this.getFloat64Type());
        }, ResultSet::getFloat);
    }

    @TestTemplate
    @Disabled(value="Not supported by any of our currently supported source databases")
    public void testDoubleWithPrecisionDataType(Source source, Sink sink) throws Exception {
        throw new IllegalStateException("Not yet implemented");
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No DOUBLE(p,s) data type support")
    public void testDoubleWithPrecisionAndScaleDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "double(4, 2)", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), record -> {
            this.assertColumn(sink, record, "id", this.getFloat64Type());
            this.assertColumn(sink, record, "data", this.getFloat64Type());
        }, ResultSet::getFloat);
    }

    @TestTemplate
    public void testDoublePrecisionDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "double precision", List.of(Double.valueOf(3.14), Double.valueOf(3.14)), record -> {
            this.assertColumn(sink, record, "id", this.getFloat64Type());
            this.assertColumn(sink, record, "data", this.getFloat64Type());
        }, ResultSet::getDouble);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No BINARY_FLOAT data type support")
    public void testBinaryFloatDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "binary_float", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), record -> {
            this.assertColumn(sink, record, "id", this.getFloat32Type());
            this.assertColumn(sink, record, "data", this.getFloat32Type());
        }, ResultSet::getFloat);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No BINARY_DOUBLE data type support")
    public void testBinaryDoubleDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "binary_double", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), record -> {
            this.assertColumn(sink, record, "id", this.getFloat64Type());
            this.assertColumn(sink, record, "data", this.getFloat64Type());
        }, ResultSet::getFloat);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason="No SMALLMONEY data type support")
    public void testSmallMoneyDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "smallmoney", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), record -> {
            this.assertColumn(sink, record, "id", this.getDecimalType(), 10, 4);
            this.assertColumn(sink, record, "data", this.getDecimalType(), 10, 4);
        }, ResultSet::getFloat);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason="No MONEY data type support")
    public void testMoneyDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "money", List.of(Float.valueOf(3.14f), Float.valueOf(3.14f)), record -> {
            this.assertColumn(sink, record, "id", this.getDecimalType(), 19, 4);
            this.assertColumn(sink, record, "data", this.getDecimalType(), 19, 4);
        }, ResultSet::getFloat);
    }

    @TestTemplate
    public void testDecimalHandlingModeDouble(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "decimal", List.of(Float.valueOf(10.0f), Float.valueOf(12.0f)), (ConnectorConfiguration config) -> config.with("decimal.handling.mode", RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE.getValue()), (SinkRecord record) -> {
            this.assertColumn(sink, record, "id", this.getFloat64Type());
            this.assertColumn(sink, record, "data", this.getFloat64Type());
        }, ResultSet::getFloat);
    }

    @TestTemplate
    public void testDecimalHandlingModeString(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "decimal", List.of(Float.valueOf(10.0f), Float.valueOf(12.0f)), source.getType() == SourceType.POSTGRES ? List.of("10.0", "12.0") : List.of("10", "12"), config -> config.with("decimal.handling.mode", RelationalDatabaseConnectorConfig.DecimalHandlingMode.STRING.getValue()), record -> {
            switch (sink.getType()) {
                case MYSQL: {
                    this.assertColumn(sink, record, "id", "VARCHAR", 255);
                    this.assertColumn(sink, record, "data", "LONGTEXT", Integer.MAX_VALUE);
                    break;
                }
                case DB2: {
                    this.assertColumn(sink, record, "id", "VARCHAR", 512);
                    this.assertColumn(sink, record, "data", "CLOB");
                    break;
                }
                case ORACLE: {
                    this.assertColumn(sink, record, "id", "VARCHAR2", 4000);
                    this.assertColumn(sink, record, "data", "CLOB");
                    break;
                }
                case SQLSERVER: {
                    this.assertColumn(sink, record, "id", "VARCHAR", 900);
                    this.assertColumn(sink, record, "data", "VARCHAR", Integer.MAX_VALUE);
                    break;
                }
                case POSTGRES: {
                    this.assertColumn(sink, record, "id", "TEXT");
                    this.assertColumn(sink, record, "data", "TEXT");
                }
            }
        }, ResultSet::getString);
    }

    @TestTemplate
    public void testCharDataType(Source source, Sink sink) throws Exception {
        this.assertCharDataType(source, sink, "char", false);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.ORACLE}, reason="Awaiting the merging of DBZ-6221 upstream")
    public void testCharacterDataType(Source source, Sink sink) throws Exception {
        this.assertCharDataType(source, sink, "character", false);
    }

    @TestTemplate
    public void testCharWithLengthDataType(Source source, Sink sink) throws Exception {
        this.assertCharWithLengthDataType(source, sink, "char(5)", 5, false);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.ORACLE}, reason="Awaiting the merging of DBZ-6221 upstream")
    public void testCharacterWithLengthDataType(Source source, Sink sink) throws Exception {
        this.assertCharWithLengthDataType(source, sink, "character(5)", 5, false);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES}, reason="NCHAR is treated as CHAR as PostgreSQL does not use nationalized types")
    public void testNationalizedCharDataType(Source source, Sink sink) throws Exception {
        this.assertCharDataType(source, sink, "nchar", true);
    }

    @TestTemplate
    @SkipWhenSources(value={@SkipWhenSource(value={SourceType.POSTGRES}, reason="NCHARACTER is treated as CHAR as PostgreSQL does not use nationalized types"), @SkipWhenSource(value={SourceType.MYSQL, SourceType.ORACLE}, reason="NCHARACTER not a supported data type")})
    public void testNationalizedCharacterDataType(Source source, Sink sink) throws Exception {
        this.assertCharDataType(source, sink, "ncharacter", true);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES}, reason="NCHAR(n) is treated as CHAR(n) as PostgreSQL does not use nationalized types")
    public void testNationalizedCharWithLengthDataType(Source source, Sink sink) throws Exception {
        this.assertCharWithLengthDataType(source, sink, "nchar(5)", 5, true);
    }

    @TestTemplate
    @SkipWhenSources(value={@SkipWhenSource(value={SourceType.POSTGRES}, reason="NCHAR(n) is treated as CHAR(n) as PostgreSQL does not use nationalized types"), @SkipWhenSource(value={SourceType.MYSQL, SourceType.ORACLE}, reason="NCHARACTER(n) not a supported data type")})
    public void testNationalizedCharacterWithLengthDataType(Source source, Sink sink) throws Exception {
        this.assertCharWithLengthDataType(source, sink, "ncharacter(5)", 5, true);
    }

    @TestTemplate
    public void testVarcharDataType(Source source, Sink sink) throws Exception {
        this.assertVarcharDataType(source, sink, "varchar(25)", 25, false);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No VARCHAR2(n) data type support")
    public void testVarchar2DataType(Source source, Sink sink) throws Exception {
        this.assertVarcharDataType(source, sink, "varchar2(25)", 25, false);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE}, reason="No NVARCHAR(n) data type support")
    public void testNVarcharDataType(Source source, Sink sink) throws Exception {
        this.assertVarcharDataType(source, sink, "nvarchar(25)", 25, true);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No NVARCHAR2(n) data type support")
    public void testNVarchar2DataType(Source source, Sink sink) throws Exception {
        this.assertVarcharDataType(source, sink, "nvarchar2(25)", 25, true);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.ORACLE}, reason="Awaiting the merging of DBZ-6221 upstream")
    public void testCharacterVaryingDataType(Source source, Sink sink) throws Exception {
        this.assertVarcharDataType(source, sink, "character varying(25)", 25, false);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason="No NCHARACTER VARYING(n) data type support")
    public void testNCharacterVaryingDataType(Source source, Sink sink) throws Exception {
        this.assertVarcharDataType(source, sink, "ncharacter varying(25)", 25, true);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No TINYTEXT data type support")
    public void testTinyTextDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "tinytext", List.of("'hello world'"), List.of("hello world"), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTextType()), ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No MEDIUMTEXT data type support")
    public void testMediumTextDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "mediumtext", List.of("'hello world'"), List.of("hello world"), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTextType()), ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No MEDIUMTEXT data type support")
    public void testLongTextDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "longtext", List.of("'hello world'"), List.of("hello world"), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTextType()), ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.ORACLE}, reason="No TEXT data type support")
    public void testTextDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "text", List.of("'hello world'"), List.of("hello world"), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTextType()), ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason="No NTEXT data type support")
    public void testNTextDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "ntext", List.of("'hello world'"), List.of("hello world"), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTextType()), ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No CLOB data type support")
    public void testClobDataType(Source source, Sink sink) throws Exception {
        String data = RandomStringUtils.randomAlphanumeric((int)65536);
        this.assertDataTypeNonKeyOnly(source, sink, "clob", (PreparedStatement ps, int index) -> {
            Clob clob = ps.getConnection().createClob();
            clob.setString(1L, data);
            ps.setClob(index, clob);
        }, List.of(data), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTextType()), ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No CLOB data type support")
    public void testClobDataTypeWithUpsert(Source source, Sink sink) throws Exception {
        String data = RandomStringUtils.randomAlphanumeric((int)65536);
        this.assertDataTypeNonKeyOnly(source, sink, "clob", (PreparedStatement ps, int index) -> {
            Clob clob = ps.getConnection().createClob();
            clob.setString(1L, data);
            ps.setClob(index, clob);
        }, List.of(data), (ConnectorConfiguration config) -> {
            config.with("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
            config.with("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
        }, (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTextType()), ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No NCLOB data type support")
    public void testNClobDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "nclob", List.of("'hello world'"), List.of("hello world"), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTextType()), ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE}, reason="No BINARY(n) data type support")
    public void testBinaryDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "binary(15)", List.of(this.binaryValue(source, "binary(15)", "'hello world'")), List.of(this.byteArrayPadded("hello world", 15)), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getBinaryType(source, "binary")), ResultSet::getBytes);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE}, reason="No VARBINARY(n) data type support")
    public void testVarBinaryDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "varbinary(15)", List.of(this.binaryValue(source, "varbinary(15)", "'hello world'")), List.of("hello world".getBytes(StandardCharsets.UTF_8)), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getBinaryType(source, "varbinary")), ResultSet::getBytes);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No TINYBLOB data type support")
    public void testTinyBlobDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "tinyblob", List.of("'hello world'"), List.of("hello world".getBytes(StandardCharsets.UTF_8)), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getBinaryType(source, "tinyblob")), ResultSet::getBytes);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No MEDIUMBLOB data type support")
    public void testMediumBlobDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "mediumblob", List.of("'hello world'"), List.of("hello world".getBytes(StandardCharsets.UTF_8)), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getBinaryType(source, "mediumblob")), ResultSet::getBytes);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No LONGBLOB data type support")
    public void testLongBlobDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "longblob", List.of("'hello world'"), List.of("hello world".getBytes(StandardCharsets.UTF_8)), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getBinaryType(source, "longblob")), ResultSet::getBytes);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No BLOB data type support")
    public void testBlobDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "blob", List.of(source.getType().is(SourceType.ORACLE) ? "UTL_RAW.CAST_TO_RAW('hello world')" : "'hello world'"), List.of("hello world".getBytes(StandardCharsets.UTF_8)), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getBinaryType(source, "blob")), ResultSet::getBytes);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="Only a single source is needed")
    public void testBinaryHandlingModeBase64(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "varbinary(35)", List.of(this.binaryValue(source, "varbinary(35)", "'hello world'")), List.of(Base64.getEncoder().encodeToString("hello world".getBytes(StandardCharsets.UTF_8))), (ConnectorConfiguration config) -> config.with("binary.handling.mode", CommonConnectorConfig.BinaryHandlingMode.BASE64.getValue()), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getStringType(source, false, false, true)), ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="Only a single source is needed")
    public void testBinaryHandlingModeBase64UrlSafe(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "varbinary(35)", List.of(this.binaryValue(source, "varbinary(35)", "'hello world'")), List.of(Base64.getUrlEncoder().encodeToString("hello world".getBytes(StandardCharsets.UTF_8))), (ConnectorConfiguration config) -> config.with("binary.handling.mode", CommonConnectorConfig.BinaryHandlingMode.BASE64_URL_SAFE.getValue()), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getStringType(source, false, false, true)), ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="Only a single source is needed")
    public void testBinaryHandlingModeHex(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "varbinary(35)", List.of(this.binaryValue(source, "varbinary(35)", "'hello world'")), List.of(HexConverter.convertToHexString((byte[])"hello world".getBytes(StandardCharsets.UTF_8))), (ConnectorConfiguration config) -> config.with("binary.handling.mode", CommonConnectorConfig.BinaryHandlingMode.HEX.getValue()), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getStringType(source, false, false, true)), ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.ORACLE, SourceType.SQLSERVER}, reason="No JSON data type support")
    public void testJsonDataType(Source source, Sink sink) throws Exception {
        String json = "{\"key\": \"value\"}";
        this.assertDataTypeNonKeyOnly(source, sink, "json", List.of(String.format("'%s'", "{\"key\": \"value\"}")), List.of(new ObjectMapper().readTree("{\"key\": \"value\"}")), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getJsonType(source)), (ResultSet rs, int index) -> new ObjectMapper().readTree(rs.getString(index)));
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No JSONB data type support")
    public void testJsonbDataType(Source source, Sink sink) throws Exception {
        String json = "{\"key\": \"value\"}";
        this.assertDataTypeNonKeyOnly(source, sink, "jsonb", List.of(String.format("'%s'", "{\"key\": \"value\"}")), List.of(new ObjectMapper().readTree("{\"key\": \"value\"}")), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getJsonbType(source)), (ResultSet rs, int index) -> new ObjectMapper().readTree(rs.getString(index)));
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.ORACLE}, reason="No XML data type support")
    public void testXmlDataType(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "xml", List.of("'<doc>abc</doc>'"), List.of("<doc>abc</doc>"), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getXmlType(source)), ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No UUID data type support")
    public void testUuidDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "uuid", List.of("'77412aae-c023-11ed-afa1-0242ac120002'", "'ed338923-f8ac-404c-87e7-e1ba5a122a12'"), List.of("77412aae-c023-11ed-afa1-0242ac120002", "ed338923-f8ac-404c-87e7-e1ba5a122a12"), (SinkRecord record) -> {
            this.assertColumn(sink, record, "id", this.getUuidType(source, true));
            this.assertColumn(sink, record, "data", this.getUuidType(source, false));
        }, ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.ORACLE, SourceType.SQLSERVER}, reason="No ENUM data type support")
    public void testEnumDataType(Source source, Sink sink) throws Exception {
        String enumDataType;
        if (SourceType.POSTGRES.is(source.getType())) {
            enumDataType = source.randomObjectName();
            source.execute(String.format("CREATE TYPE %s as ENUM ('apples', 'oranges')", enumDataType));
        } else {
            enumDataType = "enum('apples', 'oranges')";
        }
        this.assertDataType(source, sink, enumDataType, List.of("'apples'", "'oranges'"), List.of("apples", "oranges"), (SinkRecord record) -> {
            this.assertColumn(sink, record, "id", this.getEnumType(source, true));
            this.assertColumn(sink, record, "data", this.getEnumType(source, false));
        }, ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No SET data type support")
    public void testSetDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "set('apples','oranges')", List.of("'apples'", "'oranges'"), List.of("apples", "oranges"), (SinkRecord record) -> {
            this.assertColumn(sink, record, "id", this.getSetType(source, true));
            this.assertColumn(sink, record, "data", this.getSetType(source, false));
        }, ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No SET data type support")
    public void testYearDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "year", List.of(Integer.valueOf(1969), Integer.valueOf(2023)), record -> {
            if (SinkType.MYSQL.is(sink.getType())) {
                this.assertColumn(sink, record, "id", this.getYearType(), 4);
                this.assertColumn(sink, record, "data", this.getYearType(), 4);
            } else {
                this.assertColumn(sink, record, "id", this.getYearType());
                this.assertColumn(sink, record, "data", this.getYearType());
            }
        }, ResultSet::getInt);
    }

    @TestTemplate
    @WithTemporalPrecisionMode
    public void testDateDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "date", List.of(this.dateValue(source, 3, 1, 2023), this.dateValue(source, 5, 10, 2021)), List.of(Date.valueOf("2023-03-01"), Date.valueOf("2021-05-10")), (SinkRecord record) -> {
            if (SourceType.ORACLE.is(source.getType())) {
                this.assertColumn(sink, record, "id", this.getTimestampType(source, true, 6));
                this.assertColumn(sink, record, "data", this.getTimestampType(source, false, 6));
            } else {
                this.assertColumn(sink, record, "id", this.getDateType());
                this.assertColumn(sink, record, "data", this.getDateType());
            }
        }, ResultSet::getDate);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.ORACLE}, reason="No TIME data type support")
    @WithTemporalPrecisionMode
    public void testTimeDataType(Source source, Sink sink) throws Exception {
        boolean connect = source.getOptions().getTemporalPrecisionMode() == TemporalPrecisionMode.CONNECT;
        int nanoSeconds = connect ? 123000000 : 123456000;
        switch (source.getType()) {
            case MYSQL: {
                nanoSeconds = 0;
            }
        }
        switch (sink.getType()) {
            case MYSQL: {
                if (source.getType().is(SourceType.POSTGRES)) {
                    nanoSeconds = connect ? 123000000 : 123456000;
                    break;
                }
                if (source.getType().is(SourceType.SQLSERVER)) break;
                nanoSeconds = 0;
                break;
            }
            case DB2: {
                nanoSeconds = 0;
            }
        }
        this.assertDataType(source, sink, "time", List.of("'01:02:03.123456'", "'14:15:16.123456'"), List.of(OffsetTime.of(1, 2, 3, nanoSeconds, this.getCurrentSinkTimeOffset()), OffsetTime.of(14, 15, 16, nanoSeconds, this.getCurrentSinkTimeOffset())), (SinkRecord record) -> {
            this.assertColumn(sink, record, "id", this.getTimeType(source, true, 6));
            this.assertColumn(sink, record, "data", this.getTimeType(source, false, 6));
        }, this::getTimeAsOffsetTime);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.ORACLE}, reason="No TIME(n) data type support")
    @WithTemporalPrecisionMode
    public void testTimeWithPrecisionDataType(Source source, Sink sink) throws Exception {
        int nanoSeconds1;
        String ts0 = "'01:02:03.123456'";
        String ts1 = "'14:15:16.456789'";
        boolean isConnect = source.getOptions().getTemporalPrecisionMode() == TemporalPrecisionMode.CONNECT;
        int nanoSeconds0 = 123000000;
        int n = nanoSeconds1 = isConnect ? 456000000 : 456789000;
        if (sink.getType().is(SinkType.DB2)) {
            nanoSeconds0 = 0;
            nanoSeconds1 = 0;
        }
        List<OffsetTime> expectedValues = List.of(OffsetTime.of(1, 2, 3, nanoSeconds0, this.getCurrentSinkTimeOffset()), OffsetTime.of(14, 15, 16, nanoSeconds1, this.getCurrentSinkTimeOffset()), OffsetTime.of(1, 2, 3, nanoSeconds0, this.getCurrentSinkTimeOffset()), OffsetTime.of(14, 15, 16, nanoSeconds1, this.getCurrentSinkTimeOffset()));
        int time3Precision = sink.getType().is(SinkType.ORACLE) && !source.getOptions().isColumnTypePropagated() ? 6 : 3;
        this.assertDataTypes2(source, sink, List.of("time(3)", "time(6)"), List.of("'01:02:03.123456'", "'14:15:16.456789'"), expectedValues, record -> {
            this.assertColumn(sink, record, "id0", this.getTimeType(source, true, time3Precision));
            this.assertColumn(sink, record, "id1", this.getTimeType(source, true, 6));
            this.assertColumn(sink, record, "data0", this.getTimeType(source, false, time3Precision));
            this.assertColumn(sink, record, "data1", this.getTimeType(source, false, 6));
        }, this::getTimeAsOffsetTime);
    }

    @TestTemplate
    @SkipWhenSources(value={@SkipWhenSource(value={SourceType.ORACLE}, reason="No TIME(n) data type support"), @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES}, reason="Max TIME(n) precision is 6")})
    @WithTemporalPrecisionMode
    public void testNanoTimeDataType(Source source, Sink sink) throws Exception {
        int nanoSeconds;
        boolean connect = source.getOptions().getTemporalPrecisionMode() == TemporalPrecisionMode.CONNECT;
        int n = nanoSeconds = connect ? 456000000 : 456789000;
        if (sink.getType().is(SinkType.DB2)) {
            nanoSeconds = 0;
        }
        this.assertDataTypeNonKeyOnly(source, sink, "time(7)", List.of("'14:15:16.456789012'"), List.of(OffsetTime.of(14, 15, 16, nanoSeconds, this.getCurrentSinkTimeOffset())), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTimeType(source, false, 7)), this::getTimeAsOffsetTime);
    }

    @TestTemplate
    @SkipWhenSources(value={@SkipWhenSource(value={SourceType.SQLSERVER}, reason="TIMESTAMP is an internal type and isn't the same as TIMESTAMP(n)"), @SkipWhenSource(value={SourceType.MYSQL}, reason="MySQL emits timestamps as ZonedTimestamp types, tested separately")})
    @WithTemporalPrecisionMode
    public void testTimestampDataType(Source source, Sink sink) throws Exception {
        List<ZonedDateTime> timeValues = List.of(ZonedDateTime.of(2023, 5, 10, 16, 17, 18, 123456000, ZoneOffset.UTC), ZonedDateTime.of(2022, 12, 31, 14, 15, 16, 456789000, ZoneOffset.UTC));
        List<String> values = AbstractJdbcSinkPipelineIT.toTimestampStrings(source, timeValues);
        ArrayList<ZonedDateTime> expectedValues = new ArrayList<ZonedDateTime>();
        if (TemporalPrecisionMode.CONNECT == source.getOptions().getTemporalPrecisionMode()) {
            expectedValues.add(timeValues.get(0).with(ChronoField.NANO_OF_SECOND, 123000000L).withZoneSameLocal(SINK_ZONE_ID));
            expectedValues.add(timeValues.get(1).with(ChronoField.NANO_OF_SECOND, 456000000L).withZoneSameLocal(SINK_ZONE_ID));
        } else {
            expectedValues.addAll(timeValues.stream().map(v -> v.withZoneSameLocal(SINK_ZONE_ID)).collect(Collectors.toList()));
        }
        this.assertDataType(source, sink, "timestamp", values, expectedValues, (SinkRecord record) -> {
            this.assertColumn(sink, record, "id", this.getTimestampType(source, true, 6));
            this.assertColumn(sink, record, "data", this.getTimestampType(source, false, 6));
        }, this::getTimestampAsZonedDateTime);
    }

    @TestTemplate
    @SkipWhenSources(value={@SkipWhenSource(value={SourceType.SQLSERVER}, reason="No TIMESTAMP(n) data type support"), @SkipWhenSource(value={SourceType.MYSQL}, reason="MySQL emits timestamps as ZonedTimestamp types, tested separately")})
    @WithTemporalPrecisionMode
    public void testTimestampWithPrecisionDataType(Source source, Sink sink) throws Exception {
        ZonedDateTime timeValue = ZonedDateTime.of(2022, 12, 31, 14, 15, 16, 456789000, ZoneOffset.UTC);
        String value = AbstractJdbcSinkPipelineIT.toTimestampStrings(source, List.of(timeValue)).get(0);
        ArrayList<ZonedDateTime> expectedValues = new ArrayList<ZonedDateTime>();
        expectedValues.add(timeValue.with(ChronoField.NANO_OF_SECOND, 500000000L).withZoneSameLocal(SINK_ZONE_ID));
        expectedValues.add(timeValue.with(ChronoField.NANO_OF_SECOND, 460000000L).withZoneSameLocal(SINK_ZONE_ID));
        expectedValues.add(timeValue.with(ChronoField.NANO_OF_SECOND, 457000000L).withZoneSameLocal(SINK_ZONE_ID));
        if (TemporalPrecisionMode.CONNECT == source.getOptions().getTemporalPrecisionMode()) {
            long nanos = 456000000L;
            expectedValues.add(timeValue.with(ChronoField.NANO_OF_SECOND, 456000000L).withZoneSameLocal(SINK_ZONE_ID));
            expectedValues.add(timeValue.with(ChronoField.NANO_OF_SECOND, 456000000L).withZoneSameLocal(SINK_ZONE_ID));
            expectedValues.add(timeValue.with(ChronoField.NANO_OF_SECOND, 456000000L).withZoneSameLocal(SINK_ZONE_ID));
        } else {
            expectedValues.add(timeValue.with(ChronoField.NANO_OF_SECOND, 456800000L).withZoneSameLocal(SINK_ZONE_ID));
            expectedValues.add(timeValue.with(ChronoField.NANO_OF_SECOND, 456790000L).withZoneSameLocal(SINK_ZONE_ID));
            expectedValues.add(timeValue.with(ChronoField.NANO_OF_SECOND, 456789000L).withZoneSameLocal(SINK_ZONE_ID));
        }
        this.assertDataTypes(source, sink, List.of("timestamp(1)", "timestamp(2)", "timestamp(3)", "timestamp(4)", "timestamp(5)", "timestamp(6)"), List.of(value, value, value, value, value, value), expectedValues, record -> {
            this.assertColumn(sink, record, "id0", this.getTimestampType(source, true, 1));
            this.assertColumn(sink, record, "id1", this.getTimestampType(source, true, 2));
            this.assertColumn(sink, record, "id2", this.getTimestampType(source, true, 3));
            this.assertColumn(sink, record, "id3", this.getTimestampType(source, true, 4));
            this.assertColumn(sink, record, "id4", this.getTimestampType(source, true, 5));
            this.assertColumn(sink, record, "id5", this.getTimestampType(source, true, 6));
            this.assertColumn(sink, record, "data0", this.getTimestampType(source, false, 1));
            this.assertColumn(sink, record, "data1", this.getTimestampType(source, false, 2));
            this.assertColumn(sink, record, "data2", this.getTimestampType(source, false, 3));
            this.assertColumn(sink, record, "data3", this.getTimestampType(source, false, 4));
            this.assertColumn(sink, record, "data4", this.getTimestampType(source, false, 5));
            this.assertColumn(sink, record, "data5", this.getTimestampType(source, false, 6));
        }, this::getTimestampAsZonedDateTime);
    }

    @TestTemplate
    @ForSource(value={SourceType.MYSQL}, reason="MySQL emits TIMESTAMP(p) as ZonedTimestamp")
    @WithTemporalPrecisionMode
    public void testTimestampDataTypeAsZonedTimestampType(Source source, Sink sink) throws Exception {
        List<ZonedDateTime> timeValues = List.of(ZonedDateTime.of(2023, 5, 10, 16, 17, 18, 123456000, ZoneOffset.UTC), ZonedDateTime.of(2022, 12, 31, 14, 15, 16, 456789000, ZoneOffset.UTC));
        List<String> values = AbstractJdbcSinkPipelineIT.toTimestampStrings(source, timeValues.stream().map(v -> v.withZoneSameInstant(SOURCE_ZONE_ID)).collect(Collectors.toList()));
        List expectedValues = timeValues.stream().map(v -> v.with(ChronoField.NANO_OF_SECOND, 0L)).collect(Collectors.toList());
        this.assertDataTypesNonKeyOnly(source, sink, List.of("timestamp", "timestamp"), values, expectedValues, record -> {
            this.assertColumn(sink, record, "data0", this.getTimestampWithTimezoneType(source, false, 6));
            this.assertColumn(sink, record, "data1", this.getTimestampWithTimezoneType(source, false, 6));
        }, (rs, index) -> rs.getTimestamp(index).toInstant().atZone(ZoneOffset.UTC));
    }

    @TestTemplate
    @ForSource(value={SourceType.MYSQL}, reason="MySQL emits TIMESTAMP(p) as ZonedTimestamp")
    @WithTemporalPrecisionMode
    public void testTimestampWithPrecisionDataTypeAsZonedTimestampType(Source source, Sink sink) throws Exception {
        ZonedDateTime timeValue = ZonedDateTime.of(2022, 12, 31, 14, 15, 16, 456789000, ZoneOffset.UTC);
        String value = AbstractJdbcSinkPipelineIT.toTimestampStrings(source, List.of(timeValue.withZoneSameInstant(SOURCE_ZONE_ID))).get(0);
        ArrayList<ZonedDateTime> expectedValues = new ArrayList<ZonedDateTime>();
        expectedValues.add(timeValue.with(ChronoField.NANO_OF_SECOND, 500000000L));
        expectedValues.add(timeValue.with(ChronoField.NANO_OF_SECOND, 460000000L));
        expectedValues.add(timeValue.with(ChronoField.NANO_OF_SECOND, 457000000L));
        expectedValues.add(timeValue.with(ChronoField.NANO_OF_SECOND, 456800000L));
        expectedValues.add(timeValue.with(ChronoField.NANO_OF_SECOND, 456790000L));
        expectedValues.add(timeValue.with(ChronoField.NANO_OF_SECOND, 456789000L));
        this.assertDataTypesNonKeyOnly(source, sink, List.of("timestamp(1)", "timestamp(2)", "timestamp(3)", "timestamp(4)", "timestamp(5)", "timestamp(6)"), List.of(value, value, value, value, value, value), expectedValues, record -> {
            this.assertColumn(sink, record, "data0", this.getTimestampWithTimezoneType(source, false, 1));
            this.assertColumn(sink, record, "data1", this.getTimestampWithTimezoneType(source, false, 2));
            this.assertColumn(sink, record, "data2", this.getTimestampWithTimezoneType(source, false, 3));
            this.assertColumn(sink, record, "data3", this.getTimestampWithTimezoneType(source, false, 4));
            this.assertColumn(sink, record, "data4", this.getTimestampWithTimezoneType(source, false, 5));
            this.assertColumn(sink, record, "data5", this.getTimestampWithTimezoneType(source, false, 6));
        }, (rs, index) -> rs.getTimestamp(index).toInstant().atZone(ZoneOffset.UTC));
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No TIMESTAMPTZ data type support")
    @WithTemporalPrecisionMode
    public void testTimestampTzDataType(Source source, Sink sink) throws Exception {
        ZonedDateTime timeValue = ZonedDateTime.of(2022, 12, 31, 14, 15, 16, 456789000, ZoneOffset.UTC);
        this.assertDataTypeNonKeyOnly(source, sink, "timestamptz", AbstractJdbcSinkPipelineIT.toTimestampWithTimeZoneStrings(source, List.of(timeValue)), List.of(timeValue.withZoneSameInstant(SINK_ZONE_ID)), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTimestampWithTimezoneType(source, false, 6)), this::getTimestampWithTimeZoneAsZonedDateTime);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.SQLSERVER}, reason="No TIMESTAMP(n) WITH TIME ZONE data type support")
    @WithTemporalPrecisionMode
    public void testTimestampWithTimeZoneDataType(Source source, Sink sink) throws Exception {
        ZonedDateTime timeValue = ZonedDateTime.of(2022, 12, 31, 14, 15, 16, 456789000, ZoneOffset.UTC);
        this.assertDataTypeNonKeyOnly(source, sink, "timestamp(6) with time zone", AbstractJdbcSinkPipelineIT.toTimestampWithTimeZoneStrings(source, List.of(timeValue)), List.of(timeValue.withZoneSameInstant(SINK_ZONE_ID)), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTimestampWithTimezoneType(source, false, 6)), this::getTimestampWithTimeZoneAsZonedDateTime);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No TIMESTAMP(n) WITH LOCAL TIME ZONE data type support")
    @WithTemporalPrecisionMode
    public void testTimestampWithLocalTimeZoneDataType(Source source, Sink sink) throws Exception {
        String value = "TO_TIMESTAMP('2022-12-31 14:15:16.456789', 'YYYY-MM-DD HH24:MI:SS.FF6')";
        this.assertDataTypeNonKeyOnly(source, sink, "timestamp(6) with local time zone", List.of("TO_TIMESTAMP('2022-12-31 14:15:16.456789', 'YYYY-MM-DD HH24:MI:SS.FF6')"), List.of(OffsetDateTime.of(2022, 12, 31, 14, 15, 16, 456789000, ZoneOffset.UTC).toLocalDateTime()), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTimestampWithTimezoneType(source, false, 6)), (ResultSet rs, int index) -> this.getTimestamp(rs, index).toLocalDateTime());
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No TIME(n) WITH TIME ZONE data type support")
    @SkipWhenSinks(value={@SkipWhenSink(value={SinkType.MYSQL}, reason="MySQL has no support for TIME(n) with TIME ZONE support"), @SkipWhenSink(value={SinkType.DB2}, reason="There is an issue with Daylight Savings Time")})
    @WithTemporalPrecisionMode
    public void testTimeWithTimeZoneDataType(Source source, Sink sink) throws Exception {
        String value = "'14:15:16.456789 -00:00'";
        if (SourceType.ORACLE.is(source.getType())) {
            value = String.format("TO_TIMESTAMP_TZ(%s,'HH24:MI:SS.FF6 TZH:TZM')", value);
        }
        int nanoSeconds = sink.getType().is(SinkType.DB2) ? 0 : 456789000;
        this.assertDataTypeNonKeyOnly(source, sink, "time(6) with time zone", List.of(value), List.of(OffsetTime.of(14, 15, 16, nanoSeconds, ZoneOffset.UTC)), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTimeWithTimezoneType()), (ResultSet rs, int index) -> this.getTimestampWithTimeZoneAsZonedDateTime(rs, index).withZoneSameInstant(ZoneOffset.UTC).toOffsetDateTime().toOffsetTime());
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE}, reason="No DATETIME data type support")
    @WithTemporalPrecisionMode
    public void testDateTimeDataType(Source source, Sink sink) throws Exception {
        String value1 = "'2023-05-10 16:00:00.456'";
        String value2 = "'2023-01-10 16:00:00.456'";
        int precision = SourceType.MYSQL.is(source.getType()) && source.getOptions().isColumnTypePropagated() ? 6 : 3;
        int nanosOfSeconds = source.getType().is(SourceType.MYSQL) ? 0 : 457000000;
        this.assertDataTypesNonKeyOnly(source, sink, List.of("datetime", "datetime"), List.of(value1, value2), List.of(this.toZonedDateTimeAtSinkOffset(2023, 5, 10, 16, 0, 0, nanosOfSeconds), this.toZonedDateTimeAtSinkOffset(2023, 1, 10, 16, 0, 0, nanosOfSeconds)), record -> {
            this.assertColumn(sink, record, "data0", this.getTimestampType(source, false, precision));
            this.assertColumn(sink, record, "data1", this.getTimestampType(source, false, precision));
        }, this::getTimestampAsZonedDateTime);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.POSTGRES, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No DATETIME(n) data type support")
    @WithTemporalPrecisionMode
    public void testDateTimeWithPrecisionDataType(Source source, Sink sink) throws Exception {
        String value = "'2023-03-01 14:15:16.456789'";
        List<String> typeNames = List.of("datetime(1)", "datetime(2)", "datetime(3)", "datetime(4)", "datetime(5)", "datetime(6)");
        List<String> values = List.of("'2023-03-01 14:15:16.456789'", "'2023-03-01 14:15:16.456789'", "'2023-03-01 14:15:16.456789'", "'2023-03-01 14:15:16.456789'", "'2023-03-01 14:15:16.456789'", "'2023-03-01 14:15:16.456789'");
        boolean connect = TemporalPrecisionMode.CONNECT.equals((Object)source.getOptions().getTemporalPrecisionMode());
        List<ZonedDateTime> expectedValues = List.of(this.toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, 500000000), this.toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, 460000000), this.toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, 457000000), this.toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, connect ? 456000000 : 456800000), this.toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, connect ? 456000000 : 456790000), this.toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, connect ? 456000000 : 456789000));
        this.assertDataTypesNonKeyOnly(source, sink, typeNames, values, expectedValues, record -> {
            this.assertColumn(sink, record, "data0", this.getTimestampType(source, false, 1));
            this.assertColumn(sink, record, "data1", this.getTimestampType(source, false, 2));
            this.assertColumn(sink, record, "data2", this.getTimestampType(source, false, 3));
            this.assertColumn(sink, record, "data3", this.getTimestampType(source, false, 4));
            this.assertColumn(sink, record, "data4", this.getTimestampType(source, false, 5));
            this.assertColumn(sink, record, "data5", this.getTimestampType(source, false, 6));
        }, this::getTimestampAsZonedDateTime);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason="No DATETIME2 data type support")
    @WithTemporalPrecisionMode
    public void testDateTime2DataType(Source source, Sink sink) throws Exception {
        String value = "'2023-03-01 14:15:16.456789Z'";
        int nanosOfSeconds = 456789000;
        if (source.getOptions().getTemporalPrecisionMode() == TemporalPrecisionMode.CONNECT) {
            nanosOfSeconds = 456000000;
        }
        this.assertDataTypeNonKeyOnly(source, sink, "datetime2", List.of("'2023-03-01 14:15:16.456789Z'"), List.of(this.toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, nanosOfSeconds)), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTimestampType(source, false, 6)), this::getTimestampAsZonedDateTime);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason="No DATETIME2(n) data type support")
    @WithTemporalPrecisionMode
    public void testDateTime2WithPrecisionDataType(Source source, Sink sink) throws Exception {
        String value = "'2023-03-01 14:15:16.456789123Z'";
        List<String> typeNames = List.of("datetime2(1)", "datetime2(2)", "datetime2(3)", "datetime2(4)", "datetime2(5)", "datetime2(6)", "datetime2(7)");
        List<String> values = List.of("'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'");
        int dateTime7NanoSeconds = 456789000;
        if (source.getOptions().isColumnTypePropagated() && SinkType.SQLSERVER.is(sink.getType())) {
            dateTime7NanoSeconds = 456789100;
        }
        boolean connect = TemporalPrecisionMode.CONNECT.equals((Object)source.getOptions().getTemporalPrecisionMode());
        List<ZonedDateTime> expectedValues = List.of(this.toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, 500000000), this.toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, 460000000), this.toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, 457000000), this.toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, connect ? 456000000 : 456800000), this.toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, connect ? 456000000 : 456790000), this.toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, connect ? 456000000 : 456789000), this.toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 16, connect ? 456000000 : dateTime7NanoSeconds));
        this.assertDataTypesNonKeyOnly(source, sink, typeNames, values, expectedValues, record -> {
            this.assertColumn(sink, record, "data0", this.getTimestampType(source, false, 1));
            this.assertColumn(sink, record, "data1", this.getTimestampType(source, false, 2));
            this.assertColumn(sink, record, "data2", this.getTimestampType(source, false, 3));
            this.assertColumn(sink, record, "data3", this.getTimestampType(source, false, 4));
            this.assertColumn(sink, record, "data4", this.getTimestampType(source, false, 5));
            this.assertColumn(sink, record, "data5", this.getTimestampType(source, false, 6));
            this.assertColumn(sink, record, "data6", this.getTimestampType(source, false, 6));
        }, this::getTimestampAsZonedDateTime);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason="No DATETIMEOFFSET data type support")
    @WithTemporalPrecisionMode
    public void testDateTimeOffsetDataType(Source source, Sink sink) throws Exception {
        String value = "'2023-03-01 14:15:16.456789Z'";
        this.assertDataTypeNonKeyOnly(source, sink, "datetimeoffset", List.of("'2023-03-01 14:15:16.456789Z'"), List.of(OffsetDateTime.of(2023, 3, 1, 14, 15, 16, 456789000, ZoneOffset.UTC)), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTimestampWithTimezoneType(source, false, 6)), (ResultSet rs, int index) -> this.getTimestamp(rs, index).toInstant().atOffset(ZoneOffset.UTC));
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason="No DATETIMEOFFSET(n) data type support")
    @WithTemporalPrecisionMode
    public void testDateTimeOffsetWithPrecisionDataType(Source source, Sink sink) throws Exception {
        String value = "'2023-03-01 14:15:16.456789123Z'";
        List<String> typeNames = List.of("datetimeoffset(1)", "datetimeoffset(2)", "datetimeoffset(3)", "datetimeoffset(4)", "datetimeoffset(5)", "datetimeoffset(6)", "datetimeoffset(7)");
        List<String> values = List.of("'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'", "'2023-03-01 14:15:16.456789123Z'");
        int precisionNanos7 = 456789000;
        if (sink.getType().is(SinkType.SQLSERVER) && source.getOptions().isColumnTypePropagated()) {
            precisionNanos7 = 456789100;
        }
        List<OffsetDateTime> expectedValues = List.of(OffsetDateTime.of(2023, 3, 1, 14, 15, 16, 500000000, ZoneOffset.UTC), OffsetDateTime.of(2023, 3, 1, 14, 15, 16, 460000000, ZoneOffset.UTC), OffsetDateTime.of(2023, 3, 1, 14, 15, 16, 457000000, ZoneOffset.UTC), OffsetDateTime.of(2023, 3, 1, 14, 15, 16, 456800000, ZoneOffset.UTC), OffsetDateTime.of(2023, 3, 1, 14, 15, 16, 456790000, ZoneOffset.UTC), OffsetDateTime.of(2023, 3, 1, 14, 15, 16, 456789000, ZoneOffset.UTC), OffsetDateTime.of(2023, 3, 1, 14, 15, 16, precisionNanos7, ZoneOffset.UTC));
        this.assertDataTypesNonKeyOnly(source, sink, typeNames, values, expectedValues, record -> {
            this.assertColumn(sink, record, "data0", this.getTimestampWithTimezoneType(source, false, 1));
            this.assertColumn(sink, record, "data1", this.getTimestampWithTimezoneType(source, false, 2));
            this.assertColumn(sink, record, "data2", this.getTimestampWithTimezoneType(source, false, 3));
            this.assertColumn(sink, record, "data3", this.getTimestampWithTimezoneType(source, false, 4));
            this.assertColumn(sink, record, "data4", this.getTimestampWithTimezoneType(source, false, 5));
            this.assertColumn(sink, record, "data5", this.getTimestampWithTimezoneType(source, false, 6));
            this.assertColumn(sink, record, "data6", this.getTimestampWithTimezoneType(source, false, 6));
        }, (rs, index) -> this.getTimestamp(rs, index).toInstant().atOffset(ZoneOffset.UTC));
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.ORACLE}, reason="No SMALLDATETIME data type support")
    @WithTemporalPrecisionMode
    public void testSmallDateTimeDataType(Source source, Sink sink) throws Exception {
        String value = "'2023-03-01 14:15:16'";
        this.assertDataTypeNonKeyOnly(source, sink, "smalldatetime", List.of("'2023-03-01 14:15:16'"), List.of(this.toZonedDateTimeAtSinkOffset(2023, 3, 1, 14, 15, 0, 0)), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTimestampType(source, false, 6)), this::getTimestampAsZonedDateTime);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No INTERVAL data type support")
    public void testIntervalDataTypeIntervalHandlingModeNumeric(Source source, Sink sink) throws Exception {
        if (sink.getType().is(SinkType.POSTGRES)) {
            this.assertDataTypeNonKeyOnly(source, sink, "interval", List.of("'P1Y2M3DT4H5M6.78S'::INTERVAL"), List.of("10303:05:06"), (ConnectorConfiguration config) -> config.with("interval.handling.mode", "numeric"), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getIntervalType(source, true)), ResultSet::getString);
        } else {
            this.assertDataTypeNonKeyOnly(source, sink, "interval", List.of("'P1Y2M3DT4H5M6.78S'::INTERVAL"), List.of(Long.valueOf(MicroDuration.durationMicros((int)1, (int)2, (int)3, (int)4, (int)5, (double)6.78, (Double)30.4375))), (ConnectorConfiguration config) -> config.with("interval.handling.mode", "numeric"), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getIntervalType(source, true)), ResultSet::getLong);
        }
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No INTERVAL data type support")
    public void testIntervalDataTypeIntervalHandlingModeString(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "interval", List.of("'P1Y2M3DT4H5M6.78S'::INTERVAL"), List.of("P1Y2M3DT4H5M6.78S"), (ConnectorConfiguration config) -> config.with("interval.handling.mode", "string"), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTextType()), ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No INTERVAL DAY(m) TO SECOND data type support")
    public void testIntervalDayToSecondDataTypeIntervalHandlingModeNumeric(Source source, Sink sink) throws Exception {
        if (sink.getType().is(SinkType.POSTGRES)) {
            this.assertDataTypeNonKeyOnly(source, sink, "interval day to second", List.of("TO_DSINTERVAL('P10DT50H99M1000.365S')"), List.of("291:55:40"), (ConnectorConfiguration config) -> config.with("interval.handling.mode", "numeric"), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getIntervalType(source, true)), ResultSet::getString);
        } else {
            this.assertDataTypeNonKeyOnly(source, sink, "interval day to second", List.of("TO_DSINTERVAL('P10DT50H99M1000.365S')"), List.of(Long.valueOf(1050940365000L)), (ConnectorConfiguration config) -> config.with("interval.handling.mode", "numeric"), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getIntervalType(source, true)), ResultSet::getLong);
        }
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No INTERVAL DAY(m) TO SECOND data type support")
    public void testIntervalDayToSecondDataTypeIntervalHandlingModeString(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "interval day to second", List.of("TO_DSINTERVAL('P10DT50H99M1000.365S')"), List.of("P0Y0M12DT3H55M40.365S"), (ConnectorConfiguration config) -> config.with("interval.handling.mode", "string"), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTextType()), ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No INTERVAL YEAR(m) TO MONTH data type support")
    public void testIntervalYearToMonthDataTypeIntervalHandlingModeNumeric(Source source, Sink sink) throws Exception {
        if (sink.getType().is(SinkType.POSTGRES)) {
            this.assertDataTypeNonKeyOnly(source, sink, "interval year to month", List.of("INTERVAL '10-2' YEAR TO MONTH"), List.of("89121:00:00"), (ConnectorConfiguration config) -> config.with("interval.handling.mode", "numeric"), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getIntervalType(source, true)), ResultSet::getString);
        } else {
            this.assertDataTypeNonKeyOnly(source, sink, "interval year to month", List.of("INTERVAL '10-2' YEAR TO MONTH"), List.of(Long.valueOf(320835600000000L)), (ConnectorConfiguration config) -> config.with("interval.handling.mode", "numeric"), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getIntervalType(source, true)), ResultSet::getLong);
        }
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.POSTGRES, SourceType.SQLSERVER}, reason="No INTERVAL YEAR(m) TO MONTH data type support")
    public void testIntervalYearToMonthDataTypeIntervalHandlingModeString(Source source, Sink sink) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, "interval year to month", List.of("INTERVAL '10-2' YEAR TO MONTH"), List.of("P10Y2M0DT0H0M0S"), (ConnectorConfiguration config) -> config.with("interval.handling.mode", "string"), (SinkRecord record) -> this.assertColumn(sink, record, "data", this.getTextType()), ResultSet::getString);
    }

    @TestTemplate
    @SkipWhenSource(value={SourceType.MYSQL, SourceType.ORACLE, SourceType.SQLSERVER}, reason="No BYTEA data type support")
    @SkipWhenSink(value={SinkType.MYSQL, SinkType.ORACLE, SinkType.DB2}, reason="These data types are not allowed in the primary keys")
    public void testByteaDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "bytea", List.of("'hello'"), List.of("hello".getBytes(StandardCharsets.UTF_8)), (SinkRecord record) -> {
            this.assertColumn(sink, record, "id", this.getBinaryType(source, "bytea"));
            this.assertColumn(sink, record, "data", this.getBinaryType(source, "bytea"));
        }, ResultSet::getBytes);
    }

    @TestTemplate
    @ForSource(value={SourceType.POSTGRES}, reason="The OID data type only applies to PostgreSQL")
    public void testOidDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "oid", List.of(Integer.valueOf(3802)), record -> {
            this.assertColumn(sink, record, "id", this.getInt64Type());
            if (source.getOptions().isColumnTypePropagated() && sink.getType().is(SinkType.POSTGRES)) {
                this.assertColumn(sink, record, "data", "OID");
            } else {
                this.assertColumn(sink, record, "data", this.getInt64Type());
            }
        }, ResultSet::getInt);
    }

    @TestTemplate
    @ForSource(value={SourceType.POSTGRES}, reason="The LTREE data type only applies to PostgreSQL")
    @WithPostgresExtension(value="ltree")
    public void testLtreeDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "ltree", List.of("'abc.xyz'"), List.of("abc.xyz"), config -> config.with("include.unknown.datatypes", Boolean.valueOf(true)), record -> {
            if (sink.getType().is(SinkType.POSTGRES)) {
                this.assertColumn(sink, record, "id", "LTREE");
                this.assertColumn(sink, record, "data", "LTREE");
            } else {
                this.assertColumn(sink, record, "id", this.getStringType(source, true, false));
                this.assertColumn(sink, record, "data", this.getStringType(source, false, false, true));
            }
        }, ResultSet::getString);
    }

    @TestTemplate
    @ForSource(value={SourceType.POSTGRES}, reason="The CITEXT data type only applies to PostgreSQL")
    @WithPostgresExtension(value="citext")
    public void testCaseInsensitiveDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "citext", List.of("'AbCd'"), List.of("AbCd"), config -> config.with("include.unknown.datatypes", Boolean.valueOf(true)), record -> {
            this.assertColumn(sink, record, "id", this.getStringType(source, true, false));
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                this.assertColumn(sink, record, "data", "CITEXT");
            } else {
                this.assertColumn(sink, record, "data", this.getStringType(source, false, false, true));
            }
        }, ResultSet::getString);
    }

    @TestTemplate
    @ForSource(value={SourceType.POSTGRES}, reason="The INET data type only applies to PostgreSQL")
    public void testInetDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "inet", List.of("'192.168.1.0'"), List.of("192.168.1.0"), config -> config.with("include.unknown.datatypes", Boolean.valueOf(true)), record -> {
            this.assertColumn(sink, record, "id", this.getStringType(source, true, false));
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                this.assertColumn(sink, record, "data", "INET");
            } else {
                this.assertColumn(sink, record, "data", this.getStringType(source, false, false, true));
            }
        }, ResultSet::getString);
    }

    @TestTemplate
    @ForSource(value={SourceType.POSTGRES}, reason="The INT4RANGE data type only applies to PostgreSQL")
    public void testInt4RangeDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "int4range", List.of("'[1000,6000)'"), List.of("[1000,6000)"), config -> config.with("include.unknown.datatypes", Boolean.valueOf(true)), record -> {
            this.assertColumn(sink, record, "id", this.getStringType(source, true, false));
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                this.assertColumn(sink, record, "data", "INT4RANGE");
            } else {
                this.assertColumn(sink, record, "data", this.getStringType(source, false, false, true));
            }
        }, ResultSet::getString);
    }

    @TestTemplate
    @ForSource(value={SourceType.POSTGRES}, reason="The INT8RANGE data type only applies to PostgreSQL")
    public void testInt8RangeDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "int8range", List.of("'[1000000,6000000)'"), List.of("[1000000,6000000)"), config -> config.with("include.unknown.datatypes", Boolean.valueOf(true)), record -> {
            this.assertColumn(sink, record, "id", this.getStringType(source, true, false));
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                this.assertColumn(sink, record, "data", "INT8RANGE");
            } else {
                this.assertColumn(sink, record, "data", this.getStringType(source, false, false, true));
            }
        }, ResultSet::getString);
    }

    @TestTemplate
    @ForSource(value={SourceType.POSTGRES}, reason="The NUMRANGE data type only applies to PostgreSQL")
    public void testNumrangeDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "numrange", List.of("'[5.3,6.3)'"), List.of("[5.3,6.3)"), config -> config.with("include.unknown.datatypes", Boolean.valueOf(true)), record -> {
            this.assertColumn(sink, record, "id", this.getStringType(source, true, false));
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                this.assertColumn(sink, record, "data", "NUMRANGE");
            } else {
                this.assertColumn(sink, record, "data", this.getStringType(source, false, false, true));
            }
        }, ResultSet::getString);
    }

    @TestTemplate
    @ForSource(value={SourceType.POSTGRES}, reason="The TSRANGE data type only applies to PostgreSQL")
    public void testTsrangeDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "tsrange", List.of("'[2019-03-31 15:30:00,infinity)'"), List.of("[\"2019-03-31 15:30:00\",infinity)"), config -> config.with("include.unknown.datatypes", Boolean.valueOf(true)), record -> {
            this.assertColumn(sink, record, "id", this.getStringType(source, true, false));
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                this.assertColumn(sink, record, "data", "TSRANGE");
            } else {
                this.assertColumn(sink, record, "data", this.getStringType(source, false, false, true));
            }
        }, ResultSet::getString);
    }

    @TestTemplate
    @ForSource(value={SourceType.POSTGRES}, reason="The TSTZRANGE data type only applies to PostgreSQL")
    public void testTstzrangeDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "tstzrange", List.of("'[2017-06-05 11:29:12.549426+00,)'"), List.of("[\"2017-06-05 11:29:12.549426+00\",)"), config -> config.with("include.unknown.datatypes", Boolean.valueOf(true)), record -> {
            this.assertColumn(sink, record, "id", this.getStringType(source, true, false));
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                this.assertColumn(sink, record, "data", "TSTZRANGE");
            } else {
                this.assertColumn(sink, record, "data", this.getStringType(source, false, false, true));
            }
        }, ResultSet::getString);
    }

    @TestTemplate
    @ForSource(value={SourceType.POSTGRES}, reason="The DATERANGE data type only applies to PostgreSQL")
    public void testDaterangeDataType(Source source, Sink sink) throws Exception {
        this.assertDataType(source, sink, "daterange", List.of("'[2019-03-31, infinity)'"), List.of("[2019-03-31,infinity)"), config -> config.with("include.unknown.datatypes", Boolean.valueOf(true)), record -> {
            this.assertColumn(sink, record, "id", this.getStringType(source, true, false));
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                this.assertColumn(sink, record, "data", "DATERANGE");
            } else {
                this.assertColumn(sink, record, "data", this.getStringType(source, false, false, true));
            }
        }, ResultSet::getString);
    }

    @TestTemplate
    @ForSource(value={SourceType.POSTGRES}, reason="The HSTORE data type only applies to PostgreSQL")
    @WithPostgresExtension(value="hstore")
    public void testHstoreDataType(Source source, Sink sink) throws Exception {
        String expectedValue = "{\"key\":\"val\"}";
        if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
            expectedValue = "\"key\"=>\"val\"";
        } else if (sink.getType().is(SinkType.MYSQL)) {
            expectedValue = "{\"key\": \"val\"}";
        }
        this.assertDataTypeNonKeyOnly(source, sink, "hstore", List.of("'\"key\" => \"val\"'::hstore"), List.of(expectedValue), (ConnectorConfiguration config) -> config.with("include.unknown.datatypes", Boolean.valueOf(true)), (SinkRecord record) -> {
            if (sink.getType().is(SinkType.POSTGRES) && source.getOptions().isColumnTypePropagated()) {
                this.assertColumn(sink, record, "data", "HSTORE");
            } else {
                this.assertColumn(sink, record, "data", this.getJsonbType(source));
            }
        }, ResultSet::getString);
    }

    @TestTemplate
    @ForSource(value={SourceType.POSTGRES}, reason="The HSTORE data type only applies to PostgreSQL")
    @WithPostgresExtension(value="hstore")
    public void testHstoreWithMapModeDataType(Source source, Sink sink) throws Exception {
        String expectedValue = "{\"key\":\"val\"}";
        if (sink.getType().is(SinkType.POSTGRES)) {
            expectedValue = "\"key\"=>\"val\"";
        } else if (sink.getType().is(SinkType.MYSQL)) {
            expectedValue = "{\"key\": \"val\"}";
        }
        this.assertDataTypeNonKeyOnly(source, sink, "hstore", List.of("'\"key\" => \"val\"'::hstore"), List.of(expectedValue), (ConnectorConfiguration config) -> {
            config.with("include.unknown.datatypes", Boolean.valueOf(true));
            config.with("hstore.handling.mode", "map");
        }, (SinkRecord record) -> {
            if (sink.getType().is(SinkType.POSTGRES)) {
                this.assertColumn(sink, record, "data", "HSTORE");
            } else if (sink.getType().is(SinkType.MYSQL)) {
                this.assertColumn(sink, record, "data", this.getJsonType(source));
            } else {
                this.assertColumn(sink, record, "data", this.getTextType());
            }
        }, ResultSet::getString);
    }

    protected int getMaxDecimalPrecision() {
        return 38;
    }

    protected abstract String getBooleanType();

    protected abstract String getBitsDataType();

    protected abstract String getInt8Type();

    protected abstract String getInt16Type();

    protected abstract String getInt32Type();

    protected abstract String getInt64Type();

    protected abstract String getVariableScaleDecimalType();

    protected abstract String getDecimalType();

    protected abstract String getFloat32Type();

    protected abstract String getFloat64Type();

    protected abstract String getCharType(Source var1, boolean var2, boolean var3);

    protected String getStringType(Source source, boolean key, boolean nationalized) {
        return this.getStringType(source, key, nationalized, false);
    }

    protected abstract String getStringType(Source var1, boolean var2, boolean var3, boolean var4);

    protected abstract String getTextType(boolean var1);

    protected String getTextType() {
        return this.getTextType(false);
    }

    protected abstract String getBinaryType(Source var1, String var2);

    protected abstract String getJsonType(Source var1);

    protected String getJsonbType(Source source) {
        return this.getJsonType(source);
    }

    protected abstract String getXmlType(Source var1);

    protected abstract String getUuidType(Source var1, boolean var2);

    protected abstract String getEnumType(Source var1, boolean var2);

    protected abstract String getSetType(Source var1, boolean var2);

    protected abstract String getYearType();

    protected abstract String getDateType();

    protected abstract String getTimeType(Source var1, boolean var2, int var3);

    protected abstract String getTimeWithTimezoneType();

    protected abstract String getTimestampType(Source var1, boolean var2, int var3);

    protected abstract String getTimestampWithTimezoneType(Source var1, boolean var2, int var3);

    protected abstract String getIntervalType(Source var1, boolean var2);

    protected boolean isBitCoercedToBoolean() {
        return false;
    }

    private static List<String> toTimestampStrings(Source source, List<ZonedDateTime> values) {
        ArrayList<String> results = new ArrayList<String>();
        for (ZonedDateTime value : values) {
            String formattedValue = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS").format(value);
            if (source.getType().is(SourceType.ORACLE)) {
                results.add(String.format("TO_TIMESTAMP('%s', 'YYYY-MM-DD HH24:MI:SS.FF6')", formattedValue));
                continue;
            }
            results.add(String.format("'%s'", formattedValue));
        }
        return results;
    }

    private static List<String> toTimestampWithTimeZoneStrings(Source source, List<ZonedDateTime> values) {
        ArrayList<String> results = new ArrayList<String>();
        for (ZonedDateTime value : values) {
            String formattedValue = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSXXXXX").format(value);
            if (source.getType().is(SourceType.ORACLE)) {
                results.add(String.format("TO_TIMESTAMP_TZ('%s', 'YYYY-MM-DD HH24:MI:SS.FF6 TZH:TZM')", formattedValue));
                continue;
            }
            results.add(String.format("'%s'", formattedValue));
        }
        return results;
    }

    protected ZonedDateTime toZonedDateTimeAtSinkOffset(int year, int month, int day, int hour, int min, int sec, int nanos) {
        return LocalDate.of(year, month, day).atTime(hour, min, sec, nanos).atZone(SINK_ZONE_ID);
    }

    protected Timestamp getTimestamp(ResultSet rs, int index) throws SQLException {
        return rs.getTimestamp(index);
    }

    protected ZonedDateTime getTimestampWithTimeZoneAsZonedDateTime(ResultSet rs, int index) throws SQLException {
        LOGGER.trace("Timestamp from ResultSet " + this.getTimestamp(rs, index));
        LOGGER.trace("Timestamp to LocalDateTime " + this.getTimestamp(rs, index).toLocalDateTime());
        LOGGER.trace("Timestamp at Zone " + ZoneOffset.systemDefault() + " " + this.getTimestamp(rs, index).toLocalDateTime().atZone(ZoneOffset.systemDefault()));
        LOGGER.trace("Timestamp at Zone " + SINK_ZONE_ID + " " + this.getTimestamp(rs, index).toLocalDateTime().atZone(ZoneOffset.systemDefault()).withZoneSameInstant(SINK_ZONE_ID));
        return this.getTimestamp(rs, index).toLocalDateTime().atZone(ZoneOffset.systemDefault()).withZoneSameInstant(SINK_ZONE_ID);
    }

    protected ZonedDateTime getTimestampAsZonedDateTime(ResultSet rs, int index) throws SQLException {
        LOGGER.trace("Timestamp from ResultSet " + this.getTimestamp(rs, index));
        LOGGER.trace("Timestamp to LocalDateTime " + this.getTimestamp(rs, index).toLocalDateTime());
        LOGGER.trace("Timestamp at Zone " + SINK_ZONE_ID + " " + this.getTimestamp(rs, index).toLocalDateTime().atZone(SINK_ZONE_ID));
        return this.getTimestamp(rs, index).toLocalDateTime().atZone(SINK_ZONE_ID);
    }

    protected OffsetTime getTimeAsOffsetTime(ResultSet rs, int index) throws SQLException {
        LOGGER.trace(this.getTimestamp(rs, index) + " " + this.getTimestamp(rs, index).getNanos());
        return this.getTimestamp(rs, index).toLocalDateTime().toLocalTime().atOffset(this.getCurrentSinkTimeOffset());
    }

    protected ZoneOffset getCurrentSinkTimeOffset() {
        return this.getCurrentSinkTimeOffset(Instant.EPOCH);
    }

    protected ZoneOffset getCurrentSinkTimeOffset(Instant instant) {
        return instant.atZone(SINK_ZONE_ID).getOffset();
    }

    protected List<String> bitValues(Source source, String ... values) {
        switch (source.getType()) {
            case POSTGRES: {
                return Arrays.stream(values).map(v -> "'" + v + "'::bit" + (String)(v.length() > 1 ? "(" + v.length() + ")" : "")).collect(Collectors.toList());
            }
            case SQLSERVER: {
                if (values.length >= 1) {
                    ((IntAssert)Assertions.assertThat((int)values[0].length()).as("SQL Server bit type only supports 1 or 0.")).isEqualTo(1);
                }
                return Arrays.stream(values).collect(Collectors.toList());
            }
        }
        return Arrays.stream(values).map(v -> "b'" + v + "'").collect(Collectors.toList());
    }

    protected String charValue(Source source, Sink sink, int size, boolean key, String value) {
        if (SinkType.MYSQL.equals((Object)sink.getType())) {
            if (SourceType.MYSQL.equals((Object)source.getType())) {
                return value;
            }
            if (!source.getOptions().isColumnTypePropagated()) {
                return Strings.justifyLeft((String)value, (int)size, (char)' ');
            }
            return key ? Strings.justifyLeft((String)value, (int)size, (char)' ') : value;
        }
        if (SourceType.MYSQL.equals((Object)source.getType())) {
            if (source.getOptions().isColumnTypePropagated()) {
                return key ? value : Strings.justifyLeft((String)value, (int)size, (char)' ');
            }
            return value;
        }
        return Strings.justifyLeft((String)value, (int)size, (char)' ');
    }

    protected String binaryValue(Source source, String dataType, String value) {
        if (SourceType.SQLSERVER.equals((Object)source.getType())) {
            return String.format("CONVERT(%s, %s)", dataType, value);
        }
        return value;
    }

    protected byte[] byteArrayPadded(String value, int padding) {
        ByteBuffer buffer = ByteBuffer.allocate(padding);
        buffer.put(value.getBytes(StandardCharsets.UTF_8));
        return buffer.array();
    }

    protected String dateValue(Source source, int month, int day, int year) {
        if (SourceType.ORACLE.is(source.getType())) {
            return String.format("TO_DATE('%04d-%02d-%02d', 'YYYY-MM-DD')", year, month, day);
        }
        return String.format("'%04d-%02d-%02d'", year, month, day);
    }

    protected String pointValue(ResultSet rs, int index) throws SQLException {
        String[] parts;
        String result = rs.getString(index);
        if (!Strings.isNullOrEmpty((String)result) && result.startsWith("(") && result.endsWith(")") && (parts = (result = result.substring(1, result.length() - 1)).split(",")).length == 2) {
            result = String.format("(%.6f,%.6f)", Float.valueOf(Float.parseFloat(parts[0])), Float.valueOf(Float.parseFloat(parts[1])));
        }
        return result;
    }

    protected void registerSourceConnector(Source source, String tableName) {
        this.registerSourceConnector(source, null, tableName, null);
    }

    private String getSinkTable(SinkRecord record, Sink sink) {
        String sinkTableName = this.tableNamingStrategy.resolveTableName(this.getCurrentSinkConfig(), record);
        return sink.getType().is(SinkType.POSTGRES) ? sinkTableName.toLowerCase() : sinkTableName;
    }

    protected Properties getDefaultSinkConfig(Sink sink) {
        Properties sinkProperties = new Properties();
        sinkProperties.put("connection.url", sink.getJdbcUrl());
        sinkProperties.put("connection.username", sink.getUsername());
        sinkProperties.put("connection.password", sink.getPassword());
        sinkProperties.put("database.time_zone", TestHelper.getSinkTimeZone());
        return sinkProperties;
    }

    protected void assertColumn(Sink sink, SinkRecord record, String columnName, String columnType) {
        sink.assertColumn(this.getSinkTable(record, sink), columnName, columnType);
    }

    protected void assertColumn(Sink sink, SinkRecord record, String columnName, String columnType, int length) {
        sink.assertColumn(this.getSinkTable(record, sink), columnName, columnType, length);
    }

    protected void assertColumn(Sink sink, SinkRecord record, String columnName, String columnType, int precision, int scale) {
        sink.assertColumn(this.getSinkTable(record, sink), columnName, columnType, precision, scale);
    }

    protected <T> void assertDataType(Source source, Sink sink, String typeName, List<T> values, DataTypeColumnAssert columnAssert, ColumnReader<T> columnReader) throws Exception {
        this.assertDataType(source, sink, typeName, values, values, null, columnAssert, columnReader);
    }

    protected <T> void assertDataTypes(Source source, Sink sink, List<String> typeNames, List<T> values, DataTypeColumnAssert columnAssert, ColumnReader<T> columnReader) throws Exception {
        this.assertDataTypes(source, sink, typeNames, values, values, null, columnAssert, columnReader);
    }

    protected <T, U> void assertDataType(Source source, Sink sink, String typeName, List<T> values, List<U> expectedValues, DataTypeColumnAssert columnAssert, ColumnReader<U> columnReader) throws Exception {
        this.assertDataType(source, sink, typeName, values, expectedValues, null, columnAssert, columnReader);
    }

    protected <T, U> void assertDataTypes(Source source, Sink sink, List<String> typeNames, List<T> values, List<U> expectedValues, DataTypeColumnAssert columnAssert, ColumnReader<U> columnReader) throws Exception {
        this.assertDataTypes(source, sink, typeNames, values, expectedValues, null, columnAssert, columnReader);
    }

    protected <T, U> void assertDataTypes2(Source source, Sink sink, List<String> typeNames, List<T> values, List<U> expectedValues, DataTypeColumnAssert columnAssert, ColumnReader<U> columnReader) throws Exception {
        this.assertDataTypes2(source, sink, typeNames, values, expectedValues, null, columnAssert, columnReader);
    }

    protected <T, U> void assertDataTypeNonKeyOnly(Source source, Sink sink, String typeName, List<T> values, List<U> expectedValues, DataTypeColumnAssert columnAssert, ColumnReader<U> columnReader) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, typeName, values, expectedValues, null, columnAssert, columnReader);
    }

    protected <T, U> void assertDataTypeNonKeyOnly(Source source, Sink sink, String typeName, ValueBinder valueBinder, List<U> expectedValues, DataTypeColumnAssert columnAssert, ColumnReader<U> columnReader) throws Exception {
        this.assertDataTypeNonKeyOnly(source, sink, typeName, valueBinder, expectedValues, null, columnAssert, columnReader);
    }

    protected <T, U> void assertDataTypesNonKeyOnly(Source source, Sink sink, List<String> typeNames, List<T> values, List<U> expectedValues, DataTypeColumnAssert columnAssert, ColumnReader<U> columnReader) throws Exception {
        this.assertDataTypesNonKeyOnly(source, sink, typeNames, values, expectedValues, null, columnAssert, columnReader);
    }

    protected <T> void assertDataType(Source source, Sink sink, String typeName, List<T> values, ConfigurationAdjuster configAdjuster, DataTypeColumnAssert columnAssert, ColumnReader<T> columnReader) throws Exception {
        this.assertDataType(source, sink, typeName, values, values, configAdjuster, columnAssert, columnReader);
    }

    protected boolean skipDefaultValues(String typeName) {
        return Arrays.asList("smallserial", "serial", "bigserial", "json", "tinytext", "mediumtext", "longtext", "text", "tinyblob", "mediumblob", "longblob", "interval year to month").contains(typeName);
    }

    protected <T, U> void assertDataType(Source source, Sink sink, String typeName, List<T> values, List<U> expectedValues, ConfigurationAdjuster configAdjuster, DataTypeColumnAssert columnAssert, ColumnReader<U> columnReader) throws Exception {
        String tableName = source.randomTableName();
        String createSql = !source.getOptions().useDefaultValues() || this.skipDefaultValues(typeName) ? String.format("CREATE TABLE %s (id %s, data %s, primary key(id))", tableName, typeName, typeName) : String.format("CREATE TABLE %s (id %s, data %s DEFAULT %s NOT NULL, primary key(id))", tableName, typeName, typeName, values.get(0));
        String insertSql = String.format("INSERT INTO %s VALUES (%s)", tableName, Strings.join((CharSequence)",", values));
        this.registerSourceConnector(source, Collections.singletonList(typeName), tableName, configAdjuster, createSql, insertSql);
        Properties sinkProperties = this.getDefaultSinkConfig(sink);
        sinkProperties.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        sinkProperties.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
        sinkProperties.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
        this.startSink(source, sinkProperties, tableName);
        this.consumeAndAssert(sink, columnAssert, expectedValues, columnReader);
    }

    protected <T, U> void assertDataTypeNonKeyOnly(Source source, Sink sink, String typeName, List<T> values, List<U> expectedValues, ConfigurationAdjuster configAdjuster, DataTypeColumnAssert columnAssert, ColumnReader<U> columnReader) throws Exception {
        String insertSql;
        String createSql;
        String tableName = source.randomTableName();
        if (this.isLobTypeName(typeName)) {
            createSql = String.format("CREATE TABLE %s (data %s, id integer, primary key(id))", tableName, typeName);
            insertSql = String.format("INSERT INTO %s VALUES (%s, 1)", tableName, Strings.join((CharSequence)",", values));
        } else {
            createSql = !source.getOptions().useDefaultValues() || this.skipDefaultValues(typeName) ? String.format("CREATE TABLE %s (data %s NOT NULL)", tableName, typeName) : String.format("CREATE TABLE %s (data %s DEFAULT %s NOT NULL)", tableName, typeName, values.get(0));
            insertSql = String.format("INSERT INTO %s VALUES (%s)", tableName, Strings.join((CharSequence)",", values));
        }
        this.registerSourceConnector(source, Collections.singletonList(typeName), tableName, configAdjuster, createSql, insertSql);
        Properties sinkProperties = this.getDefaultSinkConfig(sink);
        sinkProperties.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        sinkProperties.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.NONE.getValue());
        sinkProperties.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        this.startSink(source, sinkProperties, tableName);
        this.consumeAndAssert(sink, columnAssert, expectedValues, columnReader);
    }

    protected <T, U> void assertDataTypeNonKeyOnly(Source source, Sink sink, String typeName, ValueBinder valueBinder, List<U> expectedValues, ConfigurationAdjuster configAdjuster, DataTypeColumnAssert columnAssert, ColumnReader<U> columnReader) throws Exception {
        String tableName = source.randomTableName();
        String createSql = String.format("CREATE TABLE %s (data %s NOT NULL, id integer, primary key(id))", tableName, typeName);
        String insertSql = String.format("INSERT INTO %s VALUES (?, 1)", tableName);
        if (source.getOptions().useSnapshot()) {
            source.execute(createSql);
            source.streamTable(tableName);
            source.execute(insertSql, valueBinder);
            this.registerSourceConnector(source, Collections.singletonList(typeName), tableName, configAdjuster);
        } else {
            this.registerSourceConnector(source, Collections.singletonList(typeName), tableName, configAdjuster);
            source.execute(createSql);
            source.streamTable(tableName);
            source.execute(insertSql, valueBinder);
        }
        Properties sinkProperties = this.getDefaultSinkConfig(sink);
        sinkProperties.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        sinkProperties.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.NONE.getValue());
        sinkProperties.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        this.startSink(source, sinkProperties, tableName);
        this.consumeAndAssert(sink, columnAssert, expectedValues, columnReader);
    }

    protected <T, U> void assertDataTypesNonKeyOnly(Source source, Sink sink, List<String> typeNames, List<T> values, List<U> expectedValues, ConfigurationAdjuster configAdjuster, DataTypeColumnAssert columnAssert, ColumnReader<U> columnReader) throws Exception {
        String tableName = source.randomTableName();
        String createSql = this.createTableFromTypes(source, tableName, false, typeNames, values);
        String insertSql = String.format("INSERT INTO %s VALUES (%s)", tableName, Strings.join((CharSequence)",", values));
        this.registerSourceConnector(source, typeNames, tableName, configAdjuster, createSql, insertSql);
        Properties sinkProperties = this.getDefaultSinkConfig(sink);
        sinkProperties.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        sinkProperties.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.NONE.getValue());
        sinkProperties.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.INSERT.getValue());
        this.startSink(source, sinkProperties, tableName);
        this.consumeAndAssert(sink, columnAssert, expectedValues, columnReader);
    }

    protected <T, U> void assertDataTypes(Source source, Sink sink, List<String> typeNames, List<T> values, List<U> expectedValues, ConfigurationAdjuster configAdjuster, DataTypeColumnAssert columnAssert, ColumnReader<U> columnReader) throws Exception {
        String tableName = source.randomTableName();
        ArrayList<T> totalValues = new ArrayList<T>();
        for (int i = 0; i < 2; ++i) {
            totalValues.addAll(values);
        }
        String createSql = this.createTableFromTypes(source, tableName, true, typeNames, values);
        String insertSql = String.format("INSERT INTO %s VALUES (%s)", tableName, Strings.join((CharSequence)",", totalValues));
        this.registerSourceConnector(source, typeNames, tableName, configAdjuster, createSql, insertSql);
        Properties sinkProperties = this.getDefaultSinkConfig(sink);
        sinkProperties.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        sinkProperties.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
        sinkProperties.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
        this.startSink(source, sinkProperties, tableName);
        ArrayList<U> totalExpectedValues = new ArrayList<U>();
        for (int i = 0; i < 2; ++i) {
            totalExpectedValues.addAll(expectedValues);
        }
        this.consumeAndAssert(sink, columnAssert, totalExpectedValues, columnReader);
    }

    protected <T, U> void assertDataTypes2(Source source, Sink sink, List<String> typeNames, List<T> values, List<U> expectedValues, ConfigurationAdjuster configAdjuster, DataTypeColumnAssert columnAssert, ColumnReader<U> columnReader) throws Exception {
        String tableName = source.randomTableName();
        ArrayList<T> totalValues = new ArrayList<T>();
        for (int i = 0; i < 2; ++i) {
            totalValues.addAll(values);
        }
        String createSql = this.createTableFromTypes(source, tableName, true, typeNames, values);
        String insertSql = String.format("INSERT INTO %s VALUES (%s)", tableName, Strings.join((CharSequence)",", totalValues));
        this.registerSourceConnector(source, typeNames, tableName, configAdjuster, createSql, insertSql);
        Properties sinkProperties = this.getDefaultSinkConfig(sink);
        sinkProperties.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        sinkProperties.put("primary.key.mode", JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
        sinkProperties.put("insert.mode", JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
        this.startSink(source, sinkProperties, tableName);
        this.consumeAndAssert(sink, columnAssert, expectedValues, columnReader);
    }

    protected boolean isLobTypeName(String typeName) {
        return typeName.equalsIgnoreCase("CLOB") || typeName.equalsIgnoreCase("NCLOB") || typeName.equalsIgnoreCase("BLOB");
    }

    protected String createTableFromTypes(Source source, String tableName, boolean keys, List<String> typeNames, List<?> values) {
        int i;
        StringBuilder create = new StringBuilder("CREATE TABLE ").append(tableName).append(" (");
        if (keys) {
            for (i = 0; i < typeNames.size(); ++i) {
                create.append("id").append(i).append(" ").append(typeNames.get(i)).append(", ");
            }
        }
        for (i = 0; i < typeNames.size(); ++i) {
            create.append("data").append(i).append(" ").append(typeNames.get(i));
            if (i + 1 >= typeNames.size()) continue;
            if (source.getOptions().useDefaultValues()) {
                create.append(" DEFAULT ").append(values.get(i)).append(" NOT NULL");
            }
            create.append(", ");
        }
        if (keys) {
            create.append(", primary key (");
            for (i = 0; i < typeNames.size(); ++i) {
                create.append("id").append(i);
                if (i + 1 >= typeNames.size()) continue;
                create.append(", ");
            }
            create.append(")");
        }
        create.append(")");
        return create.toString();
    }

    protected void registerSourceConnector(Source source, List<String> typeNames, String tableName, ConfigurationAdjuster configAdjuster, String createSql, String insertSql) throws Exception {
        if (source.getOptions().useSnapshot()) {
            source.execute(createSql);
            source.streamTable(tableName);
            source.execute(insertSql);
            this.registerSourceConnector(source, typeNames, tableName, configAdjuster);
        } else {
            this.registerSourceConnector(source, typeNames, tableName, configAdjuster);
            source.execute(createSql);
            source.streamTable(tableName);
            source.execute(insertSql);
        }
        if (TestHelper.shouldQueryDatabaseState()) {
            source.queryContainerTable(tableName);
        }
    }

    protected void registerSourceConnector(Source source, List<String> typeName, String tableName, ConfigurationAdjuster configAdjuster) {
        ConnectorConfiguration sourceConfig = this.getSourceConnectorConfig(source, tableName);
        sourceConfig.with("decimal.handling.mode", RelationalDatabaseConnectorConfig.DecimalHandlingMode.PRECISE.getValue());
        sourceConfig.with("binary.handling.mode", CommonConnectorConfig.BinaryHandlingMode.BYTES.getValue());
        if (configAdjuster != null) {
            configAdjuster.adjust(sourceConfig);
        }
        if (SourceType.ORACLE == source.getType()) {
            sourceConfig.with("converters", "boolean").with("boolean.type", "io.debezium.connector.oracle.converters.NumberOneToBooleanConverter").with("boolean.selector", ".*");
            if (typeName != null && typeName.stream().anyMatch(p -> p.equalsIgnoreCase("CLOB") || p.equalsIgnoreCase("NCLOB") || p.equalsIgnoreCase("BLOB"))) {
                sourceConfig.with("lob.enabled", "true");
            }
        }
        source.registerSourceConnector(sourceConfig);
    }

    protected void applyJdbcSourceConverter(Source source, ConnectorConfiguration config, String booleanSelector, String realSelector, String stringSelector) {
        if (source.getType().is(SourceType.MYSQL)) {
            config.with("converters", "jdbc-sink");
            config.with("jdbc-sink.type", "io.debezium.connector.mysql.converters.JdbcSinkDataTypesConverter");
            if (!Strings.isNullOrEmpty((String)booleanSelector)) {
                config.with("jdbc-sink.selector.boolean", booleanSelector);
            }
            if (!Strings.isNullOrEmpty((String)realSelector)) {
                config.with("jdbc-sink.selector.real", realSelector);
            }
            if (!Strings.isNullOrEmpty((String)stringSelector)) {
                config.with("jdbc-sink.selector.string", stringSelector);
            }
        }
    }

    protected <U> void consumeAndAssert(Sink sink, DataTypeColumnAssert columnAssert, List<U> expectedValues, ColumnReader<U> columnReader) throws Exception {
        SinkRecord record = this.consumeSinkRecord();
        String tableName = this.getSinkTable(record, sink);
        if (TestHelper.shouldQueryDatabaseState()) {
            sink.queryContainerTable(tableName);
        }
        columnAssert.assertColumn(record);
        sink.assertRows(tableName, (ThrowingFunction<ResultSet, Void>)((ThrowingFunction)rs -> {
            for (int i = 0; i < expectedValues.size(); ++i) {
                String description = String.format("Column %s read failed.", rs.getMetaData().getColumnName(i + 1));
                ((ObjectAssert)Assertions.assertThat(columnReader.read((ResultSet)rs, i + 1)).as(description)).isEqualTo(expectedValues.get(i));
            }
            return null;
        }));
    }

    protected void assertCharDataType(Source source, Sink sink, String dataType, boolean nationalized) throws Exception {
        this.assertDataType(source, sink, this.getDataTypeWithCollation(source, dataType, nationalized), List.of("'a'", "'b'"), List.of("a", "b"), config -> this.applyJdbcSourceConverter(source, config, null, null, ".*.id|.*.data"), record -> {
            this.assertColumn(sink, record, "id", this.getCharType(source, true, nationalized));
            this.assertColumn(sink, record, "data", this.getCharType(source, false, nationalized));
        }, ResultSet::getString);
    }

    protected void assertCharWithLengthDataType(Source source, Sink sink, String dataType, int length, boolean nationalized) throws Exception {
        this.assertDataType(source, sink, this.getDataTypeWithCollation(source, dataType, nationalized), List.of("'a'", "'b'"), List.of(this.charValue(source, sink, length, true, "a"), this.charValue(source, sink, length, false, "b")), config -> this.applyJdbcSourceConverter(source, config, null, null, ".*.id|.*.data"), record -> {
            this.assertColumn(sink, record, "id", this.getCharType(source, true, nationalized));
            if (source.getOptions().isColumnTypePropagated()) {
                this.assertColumn(sink, record, "data", this.getCharType(source, false, nationalized), length);
            } else {
                this.assertColumn(sink, record, "data", this.getCharType(source, false, nationalized));
            }
        }, ResultSet::getString);
    }

    protected void assertVarcharDataType(Source source, Sink sink, String dataType, int length, boolean nationalized) throws Exception {
        this.assertDataType(source, sink, this.getDataTypeWithCollation(source, dataType, nationalized), List.of("'abc'", "'hello world'"), List.of("abc", "hello world"), config -> this.applyJdbcSourceConverter(source, config, null, null, ".*.id|.*.data"), record -> {
            this.assertColumn(sink, record, "id", this.getStringType(source, true, nationalized));
            if (source.getOptions().isColumnTypePropagated()) {
                this.assertColumn(sink, record, "data", this.getStringType(source, false, nationalized), length);
            } else {
                this.assertColumn(sink, record, "data", this.getStringType(source, false, nationalized));
            }
        }, ResultSet::getString);
    }

    protected String getDataTypeWithCollation(Source source, String dataType, boolean nationalized) {
        if (source.getType().is(SourceType.MYSQL) && !nationalized) {
            return String.format("%s collate latin1_general_cs", dataType);
        }
        if (source.getType().is(SourceType.MYSQL)) {
            return String.format("%s collate utf8mb3_general_ci", dataType);
        }
        return dataType;
    }

    @FunctionalInterface
    protected static interface DataTypeColumnAssert {
        public void assertColumn(SinkRecord var1);
    }

    @FunctionalInterface
    protected static interface ColumnReader<T> {
        public T read(ResultSet var1, int var2) throws Exception;
    }

    @FunctionalInterface
    protected static interface ConfigurationAdjuster {
        public void adjust(ConnectorConfiguration var1);
    }
}

