/*
 * Decompiled with CFR 0.152.
 */
package cascading.tap.partition;

import cascading.flow.FlowProcess;
import cascading.operation.Filter;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.TapException;
import cascading.tap.partition.Partition;
import cascading.tap.partition.PartitionTapFilter;
import cascading.tap.partition.PartitionTupleEntryIterator;
import cascading.tap.type.FileType;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterableChainIterator;
import cascading.tuple.TupleEntryIterator;
import cascading.tuple.TupleEntrySchemeCollector;
import cascading.tuple.TupleEntrySchemeIterator;
import cascading.tuple.util.TupleViews;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BasePartitionTap<Config, Input, Output>
extends Tap<Config, Input, Output>
implements FileType<Config> {
    private static final Logger LOG = LoggerFactory.getLogger(BasePartitionTap.class);
    protected static final int OPEN_WRITES_THRESHOLD_DEFAULT = 300;
    protected Tap parent;
    protected Partition partition;
    protected final List<PartitionTapFilter> sourcePartitionFilters = new ArrayList<PartitionTapFilter>();
    protected boolean keepParentOnDelete = false;
    protected int openWritesThreshold = 300;
    private long openedCollectors = 0L;
    private final Map<String, TupleEntryCollector> collectors = new LinkedHashMap<String, TupleEntryCollector>(1000, 0.75f, true);

    protected abstract TupleEntrySchemeCollector createTupleEntrySchemeCollector(FlowProcess<? extends Config> var1, Tap var2, String var3, long var4) throws IOException;

    protected abstract TupleEntrySchemeIterator createTupleEntrySchemeIterator(FlowProcess<? extends Config> var1, Tap var2, String var3, Input var4) throws IOException;

    protected BasePartitionTap(Tap parent, Partition partition, int openWritesThreshold) {
        super(new PartitionScheme(parent.getScheme(), partition.getPartitionFields()), parent.getSinkMode());
        this.parent = parent;
        this.partition = partition;
        this.openWritesThreshold = openWritesThreshold;
    }

    protected BasePartitionTap(Tap parent, Partition partition, SinkMode sinkMode) {
        super(new PartitionScheme(parent.getScheme(), partition.getPartitionFields()), sinkMode);
        this.parent = parent;
        this.partition = partition;
    }

    protected BasePartitionTap(Tap parent, Partition partition, SinkMode sinkMode, boolean keepParentOnDelete, int openWritesThreshold) {
        super(new PartitionScheme(parent.getScheme(), partition.getPartitionFields()), sinkMode);
        this.parent = parent;
        this.partition = partition;
        this.keepParentOnDelete = keepParentOnDelete;
        this.openWritesThreshold = openWritesThreshold;
    }

    public Tap getParent() {
        return this.parent;
    }

    public Partition getPartition() {
        return this.partition;
    }

    public String[] getChildPartitionIdentifiers(FlowProcess<? extends Config> flowProcess, boolean fullyQualified) throws IOException {
        String[] childIdentifiers = this.castFileType().getChildIdentifiers(flowProcess.getConfig(), this.partition.getPathDepth(), fullyQualified);
        if (this.sourcePartitionFilters.isEmpty()) {
            return childIdentifiers;
        }
        return this.getFilteredPartitionIdentifiers(flowProcess, childIdentifiers);
    }

    protected String[] getFilteredPartitionIdentifiers(FlowProcess<? extends Config> flowProcess, String[] childIdentifiers) {
        Fields partitionFields = this.partition.getPartitionFields();
        TupleEntry partitionEntry = new TupleEntry(partitionFields, Tuple.size(partitionFields.size()));
        ArrayList<String> filteredIdentifiers = new ArrayList<String>(childIdentifiers.length);
        for (PartitionTapFilter filter : this.sourcePartitionFilters) {
            filter.prepare(flowProcess);
        }
        for (String childIdentifier : childIdentifiers) {
            this.partition.toTuple(childIdentifier.substring(this.parent.getFullIdentifier(flowProcess).length() + 1), partitionEntry);
            boolean isRemove = false;
            for (PartitionTapFilter filter : this.sourcePartitionFilters) {
                if (!filter.isRemove(flowProcess, partitionEntry)) continue;
                isRemove = true;
                break;
            }
            if (isRemove) continue;
            filteredIdentifiers.add(childIdentifier);
        }
        for (PartitionTapFilter filter : this.sourcePartitionFilters) {
            filter.cleanup(flowProcess);
        }
        return filteredIdentifiers.toArray(new String[filteredIdentifiers.size()]);
    }

    public void addSourcePartitionFilter(Fields argumentSelector, Filter filter) {
        Fields argumentFields = argumentSelector.isAll() ? this.partition.getPartitionFields() : this.partition.getPartitionFields().select(argumentSelector);
        this.sourcePartitionFilters.add(new PartitionTapFilter(argumentFields, filter));
    }

    @Override
    public String getIdentifier() {
        return this.parent.getIdentifier();
    }

    protected abstract String getCurrentIdentifier(FlowProcess<? extends Config> var1);

    public int getOpenWritesThreshold() {
        return this.openWritesThreshold;
    }

    @Override
    public TupleEntryCollector openForWrite(FlowProcess<? extends Config> flowProcess, Output output) throws IOException {
        return new PartitionCollector(flowProcess);
    }

    @Override
    public TupleEntryIterator openForRead(FlowProcess<? extends Config> flowProcess, Input input) throws IOException {
        return new PartitionIterator(flowProcess, input);
    }

    @Override
    public boolean createResource(Config conf) throws IOException {
        return this.parent.createResource(conf);
    }

    @Override
    public boolean deleteResource(Config conf) throws IOException {
        return this.keepParentOnDelete || this.parent.deleteResource(conf);
    }

    @Override
    public boolean prepareResourceForRead(Config conf) throws IOException {
        return this.parent.prepareResourceForRead(conf);
    }

    @Override
    public boolean prepareResourceForWrite(Config conf) throws IOException {
        return this.parent.prepareResourceForWrite(conf);
    }

    @Override
    public boolean commitResource(Config conf) throws IOException {
        return this.parent.commitResource(conf);
    }

    @Override
    public boolean rollbackResource(Config conf) throws IOException {
        return this.parent.rollbackResource(conf);
    }

    @Override
    public boolean resourceExists(Config conf) throws IOException {
        return this.parent.resourceExists(conf);
    }

    @Override
    public long getModifiedTime(Config conf) throws IOException {
        return this.parent.getModifiedTime(conf);
    }

    @Override
    public boolean isDirectory(FlowProcess<? extends Config> flowProcess) throws IOException {
        return this.castFileType().isDirectory(flowProcess);
    }

    @Override
    public boolean isDirectory(Config conf) throws IOException {
        return this.castFileType().isDirectory(conf);
    }

    @Override
    public String[] getChildIdentifiers(FlowProcess<? extends Config> flowProcess) throws IOException {
        return this.castFileType().getChildIdentifiers(flowProcess);
    }

    @Override
    public String[] getChildIdentifiers(Config conf) throws IOException {
        return this.castFileType().getChildIdentifiers(conf);
    }

    @Override
    public String[] getChildIdentifiers(FlowProcess<? extends Config> flowProcess, int depth, boolean fullyQualified) throws IOException {
        return this.castFileType().getChildIdentifiers(flowProcess, depth, fullyQualified);
    }

    @Override
    public String[] getChildIdentifiers(Config conf, int depth, boolean fullyQualified) throws IOException {
        return this.castFileType().getChildIdentifiers(conf, depth, fullyQualified);
    }

    @Override
    public long getSize(FlowProcess<? extends Config> flowProcess) throws IOException {
        return this.castFileType().getSize(flowProcess);
    }

    @Override
    public long getSize(Config conf) throws IOException {
        return this.castFileType().getSize(conf);
    }

    protected FileType<Config> castFileType() {
        if (this.parent instanceof FileType) {
            return (FileType)((Object)this.parent);
        }
        throw new UnsupportedOperationException("parent is not an implementation of " + FileType.class.getName() + ", is type: " + this.parent.getClass().getName());
    }

    @Override
    public boolean equals(Object object) {
        if (this == object) {
            return true;
        }
        if (object == null || this.getClass() != object.getClass()) {
            return false;
        }
        if (!super.equals(object)) {
            return false;
        }
        BasePartitionTap that = (BasePartitionTap)object;
        if (this.parent != null ? !this.parent.equals(that.parent) : that.parent != null) {
            return false;
        }
        if (this.partition != null ? !this.partition.equals(that.partition) : that.partition != null) {
            return false;
        }
        return !(this.partition != null ? !this.sourcePartitionFilters.equals(that.sourcePartitionFilters) : that.sourcePartitionFilters != null);
    }

    @Override
    public int hashCode() {
        int result = super.hashCode();
        result = 31 * result + (this.parent != null ? this.parent.hashCode() : 0);
        result = 31 * result + (this.partition != null ? this.partition.hashCode() : 0);
        result = 31 * result + (this.sourcePartitionFilters != null ? this.sourcePartitionFilters.hashCode() : 0);
        return result;
    }

    @Override
    public String toString() {
        return this.getClass().getSimpleName() + "[\"" + this.parent + "\"][\"" + this.partition + "\"][\"" + this.sourcePartitionFilters + "\"]";
    }

    public static class PartitionScheme<Config, Input, Output>
    extends Scheme<Config, Input, Output, Void, Void> {
        private final Scheme scheme;
        private final Fields partitionFields;

        public PartitionScheme(Scheme scheme) {
            this.scheme = scheme;
            this.partitionFields = null;
        }

        public PartitionScheme(Scheme scheme, Fields partitionFields) {
            this.scheme = scheme;
            if (partitionFields == null || partitionFields.isAll()) {
                this.partitionFields = null;
            } else if (partitionFields.isDefined()) {
                this.partitionFields = partitionFields;
            } else {
                throw new IllegalArgumentException("partitionFields must be defined or the ALL substitution, got: " + partitionFields.printVerbose());
            }
        }

        @Override
        public Fields getSinkFields() {
            if (this.partitionFields == null || this.scheme.getSinkFields().isAll()) {
                return this.scheme.getSinkFields();
            }
            return Fields.merge(this.scheme.getSinkFields(), this.partitionFields);
        }

        @Override
        public void setSinkFields(Fields sinkFields) {
            this.scheme.setSinkFields(sinkFields);
        }

        @Override
        public Fields retrieveSourceFields(FlowProcess<? extends Config> flowProcess, Tap tap) {
            return this.scheme.retrieveSourceFields(flowProcess, tap);
        }

        @Override
        public Fields retrieveSinkFields(FlowProcess<? extends Config> flowProcess, Tap tap) {
            return this.scheme.retrieveSinkFields(flowProcess, tap);
        }

        @Override
        public Fields getSourceFields() {
            if (this.partitionFields == null || this.scheme.getSourceFields().isUnknown()) {
                return this.scheme.getSourceFields();
            }
            return Fields.merge(this.scheme.getSourceFields(), this.partitionFields);
        }

        @Override
        public void setSourceFields(Fields sourceFields) {
            this.scheme.setSourceFields(sourceFields);
        }

        @Override
        public int getNumSinkParts() {
            return this.scheme.getNumSinkParts();
        }

        @Override
        public void setNumSinkParts(int numSinkParts) {
            this.scheme.setNumSinkParts(numSinkParts);
        }

        @Override
        public void sourceConfInit(FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf) {
            this.scheme.sourceConfInit(flowProcess, tap, conf);
        }

        @Override
        public void sourcePrepare(FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall) throws IOException {
            this.scheme.sourcePrepare(flowProcess, sourceCall);
        }

        @Override
        public boolean source(FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall) throws IOException {
            throw new UnsupportedOperationException("should never be called");
        }

        @Override
        public void sourceCleanup(FlowProcess<? extends Config> flowProcess, SourceCall<Void, Input> sourceCall) throws IOException {
            this.scheme.sourceCleanup(flowProcess, sourceCall);
        }

        @Override
        public void sinkConfInit(FlowProcess<? extends Config> flowProcess, Tap<Config, Input, Output> tap, Config conf) {
            this.scheme.sinkConfInit(flowProcess, tap, conf);
        }

        @Override
        public void sinkPrepare(FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall) throws IOException {
            this.scheme.sinkPrepare(flowProcess, sinkCall);
        }

        @Override
        public void sink(FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall) throws IOException {
            throw new UnsupportedOperationException("should never be called");
        }

        @Override
        public void sinkCleanup(FlowProcess<? extends Config> flowProcess, SinkCall<Void, Output> sinkCall) throws IOException {
            this.scheme.sinkCleanup(flowProcess, sinkCall);
        }
    }

    public static enum Counters {
        Paths_Opened,
        Paths_Closed,
        Path_Purges;

    }

    public class PartitionCollector
    extends TupleEntryCollector {
        private final FlowProcess<? extends Config> flowProcess;
        private final Config conf;
        private final Fields parentFields;
        private final Fields partitionFields;
        private TupleEntry partitionEntry;
        private final Tuple partitionTuple;
        private final Tuple parentTuple;

        public PartitionCollector(FlowProcess<? extends Config> flowProcess) {
            super(Fields.asDeclaration(BasePartitionTap.this.getSinkFields()));
            this.flowProcess = flowProcess;
            this.conf = flowProcess.getConfigCopy();
            this.parentFields = BasePartitionTap.this.parent.getSinkFields();
            this.partitionFields = ((PartitionScheme)BasePartitionTap.this.getScheme()).partitionFields;
            this.partitionEntry = new TupleEntry(this.partitionFields);
            this.partitionTuple = TupleViews.createNarrow(BasePartitionTap.this.getSinkFields().getPos(this.partitionFields));
            this.parentTuple = TupleViews.createNarrow(BasePartitionTap.this.getSinkFields().getPos(this.parentFields));
            this.partitionEntry.setTuple(this.partitionTuple);
        }

        TupleEntryCollector getCollector(String path) {
            TupleEntryCollector collector = (TupleEntryCollector)BasePartitionTap.this.collectors.get(path);
            if (collector != null) {
                return collector;
            }
            try {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("creating collector for parent: {}, path: {}", (Object)BasePartitionTap.this.parent.getFullIdentifier(this.conf), (Object)path);
                }
                collector = BasePartitionTap.this.createTupleEntrySchemeCollector(this.flowProcess, BasePartitionTap.this.parent, path, BasePartitionTap.this.openedCollectors);
                BasePartitionTap.this.openedCollectors++;
                this.flowProcess.increment(Counters.Paths_Opened, 1L);
            }
            catch (IOException exception) {
                throw new TapException("unable to open partition path: " + path, exception);
            }
            if (BasePartitionTap.this.collectors.size() > BasePartitionTap.this.openWritesThreshold) {
                this.purgeCollectors();
            }
            BasePartitionTap.this.collectors.put(path, collector);
            if (LOG.isInfoEnabled() && BasePartitionTap.this.collectors.size() % 100 == 0) {
                LOG.info("caching {} open Taps", (Object)BasePartitionTap.this.collectors.size());
            }
            return collector;
        }

        private void purgeCollectors() {
            int numToClose = Math.max(1, (int)((double)BasePartitionTap.this.openWritesThreshold * 0.1));
            if (LOG.isInfoEnabled()) {
                LOG.info("removing {} open Taps from cache of size {}", (Object)numToClose, (Object)BasePartitionTap.this.collectors.size());
            }
            HashMap<String, TupleEntryCollector> purge = new HashMap<String, TupleEntryCollector>();
            Set entries = BasePartitionTap.this.collectors.entrySet();
            for (Map.Entry entry : entries) {
                if (numToClose-- == 0) break;
                purge.put((String)entry.getKey(), (TupleEntryCollector)entry.getValue());
            }
            this.closeCollectors(purge, this::closeCollectorFor);
            BasePartitionTap.this.collectors.keySet().removeAll(purge.keySet());
            this.flowProcess.increment(Counters.Path_Purges, 1L);
        }

        protected void closeCollectors(Map<String, TupleEntryCollector> collectorMap, BiConsumer<String, TupleEntryCollector> closeCollectorFor) {
            collectorMap.forEach(closeCollectorFor);
        }

        @Override
        public void close() {
            super.close();
            try {
                this.closeCollectors(BasePartitionTap.this.collectors, this::closeCollectorFor);
            }
            finally {
                BasePartitionTap.this.collectors.clear();
            }
        }

        protected void closeCollectorFor(String path, TupleEntryCollector collector) {
            if (collector == null) {
                return;
            }
            try {
                collector.close();
                this.flowProcess.increment(Counters.Paths_Closed, 1L);
            }
            catch (Exception exception) {
                boolean failOnError = this.flowProcess.getBooleanProperty("cascading.tap.partition.failonclose", false);
                if (failOnError) {
                    LOG.error("exception while closing TupleEntryCollector {}", (Object)path, (Object)exception);
                    throw new TapException(exception);
                }
                LOG.warn("exception while closing TupleEntryCollector {}: {}", (Object)path, (Object)exception.getMessage());
            }
        }

        @Override
        protected void collect(TupleEntry tupleEntry) throws IOException {
            TupleViews.reset(this.partitionTuple, tupleEntry.getTuple());
            TupleViews.reset(this.parentTuple, tupleEntry.getTuple());
            String path = BasePartitionTap.this.partition.toPartition(this.partitionEntry);
            this.getCollector(path).add(this.parentTuple);
        }
    }

    private class PartitionIterator
    extends TupleEntryIterableChainIterator {
        public PartitionIterator(FlowProcess<? extends Config> flowProcess, Input input) throws IOException {
            super(BasePartitionTap.this.getSourceFields());
            ArrayList<Iterator<Tuple>> iterators = new ArrayList<Iterator<Tuple>>();
            if (input != null) {
                String identifier = BasePartitionTap.this.parent.getFullIdentifier(flowProcess);
                iterators.add(this.createPartitionEntryIterator(flowProcess, input, identifier, BasePartitionTap.this.getCurrentIdentifier(flowProcess)));
            } else {
                String[] childIdentifiers;
                for (String childIdentifier : childIdentifiers = BasePartitionTap.this.getChildPartitionIdentifiers(flowProcess, false)) {
                    iterators.add(this.createPartitionEntryIterator(flowProcess, null, BasePartitionTap.this.parent.getIdentifier(), childIdentifier));
                }
            }
            this.reset(iterators);
        }

        private PartitionTupleEntryIterator createPartitionEntryIterator(FlowProcess<? extends Config> flowProcess, Input input, String parentIdentifier, String childIdentifier) throws IOException {
            TupleEntrySchemeIterator schemeIterator = BasePartitionTap.this.createTupleEntrySchemeIterator(flowProcess, BasePartitionTap.this.parent, childIdentifier, input);
            return new PartitionTupleEntryIterator(BasePartitionTap.this.getSourceFields(), BasePartitionTap.this.partition, parentIdentifier, childIdentifier, schemeIterator);
        }
    }
}

