package org.apache.beam.sdk.io;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.hadoop.fs.Path;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(Experimental.Kind.FILESYSTEM)
/* loaded from: input_file:org/apache/beam/sdk/io/HadoopFileBasedSink.class */
public abstract class HadoopFileBasedSink<T> implements Serializable, HasDisplayData {
    private static final Logger LOG = LoggerFactory.getLogger(HadoopFileBasedSink.class);
    private final FileBasedSink.FilenamePolicy filenamePolicy;
    private final ValueProvider<ResourceId> baseOutputDirectoryProvider;

    /* loaded from: input_file:org/apache/beam/sdk/io/HadoopFileBasedSink$ExtractDirectory.class */
    private static class ExtractDirectory implements SerializableFunction<ResourceId, ResourceId> {
        private ExtractDirectory() {
        }

        public ResourceId apply(ResourceId resourceId) {
            return resourceId.getCurrentDirectory();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/HadoopFileBasedSink$WriteOperation.class */
    public static abstract class WriteOperation<T> implements Serializable {
        protected final HadoopFileBasedSink<T> sink;
        protected final ValueProvider<ResourceId> tempDirectory;

        @Experimental(Experimental.Kind.FILESYSTEM)
        protected boolean windowedWrites;

        /* loaded from: input_file:org/apache/beam/sdk/io/HadoopFileBasedSink$WriteOperation$TemporaryDirectoryBuilder.class */
        private static class TemporaryDirectoryBuilder implements SerializableFunction<ResourceId, ResourceId> {
            private static final AtomicLong TEMP_COUNT = new AtomicLong(0);
            private static final DateTimeFormatter TEMPDIR_TIMESTAMP = DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss");
            private final String timestamp;
            private final Long tempId;

            private TemporaryDirectoryBuilder() {
                this.timestamp = Instant.now().toString(TEMPDIR_TIMESTAMP);
                this.tempId = Long.valueOf(TEMP_COUNT.getAndIncrement());
            }

            public ResourceId apply(ResourceId resourceId) {
                return resourceId.resolve(String.format(".temp-beam-%s-%s", this.timestamp, this.tempId), ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
            }
        }

        @Experimental(Experimental.Kind.FILESYSTEM)
        protected static ResourceId buildTemporaryFilename(ResourceId resourceId, String str) throws IOException {
            return resourceId.resolve(str, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
        }

        public WriteOperation(HadoopFileBasedSink<T> hadoopFileBasedSink) {
            this(hadoopFileBasedSink, (ValueProvider<ResourceId>) ValueProvider.NestedValueProvider.of(hadoopFileBasedSink.getBaseOutputDirectoryProvider(), new TemporaryDirectoryBuilder()));
        }

        @Experimental(Experimental.Kind.FILESYSTEM)
        public WriteOperation(HadoopFileBasedSink<T> hadoopFileBasedSink, ResourceId resourceId) {
            this(hadoopFileBasedSink, (ValueProvider<ResourceId>) ValueProvider.StaticValueProvider.of(resourceId));
        }

        private WriteOperation(HadoopFileBasedSink<T> hadoopFileBasedSink, ValueProvider<ResourceId> valueProvider) {
            this.sink = hadoopFileBasedSink;
            this.tempDirectory = valueProvider;
            this.windowedWrites = false;
        }

        public abstract Writer<T> createWriter() throws Exception;

        public void setWindowedWrites(boolean z) {
            this.windowedWrites = z;
        }

        public void finalize(Iterable<FileBasedSink.FileResult<Void>> iterable) throws Exception {
            Map<ResourceId, ResourceId> buildOutputFilenames = buildOutputFilenames(iterable);
            copyToOutputFiles(buildOutputFilenames);
            removeTemporaryFiles(buildOutputFilenames.keySet(), !this.windowedWrites);
        }

        @Experimental(Experimental.Kind.FILESYSTEM)
        protected final Map<ResourceId, ResourceId> buildOutputFilenames(Iterable<FileBasedSink.FileResult<Void>> iterable) {
            int size = Iterables.size(iterable);
            HashMap hashMap = new HashMap();
            FileBasedSink.FilenamePolicy filenamePolicy = getSink().getFilenamePolicy();
            Boolean bool = null;
            for (FileBasedSink.FileResult<Void> fileResult : iterable) {
                boolean z = fileResult.getShard() != -1;
                if (bool == null) {
                    bool = Boolean.valueOf(z);
                } else {
                    Preconditions.checkArgument(bool.booleanValue() == z, "Found a mix of files with and without shard number set: %s", fileResult);
                }
            }
            if (bool == null) {
                bool = true;
            }
            ArrayList<FileBasedSink.FileResult> newArrayList = Lists.newArrayList();
            if (bool.booleanValue()) {
                newArrayList = Lists.newArrayList(iterable);
            } else {
                List sortedCopy = Ordering.from(new Comparator<FileBasedSink.FileResult<Void>>() { // from class: org.apache.beam.sdk.io.HadoopFileBasedSink.WriteOperation.1
                    @Override // java.util.Comparator
                    public int compare(FileBasedSink.FileResult<Void> fileResult2, FileBasedSink.FileResult<Void> fileResult3) {
                        return fileResult2.getTempFilename().toString().compareTo(fileResult3.getTempFilename().toString());
                    }
                }).sortedCopy(iterable);
                for (int i = 0; i < sortedCopy.size(); i++) {
                    newArrayList.add(((FileBasedSink.FileResult) sortedCopy.get(i)).withShard(i));
                }
            }
            for (FileBasedSink.FileResult fileResult2 : newArrayList) {
                Preconditions.checkArgument(fileResult2.getShard() != -1, "Should have set shard number on %s", fileResult2);
                hashMap.put(fileResult2.getTempFilename(), fileResult2.getDestinationFile(DynamicFileDestinations.constant(filenamePolicy), size, new FileBasedSink.OutputFileHints() { // from class: org.apache.beam.sdk.io.HadoopFileBasedSink.WriteOperation.2
                    @Nullable
                    public String getMimeType() {
                        return "application/octet-stream";
                    }

                    @Nullable
                    public String getSuggestedFilenameSuffix() {
                        return "";
                    }
                }));
            }
            int size2 = new HashSet(hashMap.values()).size();
            Preconditions.checkState(size2 == hashMap.size(), "Only generated %s distinct file names for %s files.", size2, hashMap.size());
            return hashMap;
        }

        @VisibleForTesting
        @Experimental(Experimental.Kind.FILESYSTEM)
        final void copyToOutputFiles(Map<ResourceId, ResourceId> map) throws IOException {
            int size = map.size();
            if (size <= 0) {
                HadoopFileBasedSink.LOG.info("No output files to write.");
                return;
            }
            HadoopFileBasedSink.LOG.debug("Copying {} files.", Integer.valueOf(size));
            ArrayList arrayList = new ArrayList(map.size());
            ArrayList arrayList2 = new ArrayList(map.size());
            for (Map.Entry<ResourceId, ResourceId> entry : map.entrySet()) {
                arrayList.add(entry.getKey());
                arrayList2.add(entry.getValue());
            }
            FileSystems.copy(arrayList, arrayList2, new MoveOptions[]{MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES});
        }

        @VisibleForTesting
        @Experimental(Experimental.Kind.FILESYSTEM)
        final void removeTemporaryFiles(Set<ResourceId> set, boolean z) throws IOException {
            ResourceId resourceId = (ResourceId) this.tempDirectory.get();
            HadoopFileBasedSink.LOG.debug("Removing temporary bundle output files in {}.", resourceId);
            HashSet hashSet = new HashSet();
            if (z) {
                try {
                    Iterator it = ((MatchResult) Iterables.getOnlyElement(FileSystems.match(Collections.singletonList(resourceId.toString() + "*")))).metadata().iterator();
                    while (it.hasNext()) {
                        hashSet.add(((MatchResult.Metadata) it.next()).resourceId());
                    }
                } catch (Exception e) {
                    HadoopFileBasedSink.LOG.warn("Failed to match temporary files under: [{}].", resourceId);
                }
            }
            HashSet hashSet2 = new HashSet(hashSet);
            hashSet2.addAll(set);
            HadoopFileBasedSink.LOG.debug("Removing {} temporary files found under {} ({} matched glob, {} known files)", new Object[]{Integer.valueOf(hashSet2.size()), resourceId, Integer.valueOf(hashSet.size()), Integer.valueOf(hashSet2.size() - hashSet.size())});
            FileSystems.delete(hashSet2, new MoveOptions[]{MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES});
            try {
                FileSystems.delete(Collections.singletonList(resourceId), new MoveOptions[]{MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES});
            } catch (Exception e2) {
                HadoopFileBasedSink.LOG.warn("Failed to remove temporary directory: [{}].", resourceId);
            }
        }

        public HadoopFileBasedSink<T> getSink() {
            return this.sink;
        }

        public String toString() {
            return getClass().getSimpleName() + "{tempDirectory=" + (this.tempDirectory.isAccessible() ? ((ResourceId) this.tempDirectory.get()).toString() : this.tempDirectory.toString()) + ", windowedWrites=" + this.windowedWrites + '}';
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/HadoopFileBasedSink$Writer.class */
    public static abstract class Writer<T> {
        private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
        private final WriteOperation<T> writeOperation;
        private String id;
        private BoundedWindow window;
        private PaneInfo paneInfo;
        private int shard = -1;

        @Nullable
        private ResourceId outputFile;
        private Path path;

        public Writer(WriteOperation<T> writeOperation) {
            Preconditions.checkNotNull(writeOperation);
            this.writeOperation = writeOperation;
        }

        protected abstract void prepareWrite(Path path) throws Exception;

        protected void finishWrite() throws Exception {
        }

        public final void openWindowed(String str, BoundedWindow boundedWindow, PaneInfo paneInfo, int i) throws Exception {
            if (!getWriteOperation().windowedWrites) {
                throw new IllegalStateException("openWindowed called a non-windowed sink.");
            }
            open(str, boundedWindow, paneInfo, i);
        }

        public abstract void write(T t) throws Exception;

        public final void openUnwindowed(String str, int i) throws Exception {
            if (getWriteOperation().windowedWrites) {
                throw new IllegalStateException("openUnwindowed called a windowed sink.");
            }
            open(str, null, null, i);
        }

        private void open(String str, @Nullable BoundedWindow boundedWindow, @Nullable PaneInfo paneInfo, int i) throws Exception {
            this.id = str;
            this.window = boundedWindow;
            this.paneInfo = paneInfo;
            this.shard = i;
            ResourceId resourceId = (ResourceId) getWriteOperation().tempDirectory.get();
            this.outputFile = resourceId.resolve(this.id, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
            Verify.verifyNotNull(this.outputFile, "FileSystems are not allowed to return null from resolve: %s", new Object[]{resourceId});
            LOG.debug("Opening {} for write", this.outputFile);
            this.path = new Path(this.outputFile.toString());
            LOG.debug("Preparing write to {}.", this.outputFile);
            prepareWrite(this.path);
            LOG.debug("Starting write of bundle {} to {}.", this.id, this.outputFile);
        }

        public final void cleanup() throws Exception {
            if (this.outputFile != null) {
                FileSystems.delete(Collections.singletonList(this.outputFile), new MoveOptions[]{MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES});
            }
        }

        public final FileBasedSink.FileResult<Void> close() throws Exception {
            Preconditions.checkState(this.outputFile != null, "FileResult.close cannot be called with a null outputFile");
            LOG.debug("Finishing write to {}.", this.outputFile);
            finishWrite();
            FileBasedSink.FileResult<Void> fileResult = new FileBasedSink.FileResult<>(this.outputFile, this.shard, this.window, this.paneInfo, (Object) null);
            LOG.debug("Result for bundle {}: {}", this.id, this.outputFile);
            return fileResult;
        }

        public WriteOperation<T> getWriteOperation() {
            return this.writeOperation;
        }
    }

    @Experimental(Experimental.Kind.FILESYSTEM)
    public HadoopFileBasedSink(ValueProvider<ResourceId> valueProvider, FileBasedSink.FilenamePolicy filenamePolicy) {
        this.baseOutputDirectoryProvider = ValueProvider.NestedValueProvider.of(valueProvider, new ExtractDirectory());
        this.filenamePolicy = filenamePolicy;
    }

    @Experimental(Experimental.Kind.FILESYSTEM)
    public ValueProvider<ResourceId> getBaseOutputDirectoryProvider() {
        return this.baseOutputDirectoryProvider;
    }

    @Experimental(Experimental.Kind.FILESYSTEM)
    public final FileBasedSink.FilenamePolicy getFilenamePolicy() {
        return this.filenamePolicy;
    }

    public void validate(PipelineOptions pipelineOptions) {
    }

    public abstract WriteOperation<T> createWriteOperation();

    public void populateDisplayData(DisplayData.Builder builder) {
        getFilenamePolicy().populateDisplayData(builder);
    }
}
