package io.trino.plugin.deltalake.transactionlog.writer;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoInputStream;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.spi.connector.ConnectorSession;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.util.Base64;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/* loaded from: input_file:io/trino/plugin/deltalake/transactionlog/writer/S3NativeTransactionLogSynchronizer.class */
public class S3NativeTransactionLogSynchronizer implements TransactionLogSynchronizer {
    private static final String LOCK_DIRECTORY = "_sb_lock";
    private static final String LOCK_INFIX = "sb-lock_";
    private final TrinoFileSystemFactory fileSystemFactory;
    private final JsonCodec<LockFileContents> lockFileContentsJsonCodec;
    public static final Logger LOG = Logger.get(S3NativeTransactionLogSynchronizer.class);
    private static final Duration EXPIRATION_DURATION = Duration.of(5, ChronoUnit.MINUTES);
    private static final Pattern LOCK_FILENAME_PATTERN = Pattern.compile("(.*)\\.sb-lock_.*");

    /* loaded from: input_file:io/trino/plugin/deltalake/transactionlog/writer/S3NativeTransactionLogSynchronizer$LockFileContents.class */
    public static class LockFileContents {
        private final String clusterId;
        private final String owningQuery;
        private final long expirationEpochMillis;

        @JsonCreator
        public LockFileContents(@JsonProperty("clusterId") String str, @JsonProperty("owningQuery") String str2, @JsonProperty("expirationEpochMillis") long j) {
            this.clusterId = (String) Objects.requireNonNull(str, "clusterId is null");
            this.owningQuery = (String) Objects.requireNonNull(str2, "owningQuery is null");
            this.expirationEpochMillis = j;
        }

        @JsonProperty
        public String getClusterId() {
            return this.clusterId;
        }

        @JsonProperty
        public String getOwningQuery() {
            return this.owningQuery;
        }

        @JsonProperty
        public long getExpirationEpochMillis() {
            return this.expirationEpochMillis;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/plugin/deltalake/transactionlog/writer/S3NativeTransactionLogSynchronizer$LockInfo.class */
    public static class LockInfo {
        private final String lockFilename;
        private final String entryFilename;
        private final LockFileContents contents;

        public LockInfo(String str, LockFileContents lockFileContents) {
            this.lockFilename = (String) Objects.requireNonNull(str, "lockFilename is null");
            this.entryFilename = S3NativeTransactionLogSynchronizer.parseEntryFilename(str);
            this.contents = (LockFileContents) Objects.requireNonNull(lockFileContents, "contents is null");
        }

        public String getLockFilename() {
            return this.lockFilename;
        }

        public String getEntryFilename() {
            return this.entryFilename;
        }

        public String getClusterId() {
            return this.contents.getClusterId();
        }

        public String getOwningQuery() {
            return this.contents.getOwningQuery();
        }

        public Instant getExpirationTime() {
            return Instant.ofEpochMilli(this.contents.getExpirationEpochMillis());
        }
    }

    @Inject
    public S3NativeTransactionLogSynchronizer(TrinoFileSystemFactory trinoFileSystemFactory, JsonCodec<LockFileContents> jsonCodec) {
        this.fileSystemFactory = (TrinoFileSystemFactory) Objects.requireNonNull(trinoFileSystemFactory, "fileSystemFactory is null");
        this.lockFileContentsJsonCodec = (JsonCodec) Objects.requireNonNull(jsonCodec, "lockFileContentesCodec is null");
    }

    @Override // io.trino.plugin.deltalake.transactionlog.writer.TransactionLogSynchronizer
    public boolean isUnsafe() {
        return true;
    }

    @Override // io.trino.plugin.deltalake.transactionlog.writer.TransactionLogSynchronizer
    public void write(ConnectorSession connectorSession, String str, Location location, byte[] bArr) {
        TrinoFileSystem create = this.fileSystemFactory.create(connectorSession);
        Location appendPath = location.parentDirectory().appendPath(LOCK_DIRECTORY);
        String fileName = location.fileName();
        Optional empty = Optional.empty();
        try {
            try {
                if (create.newInputFile(location).exists()) {
                    throw new TransactionConflictException("Target file already exists: " + location);
                }
                List<LockInfo> listLockInfos = listLockInfos(create, appendPath);
                Optional empty2 = Optional.empty();
                for (LockInfo lockInfo : listLockInfos) {
                    if (lockInfo.getExpirationTime().isBefore(Instant.now())) {
                        deleteLock(create, appendPath, lockInfo);
                    } else if (!lockInfo.getEntryFilename().equals(fileName)) {
                        continue;
                    } else {
                        if (empty2.isPresent()) {
                            throw new IllegalStateException(String.format("Multiple live locks found for: %s; lock1: %s; lock2: %s", location, ((LockInfo) empty2.get()).getLockFilename(), lockInfo.getLockFilename()));
                        }
                        empty2 = Optional.of(lockInfo);
                    }
                }
                empty2.ifPresent(lockInfo2 -> {
                    throw new TransactionConflictException(String.format("Transaction log locked(1); lockingCluster=%s; lockingQuery=%s; expires=%s", lockInfo2.getClusterId(), lockInfo2.getOwningQuery(), lockInfo2.getExpirationTime()));
                });
                Optional of = Optional.of(writeNewLockInfo(create, appendPath, fileName, str, connectorSession.getQueryId()));
                List<LockInfo> listLockInfos2 = listLockInfos(create, appendPath);
                String lockFilename = ((LockInfo) of.get()).getLockFilename();
                Optional<LockInfo> findFirst = listLockInfos2.stream().filter(lockInfo3 -> {
                    return lockInfo3.getEntryFilename().equals(fileName);
                }).filter(lockInfo4 -> {
                    return !lockInfo4.getLockFilename().equals(lockFilename);
                }).findFirst();
                if (findFirst.isPresent()) {
                    throw new TransactionConflictException(String.format("Transaction log locked(2); lockingCluster=%s; lockingQuery=%s; expires=%s", findFirst.get().getClusterId(), findFirst.get().getOwningQuery(), findFirst.get().getExpirationTime()));
                }
                if (create.newInputFile(location).exists()) {
                    throw new TransactionConflictException("Target file was created during locking: " + location);
                }
                OutputStream create2 = create.newOutputFile(location).create();
                try {
                    create2.write(bArr);
                    if (create2 != null) {
                        create2.close();
                    }
                    if (of.isPresent()) {
                        try {
                            deleteLock(create, appendPath, (LockInfo) of.get());
                        } catch (IOException e) {
                            LOG.warn(e, "Could not delete lockfile %s", new Object[]{((LockInfo) of.get()).lockFilename});
                        }
                    }
                } catch (Throwable th) {
                    if (create2 != null) {
                        try {
                            create2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (IOException e2) {
                throw new UncheckedIOException("Internal error while writing " + location, e2);
            }
        } catch (Throwable th3) {
            if (empty.isPresent()) {
                try {
                    deleteLock(create, appendPath, (LockInfo) empty.get());
                } catch (IOException e3) {
                    LOG.warn(e3, "Could not delete lockfile %s", new Object[]{((LockInfo) empty.get()).lockFilename});
                }
            }
            throw th3;
        }
    }

    private LockInfo writeNewLockInfo(TrinoFileSystem trinoFileSystem, Location location, String str, String str2, String str3) throws IOException {
        String str4 = str + ".sb-lock_" + str3;
        LockFileContents lockFileContents = new LockFileContents(str2, str3, Instant.now().plus((TemporalAmount) EXPIRATION_DURATION).toEpochMilli());
        TrinoOutputFile newOutputFile = trinoFileSystem.newOutputFile(location.appendPath(str4));
        byte[] jsonBytes = this.lockFileContentsJsonCodec.toJsonBytes(lockFileContents);
        OutputStream create = newOutputFile.create();
        try {
            create.write(jsonBytes);
            if (create != null) {
                create.close();
            }
            return new LockInfo(str4, lockFileContents);
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static void deleteLock(TrinoFileSystem trinoFileSystem, Location location, LockInfo lockInfo) throws IOException {
        trinoFileSystem.deleteFile(location.appendPath(lockInfo.getLockFilename()));
    }

    private List<LockInfo> listLockInfos(TrinoFileSystem trinoFileSystem, Location location) throws IOException {
        FileIterator listFiles = trinoFileSystem.listFiles(location);
        ImmutableList.Builder builder = ImmutableList.builder();
        while (listFiles.hasNext()) {
            FileEntry next = listFiles.next();
            String fileName = next.location().fileName();
            if (LOCK_FILENAME_PATTERN.matcher(fileName).matches()) {
                Optional<LockInfo> parseLockFile = parseLockFile(trinoFileSystem.newInputFile(next.location()), fileName);
                Objects.requireNonNull(builder);
                parseLockFile.ifPresent((v1) -> {
                    r1.add(v1);
                });
            }
        }
        return builder.build();
    }

    private Optional<LockInfo> parseLockFile(TrinoInputFile trinoInputFile, String str) throws IOException {
        byte[] bArr = null;
        try {
            TrinoInputStream newStream = trinoInputFile.newStream();
            try {
                bArr = newStream.readAllBytes();
                Optional<LockInfo> of = Optional.of(new LockInfo(str, (LockFileContents) this.lockFileContentsJsonCodec.fromJson(bArr)));
                if (newStream != null) {
                    newStream.close();
                }
                return of;
            } finally {
            }
        } catch (FileNotFoundException e) {
            return Optional.empty();
        } catch (IllegalArgumentException e2) {
            String str2 = null;
            if (bArr != null) {
                str2 = Base64.getEncoder().encodeToString(bArr);
            }
            LOG.warn(e2, "Could not parse lock file: %s; contents=%s", new Object[]{trinoInputFile.location(), str2});
            return Optional.empty();
        }
    }

    public static String parseEntryFilename(String str) {
        Matcher matcher = LOCK_FILENAME_PATTERN.matcher(str);
        if (matcher.matches()) {
            return matcher.group(1);
        }
        throw new IllegalArgumentException("Lock filename " + str + " does not match expected pattern");
    }
}
