package io.druid.segment.realtime.firehose;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import com.google.common.io.CountingOutputStream;
import com.google.common.io.Files;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.data.input.impl.InputRowParser;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.utils.Runnables;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;

/* loaded from: input_file:io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.class */
public class ReplayableFirehoseFactory implements FirehoseFactory<InputRowParser> {
    private static final EmittingLogger log = new EmittingLogger(ReplayableFirehoseFactory.class);
    private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false;
    private static final int DEFAULT_MAX_TEMP_FILE_SIZE = 250000000;
    private static final int DEFAULT_READ_FIREHOSE_RETRIES = 3;
    private final FirehoseFactory delegateFactory;
    private final boolean reportParseExceptions;
    private final int maxTempFileSize;
    private final int readFirehoseRetries;
    private final ObjectMapper smileMapper;
    private ReplayableFirehose firehose;

    /* loaded from: input_file:io/druid/segment/realtime/firehose/ReplayableFirehoseFactory$ReplayableFirehose.class */
    public class ReplayableFirehose implements Firehose {
        private final List<String> dimensions;
        private JsonFactory jsonFactory;
        private JsonParser jsonParser;
        private Iterator<Row> it;
        private final List<File> files = new ArrayList();
        private int fileIndex = 0;

        public ReplayableFirehose(InputRowParser inputRowParser) throws IOException {
            this.jsonFactory = ReplayableFirehoseFactory.this.smileMapper.getFactory();
            if (this.jsonFactory instanceof SmileFactory) {
                this.jsonFactory = this.jsonFactory.enable(SmileGenerator.Feature.CHECK_SHARED_STRING_VALUES);
            }
            long j = 0;
            long j2 = 0;
            long j3 = 0;
            long j4 = 0;
            HashSet hashSet = new HashSet();
            File createTempDir = Files.createTempDir();
            createTempDir.deleteOnExit();
            long nanoTime = System.nanoTime();
            boolean z = false;
            do {
                deleteTempFiles();
                try {
                    Firehose connect = ReplayableFirehoseFactory.this.delegateFactory.connect(inputRowParser);
                    Throwable th = null;
                    while (connect.hasMore()) {
                        try {
                            File createTempFile = File.createTempFile("replayable-", null, createTempDir);
                            createTempFile.deleteOnExit();
                            this.files.add(createTempFile);
                            ReplayableFirehoseFactory.log.debug("Created file [%s]", new Object[]{createTempFile.getAbsolutePath()});
                            CountingOutputStream countingOutputStream = new CountingOutputStream(new FileOutputStream(createTempFile));
                            Throwable th2 = null;
                            try {
                                JsonGenerator createGenerator = this.jsonFactory.createGenerator(countingOutputStream);
                                Throwable th3 = null;
                                while (connect.hasMore() && countingOutputStream.getCount() < ReplayableFirehoseFactory.this.getMaxTempFileSize()) {
                                    try {
                                        try {
                                            InputRow nextRow = connect.nextRow();
                                            createGenerator.writeObject(nextRow);
                                            hashSet.addAll(nextRow.getDimensions());
                                            j++;
                                        } catch (ParseException e) {
                                            if (ReplayableFirehoseFactory.this.reportParseExceptions) {
                                                throw e;
                                            }
                                            j3++;
                                        }
                                    } catch (Throwable th4) {
                                        if (createGenerator != null) {
                                            if (0 != 0) {
                                                try {
                                                    createGenerator.close();
                                                } catch (Throwable th5) {
                                                    th3.addSuppressed(th5);
                                                }
                                            } else {
                                                createGenerator.close();
                                            }
                                        }
                                        throw th4;
                                    }
                                }
                                j2 += countingOutputStream.getCount();
                                if (createGenerator != null) {
                                    if (0 != 0) {
                                        try {
                                            createGenerator.close();
                                        } catch (Throwable th6) {
                                            th3.addSuppressed(th6);
                                        }
                                    } else {
                                        createGenerator.close();
                                    }
                                }
                                if (countingOutputStream != null) {
                                    if (0 != 0) {
                                        try {
                                            countingOutputStream.close();
                                        } catch (Throwable th7) {
                                            th2.addSuppressed(th7);
                                        }
                                    } else {
                                        countingOutputStream.close();
                                    }
                                }
                            } catch (Throwable th8) {
                                if (countingOutputStream != null) {
                                    if (0 != 0) {
                                        try {
                                            countingOutputStream.close();
                                        } catch (Throwable th9) {
                                            th2.addSuppressed(th9);
                                        }
                                    } else {
                                        countingOutputStream.close();
                                    }
                                }
                                throw th8;
                            }
                        } finally {
                        }
                    }
                    z = true;
                    if (connect != null) {
                        if (0 != 0) {
                            try {
                                connect.close();
                            } catch (Throwable th10) {
                                th.addSuppressed(th10);
                            }
                        } else {
                            connect.close();
                        }
                    }
                } catch (Exception e2) {
                    long j5 = j4 + 1;
                    j4 = j5;
                    if (j5 > ReplayableFirehoseFactory.this.readFirehoseRetries || (e2 instanceof ParseException)) {
                        ReplayableFirehoseFactory.log.error(e2, "Delegate firehose threw an exception, retries exhausted, aborting", new Object[0]);
                        Throwables.propagate(e2);
                    } else {
                        ReplayableFirehoseFactory.log.error(e2, "Delegate firehose threw an exception, retrying (%d of %d)", new Object[]{Long.valueOf(j4), Integer.valueOf(ReplayableFirehoseFactory.this.readFirehoseRetries)});
                    }
                }
            } while (!z);
            ReplayableFirehoseFactory.log.info("Finished reading from firehose in [%,dms], [%,d] events parsed, [%,d] bytes written, [%,d] events unparseable", new Object[]{Long.valueOf((System.nanoTime() - nanoTime) / 1000000), Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3)});
            this.dimensions = Ordering.natural().immutableSortedCopy(hashSet);
            if (j != 0) {
                this.jsonParser = this.jsonFactory.createParser(this.files.get(this.fileIndex));
                this.it = this.jsonParser.readValuesAs(Row.class);
            } else {
                ReplayableFirehoseFactory.log.warn("Firehose contains no events!", new Object[0]);
                deleteTempFiles();
                this.it = Iterators.emptyIterator();
            }
        }

        public boolean hasMore() {
            if (this.it.hasNext()) {
                return true;
            }
            try {
                if (this.jsonParser != null) {
                    this.jsonParser.close();
                }
                int i = this.fileIndex + 1;
                this.fileIndex = i;
                if (i >= this.files.size() || this.files.get(this.fileIndex).length() == 0) {
                    return false;
                }
                this.jsonParser = this.jsonFactory.createParser(this.files.get(this.fileIndex));
                this.it = this.jsonParser.readValuesAs(Row.class);
                return true;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public InputRow nextRow() {
            return Rows.toCaseInsensitiveInputRow(this.it.next(), this.dimensions);
        }

        public Runnable commit() {
            return Runnables.getNoopRunnable();
        }

        public void close() throws IOException {
            if (this.jsonParser != null) {
                this.jsonParser.close();
            }
            this.it = Iterators.emptyIterator();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void rewind() throws IOException {
            close();
            if (this.files.isEmpty()) {
                return;
            }
            this.fileIndex = 0;
            this.jsonParser = this.jsonFactory.createParser(this.files.get(this.fileIndex));
            this.it = this.jsonParser.readValuesAs(Row.class);
        }

        private void deleteTempFiles() {
            for (File file : this.files) {
                ReplayableFirehoseFactory.log.debug("Deleting temp file: %s", new Object[]{file.getAbsolutePath()});
                file.delete();
            }
            this.files.clear();
        }
    }

    @JsonCreator
    public ReplayableFirehoseFactory(@JsonProperty("delegate") FirehoseFactory firehoseFactory, @JsonProperty("reportParseExceptions") Boolean bool, @JsonProperty("maxTempFileSize") Integer num, @JsonProperty("readFirehoseRetries") Integer num2, @Smile @JacksonInject ObjectMapper objectMapper) {
        Preconditions.checkNotNull(firehoseFactory, "delegate cannot be null");
        Preconditions.checkArgument(!(firehoseFactory instanceof ReplayableFirehoseFactory), "Refusing to wrap another ReplayableFirehoseFactory");
        this.delegateFactory = firehoseFactory;
        this.reportParseExceptions = bool == null ? false : bool.booleanValue();
        this.maxTempFileSize = num == null ? DEFAULT_MAX_TEMP_FILE_SIZE : num.intValue();
        this.readFirehoseRetries = num2 == null ? 3 : num2.intValue();
        this.smileMapper = objectMapper;
        log.info(toString(), new Object[0]);
    }

    public Firehose connect(InputRowParser inputRowParser) throws IOException {
        if (this.firehose == null) {
            this.firehose = new ReplayableFirehose(inputRowParser);
        } else {
            log.info("Rewinding and returning existing firehose", new Object[0]);
            this.firehose.rewind();
        }
        return this.firehose;
    }

    @JsonProperty("delegate")
    public FirehoseFactory getDelegateFactory() {
        return this.delegateFactory;
    }

    @JsonProperty("reportParseExceptions")
    public boolean isReportParseExceptions() {
        return this.reportParseExceptions;
    }

    @JsonProperty("maxTempFileSize")
    public int getMaxTempFileSize() {
        return this.maxTempFileSize;
    }

    @JsonProperty("readFirehoseRetries")
    public int getReadFirehoseRetries() {
        return this.readFirehoseRetries;
    }

    public String toString() {
        return "ReplayableFirehoseFactory{delegateFactory=" + this.delegateFactory + ", reportParseExceptions=" + this.reportParseExceptions + ", maxTempFileSize=" + this.maxTempFileSize + ", readFirehoseRetries=" + this.readFirehoseRetries + '}';
    }
}
