package io.trino.plugin.deltalake;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.plugin.deltalake.DeltaLakeTableHandle;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
import io.trino.plugin.deltalake.transactionlog.TransactionLogParser;
import io.trino.plugin.hive.FileFormatDataSourceStats;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HiveCompressionCodec;
import io.trino.plugin.hive.HiveStorageFormat;
import io.trino.plugin.hive.ReaderPageSource;
import io.trino.plugin.hive.RecordFileWriter;
import io.trino.plugin.hive.metastore.StorageFormat;
import io.trino.plugin.hive.parquet.ParquetPageSourceFactory;
import io.trino.plugin.hive.util.CompressionConfigUtil;
import io.trino.plugin.hive.util.ConfigurationUtils;
import io.trino.spi.Page;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.RowBlock;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.UpdatablePageSource;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.Utils;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.TypeManager;
import java.io.IOException;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.joda.time.DateTimeZone;

/* loaded from: input_file:io/trino/plugin/deltalake/DeltaLakeUpdatablePageSource.class */
public class DeltaLakeUpdatablePageSource implements UpdatablePageSource {
    private final int rowIdColumnIndex;
    private final List<DeltaLakeColumnHandle> queryColumns;
    private final DeltaLakeTableHandle tableHandle;
    private final Map<String, Optional<String>> partitionKeys;
    private final String path;
    private final long fileSize;
    private final ConnectorSession session;
    private final ExecutorService executorService;
    private final HdfsEnvironment hdfsEnvironment;
    private final HdfsEnvironment.HdfsContext hdfsContext;
    private final DateTimeZone parquetDateTimeZone;
    private final ParquetReaderOptions parquetReaderOptions;
    private final TypeManager typeManager;
    private final JsonCodec<DeltaLakeUpdateResult> updateResultJsonCodec;
    private final BitSet rowsToDelete;
    private final DeltaLakePageSource pageSourceDelegate;
    private final int totalRecordCount;
    private final List<DeltaLakeColumnHandle> allDataColumns;
    private final DeltaLakeTableHandle.WriteType writeType;
    private final DeltaLakeWriter updatedFileWriter;
    private final Optional<int[]> queryColumnMapping;
    private final Optional<int[]> rowIdColumnMapping;
    private final Set<DeltaLakeColumnHandle> updatedColumns;

    public DeltaLakeUpdatablePageSource(DeltaLakeTableHandle deltaLakeTableHandle, List<DeltaLakeColumnHandle> list, Map<String, Optional<String>> map, String str, long j, long j2, ConnectorSession connectorSession, ExecutorService executorService, HdfsEnvironment hdfsEnvironment, HdfsEnvironment.HdfsContext hdfsContext, DateTimeZone dateTimeZone, ParquetReaderOptions parquetReaderOptions, TupleDomain<HiveColumnHandle> tupleDomain, TypeManager typeManager, JsonCodec<DeltaLakeUpdateResult> jsonCodec) {
        List list2;
        this.tableHandle = (DeltaLakeTableHandle) Objects.requireNonNull(deltaLakeTableHandle, "tableHandle is null");
        this.queryColumns = (List) Objects.requireNonNull(list, "queryColumns is null");
        this.partitionKeys = (Map) Objects.requireNonNull(map, "partitionKeys is null");
        this.path = (String) Objects.requireNonNull(str, "path is null");
        this.fileSize = j;
        this.session = (ConnectorSession) Objects.requireNonNull(connectorSession, "session is null");
        this.executorService = (ExecutorService) Objects.requireNonNull(executorService, "executorService is null");
        this.hdfsEnvironment = (HdfsEnvironment) Objects.requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
        this.hdfsContext = (HdfsEnvironment.HdfsContext) Objects.requireNonNull(hdfsContext, "hdfsContext is null");
        this.parquetDateTimeZone = (DateTimeZone) Objects.requireNonNull(dateTimeZone, "parquetDateTimeZone is null");
        this.parquetReaderOptions = (ParquetReaderOptions) Objects.requireNonNull(parquetReaderOptions, "parquetReaderOptions is null");
        this.typeManager = (TypeManager) Objects.requireNonNull(typeManager, "typeManager is null");
        this.updateResultJsonCodec = (JsonCodec) Objects.requireNonNull(jsonCodec, "deleteResultJsonCodec is null");
        List<ColumnMetadata> extractSchema = DeltaLakeSchemaSupport.extractSchema(deltaLakeTableHandle.getMetadataEntry(), typeManager);
        List list3 = (List) extractSchema.stream().map(columnMetadata -> {
            return new DeltaLakeColumnHandle(columnMetadata.getName(), columnMetadata.getType(), map.containsKey(columnMetadata.getName()) ? DeltaLakeColumnType.PARTITION_KEY : DeltaLakeColumnType.REGULAR);
        }).collect(ImmutableList.toImmutableList());
        this.allDataColumns = (List) list3.stream().filter(deltaLakeColumnHandle -> {
            return deltaLakeColumnHandle.getColumnType() == DeltaLakeColumnType.REGULAR;
        }).collect(ImmutableList.toImmutableList());
        this.writeType = deltaLakeTableHandle.getWriteType().orElseThrow();
        List list4 = (List) list.stream().filter(deltaLakeColumnHandle2 -> {
            return !deltaLakeColumnHandle2.getName().equals(DeltaLakeColumnHandle.ROW_ID_COLUMN_NAME);
        }).collect(ImmutableList.toImmutableList());
        switch (this.writeType) {
            case UPDATE:
                list2 = list3;
                int i = 0;
                ImmutableSet copyOf = ImmutableSet.copyOf(list4);
                int[] iArr = new int[list4.size()];
                int i2 = 0;
                ImmutableSet copyOf2 = ImmutableSet.copyOf(deltaLakeTableHandle.getUpdateRowIdColumns().orElseThrow());
                int[] iArr2 = new int[copyOf2.size()];
                for (int i3 = 0; i3 < list2.size(); i3++) {
                    DeltaLakeColumnHandle deltaLakeColumnHandle3 = (DeltaLakeColumnHandle) list2.get(i3);
                    if (copyOf2.contains(deltaLakeColumnHandle3)) {
                        iArr2[i2] = i3;
                        i2++;
                    }
                    if (copyOf.contains(deltaLakeColumnHandle3)) {
                        iArr[i] = i3;
                        i++;
                    }
                }
                this.queryColumnMapping = Optional.of(iArr);
                this.rowIdColumnMapping = Optional.of(iArr2);
                this.updatedColumns = ImmutableSet.copyOf(deltaLakeTableHandle.getUpdatedColumns().orElseThrow());
                break;
            case DELETE:
                list2 = list4;
                this.queryColumnMapping = Optional.empty();
                this.rowIdColumnMapping = Optional.empty();
                this.updatedColumns = ImmutableSet.of();
                break;
            default:
                throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "Unsupported write type: " + this.writeType);
        }
        DeltaLakeColumnHandle rowIndexColumn = rowIndexColumn();
        ImmutableList build = ImmutableList.builder().addAll(list2).add(rowIndexColumn).build();
        this.pageSourceDelegate = new DeltaLakePageSource(build, map, createParquetPageSource(tupleDomain, (List) build.stream().filter(deltaLakeColumnHandle4 -> {
            return deltaLakeColumnHandle4.getColumnType() == DeltaLakeColumnType.REGULAR || deltaLakeColumnHandle4 == rowIndexColumn;
        }).map((v0) -> {
            return v0.toHiveColumnHandle();
        }).collect(ImmutableList.toImmutableList())).get(), str, j, j2);
        Path pathForNewFile = getPathForNewFile();
        try {
            this.updatedFileWriter = createWriter(pathForNewFile, extractSchema, this.allDataColumns);
            OptionalInt findFirst = IntStream.range(0, list.size()).filter(i4 -> {
                return ((DeltaLakeColumnHandle) list.get(i4)).getName().equals(DeltaLakeColumnHandle.ROW_ID_COLUMN_NAME);
            }).findFirst();
            Preconditions.checkArgument(findFirst.isPresent(), "RowId column was not provided during delete operation");
            this.rowIdColumnIndex = findFirst.getAsInt();
            Path path = new Path(str);
            try {
                ParquetFileReader open = ParquetFileReader.open(HadoopInputFile.fromPath(path, hdfsEnvironment.getConfiguration(hdfsContext, path)));
                try {
                    Preconditions.checkArgument(open.getRecordCount() <= 2147483647L, "Deletes from files with more than Integer.MAX_VALUE rows is not supported");
                    this.totalRecordCount = Math.toIntExact(open.getRecordCount());
                    this.rowsToDelete = new BitSet(this.totalRecordCount);
                    if (open != null) {
                        open.close();
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA, "Unable to read parquet metadata for file: " + str, e);
            }
        } catch (IOException e2) {
            throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE, "Unable to create writer for location: " + pathForNewFile, e2);
        }
    }

    public long getCompletedBytes() {
        return this.pageSourceDelegate.getCompletedBytes();
    }

    public long getReadTimeNanos() {
        return this.pageSourceDelegate.getReadTimeNanos();
    }

    public boolean isFinished() {
        return this.pageSourceDelegate.isFinished();
    }

    public long getMemoryUsage() {
        return this.pageSourceDelegate.getMemoryUsage() + (this.rowsToDelete.size() / 8);
    }

    public void close() {
        this.pageSourceDelegate.close();
    }

    public Page getNextPage() {
        Page nextPage = this.pageSourceDelegate.getNextPage();
        if (nextPage == null) {
            return null;
        }
        int size = this.queryColumns.size();
        Block[] blockArr = new Block[size];
        int i = 0;
        for (int i2 = 0; i2 < size; i2++) {
            if (i2 == this.rowIdColumnIndex) {
                blockArr[i2] = getRowIdBlock(nextPage);
            } else {
                blockArr[i2] = nextPage.getBlock(this.queryColumnMapping.isEmpty() ? i : this.queryColumnMapping.get()[i]);
                i++;
            }
        }
        return new Page(nextPage.getPositionCount(), blockArr);
    }

    private static DeltaLakeColumnHandle rowIndexColumn() {
        return new DeltaLakeColumnHandle("$delta$dummy_row_index", BigintType.BIGINT, DeltaLakeColumnType.SYNTHESIZED) { // from class: io.trino.plugin.deltalake.DeltaLakeUpdatablePageSource.1
            @Override // io.trino.plugin.deltalake.DeltaLakeColumnHandle
            public HiveColumnHandle toHiveColumnHandle() {
                return ParquetPageSourceFactory.PARQUET_ROW_INDEX_COLUMN;
            }
        };
    }

    private Block getRowIdBlock(Page page) {
        switch (this.writeType) {
            case UPDATE:
                return getUpdateRowIdBlock(page);
            case DELETE:
                return getRowIndexBlock(page);
            default:
                throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "Unsupported write type: " + this.writeType);
        }
    }

    private Block getUpdateRowIdBlock(Page page) {
        Block[] blockArr;
        int[] orElseThrow = this.rowIdColumnMapping.orElseThrow();
        if (orElseThrow.length > 0) {
            Block[] blockArr2 = new Block[orElseThrow.length];
            for (int i = 0; i < orElseThrow.length; i++) {
                blockArr2[i] = page.getBlock(orElseThrow[i]);
            }
            blockArr = new Block[]{getRowIndexBlock(page), RowBlock.fromFieldBlocks(page.getPositionCount(), Optional.empty(), blockArr2)};
        } else {
            blockArr = new Block[]{getRowIndexBlock(page)};
        }
        return RowBlock.fromFieldBlocks(page.getPositionCount(), Optional.empty(), blockArr);
    }

    private Block getRowIndexBlock(Page page) {
        return page.getBlock(page.getChannelCount() - 1);
    }

    public void deleteRows(Block block) {
        for (int i = 0; i < block.getPositionCount(); i++) {
            this.rowsToDelete.set(Math.toIntExact(BigintType.BIGINT.getLong(block, i)));
        }
    }

    public void updateRows(Page page, List<Integer> list) {
        int intValue = list.get(list.size() - 1).intValue();
        List<Integer> subList = list.subList(0, list.size() - 1);
        Block block = page.getBlock(intValue);
        for (int i = 0; i < block.getPositionCount(); i++) {
            this.rowsToDelete.set(Math.toIntExact(((Block) block.getObject(i, Block.class)).getLong(0, 0)));
        }
        int i2 = 0;
        int i3 = 0;
        Block[] blockArr = new Block[this.allDataColumns.size()];
        for (int i4 = 0; i4 < this.allDataColumns.size(); i4++) {
            if (this.updatedColumns.contains(this.allDataColumns.get(i4))) {
                blockArr[i4] = page.getBlock(subList.get(i3).intValue());
                i3++;
            } else {
                blockArr[i4] = (Block) ((Block) block.getChildren().get(1)).getChildren().get(i2);
                i2++;
            }
        }
        this.updatedFileWriter.appendRows(new Page(page.getPositionCount(), blockArr));
    }

    public CompletableFuture<Collection<Slice>> finish() {
        switch (this.writeType) {
            case UPDATE:
                return finishUpdate();
            case DELETE:
                return finishDelete();
            default:
                throw new TrinoException(StandardErrorCode.NOT_SUPPORTED, "Unsupported write type: " + this.writeType);
        }
    }

    private CompletableFuture<Collection<Slice>> finishUpdate() {
        return this.rowsToDelete.isEmpty() ? CompletableFuture.completedFuture(Collections.emptyList()) : CompletableFuture.supplyAsync(this::doFinishUpdate, this.executorService);
    }

    private Collection<Slice> doFinishUpdate() {
        String uri = new Path(this.tableHandle.getLocation()).toUri().relativize(new Path(this.path).toUri()).toString();
        try {
            int nextClearBit = this.rowsToDelete.nextClearBit(0);
            if (nextClearBit != -1 && nextClearBit < this.totalRecordCount) {
                return ImmutableList.of(Slices.utf8Slice(this.updateResultJsonCodec.toJson(new DeltaLakeUpdateResult(uri, Optional.of(copyParquetPageSource(this.updatedFileWriter))))));
            }
            this.updatedFileWriter.commit();
            return ImmutableList.of(Slices.utf8Slice(this.updateResultJsonCodec.toJson(new DeltaLakeUpdateResult(uri, Optional.of(this.updatedFileWriter.getDataFileInfo())))));
        } catch (IOException e) {
            throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE, "Unable to write new Parquet file for UPDATE operation", e);
        }
    }

    private CompletableFuture<Collection<Slice>> finishDelete() {
        return this.rowsToDelete.isEmpty() ? CompletableFuture.completedFuture(ImmutableList.of()) : CompletableFuture.supplyAsync(this::doFinishDelete, this.executorService);
    }

    private Collection<Slice> doFinishDelete() {
        try {
            String uri = new Path(this.tableHandle.getLocation()).toUri().relativize(new Path(this.path).toUri()).toString();
            int nextClearBit = this.rowsToDelete.nextClearBit(0);
            if (nextClearBit == -1 || nextClearBit >= this.totalRecordCount) {
                return ImmutableList.of(Slices.utf8Slice(this.updateResultJsonCodec.toJson(new DeltaLakeUpdateResult(uri, Optional.empty()))));
            }
            return ImmutableList.of(Slices.utf8Slice(this.updateResultJsonCodec.toJson(new DeltaLakeUpdateResult(uri, Optional.of(copyParquetPageSource(this.updatedFileWriter))))));
        } catch (IOException e) {
            throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE, "Unable to write new Parquet file for DELETE operation", e);
        }
    }

    private Path getPathForNewFile() {
        return new Path(new Path(this.path).getParent(), this.session.getQueryId() + "_" + UUID.randomUUID());
    }

    private DataFileInfo copyParquetPageSource(DeltaLakeWriter deltaLakeWriter) throws IOException {
        ConnectorPageSource connectorPageSource = createParquetPageSource(TupleDomain.all(), (List) this.allDataColumns.stream().map((v0) -> {
            return v0.toHiveColumnHandle();
        }).collect(ImmutableList.toImmutableList())).get();
        boolean z = true;
        int i = 0;
        while (!connectorPageSource.isFinished()) {
            try {
                try {
                    Page nextPage = connectorPageSource.getNextPage();
                    if (nextPage != null) {
                        int positionCount = nextPage.getPositionCount();
                        int nextSetBit = this.rowsToDelete.nextSetBit(i);
                        if (nextSetBit != -1 && nextSetBit < i + positionCount) {
                            int[] iArr = new int[positionCount];
                            int i2 = 0;
                            for (int i3 = 0; i3 < positionCount; i3++) {
                                if (!this.rowsToDelete.get(i + i3)) {
                                    iArr[i2] = i3;
                                    i2++;
                                }
                            }
                            nextPage = nextPage.getPositions(iArr, 0, i2);
                        }
                        deltaLakeWriter.appendRows(nextPage);
                        i += positionCount;
                    }
                } catch (Exception e) {
                    z = false;
                    try {
                        deltaLakeWriter.rollback();
                    } catch (Exception e2) {
                        if (e != e2) {
                            e.addSuppressed(e2);
                        }
                    }
                    throw e;
                }
            } catch (Throwable th) {
                if (z) {
                    deltaLakeWriter.commit();
                }
                connectorPageSource.close();
                throw th;
            }
        }
        if (1 != 0) {
            deltaLakeWriter.commit();
        }
        connectorPageSource.close();
        return deltaLakeWriter.getDataFileInfo();
    }

    private ReaderPageSource createParquetPageSource(TupleDomain<HiveColumnHandle> tupleDomain, List<HiveColumnHandle> list) {
        return ParquetPageSourceFactory.createPageSource(new Path(this.path), 0L, this.fileSize, this.fileSize, list, tupleDomain, true, this.hdfsEnvironment, this.hdfsEnvironment.getConfiguration(this.hdfsContext, new Path(this.path)), this.session.getIdentity(), this.parquetDateTimeZone, new FileFormatDataSourceStats(), this.parquetReaderOptions.withMaxReadBlockSize(DeltaLakeSessionProperties.getParquetMaxReadBlockSize(this.session)).withUseColumnIndex(DeltaLakeSessionProperties.isParquetUseColumnIndex(this.session)));
    }

    private DeltaLakeWriter createWriter(Path path, List<ColumnMetadata> list, List<DeltaLakeColumnHandle> list2) throws IOException {
        Configuration configuration = this.hdfsEnvironment.getConfiguration(new HdfsEnvironment.HdfsContext(this.session), path);
        CompressionConfigUtil.configureCompression(configuration, HiveCompressionCodec.SNAPPY);
        RecordFileWriter recordFileWriter = new RecordFileWriter(path, (List) list2.stream().map((v0) -> {
            return v0.getName();
        }).collect(ImmutableList.toImmutableList()), StorageFormat.fromHiveStorageFormat(HiveStorageFormat.PARQUET), DeltaLakePageSink.buildSchemaProperties((List) list2.stream().map((v0) -> {
            return v0.getName();
        }).collect(ImmutableList.toImmutableList()), (List) list2.stream().map((v0) -> {
            return v0.getType();
        }).collect(ImmutableList.toImmutableList())), HiveStorageFormat.PARQUET.getEstimatedWriterMemoryUsage(), ConfigurationUtils.toJobConf(configuration), this.typeManager, DateTimeZone.UTC, this.session);
        Path path2 = new Path(this.tableHandle.getLocation());
        Path path3 = new Path(path2.toUri().relativize(path.toUri()));
        return new DeltaLakeWriter(this.hdfsEnvironment.getFileSystem(this.hdfsContext, path), recordFileWriter, path2, path3.toString(), getPartitionValues((List) list.stream().filter(columnMetadata -> {
            return this.partitionKeys.containsKey(columnMetadata.getName());
        }).collect(ImmutableList.toImmutableList())), new DeltaLakeWriterStats(), list2);
    }

    private List<String> getPartitionValues(List<ColumnMetadata> list) {
        Block[] blockArr = new Block[list.size()];
        for (int i = 0; i < blockArr.length; i++) {
            ColumnMetadata columnMetadata = list.get(i);
            blockArr[i] = Utils.nativeValueToBlock(columnMetadata.getType(), TransactionLogParser.deserializePartitionValue(new DeltaLakeColumnHandle(columnMetadata.getName(), columnMetadata.getType(), DeltaLakeColumnType.PARTITION_KEY), this.partitionKeys.get(columnMetadata.getName())));
        }
        return DeltaLakePageSink.createPartitionValues((List) list.stream().map((v0) -> {
            return v0.getType();
        }).collect(ImmutableList.toImmutableList()), new Page(1, blockArr), 0);
    }

    public void abort() {
        this.pageSourceDelegate.close();
    }
}
