/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.EventHeader;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.XidEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.network.SSLMode;
import com.github.shyiko.mysql.binlog.network.ServerException;
import io.debezium.connector.mysql.MySqlTestConnection;
import io.debezium.connector.mysql.StopEventDataDeserializer;
import io.debezium.connector.mysql.UniqueDatabase;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.util.Testing;
import java.io.IOException;
import java.io.Serializable;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SkipWhenDatabaseVersion(check=EqualityCheck.LESS_THAN, major=5, minor=6, patch=5, reason="MySQL 5.5 does not support CURRENT_TIMESTAMP on DATETIME and only a single column can specify default CURRENT_TIMESTAMP, lifted in MySQL 5.6.5")
public class ReadBinLogIT
implements Testing {
    protected static final Logger LOGGER = LoggerFactory.getLogger(ReadBinLogIT.class);
    protected static final long DEFAULT_TIMEOUT = TimeUnit.SECONDS.toMillis(15L);
    private static final Serializable ANY_OBJECT = new AnyValue();
    private EventQueue counters;
    private BinaryLogClient client;
    private MySqlTestConnection conn;
    private List<Event> events = new LinkedList<Event>();
    private JdbcConfiguration config;
    private final UniqueDatabase DATABASE = new UniqueDatabase("readbinlog_it", "readbinlog_test");
    @Rule
    public SkipTestRule skipTest = new SkipTestRule();

    @Before
    public void beforeEach() throws TimeoutException, IOException, SQLException, InterruptedException {
        this.events.clear();
        this.DATABASE.createAndInitialize();
        this.conn = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        this.conn.connect();
        this.config = this.conn.config();
    }

    @After
    public void afterEach() throws IOException, SQLException {
        this.events.clear();
        try {
            if (this.client != null) {
                this.client.disconnect();
            }
        }
        finally {
            this.client = null;
            try {
                if (this.conn != null) {
                    this.conn.close();
                }
            }
            finally {
                this.conn = null;
            }
        }
    }

    protected void startClient() throws IOException, TimeoutException, SQLException {
        this.startClient(null);
    }

    protected void startClient(Consumer<BinaryLogClient> preConnect) throws IOException, TimeoutException, SQLException {
        this.counters = new EventQueue(DEFAULT_TIMEOUT, this::logConsumedEvent, this::logIgnoredEvent);
        this.client = new BinaryLogClient(this.config.getHostname(), this.config.getPort(), "replicator", "replpass");
        this.client.setServerId(this.client.getServerId() - 1L);
        this.client.setKeepAlive(false);
        this.client.setSSLMode(SSLMode.DISABLED);
        this.client.registerEventListener((BinaryLogClient.EventListener)this.counters);
        this.client.registerEventListener(this::recordEvent);
        this.client.registerLifecycleListener((BinaryLogClient.LifecycleListener)new TraceLifecycleListener());
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setEventDataDeserializer(EventType.STOP, (EventDataDeserializer)new StopEventDataDeserializer());
        this.client.setEventDeserializer(eventDeserializer);
        if (preConnect != null) {
            preConnect.accept(this.client);
        }
        this.client.connect(DEFAULT_TIMEOUT);
        this.conn.execute(new String[]{"DROP TABLE IF EXISTS person", "CREATE TABLE person (  name VARCHAR(255) primary key,  age INTEGER NULL DEFAULT 10,  createdAt DATETIME NULL DEFAULT CURRENT_TIMESTAMP,  updatedAt DATETIME NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP)"});
        this.counters.consume(2, EventType.QUERY);
        this.counters.reset();
    }

    @Ignore
    @Test(expected=ServerException.class)
    public void shouldFailToConnectToInvalidBinlogFile() throws Exception {
        Testing.Print.enable();
        this.startClient(client -> client.setBinlogFilename("invalid-mysql-binlog.filename.000001"));
    }

    @Ignore
    @Test
    public void shouldReadMultipleBinlogFiles() throws Exception {
        Testing.Print.enable();
        this.startClient(client -> client.setBinlogFilename("mysql-bin.000001"));
        this.counters.consumeAll(20L, TimeUnit.SECONDS);
    }

    @Test
    public void shouldCaptureSingleWriteUpdateDeleteEvents() throws Exception {
        String sslMode = System.getProperty("database.ssl.mode", "disabled");
        if (!sslMode.equals("disabled")) {
            return;
        }
        this.startClient();
        this.conn.execute(new String[]{"INSERT INTO person(name,age) VALUES ('Georgia',30)"});
        this.counters.consume(1, WriteRowsEventData.class);
        List<WriteRowsEventData> writeRowEvents = this.recordedEventData(WriteRowsEventData.class, 1);
        this.assertRows(writeRowEvents.get(0), this.rows().insertedRow(new Serializable[]{"Georgia", Integer.valueOf(30), this.any(), this.any()}));
        this.conn.execute(new String[]{"UPDATE person SET name = 'Maggie' WHERE name = 'Georgia'"});
        this.counters.consume(1, UpdateRowsEventData.class);
        List<UpdateRowsEventData> updateRowEvents = this.recordedEventData(UpdateRowsEventData.class, 1);
        this.assertRows(updateRowEvents.get(0), this.rows().changeRow(new Serializable[]{"Georgia", Integer.valueOf(30), this.any(), this.any()}).to(new Serializable[]{"Maggie", Integer.valueOf(30), this.any(), this.any()}));
        this.conn.execute(new String[]{"DELETE FROM person WHERE name = 'Maggie'"});
        this.counters.consume(1, DeleteRowsEventData.class);
        List<DeleteRowsEventData> deleteRowEvents = this.recordedEventData(DeleteRowsEventData.class, 1);
        this.assertRows(deleteRowEvents.get(0), this.rows().removedRow(new Serializable[]{"Maggie", Integer.valueOf(30), this.any(), this.any()}));
    }

    @Test
    public void shouldCaptureMultipleWriteUpdateDeleteEvents() throws Exception {
        String sslMode = System.getProperty("database.ssl.mode", "disabled");
        if (!sslMode.equals("disabled")) {
            return;
        }
        this.startClient();
        this.conn.execute(new String[]{"INSERT INTO person(name,age) VALUES ('Georgia',30)", "INSERT INTO person(name,age) VALUES ('Janice',19)"});
        this.counters.consume(1, QueryEventData.class);
        this.counters.consume(1, TableMapEventData.class);
        this.counters.consume(2, WriteRowsEventData.class);
        this.counters.consume(1, XidEventData.class);
        List<WriteRowsEventData> writeRowEvents = this.recordedEventData(WriteRowsEventData.class, 2);
        this.assertRows(writeRowEvents.get(0), this.rows().insertedRow(new Serializable[]{"Georgia", Integer.valueOf(30), this.any(), this.any()}));
        this.assertRows(writeRowEvents.get(1), this.rows().insertedRow(new Serializable[]{"Janice", Integer.valueOf(19), this.any(), this.any()}));
        this.counters.reset();
        this.conn.execute(new String[]{"UPDATE person SET name = 'Maggie' WHERE name = 'Georgia'", "UPDATE person SET name = 'Jamie' WHERE name = 'Janice'"});
        this.counters.consume(1, QueryEventData.class);
        this.counters.consume(1, TableMapEventData.class);
        this.counters.consume(2, UpdateRowsEventData.class);
        this.counters.consume(1, XidEventData.class);
        List<UpdateRowsEventData> updateRowEvents = this.recordedEventData(UpdateRowsEventData.class, 2);
        this.assertRows(updateRowEvents.get(0), this.rows().changeRow(new Serializable[]{"Georgia", Integer.valueOf(30), this.any(), this.any()}).to(new Serializable[]{"Maggie", Integer.valueOf(30), this.any(), this.any()}));
        this.assertRows(updateRowEvents.get(1), this.rows().changeRow(new Serializable[]{"Janice", Integer.valueOf(19), this.any(), this.any()}).to(new Serializable[]{"Jamie", Integer.valueOf(19), this.any(), this.any()}));
        this.counters.reset();
        this.conn.execute(new String[]{"DELETE FROM person WHERE name = 'Maggie'", "DELETE FROM person WHERE name = 'Jamie'"});
        this.counters.consume(1, QueryEventData.class);
        this.counters.consume(1, TableMapEventData.class);
        this.counters.consume(2, DeleteRowsEventData.class);
        this.counters.consume(1, XidEventData.class);
        List<DeleteRowsEventData> deleteRowEvents = this.recordedEventData(DeleteRowsEventData.class, 2);
        this.assertRows(deleteRowEvents.get(0), this.rows().removedRow(new Serializable[]{"Maggie", Integer.valueOf(30), this.any(), this.any()}));
        this.assertRows(deleteRowEvents.get(1), this.rows().removedRow(new Serializable[]{"Jamie", Integer.valueOf(19), this.any(), this.any()}));
    }

    @Test
    public void shouldCaptureMultipleWriteUpdateDeletesInSingleEvents() throws Exception {
        String sslMode = System.getProperty("database.ssl.mode", "disabled");
        if (!sslMode.equals("disabled")) {
            return;
        }
        this.startClient();
        this.conn.execute(new String[]{"INSERT INTO person(name,age) VALUES ('Georgia',30),('Janice',19)"});
        this.counters.consume(1, QueryEventData.class);
        this.counters.consume(1, TableMapEventData.class);
        this.counters.consume(1, WriteRowsEventData.class);
        this.counters.consume(1, XidEventData.class);
        List<WriteRowsEventData> writeRowEvents = this.recordedEventData(WriteRowsEventData.class, 1);
        this.assertRows(writeRowEvents.get(0), this.rows().insertedRow(new Serializable[]{"Georgia", Integer.valueOf(30), this.any(), this.any()}).insertedRow(new Serializable[]{"Janice", Integer.valueOf(19), this.any(), this.any()}));
        this.counters.reset();
        this.conn.execute(new String[]{"UPDATE person SET name = CASE                           WHEN name = 'Georgia' THEN 'Maggie'                           WHEN name = 'Janice' THEN 'Jamie'                          END WHERE name IN ('Georgia','Janice')"});
        this.counters.consume(1, QueryEventData.class);
        this.counters.consume(1, TableMapEventData.class);
        this.counters.consume(1, UpdateRowsEventData.class);
        this.counters.consume(1, XidEventData.class);
        List<UpdateRowsEventData> updateRowEvents = this.recordedEventData(UpdateRowsEventData.class, 1);
        this.assertRows(updateRowEvents.get(0), this.rows().changeRow(new Serializable[]{"Georgia", Integer.valueOf(30), this.any(), this.any()}).to(new Serializable[]{"Maggie", Integer.valueOf(30), this.any(), this.any()}).changeRow(new Serializable[]{"Janice", Integer.valueOf(19), this.any(), this.any()}).to(new Serializable[]{"Jamie", Integer.valueOf(19), this.any(), this.any()}));
        this.counters.reset();
        this.conn.execute(new String[]{"DELETE FROM person WHERE name IN ('Maggie','Jamie')"});
        this.counters.consume(1, QueryEventData.class);
        this.counters.consume(1, TableMapEventData.class);
        this.counters.consume(1, DeleteRowsEventData.class);
        this.counters.consume(1, XidEventData.class);
        List<DeleteRowsEventData> deleteRowEvents = this.recordedEventData(DeleteRowsEventData.class, 1);
        this.assertRows(deleteRowEvents.get(0), this.rows().removedRow(new Serializable[]{"Maggie", Integer.valueOf(30), this.any(), this.any()}).removedRow(new Serializable[]{"Jamie", Integer.valueOf(19), this.any(), this.any()}));
    }

    @Ignore
    @Test
    public void shouldCaptureQueryEventData() throws Exception {
        this.startClient(client -> {
            client.setBinlogFilename("mysql-bin.000001");
            client.setBinlogPosition(4L);
        });
        this.counters.consumeAll(5L, TimeUnit.SECONDS);
        List<QueryEventData> allQueryEvents = this.recordedEventData(QueryEventData.class, -1);
        allQueryEvents.forEach(event -> {
            String sql = event.getSql();
            if (sql.equalsIgnoreCase("BEGIN") || sql.equalsIgnoreCase("COMMIT")) {
                return;
            }
            System.out.println(event.getSql());
        });
    }

    @Test
    public void shouldQueryInformationSchema() throws Exception {
    }

    protected void logConsumedEvent(Event event) {
        Testing.print((Object)("Consumed event: " + event));
    }

    protected void logIgnoredEvent(Event event) {
        Testing.print((Object)("Ignored event:  " + event));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void recordEvent(Event event) {
        List<Event> list = this.events;
        synchronized (list) {
            this.events.add(event);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected <T extends EventData> List<T> recordedEventData(Class<T> eventType, int expectedCount) {
        List results = null;
        List<Event> list = this.events;
        synchronized (list) {
            results = this.events.stream().map(Event::getData).filter(eventType::isInstance).map(eventType::cast).collect(Collectors.toList());
        }
        if (expectedCount > -1) {
            Assertions.assertThat((int)results.size()).isEqualTo(expectedCount);
        }
        return results;
    }

    protected void assertRow(Serializable[] data, Serializable ... expected) {
        Assertions.assertThat((int)data.length).isEqualTo(expected.length);
        Assertions.assertThat((Object[])data).contains((Object[])expected);
    }

    protected void assertRows(WriteRowsEventData eventData, int numRowsInEvent, Serializable ... expectedValuesInRows) {
        Assertions.assertThat((int)eventData.getRows().size()).isEqualTo(numRowsInEvent);
        int valuePosition = 0;
        Iterator iterator = eventData.getRows().iterator();
        while (iterator.hasNext()) {
            Serializable[] row;
            for (Serializable value : row = (Serializable[])iterator.next()) {
                Assertions.assertThat((Object)value).isEqualTo((Object)expectedValuesInRows[valuePosition++]);
            }
        }
    }

    protected Serializable any() {
        return ANY_OBJECT;
    }

    protected RowBuilder rows() {
        return new RowBuilder();
    }

    protected void assertRows(UpdateRowsEventData eventData, RowBuilder rows) {
        Assertions.assertThat((int)eventData.getRows().size()).isEqualTo(rows.rows().size());
        for (Map.Entry row : eventData.getRows()) {
            if (rows.findUpdatedRow((Serializable[])row.getKey(), (Serializable[])row.getValue())) continue;
            Assert.fail((String)("Failed to find updated row: " + eventData));
        }
    }

    protected void assertRows(WriteRowsEventData eventData, RowBuilder rows) {
        Assertions.assertThat((int)eventData.getRows().size()).isEqualTo(rows.rows().size());
        for (Serializable[] removedRow : eventData.getRows()) {
            if (rows.findInsertedRow(removedRow)) continue;
            Assert.fail((String)("Failed to find inserted row: " + eventData));
        }
    }

    protected void assertRows(DeleteRowsEventData eventData, RowBuilder rows) {
        Assertions.assertThat((int)eventData.getRows().size()).isEqualTo(rows.rows().size());
        for (Serializable[] removedRow : eventData.getRows()) {
            if (rows.findDeletedRow(removedRow)) continue;
            Assert.fail((String)("Failed to find removed row: " + eventData));
        }
    }

    protected static class EventQueue
    implements BinaryLogClient.EventListener {
        private final ConcurrentLinkedQueue<Event> queue = new ConcurrentLinkedQueue();
        private final Consumer<Event> consumedEvents;
        private final Consumer<Event> ignoredEvents;
        private final long defaultTimeoutInMillis;

        public EventQueue(long defaultTimeoutInMillis, Consumer<Event> consumedEvents, Consumer<Event> ignoredEvents) {
            this.defaultTimeoutInMillis = defaultTimeoutInMillis;
            this.consumedEvents = consumedEvents != null ? consumedEvents : this::defaultEventHandler;
            this.ignoredEvents = ignoredEvents != null ? ignoredEvents : this::defaultEventHandler;
        }

        private void defaultEventHandler(Event event) {
        }

        public void onEvent(Event event) {
            boolean success = this.queue.offer(event);
            assert (success);
        }

        public void consumeAll(long timeout, TimeUnit unit) throws TimeoutException {
            long stopTime = System.currentTimeMillis() + unit.toMillis(timeout);
            while (System.currentTimeMillis() < stopTime) {
                Event nextEvent = this.queue.poll();
                if (nextEvent == null) continue;
                Testing.print((Object)("Found event: " + nextEvent));
                this.consumedEvents.accept(nextEvent);
            }
        }

        public void consume(int eventCount, Predicate<Event> condition) throws TimeoutException {
            this.consume(eventCount, this.defaultTimeoutInMillis, condition);
        }

        public void consume(int eventCount, long timeoutInMillis, Predicate<Event> condition) throws TimeoutException {
            if (eventCount < 0) {
                throw new IllegalArgumentException("The eventCount may not be negative");
            }
            if (eventCount == 0) {
                return;
            }
            int eventsRemaining = eventCount;
            long stopTime = System.currentTimeMillis() + timeoutInMillis;
            while (eventsRemaining > 0 && System.currentTimeMillis() < stopTime) {
                Event nextEvent = this.queue.poll();
                if (nextEvent == null) continue;
                if (condition.test(nextEvent)) {
                    --eventsRemaining;
                    this.consumedEvents.accept(nextEvent);
                    continue;
                }
                this.ignoredEvents.accept(nextEvent);
            }
            if (eventsRemaining > 0) {
                throw new TimeoutException("Received " + (eventCount - eventsRemaining) + " of " + eventCount + " in " + timeoutInMillis + "ms");
            }
        }

        public void consume(int eventCount, EventType type) throws TimeoutException {
            this.consume(eventCount, type, this.defaultTimeoutInMillis);
        }

        public void consume(int eventCount, EventType type, long timeoutMillis) throws TimeoutException {
            this.consume(eventCount, this.defaultTimeoutInMillis, event -> {
                EventHeader header = event.getHeader();
                EventType eventType = header == null ? null : header.getEventType();
                return type.equals((Object)eventType);
            });
        }

        public void consume(int eventCount, Class<? extends EventData> eventDataClass) throws TimeoutException {
            this.consume(eventCount, eventDataClass, this.defaultTimeoutInMillis);
        }

        public void consume(int eventCount, Class<? extends EventData> eventDataClass, long timeoutMillis) throws TimeoutException {
            this.consume(eventCount, this.defaultTimeoutInMillis, event -> {
                EventData data = event.getData();
                return data != null && data.getClass().equals(eventDataClass);
            });
        }

        public void reset() {
            this.queue.clear();
        }
    }

    protected static class TraceLifecycleListener
    implements BinaryLogClient.LifecycleListener {
        protected TraceLifecycleListener() {
        }

        public void onDisconnect(BinaryLogClient client) {
            LOGGER.debug("Client disconnected");
        }

        public void onConnect(BinaryLogClient client) {
            LOGGER.debug("Client connected");
        }

        public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
            LOGGER.warn("Client communication failure", (Throwable)ex);
        }

        public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
            LOGGER.error("Client received event deserialization failure", (Throwable)ex);
        }
    }

    public static class RowBuilder {
        private List<Row> rows = new ArrayList<Row>();
        private Row nextRow = null;

        public RowBuilder insertedRow(Serializable ... values) {
            this.maybeAddRow();
            return this.changeRow(new Serializable[0]).to(values);
        }

        public RowBuilder removedRow(Serializable ... values) {
            this.maybeAddRow();
            return this.changeRow(values).to(values);
        }

        public UpdateBuilder changeRow(Serializable ... values) {
            this.maybeAddRow();
            this.nextRow = new Row();
            this.nextRow.fromValues = values;
            return new UpdateBuilder(){

                @Override
                public RowBuilder to(Serializable ... values) {
                    nextRow.toValues = values;
                    return this;
                }
            };
        }

        protected void maybeAddRow() {
            if (this.nextRow != null) {
                this.rows.add(this.nextRow);
                this.nextRow = null;
            }
        }

        protected List<Row> rows() {
            this.maybeAddRow();
            return this.rows;
        }

        protected boolean findInsertedRow(Serializable[] values) {
            this.maybeAddRow();
            Iterator<Row> iter = this.rows.iterator();
            while (iter.hasNext()) {
                Row expectedRow = iter.next();
                if (!this.deepEquals(expectedRow.toValues, values)) continue;
                iter.remove();
                return true;
            }
            return false;
        }

        protected boolean findDeletedRow(Serializable[] values) {
            this.maybeAddRow();
            Iterator<Row> iter = this.rows.iterator();
            while (iter.hasNext()) {
                Row expectedRow = iter.next();
                if (!this.deepEquals(expectedRow.fromValues, values)) continue;
                iter.remove();
                return true;
            }
            return false;
        }

        protected boolean findUpdatedRow(Serializable[] oldValues, Serializable[] newValues) {
            this.maybeAddRow();
            Iterator<Row> iter = this.rows.iterator();
            while (iter.hasNext()) {
                Row expectedRow = iter.next();
                if (!this.deepEquals(expectedRow.fromValues, oldValues) || !this.deepEquals(expectedRow.toValues, newValues)) continue;
                iter.remove();
                return true;
            }
            return false;
        }

        protected boolean deepEquals(Serializable[] expectedValues, Serializable[] actualValues) {
            Assertions.assertThat((int)expectedValues.length).isEqualTo(actualValues.length);
            Object[] actualValuesCopy = Arrays.copyOf(actualValues, actualValues.length);
            for (int i = 0; i != actualValuesCopy.length; ++i) {
                if (!(expectedValues[i] instanceof AnyValue)) continue;
                actualValuesCopy[i] = expectedValues[i];
            }
            return Arrays.deepEquals(expectedValues, actualValuesCopy);
        }
    }

    public static interface UpdateBuilder {
        public RowBuilder to(Serializable ... var1);
    }

    private static final class AnyValue
    implements Serializable {
        private static final long serialVersionUID = 1L;

        private AnyValue() {
        }
    }

    public static class Row {
        public Serializable[] fromValues;
        public Serializable[] toValues;
    }
}

