/*
 * Decompiled with CFR 0.152.
 */
package cascading.tap;

import cascading.flow.FlowProcess;
import cascading.scheme.NullScheme;
import cascading.scheme.Scheme;
import cascading.tap.CompositeTap;
import cascading.tap.SinkTap;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import cascading.util.Util;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiSinkTap<Child extends Tap, Config, Output>
extends SinkTap<Config, Output>
implements CompositeTap<Child> {
    private static final Logger LOG = LoggerFactory.getLogger(MultiSinkTap.class);
    private final Child[] taps;
    private final String tempPath = "__multisink_placeholder_" + Util.createUniqueID();
    private List<Map<String, String>> childConfigs;

    @ConstructorProperties(value={"taps"})
    public MultiSinkTap(Child ... taps) {
        this.taps = taps;
    }

    protected Child[] getTaps() {
        return this.taps;
    }

    @Override
    public Iterator<Child> getChildTaps() {
        return Arrays.asList(this.getTaps()).iterator();
    }

    @Override
    public long getNumChildTaps() {
        return this.getTaps().length;
    }

    @Override
    public String getIdentifier() {
        return this.tempPath;
    }

    @Override
    public void presentSinkFields(FlowProcess<? extends Config> flowProcess, Fields fields) {
        for (Tap child : this.getTaps()) {
            child.presentSinkFields(flowProcess, fields);
        }
    }

    @Override
    public TupleEntryCollector openForWrite(FlowProcess<? extends Config> flowProcess, Output output) throws IOException {
        return new MultiSinkCollector(flowProcess, this.getTaps());
    }

    @Override
    public void sinkConfInit(FlowProcess<? extends Config> flowProcess, Config conf) {
        this.bridge(flowProcess, conf);
    }

    private void bridge(FlowProcess flowProcess, Object conf) {
        this.childConfigs = new ArrayList<Map<String, String>>();
        for (int i = 0; i < this.getTaps().length; ++i) {
            Tap tap = this.getTaps()[i];
            Object newConfig = flowProcess.copyConfig(conf);
            tap.sinkConfInit(flowProcess, newConfig);
            this.childConfigs.add(flowProcess.diffConfigIntoMap(conf, newConfig));
        }
    }

    @Override
    public boolean createResource(Config conf) throws IOException {
        for (Tap tap : this.getTaps()) {
            if (tap.createResource(conf)) continue;
            return false;
        }
        return true;
    }

    @Override
    public boolean deleteResource(Config conf) throws IOException {
        for (Tap tap : this.getTaps()) {
            if (tap.deleteResource(conf)) continue;
            return false;
        }
        return true;
    }

    @Override
    public boolean commitResource(Config conf) throws IOException {
        for (Tap tap : this.getTaps()) {
            if (tap.commitResource(conf)) continue;
            return false;
        }
        return true;
    }

    @Override
    public boolean rollbackResource(Config conf) throws IOException {
        for (Tap tap : this.getTaps()) {
            if (tap.rollbackResource(conf)) continue;
            return false;
        }
        return true;
    }

    @Override
    public boolean resourceExists(Config conf) throws IOException {
        for (Tap tap : this.getTaps()) {
            if (tap.resourceExists(conf)) continue;
            return false;
        }
        return true;
    }

    @Override
    public long getModifiedTime(Config conf) throws IOException {
        long modified = this.getTaps()[0].getModifiedTime(conf);
        for (int i = 1; i < this.getTaps().length; ++i) {
            modified = Math.max(this.getTaps()[i].getModifiedTime(conf), modified);
        }
        return modified;
    }

    @Override
    public Scheme getScheme() {
        if (super.getScheme() != null) {
            return super.getScheme();
        }
        HashSet<Fields> fields = new HashSet<Fields>();
        for (Tap child : this.getTaps()) {
            fields.add(child.getSinkFields());
        }
        if (fields.size() == 1) {
            this.setScheme(this.getTaps()[0].getScheme());
            return super.getScheme();
        }
        Fields allFields = Fields.merge(fields.toArray(new Fields[fields.size()]));
        this.setScheme(new NullScheme(allFields, allFields));
        return super.getScheme();
    }

    @Override
    public String toString() {
        return "MultiSinkTap[" + (this.taps == null ? "none" : Arrays.asList(this.taps)) + ']';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof MultiSinkTap)) {
            return false;
        }
        if (!super.equals(o)) {
            return false;
        }
        MultiSinkTap that = (MultiSinkTap)o;
        return Arrays.equals(this.taps, that.taps);
    }

    @Override
    public int hashCode() {
        int result = super.hashCode();
        result = 31 * result + (this.taps != null ? Arrays.hashCode(this.taps) : 0);
        return result;
    }

    private class MultiSinkCollector
    extends TupleEntryCollector {
        TupleEntryCollector[] collectors;

        public <C extends Config> MultiSinkCollector(FlowProcess<C> flowProcess, Tap<Config, ?, ?> ... taps) throws IOException {
            super(Fields.asDeclaration(MultiSinkTap.this.getSinkFields()));
            this.collectors = new TupleEntryCollector[taps.length];
            C conf = flowProcess.getConfigCopy();
            for (int i = 0; i < taps.length; ++i) {
                C mergedConf = MultiSinkTap.this.childConfigs == null ? conf : flowProcess.mergeMapIntoConfig(conf, (Map)MultiSinkTap.this.childConfigs.get(i));
                Tap<C, ?, ?> tap = taps[i];
                LOG.info("opening for write: {}", (Object)tap.toString());
                this.collectors[i] = tap.openForWrite(flowProcess.copyWith(mergedConf), null);
            }
        }

        @Override
        protected void collect(TupleEntry tupleEntry) throws IOException {
            for (int i = 0; i < MultiSinkTap.this.taps.length; ++i) {
                this.collectors[i].add(tupleEntry);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() {
            super.close();
            try {
                for (TupleEntryCollector collector : this.collectors) {
                    try {
                        collector.close();
                    }
                    catch (Exception exception) {
                        LOG.warn("exception closing TupleEntryCollector", (Throwable)exception);
                    }
                }
            }
            finally {
                this.collectors = null;
            }
        }
    }
}

