/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.storage.s3.history;

import io.debezium.DebeziumException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.util.FunctionalReadWriteLock;
import io.debezium.util.Loggings;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.common.config.ConfigDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.Bucket;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;

@NotThreadSafe
public class S3SchemaHistory
extends AbstractSchemaHistory {
    private static final Logger LOGGER = LoggerFactory.getLogger(S3SchemaHistory.class);
    public static final String ACCESS_KEY_ID_CONFIG = "s3.access.key.id";
    public static final String SECRET_ACCESS_KEY_CONFIG = "s3.secret.access.key";
    public static final String REGION_CONFIG = "schema.history.internal.s3.region.name";
    public static final String BUCKET_CONFIG = "schema.history.internal.s3.bucket.name";
    public static final String OBJECT_NAME_CONFIG = "schema.history.internal.s3.object.name";
    public static final String ENDPOINT_CONFIG = "schema.history.internal.s3.endpoint";
    public static final String OBJECT_CONTENT_TYPE = "text/plain";
    public static final Field ACCESS_KEY_ID = Field.create((String)"schema.history.internal.s3.access.key.id").withDisplayName("S3 access key id").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH);
    public static final Field SECRET_ACCESS_KEY = Field.create((String)"schema.history.internal.s3.secret.access.key").withDisplayName("S3 secret access key").withType(ConfigDef.Type.PASSWORD).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH);
    public static final Field REGION = Field.create((String)"schema.history.internal.s3.region.name").withDisplayName("S3 region").withWidth(ConfigDef.Width.LONG).withType(ConfigDef.Type.STRING).withImportance(ConfigDef.Importance.MEDIUM);
    public static final Field BUCKET = Field.create((String)"schema.history.internal.s3.bucket.name").withDisplayName("S3 bucket").withType(ConfigDef.Type.STRING).withImportance(ConfigDef.Importance.HIGH);
    public static final Field OBJECT_NAME = Field.create((String)"schema.history.internal.s3.object.name").withDisplayName("S3 Object name").withType(ConfigDef.Type.STRING).withImportance(ConfigDef.Importance.HIGH).withDescription("The name of the object under which the history is stored.");
    public static final Field ENDPOINT = Field.create((String)"schema.history.internal.s3.endpoint").withDisplayName("S3 endpoint").withType(ConfigDef.Type.STRING).withImportance(ConfigDef.Importance.LOW);
    public static final Field.Set ALL_FIELDS = Field.setOf((Field[])new Field[]{ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION, BUCKET, ENDPOINT});
    private final AtomicBoolean running = new AtomicBoolean();
    private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant();
    private final DocumentWriter documentWriter = DocumentWriter.defaultWriter();
    private final DocumentReader reader = DocumentReader.defaultReader();
    private String bucket = null;
    private String objectName = null;
    private Region region = null;
    private URI endpoint = null;
    private AwsCredentialsProvider credentialsProvider = null;
    private volatile S3Client client = null;
    private volatile List<HistoryRecord> records = new ArrayList<HistoryRecord>();

    public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
        super.configure(config, comparator, listener, useCatalogBeforeSchema);
        if (!config.validateAndRecord((Iterable)ALL_FIELDS, arg_0 -> ((Logger)LOGGER).error(arg_0))) {
            throw new SchemaHistoryException("Error configuring an instance of " + ((Object)((Object)this)).getClass().getSimpleName() + "; check the logs for details");
        }
        this.bucket = config.getString(BUCKET);
        if (this.bucket == null) {
            throw new DebeziumException(BUCKET + " is required to be set");
        }
        this.objectName = config.getString(OBJECT_NAME);
        if (this.objectName == null) {
            throw new DebeziumException(OBJECT_NAME + " is required to be set");
        }
        String regionName = config.getString(REGION);
        if (regionName == null) {
            throw new DebeziumException(REGION + " is required to be set");
        }
        this.region = Region.of((String)regionName);
        LOGGER.info("Database history will be stored in bucket '{}' under key '{}' using region '{}'", new Object[]{this.bucket, this.objectName, this.region});
        String uriString = config.getString(ENDPOINT);
        if (uriString != null) {
            LOGGER.info("Using explicitly configured endpoint " + uriString);
            this.endpoint = URI.create(uriString);
        }
        if (config.getString(ACCESS_KEY_ID) == null && config.getString(SECRET_ACCESS_KEY) == null) {
            LOGGER.info("DefaultCreadentialsProvider is used for authentication");
            this.credentialsProvider = DefaultCredentialsProvider.create();
        } else {
            LOGGER.info("StaticCredentialsProvider is used for authentication");
            AwsBasicCredentials credentials = AwsBasicCredentials.create((String)config.getString(ACCESS_KEY_ID), (String)config.getString(SECRET_ACCESS_KEY));
            this.credentialsProvider = StaticCredentialsProvider.create((AwsCredentials)credentials);
        }
    }

    public synchronized void start() {
        if (this.client == null) {
            S3ClientBuilder clientBuilder = (S3ClientBuilder)((S3ClientBuilder)S3Client.builder().credentialsProvider(this.credentialsProvider)).region(this.region);
            if (this.endpoint != null) {
                clientBuilder.endpointOverride(this.endpoint);
            }
            this.client = (S3Client)clientBuilder.build();
        }
        this.lock.write(() -> {
            if (this.running.compareAndSet(false, true)) {
                if (!this.storageExists()) {
                    this.initializeStorage();
                }
                InputStream objectInputStream = null;
                try {
                    GetObjectRequest request = (GetObjectRequest)GetObjectRequest.builder().bucket(this.bucket).key(this.objectName).responseContentType(OBJECT_CONTENT_TYPE).build();
                    objectInputStream = (InputStream)this.client.getObject(request, ResponseTransformer.toInputStream());
                }
                catch (NoSuchKeyException request) {
                }
                catch (S3Exception e) {
                    throw new SchemaHistoryException("Can't retrieve history object from S3", (Throwable)e);
                }
                if (objectInputStream != null) {
                    try (BufferedReader historyReader = new BufferedReader(new InputStreamReader(objectInputStream, StandardCharsets.UTF_8));){
                        String line;
                        while ((line = historyReader.readLine()) != null) {
                            if (line.isEmpty()) continue;
                            this.records.add(new HistoryRecord(this.reader.read(line)));
                        }
                    }
                    catch (IOException e) {
                        throw new SchemaHistoryException("Unable to read object content", (Throwable)e);
                    }
                }
            }
        });
        super.start();
    }

    public synchronized void stop() {
        if (this.running.compareAndSet(true, false) && this.client != null) {
            this.client.close();
        }
        super.stop();
    }

    protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
        if (this.client == null) {
            throw new IllegalStateException("No S3 client is available. Ensure that 'start()' is called before storing database history records.");
        }
        if (record == null) {
            return;
        }
        LOGGER.trace("Storing record into database history: {}", (Object)record);
        this.lock.write(() -> {
            if (!this.running.get()) {
                throw new IllegalStateException("The history has been stopped and will not accept more records");
            }
            this.records.add(record);
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
            try (BufferedWriter historyWriter = new BufferedWriter(new OutputStreamWriter((OutputStream)outputStream, StandardCharsets.UTF_8));){
                for (HistoryRecord r : this.records) {
                    String line = null;
                    line = this.documentWriter.write(r.document());
                    if (line == null) continue;
                    historyWriter.newLine();
                    historyWriter.append(line);
                }
            }
            catch (IOException e) {
                Loggings.logErrorAndTraceRecord((Logger)this.logger, (Object)record, (String)"Failed to convert record", (Throwable)e);
                throw new SchemaHistoryException("Failed to convert record", (Throwable)e);
            }
            try {
                PutObjectRequest request = (PutObjectRequest)PutObjectRequest.builder().bucket(this.bucket).key(this.objectName).contentType(OBJECT_CONTENT_TYPE).build();
                this.client.putObject(request, RequestBody.fromBytes((byte[])outputStream.toByteArray()));
            }
            catch (S3Exception e) {
                throw new SchemaHistoryException("Can not store record to S3", (Throwable)e);
            }
        });
    }

    protected void recoverRecords(Consumer<HistoryRecord> records) {
        this.lock.write(() -> this.records.forEach(records));
    }

    public boolean exists() {
        return !this.records.isEmpty();
    }

    public boolean storageExists() {
        boolean bucketExists = this.client.listBuckets().buckets().stream().map(Bucket::name).anyMatch(this.bucket::equals);
        if (bucketExists) {
            LOGGER.info("Bucket '{}' used to store database history exists", (Object)this.bucket);
        } else {
            LOGGER.info("Bucket '{}' used to store database history does not exist yet", (Object)this.bucket);
        }
        return bucketExists;
    }

    public void initializeStorage() {
        super.initializeStorage();
        LOGGER.info("Creating S3 bucket '{}' used to store database history", (Object)this.bucket);
        this.client.createBucket((CreateBucketRequest)CreateBucketRequest.builder().bucket(this.bucket).build());
    }

    public String toString() {
        return "S3";
    }
}

