/*
 * Decompiled with CFR 0.152.
 */
package io.leoplatform.sdk.oracle;

import io.leoplatform.schema.ChangeEvent;
import io.leoplatform.schema.Field;
import io.leoplatform.schema.FieldType;
import io.leoplatform.schema.Op;
import io.leoplatform.schema.Source;
import io.leoplatform.sdk.ExecutorManager;
import io.leoplatform.sdk.changes.SchemaChangeQueue;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Singleton;
import oracle.jdbc.dcn.DatabaseChangeEvent;
import oracle.jdbc.dcn.DatabaseChangeListener;
import oracle.jdbc.dcn.RowChangeDescription;
import oracle.jdbc.dcn.TableChangeDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public final class OracleChangeWriter
implements DatabaseChangeListener {
    private static final Logger log = LoggerFactory.getLogger(OracleChangeWriter.class);
    private final SchemaChangeQueue changeQueue;
    private final BlockingQueue<DatabaseChangeEvent> payloads = new LinkedBlockingQueue<DatabaseChangeEvent>();
    private final AtomicBoolean running;
    private final Lock lock = new ReentrantLock();
    private final Condition changedRows = this.lock.newCondition();

    @Inject
    public OracleChangeWriter(SchemaChangeQueue changeQueue, ExecutorManager executorManager) {
        this.changeQueue = changeQueue;
        this.running = new AtomicBoolean(true);
        CompletableFuture.runAsync(this::asyncWriter, executorManager.get());
    }

    public void onDatabaseChangeNotification(DatabaseChangeEvent changeEvent) {
        log.info("Received database notification {}", (Object)changeEvent);
        if (this.running.get()) {
            this.lock.lock();
            try {
                this.payloads.put(changeEvent);
            }
            catch (InterruptedException i) {
                log.warn("Batch writer stopped unexpectedly");
                this.running.set(false);
            }
            finally {
                this.lock.unlock();
            }
            this.signalAll();
        }
    }

    private void asyncWriter() {
        while (this.running.get()) {
            LinkedList<DatabaseChangeEvent> toWrite = new LinkedList<DatabaseChangeEvent>();
            this.lock.lock();
            try {
                this.changedRows.await(500L, TimeUnit.MILLISECONDS);
                this.payloads.drainTo(toWrite);
            }
            catch (InterruptedException e) {
                log.warn("Oracle batch change writer stopped unexpectedly");
                this.running.set(false);
            }
            finally {
                this.lock.unlock();
            }
            if (toWrite.isEmpty()) continue;
            this.sendToChangeQueue(toWrite);
        }
    }

    private void sendToChangeQueue(Queue<DatabaseChangeEvent> toWrite) {
        this.toChangeEvents(toWrite).forEach(change -> {
            List rowIds = change.getFields().stream().map(Field::getValue).collect(Collectors.toList());
            log.info("Sending notification for {} {} changes", (Object)rowIds.size(), (Object)change.getName());
            log.debug("ROWIDs changed: {}", (Object)String.join((CharSequence)",", rowIds));
            this.changeQueue.add(change);
        });
    }

    void end() {
        this.running.set(false);
        this.signalAll();
        this.drainBuffer();
        this.changeQueue.end();
    }

    private void signalAll() {
        this.lock.lock();
        try {
            this.changedRows.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    private Collection<ChangeEvent> toChangeEvents(Queue<DatabaseChangeEvent> changeEvents) {
        return changeEvents.parallelStream().flatMap(this::tableChanges).map(this::rowChanges).flatMap(Collection::stream).collect(Collectors.toConcurrentMap(ChangeEvent::getName, Function.identity(), this::combineEvents)).values();
    }

    private ChangeEvent combineEvents(ChangeEvent c1, ChangeEvent c2) {
        List f = Stream.of(c1.getFields(), c2.getFields()).flatMap(Collection::stream).distinct().collect(Collectors.toList());
        return new ChangeEvent(c1.getSource(), c1.getOp(), c1.getName(), f);
    }

    private Stream<TableChangeDescription> tableChanges(DatabaseChangeEvent changeEvent) {
        return Optional.of(changeEvent).map(DatabaseChangeEvent::getTableChangeDescription).map(Stream::of).orElse(Stream.empty());
    }

    private Set<ChangeEvent> rowChanges(TableChangeDescription desc) {
        String table = this.tableName(desc);
        return this.rowChangeDescription(desc).stream().map(rowDesc -> {
            Field f = new Field("ROWID", FieldType.STRING, rowDesc.getRowid().stringValue());
            return new AbstractMap.SimpleImmutableEntry<Op, Field>(this.getOp((RowChangeDescription)rowDesc), f);
        }).map(e -> new ChangeEvent(Source.ORACLE, (Op)e.getKey(), table, Collections.singletonList(e.getValue()))).collect(Collectors.toSet());
    }

    private Op getOp(RowChangeDescription desc) {
        RowChangeDescription.RowOperation oracleOp = Optional.of(desc).map(RowChangeDescription::getRowOperation).orElse(RowChangeDescription.RowOperation.UPDATE);
        switch (oracleOp) {
            case INSERT: {
                return Op.INSERT;
            }
            case UPDATE: {
                return Op.UPDATE;
            }
            case DELETE: {
                return Op.DELETE;
            }
        }
        return Op.UPDATE;
    }

    private String tableName(TableChangeDescription desc) {
        return Optional.ofNullable(desc).map(TableChangeDescription::getTableName).orElseThrow(() -> new IllegalArgumentException("Missing table name in description " + desc));
    }

    private void drainBuffer() {
        LinkedList<DatabaseChangeEvent> toWrite = new LinkedList<DatabaseChangeEvent>();
        this.lock.lock();
        try {
            this.payloads.drainTo(toWrite);
        }
        finally {
            this.lock.unlock();
        }
        if (!toWrite.isEmpty()) {
            this.sendToChangeQueue(toWrite);
        }
    }

    private List<RowChangeDescription> rowChangeDescription(TableChangeDescription desc) {
        return Optional.ofNullable(desc).map(TableChangeDescription::getRowChangeDescription).map(Arrays::asList).orElse(Collections.emptyList());
    }
}

