package io.deephaven.server.arrow;

import com.google.rpc.Code;
import dagger.assisted.Assisted;
import dagger.assisted.AssistedFactory;
import dagger.assisted.AssistedInject;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.barrage.flatbuf.BarrageSnapshotRequest;
import io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest;
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.SingletonLivenessManager;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.BaseTable;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.extensions.barrage.BarragePerformanceLog;
import io.deephaven.extensions.barrage.BarrageSnapshotOptions;
import io.deephaven.extensions.barrage.BarrageStreamGenerator;
import io.deephaven.extensions.barrage.BarrageStreamGeneratorImpl;
import io.deephaven.extensions.barrage.BarrageSubscriptionOptions;
import io.deephaven.extensions.barrage.table.BarrageTable;
import io.deephaven.extensions.barrage.util.ArrowToTableConverter;
import io.deephaven.extensions.barrage.util.BarrageProtoUtil;
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.proto.util.ExportTicketHelper;
import io.deephaven.server.barrage.BarrageMessageProducer;
import io.deephaven.server.hierarchicaltable.HierarchicalTableView;
import io.deephaven.server.hierarchicaltable.HierarchicalTableViewSubscription;
import io.deephaven.server.session.SessionService;
import io.deephaven.server.session.SessionState;
import io.deephaven.server.session.TicketRouter;
import io.deephaven.util.SafeCloseable;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayDeque;
import java.util.BitSet;
import java.util.Iterator;
import java.util.Queue;
import org.apache.arrow.flatbuf.MessageHeader;
import org.apache.arrow.flatbuf.Schema;
import org.apache.arrow.flight.impl.Flight;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/deephaven/server/arrow/ArrowFlightUtil.class */
public class ArrowFlightUtil {
    private static final Logger log = LoggerFactory.getLogger(ArrowFlightUtil.class);
    public static final int DEFAULT_MIN_UPDATE_INTERVAL_MS = Configuration.getInstance().getIntegerWithDefault("barrage.minUpdateInterval", 1000);

    /* loaded from: input_file:io/deephaven/server/arrow/ArrowFlightUtil$DoExchangeMarshaller.class */
    public static class DoExchangeMarshaller extends SingletonLivenessManager implements StreamObserver<InputStream>, Closeable {
        private final SessionState session;
        private final StreamObserver<BarrageStreamGeneratorImpl.View> listener;
        private final TicketRouter ticketRouter;
        private final BarrageStreamGenerator.Factory<BarrageStreamGeneratorImpl.View> streamGeneratorFactory;
        private final BarrageMessageProducer.Operation.Factory<BarrageStreamGeneratorImpl.View> bmpOperationFactory;
        private final HierarchicalTableViewSubscription.Factory htvsFactory;
        private final BarrageMessageProducer.Adapter<BarrageSubscriptionRequest, BarrageSubscriptionOptions> subscriptionOptAdapter;
        private final BarrageMessageProducer.Adapter<BarrageSnapshotRequest, BarrageSnapshotOptions> snapshotOptAdapter;
        private final SessionService.ErrorTransformer errorTransformer;
        private boolean isClosed = false;
        private boolean isFirstMsg = true;
        private Handler requestHandler = null;
        private final String myPrefix = "DoExchangeMarshaller{" + Integer.toHexString(System.identityHashCode(this)) + "}: ";

        @AssistedFactory
        /* loaded from: input_file:io/deephaven/server/arrow/ArrowFlightUtil$DoExchangeMarshaller$Factory.class */
        public interface Factory {
            DoExchangeMarshaller openExchange(SessionState sessionState, StreamObserver<InputStream> streamObserver);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/deephaven/server/arrow/ArrowFlightUtil$DoExchangeMarshaller$Handler.class */
        public interface Handler extends Closeable {
            void handleMessage(@NotNull BarrageProtoUtil.MessageInfo messageInfo);
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:io/deephaven/server/arrow/ArrowFlightUtil$DoExchangeMarshaller$SnapshotRequestHandler.class */
        public class SnapshotRequestHandler implements Handler {
            public SnapshotRequestHandler() {
            }

            @Override // io.deephaven.server.arrow.ArrowFlightUtil.DoExchangeMarshaller.Handler
            public void handleMessage(@NotNull BarrageProtoUtil.MessageInfo messageInfo) {
                if (messageInfo.app_metadata.msgType() != 7) {
                    throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Request type cannot be changed after initialization, expected BarrageSnapshotRequest metadata");
                }
                synchronized (DoExchangeMarshaller.this) {
                    BarrageSnapshotRequest rootAsBarrageSnapshotRequest = BarrageSnapshotRequest.getRootAsBarrageSnapshotRequest(messageInfo.app_metadata.msgPayloadAsByteBuffer());
                    SessionState.ExportObject<?> resolve = DoExchangeMarshaller.this.ticketRouter.resolve(DoExchangeMarshaller.this.session, rootAsBarrageSnapshotRequest.ticketAsByteBuffer(), "ticket");
                    BarragePerformanceLog.SnapshotMetricsHelper snapshotMetricsHelper = new BarragePerformanceLog.SnapshotMetricsHelper();
                    long nanoTime = System.nanoTime();
                    DoExchangeMarshaller.this.session.nonExport().require(resolve).onError((StreamObserver<?>) DoExchangeMarshaller.this.listener).submit(() -> {
                        snapshotMetricsHelper.queueNanos = System.nanoTime() - nanoTime;
                        BaseTable baseTable = (BaseTable) resolve.get();
                        snapshotMetricsHelper.tableId = Integer.toHexString(System.identityHashCode(baseTable));
                        snapshotMetricsHelper.tableKey = BarragePerformanceLog.getKeyFor(baseTable);
                        DoExchangeMarshaller.this.listener.onNext((BarrageStreamGeneratorImpl.View) DoExchangeMarshaller.this.streamGeneratorFactory.getSchemaView(flatBufferBuilder -> {
                            return BarrageUtil.makeTableSchemaPayload(flatBufferBuilder, DoExchangeMarshaller.this.snapshotOptAdapter.adapt(rootAsBarrageSnapshotRequest), baseTable.getDefinition(), baseTable.getAttributes());
                        }));
                        BarrageUtil.createAndSendSnapshot(DoExchangeMarshaller.this.streamGeneratorFactory, baseTable, rootAsBarrageSnapshotRequest.columnsVector() != null ? BitSet.valueOf(rootAsBarrageSnapshotRequest.columnsAsByteBuffer()) : null, rootAsBarrageSnapshotRequest.viewportVector() != null ? BarrageProtoUtil.toRowSet(rootAsBarrageSnapshotRequest.viewportAsByteBuffer()) : null, rootAsBarrageSnapshotRequest.reverseViewport(), DoExchangeMarshaller.this.snapshotOptAdapter.adapt(rootAsBarrageSnapshotRequest), DoExchangeMarshaller.this.listener, snapshotMetricsHelper);
                        DoExchangeMarshaller.this.listener.onCompleted();
                    });
                }
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() {
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:io/deephaven/server/arrow/ArrowFlightUtil$DoExchangeMarshaller$SubscriptionRequestHandler.class */
        public class SubscriptionRequestHandler implements Handler {
            private BarrageMessageProducer<BarrageStreamGeneratorImpl.View> bmp;
            private HierarchicalTableViewSubscription htvs;
            private Queue<BarrageSubscriptionRequest> preExportSubscriptions;
            private SessionState.ExportObject<?> onExportResolvedContinuation;

            public SubscriptionRequestHandler() {
            }

            @Override // io.deephaven.server.arrow.ArrowFlightUtil.DoExchangeMarshaller.Handler
            public void handleMessage(@NotNull BarrageProtoUtil.MessageInfo messageInfo) {
                if (messageInfo.app_metadata.msgType() != 5) {
                    throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Request type cannot be changed after initialization, expected BarrageSubscriptionRequest metadata");
                }
                if (messageInfo.app_metadata.msgPayloadVector() == null) {
                    throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Subscription request not supplied");
                }
                synchronized (DoExchangeMarshaller.this) {
                    BarrageSubscriptionRequest rootAsBarrageSubscriptionRequest = BarrageSubscriptionRequest.getRootAsBarrageSubscriptionRequest(messageInfo.app_metadata.msgPayloadAsByteBuffer());
                    if (this.bmp != null || this.htvs != null) {
                        apply(rootAsBarrageSubscriptionRequest);
                        return;
                    }
                    if (DoExchangeMarshaller.this.isClosed) {
                        return;
                    }
                    if (this.preExportSubscriptions != null) {
                        this.preExportSubscriptions.add(rootAsBarrageSubscriptionRequest);
                        return;
                    }
                    if (rootAsBarrageSubscriptionRequest.ticketVector() == null) {
                        GrpcUtil.safelyError(DoExchangeMarshaller.this.listener, Code.INVALID_ARGUMENT, "Ticket not specified.");
                        return;
                    }
                    this.preExportSubscriptions = new ArrayDeque();
                    this.preExportSubscriptions.add(rootAsBarrageSubscriptionRequest);
                    SessionState.ExportObject<?> resolve = DoExchangeMarshaller.this.ticketRouter.resolve(DoExchangeMarshaller.this.session, rootAsBarrageSubscriptionRequest.ticketAsByteBuffer(), "ticket");
                    synchronized (this) {
                        SessionState.ExportBuilder require = DoExchangeMarshaller.this.session.nonExport().require(resolve);
                        DoExchangeMarshaller doExchangeMarshaller = DoExchangeMarshaller.this;
                        this.onExportResolvedContinuation = require.onErrorHandler((v1) -> {
                            r2.onError(v1);
                        }).submit(() -> {
                            onExportResolved(resolve);
                        });
                    }
                }
            }

            private synchronized void onExportResolved(SessionState.ExportObject<Object> exportObject) {
                SafeCloseable open;
                this.onExportResolvedContinuation = null;
                if (DoExchangeMarshaller.this.isClosed) {
                    this.preExportSubscriptions = null;
                    return;
                }
                BarrageSubscriptionRequest remove = this.preExportSubscriptions.remove();
                io.deephaven.barrage.flatbuf.BarrageSubscriptionOptions subscriptionOptions = remove.subscriptionOptions();
                long minUpdateIntervalMs = (subscriptionOptions == null || subscriptionOptions.minUpdateIntervalMs() == 0) ? ArrowFlightUtil.DEFAULT_MIN_UPDATE_INTERVAL_MS : subscriptionOptions.minUpdateIntervalMs();
                Object obj = exportObject.get();
                if (obj instanceof QueryTable) {
                    BaseTable<?> baseTable = (QueryTable) obj;
                    open = ExecutionContext.getContext().withUpdateGraph(baseTable.getUpdateGraph()).open();
                    try {
                        this.bmp = (BarrageMessageProducer) baseTable.getResult(DoExchangeMarshaller.this.bmpOperationFactory.create(baseTable, minUpdateIntervalMs));
                        if (this.bmp.isRefreshing()) {
                            DoExchangeMarshaller.this.manage(this.bmp);
                        }
                        if (open != null) {
                            open.close();
                        }
                    } finally {
                    }
                } else {
                    if (!(obj instanceof HierarchicalTableView)) {
                        GrpcUtil.safelyError(DoExchangeMarshaller.this.listener, Code.FAILED_PRECONDITION, "Ticket (" + ExportTicketHelper.toReadableString(remove.ticketAsByteBuffer(), "ticket") + ") is not a subscribable table.");
                        return;
                    }
                    HierarchicalTableView hierarchicalTableView = (HierarchicalTableView) obj;
                    open = ExecutionContext.getContext().withUpdateGraph(hierarchicalTableView.getHierarchicalTable().getSource().getUpdateGraph()).open();
                    try {
                        this.htvs = DoExchangeMarshaller.this.htvsFactory.create(hierarchicalTableView, DoExchangeMarshaller.this.listener, DoExchangeMarshaller.this.subscriptionOptAdapter.adapt(remove), minUpdateIntervalMs);
                        if (hierarchicalTableView.getHierarchicalTable().getSource().isRefreshing()) {
                            DoExchangeMarshaller.this.manage(this.htvs);
                        }
                        if (open != null) {
                            open.close();
                        }
                    } finally {
                    }
                }
                ArrowFlightUtil.log.debug().append(DoExchangeMarshaller.this.myPrefix).append("processing initial subscription").endl();
                BitSet valueOf = remove.columnsVector() != null ? BitSet.valueOf(remove.columnsAsByteBuffer()) : null;
                RowSet rowSet = remove.viewportVector() != null ? BarrageProtoUtil.toRowSet(remove.viewportAsByteBuffer()) : null;
                boolean reverseViewport = remove.reverseViewport();
                if (this.bmp != null) {
                    this.bmp.addSubscription(DoExchangeMarshaller.this.listener, DoExchangeMarshaller.this.subscriptionOptAdapter.adapt(remove), valueOf, rowSet, reverseViewport);
                } else if (this.htvs != null) {
                    this.htvs.setViewport(valueOf, rowSet, reverseViewport);
                } else {
                    Assert.statementNeverExecuted();
                }
                Iterator<BarrageSubscriptionRequest> it = this.preExportSubscriptions.iterator();
                while (it.hasNext()) {
                    apply(it.next());
                }
                this.preExportSubscriptions = null;
            }

            private void apply(BarrageSubscriptionRequest barrageSubscriptionRequest) {
                boolean z;
                BitSet valueOf = barrageSubscriptionRequest.columnsVector() != null ? BitSet.valueOf(barrageSubscriptionRequest.columnsAsByteBuffer()) : null;
                RowSet rowSet = barrageSubscriptionRequest.viewportVector() != null ? BarrageProtoUtil.toRowSet(barrageSubscriptionRequest.viewportAsByteBuffer()) : null;
                boolean reverseViewport = barrageSubscriptionRequest.reverseViewport();
                if (this.bmp != null) {
                    z = this.bmp.updateSubscription(DoExchangeMarshaller.this.listener, rowSet, valueOf, reverseViewport);
                } else if (this.htvs != null) {
                    z = true;
                    this.htvs.setViewport(valueOf, rowSet, reverseViewport);
                } else {
                    z = false;
                }
                if (!z) {
                    throw Exceptions.statusRuntimeException(Code.NOT_FOUND, "Subscription was not found.");
                }
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public synchronized void close() {
                if (this.bmp != null) {
                    this.bmp.removeSubscription(DoExchangeMarshaller.this.listener);
                    this.bmp = null;
                } else if (this.htvs != null) {
                    this.htvs.completed();
                    this.htvs = null;
                } else {
                    GrpcUtil.safelyComplete(DoExchangeMarshaller.this.listener);
                }
                if (this.onExportResolvedContinuation != null) {
                    this.onExportResolvedContinuation.cancel();
                    this.onExportResolvedContinuation = null;
                }
                if (this.preExportSubscriptions != null) {
                    this.preExportSubscriptions = null;
                }
            }
        }

        @AssistedInject
        public DoExchangeMarshaller(TicketRouter ticketRouter, BarrageStreamGenerator.Factory<BarrageStreamGeneratorImpl.View> factory, BarrageMessageProducer.Operation.Factory<BarrageStreamGeneratorImpl.View> factory2, HierarchicalTableViewSubscription.Factory factory3, BarrageMessageProducer.Adapter<StreamObserver<InputStream>, StreamObserver<BarrageStreamGeneratorImpl.View>> adapter, BarrageMessageProducer.Adapter<BarrageSubscriptionRequest, BarrageSubscriptionOptions> adapter2, BarrageMessageProducer.Adapter<BarrageSnapshotRequest, BarrageSnapshotOptions> adapter3, SessionService.ErrorTransformer errorTransformer, @Assisted SessionState sessionState, @Assisted StreamObserver<InputStream> streamObserver) {
            this.ticketRouter = ticketRouter;
            this.streamGeneratorFactory = factory;
            this.bmpOperationFactory = factory2;
            this.htvsFactory = factory3;
            this.subscriptionOptAdapter = adapter2;
            this.snapshotOptAdapter = adapter3;
            this.session = sessionState;
            this.listener = adapter.adapt(streamObserver);
            this.errorTransformer = errorTransformer;
            this.session.addOnCloseCallback(this);
            if (streamObserver instanceof ServerCallStreamObserver) {
                ((ServerCallStreamObserver) streamObserver).setOnCancelHandler(this::onCancel);
            }
        }

        public void onNext(InputStream inputStream) {
            try {
                BarrageProtoUtil.MessageInfo parseProtoMessage = BarrageProtoUtil.parseProtoMessage(inputStream);
                synchronized (this) {
                    if (this.requestHandler != null) {
                        this.requestHandler.handleMessage(parseProtoMessage);
                        return;
                    }
                    if (parseProtoMessage.app_metadata == null) {
                        if (!this.isFirstMsg) {
                            throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, this.myPrefix + "failed to receive Barrage request metadata");
                        }
                        this.isFirstMsg = false;
                        return;
                    }
                    switch (parseProtoMessage.app_metadata.msgType()) {
                        case 5:
                            this.requestHandler = new SubscriptionRequestHandler();
                            break;
                        case 7:
                            this.requestHandler = new SnapshotRequestHandler();
                            break;
                        default:
                            throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, this.myPrefix + "received a message with unhandled BarrageMessageType");
                    }
                    this.requestHandler.handleMessage(parseProtoMessage);
                }
            } catch (IOException e) {
                throw this.errorTransformer.transform(e);
            }
        }

        public void onCancel() {
            ArrowFlightUtil.log.debug().append(this.myPrefix).append("cancel requested").endl();
            tryClose();
        }

        public void onError(Throwable th) {
            GrpcUtil.safelyError(this.listener, this.errorTransformer.transform(th));
            tryClose();
        }

        public void onCompleted() {
            ArrowFlightUtil.log.debug().append(this.myPrefix).append("client stream closed subscription").endl();
            tryClose();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            synchronized (this) {
                if (this.isClosed) {
                    return;
                }
                this.isClosed = true;
                try {
                    if (this.requestHandler != null) {
                        this.requestHandler.close();
                    }
                    release();
                } catch (IOException e) {
                    throw this.errorTransformer.transform(e);
                }
            }
        }

        private void tryClose() {
            if (this.session.removeOnCloseCallback(this)) {
                close();
            }
        }
    }

    /* loaded from: input_file:io/deephaven/server/arrow/ArrowFlightUtil$DoPutObserver.class */
    public static class DoPutObserver extends ArrowToTableConverter implements StreamObserver<InputStream>, Closeable {
        private final SessionState session;
        private final TicketRouter ticketRouter;
        private final SessionService.ErrorTransformer errorTransformer;
        private final StreamObserver<Flight.PutResult> observer;
        private SessionState.ExportBuilder<Table> resultExportBuilder;
        private Flight.FlightDescriptor flightDescriptor;

        public DoPutObserver(SessionState sessionState, TicketRouter ticketRouter, SessionService.ErrorTransformer errorTransformer, StreamObserver<Flight.PutResult> streamObserver) {
            this.session = sessionState;
            this.ticketRouter = ticketRouter;
            this.errorTransformer = errorTransformer;
            this.observer = streamObserver;
            this.session.addOnCloseCallback(this);
            if (streamObserver instanceof ServerCallStreamObserver) {
                ((ServerCallStreamObserver) streamObserver).setOnCancelHandler(this::onCancel);
            }
        }

        public void onNext(InputStream inputStream) {
            try {
                BarrageProtoUtil.MessageInfo parseProtoMessage = BarrageProtoUtil.parseProtoMessage(inputStream);
                if (parseProtoMessage.descriptor != null) {
                    if (this.flightDescriptor == null) {
                        this.flightDescriptor = parseProtoMessage.descriptor;
                        this.resultExportBuilder = this.ticketRouter.publish(this.session, parseProtoMessage.descriptor, "Flight.Descriptor", (Runnable) null).onError((StreamObserver<?>) this.observer);
                    } else if (!this.flightDescriptor.equals(parseProtoMessage.descriptor)) {
                        throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "additional flight descriptor sent does not match original descriptor");
                    }
                }
                if (parseProtoMessage.app_metadata != null && parseProtoMessage.app_metadata.msgType() == 4) {
                    this.options = BarrageSubscriptionOptions.of(BarrageSubscriptionRequest.getRootAsBarrageSubscriptionRequest(parseProtoMessage.app_metadata.msgPayloadAsByteBuffer()));
                }
                if (parseProtoMessage.header == null) {
                    return;
                }
                if (parseProtoMessage.header.headerType() == 1) {
                    parseSchema((Schema) parseProtoMessage.header.header(new Schema()));
                    return;
                }
                if (parseProtoMessage.header.headerType() != 3) {
                    throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Only schema/record-batch messages supported, instead got " + MessageHeader.name(parseProtoMessage.header.headerType()));
                }
                BarrageMessage createBarrageMessage = createBarrageMessage(parseProtoMessage, this.resultTable.getColumnSources().size());
                createBarrageMessage.rowsAdded = RowSetFactory.fromRange(this.totalRowsRead, (this.totalRowsRead + createBarrageMessage.length) - 1);
                createBarrageMessage.rowsIncluded = createBarrageMessage.rowsAdded.copy();
                createBarrageMessage.modColumnData = BarrageMessage.ZERO_MOD_COLUMNS;
                this.totalRowsRead += createBarrageMessage.length;
                this.resultTable.handleBarrageMessage(createBarrageMessage);
                GrpcUtil.safelyOnNext(this.observer, Flight.PutResult.getDefaultInstance());
            } catch (IOException e) {
                throw this.errorTransformer.transform(e);
            }
        }

        private void onCancel() {
            if (this.resultTable != null) {
                this.resultTable.dropReference();
                this.resultTable = null;
            }
            if (this.resultExportBuilder != null) {
                this.resultExportBuilder.submit(() -> {
                    throw Exceptions.statusRuntimeException(Code.CANCELLED, "cancelled");
                });
                this.resultExportBuilder = null;
            }
            this.session.removeOnCloseCallback(this);
        }

        public void onError(Throwable th) {
            if (this.resultTable != null) {
                this.resultTable.dropReference();
                this.resultTable = null;
            }
            if (this.resultExportBuilder != null) {
                this.resultExportBuilder.submit(() -> {
                    throw new UncheckedDeephavenException(th);
                });
                this.resultExportBuilder = null;
            }
            this.session.removeOnCloseCallback(this);
        }

        public void onCompleted() {
            if (this.resultExportBuilder == null) {
                throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Result flight descriptor never provided");
            }
            if (this.resultTable == null) {
                throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Result flight schema never provided");
            }
            BarrageTable barrageTable = this.resultTable;
            this.resultTable = null;
            SessionState.ExportBuilder<Table> exportBuilder = this.resultExportBuilder;
            this.resultExportBuilder = null;
            if (exportBuilder.getExport().tryManage(barrageTable)) {
                barrageTable.dropReference();
                barrageTable.sealTable(() -> {
                    exportBuilder.submit(() -> {
                        GrpcUtil.safelyComplete(this.observer);
                        this.session.removeOnCloseCallback(this);
                        return barrageTable;
                    });
                }, () -> {
                    GrpcUtil.safelyError(this.observer, Code.DATA_LOSS, "Do put could not be sealed");
                    this.session.removeOnCloseCallback(this);
                });
            } else {
                GrpcUtil.safelyError(this.observer, Code.DATA_LOSS, "Result export already destroyed");
                barrageTable.dropReference();
                this.session.removeOnCloseCallback(this);
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            GrpcUtil.safelyError(this.observer, Code.UNAUTHENTICATED, "Session expired");
        }
    }

    public static void DoGetCustom(BarrageStreamGenerator.Factory<BarrageStreamGeneratorImpl.View> factory, SessionState sessionState, TicketRouter ticketRouter, Flight.Ticket ticket, StreamObserver<InputStream> streamObserver) {
        SessionState.ExportObject<?> resolve = ticketRouter.resolve(sessionState, ticket, "request");
        BarragePerformanceLog.SnapshotMetricsHelper snapshotMetricsHelper = new BarragePerformanceLog.SnapshotMetricsHelper();
        long nanoTime = System.nanoTime();
        sessionState.nonExport().require(resolve).onError(streamObserver).submit(() -> {
            snapshotMetricsHelper.queueNanos = System.nanoTime() - nanoTime;
            BaseTable baseTable = (BaseTable) resolve.get();
            snapshotMetricsHelper.tableId = Integer.toHexString(System.identityHashCode(baseTable));
            snapshotMetricsHelper.tableKey = BarragePerformanceLog.getKeyFor(baseTable);
            StreamObserver<BarrageStreamGeneratorImpl.View> adapt = ArrowModule.provideListenerAdapter().adapt(streamObserver);
            adapt.onNext((BarrageStreamGeneratorImpl.View) factory.getSchemaView(flatBufferBuilder -> {
                return BarrageUtil.makeTableSchemaPayload(flatBufferBuilder, BarrageUtil.DEFAULT_SNAPSHOT_DESER_OPTIONS, baseTable.getDefinition(), baseTable.getAttributes());
            }));
            BarrageUtil.createAndSendSnapshot(factory, baseTable, (BitSet) null, (RowSet) null, false, BarrageUtil.DEFAULT_SNAPSHOT_DESER_OPTIONS, adapt, snapshotMetricsHelper);
            adapt.onCompleted();
        });
    }
}
