/*
 * Decompiled with CFR 0.152.
 */
package io.leoplatform.sdk.oracle;

import io.leoplatform.sdk.ExecutorManager;
import io.leoplatform.sdk.LoadingStream;
import io.leoplatform.sdk.StreamStats;
import io.leoplatform.sdk.payload.EventPayload;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import oracle.jdbc.dcn.DatabaseChangeEvent;
import oracle.jdbc.dcn.DatabaseChangeListener;
import oracle.jdbc.dcn.RowChangeDescription;
import oracle.jdbc.dcn.TableChangeDescription;
import oracle.sql.ROWID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public final class OracleChangeWriter
implements DatabaseChangeListener {
    private static final Logger log = LoggerFactory.getLogger(OracleChangeWriter.class);
    private final LoadingStream stream;
    private final ExecutorManager executorManager;
    private final BlockingQueue<DatabaseChangeEvent> payloads = new LinkedBlockingQueue<DatabaseChangeEvent>();
    private final Queue<CompletableFuture<Void>> pendingWrites = new LinkedList<CompletableFuture<Void>>();
    private final AtomicBoolean running;
    private final Lock lock = new ReentrantLock();
    private final Condition changedRows = this.lock.newCondition();

    @Inject
    public OracleChangeWriter(LoadingStream stream, ExecutorManager executorManager) {
        this.stream = stream;
        this.executorManager = executorManager;
        this.running = new AtomicBoolean(true);
        CompletableFuture.runAsync(this::asyncWriter, executorManager.get());
    }

    public void onDatabaseChangeNotification(DatabaseChangeEvent changeEvent) {
        log.info("Received database notification {}", (Object)changeEvent);
        if (this.running.get()) {
            this.lock.lock();
            try {
                this.payloads.put(changeEvent);
                this.changedRows.signalAll();
            }
            catch (InterruptedException i) {
                log.warn("Batch writer stopped unexpectedly");
                this.running.set(false);
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void asyncWriter() {
        while (this.running.get()) {
            this.lock.lock();
            try {
                this.changedRows.await(200L, TimeUnit.MILLISECONDS);
                LinkedList toWrite = new LinkedList();
                this.payloads.drainTo(toWrite);
                if (toWrite.isEmpty()) continue;
                Executor e = this.executorManager.get();
                CompletionStage cf = CompletableFuture.runAsync(() -> this.sendToBus(toWrite), e).thenRunAsync(this::removeCompleted, e);
                this.pendingWrites.add((CompletableFuture<Void>)cf);
            }
            catch (InterruptedException e) {
                log.warn("Oracle batch change writer stopped unexpectedly");
                this.running.set(false);
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    private void sendToBus(Queue<DatabaseChangeEvent> toWrite) {
        Map<String, Set<String>> tableChanges = this.toTableChanges(toWrite);
        tableChanges.forEach((t, r) -> {
            log.info("Sending rows for {}", t);
            log.info(String.join((CharSequence)",", r));
        });
        Optional.of(tableChanges).filter(c -> !c.isEmpty()).map(Map::entrySet).map(Collection::stream).map(this::reduceEntries).map(JsonObjectBuilder::build).map(this::toPayload).ifPresent(arg_0 -> ((LoadingStream)this.stream).load(arg_0));
    }

    CompletableFuture<StreamStats> end() {
        this.running.set(false);
        this.lock.lock();
        try {
            this.changedRows.signalAll();
        }
        finally {
            this.lock.unlock();
        }
        this.completePendingTasks();
        return this.stream.end();
    }

    private Map<String, Set<String>> toTableChanges(Queue<DatabaseChangeEvent> toWrite) {
        LinkedHashMap<String, Set<String>> changes = new LinkedHashMap<String, Set<String>>();
        toWrite.forEach(e -> this.rowsChanged((DatabaseChangeEvent)e).forEach((key, value) -> changes.merge((String)key, (Set<String>)value, (tbl, rows) -> Stream.of(rows, value).flatMap(Collection::stream).collect(Collectors.toSet()))));
        return changes;
    }

    private JsonObjectBuilder reduceEntries(Stream<Map.Entry<String, Set<String>>> s) {
        return s.reduce(Json.createObjectBuilder(), (b, c) -> b.add((String)c.getKey(), Json.createArrayBuilder((Collection)((Collection)c.getValue()))), JsonObjectBuilder::addAll);
    }

    private Map<String, Set<String>> rowsChanged(DatabaseChangeEvent changeEvent) {
        return this.tableChanges(changeEvent).parallelStream().map(this::rowChanges).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (r1, r2) -> Stream.of(r1, r2).flatMap(Collection::stream).collect(Collectors.toSet())));
    }

    private List<TableChangeDescription> tableChanges(DatabaseChangeEvent changeEvent) {
        return Optional.ofNullable(changeEvent).map(DatabaseChangeEvent::getTableChangeDescription).map(Arrays::asList).orElse(Collections.emptyList());
    }

    private Map.Entry<String, Set<String>> rowChanges(TableChangeDescription desc) {
        String table = this.tableName(desc);
        Set rowIds = this.rowChangeDescription(desc).stream().map(RowChangeDescription::getRowid).map(ROWID::stringValue).collect(Collectors.toSet());
        return new AbstractMap.SimpleImmutableEntry<String, Set<String>>(table, rowIds);
    }

    private EventPayload toPayload(JsonObject jsonObject) {
        return () -> jsonObject;
    }

    private String tableName(TableChangeDescription desc) {
        return Optional.ofNullable(desc).map(TableChangeDescription::getTableName).orElseThrow(() -> new IllegalArgumentException("Missing table name in description " + desc));
    }

    private void removeCompleted() {
        this.lock.lock();
        try {
            this.pendingWrites.removeIf(CompletableFuture::isDone);
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void completePendingTasks() {
        this.removeCompleted();
        this.lock.lock();
        try {
            long inFlight = this.pendingWrites.parallelStream().filter(w -> !w.isDone()).count();
            log.info("Waiting for {} Oracle writer task{} to complete", (Object)inFlight, (Object)(inFlight == 1L ? "" : "s"));
        }
        finally {
            this.lock.unlock();
        }
        while (!this.pendingWrites.isEmpty()) {
            this.lock.lock();
            try {
                this.changedRows.await(100L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException i) {
                log.warn("Stopped with incomplete pending Oracle writer tasks");
                this.pendingWrites.clear();
            }
            finally {
                this.lock.unlock();
            }
            this.removeCompleted();
        }
    }

    private List<RowChangeDescription> rowChangeDescription(TableChangeDescription desc) {
        return Optional.ofNullable(desc).map(TableChangeDescription::getRowChangeDescription).map(Arrays::asList).orElse(Collections.emptyList());
    }
}

