/*
 * Decompiled with CFR 0.152.
 */
package cascading.local.tap.aws.s3;

import cascading.flow.FlowProcess;
import cascading.local.tap.aws.s3.S3Checkpointer;
import cascading.local.tap.aws.s3.S3Iterable;
import cascading.property.PropertyUtil;
import cascading.scheme.FileFormat;
import cascading.scheme.Scheme;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.TapException;
import cascading.tap.type.FileType;
import cascading.tap.type.TapWith;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import cascading.tuple.TupleEntrySchemeCollector;
import cascading.tuple.TupleEntrySchemeIterator;
import cascading.util.CloseableIterator;
import cascading.util.Util;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.SdkClientException;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressEventType;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.transfer.PersistableTransfer;
import com.amazonaws.services.s3.transfer.Transfer;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.services.s3.transfer.internal.S3ProgressListener;
import com.amazonaws.services.s3.transfer.model.UploadResult;
import com.google.common.io.ByteSource;
import com.google.common.io.FileBackedOutputStream;
import java.io.DataOutputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3Tap
extends Tap<Properties, InputStream, OutputStream>
implements FileType<Properties>,
TapWith<Properties, InputStream, OutputStream> {
    private static final Logger LOG = LoggerFactory.getLogger(S3Tap.class);
    public static final String SEQUENCE_TOKEN = "{sequence}";
    public static final String MIME_DIRECTORY = "application/x-directory";
    public static final String DEFAULT_DELIMITER = "/";
    AmazonS3 s3Client;
    String bucketName;
    String key;
    Predicate<String> filter;
    String delimiter = "/";
    S3Checkpointer checkpointer;
    private transient ObjectMetadata objectMetadata;

    public static URI makeURI(String bucketName, String keyPrefix) {
        return S3Tap.makeURI(bucketName, keyPrefix, null);
    }

    public static URI makeURI(String bucketName, String keyPrefix, String glob) {
        if (bucketName == null) {
            throw new IllegalArgumentException("bucketName may not be null");
        }
        try {
            if (keyPrefix == null) {
                keyPrefix = DEFAULT_DELIMITER;
            } else if (!keyPrefix.startsWith(DEFAULT_DELIMITER)) {
                keyPrefix = DEFAULT_DELIMITER + keyPrefix;
            }
            return new URI("s3", bucketName, keyPrefix, glob, null);
        }
        catch (URISyntaxException exception) {
            throw new IllegalArgumentException(exception.getMessage(), exception);
        }
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String bucketName) {
        this(scheme, bucketName, null, null, null, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String bucketName, String key) {
        this(scheme, bucketName, key, DEFAULT_DELIMITER, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String bucketName, String key, String delimiter) {
        this(scheme, null, null, bucketName, key, delimiter, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String bucketName, Predicate<String> filter) {
        this(scheme, bucketName, null, filter, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String bucketName, String key, Predicate<String> filter) {
        this(scheme, bucketName, key, DEFAULT_DELIMITER, filter, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String bucketName, String key, String delimiter, Predicate<String> filter) {
        this(scheme, null, null, bucketName, key, delimiter, filter, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, String bucketName) {
        this(scheme, s3Client, bucketName, null, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, String bucketName, String key) {
        this(scheme, s3Client, bucketName, key, DEFAULT_DELIMITER, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, String bucketName, String key, String delimiter) {
        this(scheme, s3Client, bucketName, key, delimiter, null, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, String bucketName, String key, String delimiter, Predicate<String> filter) {
        this(scheme, s3Client, null, bucketName, key, delimiter, filter, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, S3Checkpointer checkpointer, String bucketName) {
        this(scheme, checkpointer, bucketName, null, null, null, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, S3Checkpointer checkpointer, String bucketName, String key) {
        this(scheme, checkpointer, bucketName, key, DEFAULT_DELIMITER, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, S3Checkpointer checkpointer, String bucketName, String key, String delimiter) {
        this(scheme, null, checkpointer, bucketName, key, delimiter, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, S3Checkpointer checkpointer, String bucketName, Predicate<String> filter) {
        this(scheme, checkpointer, bucketName, null, filter, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, S3Checkpointer checkpointer, String bucketName, String key, Predicate<String> filter) {
        this(scheme, checkpointer, bucketName, key, DEFAULT_DELIMITER, filter, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, S3Checkpointer checkpointer, String bucketName, String key, String delimiter, Predicate<String> filter) {
        this(scheme, null, checkpointer, bucketName, key, delimiter, filter, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, S3Checkpointer checkpointer, String bucketName) {
        this(scheme, s3Client, checkpointer, bucketName, null, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, S3Checkpointer checkpointer, String bucketName, String key) {
        this(scheme, s3Client, checkpointer, bucketName, key, DEFAULT_DELIMITER, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, S3Checkpointer checkpointer, String bucketName, String key, String delimiter) {
        this(scheme, s3Client, checkpointer, bucketName, key, delimiter, null, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, S3Checkpointer checkpointer, String bucketName, String key, String delimiter, Predicate<String> filter) {
        this(scheme, s3Client, checkpointer, bucketName, key, delimiter, filter, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String bucketName, SinkMode sinkMode) {
        this(scheme, bucketName, null, null, null, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String bucketName, String key, SinkMode sinkMode) {
        this(scheme, bucketName, key, DEFAULT_DELIMITER, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String bucketName, String key, String delimiter, SinkMode sinkMode) {
        this(scheme, null, null, bucketName, key, delimiter, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String bucketName, Predicate<String> filter, SinkMode sinkMode) {
        this(scheme, bucketName, null, filter, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String bucketName, String key, Predicate<String> filter, SinkMode sinkMode) {
        this(scheme, bucketName, key, DEFAULT_DELIMITER, filter, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, String bucketName, String key, String delimiter, Predicate<String> filter, SinkMode sinkMode) {
        this(scheme, null, null, bucketName, key, delimiter, filter, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, String bucketName, SinkMode sinkMode) {
        this(scheme, s3Client, bucketName, null, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, String bucketName, String key, SinkMode sinkMode) {
        this(scheme, s3Client, bucketName, key, DEFAULT_DELIMITER, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, String bucketName, String key, String delimiter, SinkMode sinkMode) {
        this(scheme, s3Client, bucketName, key, delimiter, null, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, String bucketName, String key, String delimiter, Predicate<String> filter, SinkMode sinkMode) {
        this(scheme, s3Client, null, bucketName, key, delimiter, filter, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, S3Checkpointer checkpointer, String bucketName, SinkMode sinkMode) {
        this(scheme, checkpointer, bucketName, null, null, null, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, S3Checkpointer checkpointer, String bucketName, String key, SinkMode sinkMode) {
        this(scheme, checkpointer, bucketName, key, DEFAULT_DELIMITER, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, S3Checkpointer checkpointer, String bucketName, String key, String delimiter, SinkMode sinkMode) {
        this(scheme, null, checkpointer, bucketName, key, delimiter, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, S3Checkpointer checkpointer, String bucketName, Predicate<String> filter, SinkMode sinkMode) {
        this(scheme, checkpointer, bucketName, null, filter, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, S3Checkpointer checkpointer, String bucketName, String key, Predicate<String> filter, SinkMode sinkMode) {
        this(scheme, checkpointer, bucketName, key, DEFAULT_DELIMITER, filter, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, S3Checkpointer checkpointer, String bucketName, String key, String delimiter, Predicate<String> filter, SinkMode sinkMode) {
        this(scheme, null, checkpointer, bucketName, key, delimiter, filter, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, S3Checkpointer checkpointer, String bucketName, SinkMode sinkMode) {
        this(scheme, s3Client, checkpointer, bucketName, null, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, S3Checkpointer checkpointer, String bucketName, String key, SinkMode sinkMode) {
        this(scheme, s3Client, checkpointer, bucketName, key, DEFAULT_DELIMITER, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, S3Checkpointer checkpointer, String bucketName, String key, String delimiter, SinkMode sinkMode) {
        this(scheme, s3Client, checkpointer, bucketName, key, delimiter, null, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, S3Checkpointer checkpointer, String bucketName, String key, String delimiter, Predicate<String> filter, SinkMode sinkMode) {
        super(scheme, sinkMode);
        this.s3Client = s3Client;
        this.checkpointer = checkpointer;
        this.bucketName = bucketName;
        if (Util.isEmpty((String)this.bucketName)) {
            throw new IllegalArgumentException("bucket name may not be null or empty");
        }
        this.key = key;
        this.delimiter = delimiter;
        this.filter = filter;
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, URI identifier) {
        this(scheme, null, null, identifier, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, URI identifier) {
        this(scheme, s3Client, null, identifier, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, S3Checkpointer checkpointer, URI identifier) {
        this(scheme, null, checkpointer, identifier, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, S3Checkpointer checkpointer, URI identifier) {
        this(scheme, s3Client, checkpointer, identifier, SinkMode.KEEP);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, URI identifier, SinkMode sinkMode) {
        this(scheme, null, null, identifier, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, URI identifier, SinkMode sinkMode) {
        this(scheme, s3Client, null, identifier, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, S3Checkpointer checkpointer, URI identifier, SinkMode sinkMode) {
        this(scheme, null, checkpointer, identifier, sinkMode);
    }

    public S3Tap(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, S3Checkpointer checkpointer, URI identifier, SinkMode sinkMode) {
        super(scheme, sinkMode);
        this.s3Client = s3Client;
        this.checkpointer = checkpointer;
        if (identifier == null) {
            throw new IllegalArgumentException("identifier may not be null");
        }
        if (!identifier.getScheme().equalsIgnoreCase("s3")) {
            throw new IllegalArgumentException("identifier does not have s3 scheme");
        }
        this.bucketName = this.getBucketNameFor(identifier);
        if (Util.isEmpty((String)this.bucketName)) {
            throw new IllegalArgumentException("bucket name may not be null or empty" + identifier);
        }
        this.key = this.cleanKey(identifier);
        if (identifier.getQuery() != null) {
            this.filter = S3Tap.globPredicate(identifier.getQuery());
        }
    }

    protected String getBucketNameFor(URI identifier) {
        String authority = identifier.getAuthority();
        if (Util.isEmpty((String)authority)) {
            throw new IllegalArgumentException("identifier must have an authority: " + identifier);
        }
        int pos = authority.indexOf(64);
        if (pos != -1) {
            return authority.substring(pos + 1);
        }
        return authority;
    }

    private static Predicate<String> globPredicate(String glob) {
        String regex = S3Tap.getRegexForGlob(glob);
        Pattern pattern = Pattern.compile(regex);
        return string -> pattern.matcher((CharSequence)string).matches();
    }

    private static String getRegexForGlob(String glob) {
        return (String)Util.invokeStaticMethod((String)"sun.nio.fs.Globs", (String)"toUnixRegexPattern", (Object[])new Object[]{glob}, (Class[])new Class[]{String.class});
    }

    public TapWith<Properties, InputStream, OutputStream> withScheme(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme) {
        return this.create(scheme, this.s3Client, this.getBucketName(), this.getKey(), this.getDelimiter(), this.getFilter(), this.getSinkMode());
    }

    public TapWith<Properties, InputStream, OutputStream> withChildIdentifier(String identifier) {
        URI uri = identifier.startsWith("s3://") ? URI.create(identifier) : (identifier.startsWith(this.getBucketName()) ? S3Tap.makeURI(identifier, null) : S3Tap.makeURI(this.getBucketName(), this.getKey() + (identifier.startsWith(this.delimiter) ? identifier : this.delimiter + identifier)));
        return this.create(this.getScheme(), this.s3Client, uri, this.getSinkMode());
    }

    protected TapWith<Properties, InputStream, OutputStream> create(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, URI identifier, SinkMode sinkMode) {
        return new S3Tap(scheme, s3Client, identifier, sinkMode);
    }

    public TapWith<Properties, InputStream, OutputStream> withSinkMode(SinkMode sinkMode) {
        return this.create(this.getScheme(), this.s3Client, this.getBucketName(), this.getKey(), this.getDelimiter(), this.getFilter(), sinkMode);
    }

    protected TapWith<Properties, InputStream, OutputStream> create(Scheme<Properties, InputStream, OutputStream, ?, ?> scheme, AmazonS3 s3Client, String bucketName, String key, String delimiter, Predicate<String> filter, SinkMode sinkMode) {
        return new S3Tap(scheme, s3Client, bucketName, key, delimiter, filter, sinkMode);
    }

    protected String cleanKey(URI identifier) {
        String path = identifier.normalize().getPath();
        if (path.startsWith(DEFAULT_DELIMITER)) {
            path = path.substring(1);
        }
        return path;
    }

    protected AmazonS3 getS3Client(Properties properties) {
        if (this.s3Client != null) {
            return this.s3Client;
        }
        AmazonS3ClientBuilder standard = AmazonS3ClientBuilder.standard();
        if (properties != null) {
            String endpoint = properties.getProperty("cascading.tap.aws.s3.endpoint");
            String region = properties.getProperty("cascading.tap.aws.s3.region", "us-east-1");
            if (properties.containsKey("cascading.tap.aws.s3.proxy.host")) {
                ClientConfiguration config = new ClientConfiguration().withProxyHost(properties.getProperty("cascading.tap.aws.s3.proxy.host")).withProxyPort(PropertyUtil.getIntProperty((Map)properties, (String)"cascading.tap.aws.s3.proxy.port", (int)-1));
                standard.withClientConfiguration(config);
            }
            if (endpoint != null) {
                standard.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region));
            } else {
                standard.setRegion(region);
            }
            if (Boolean.parseBoolean(properties.getProperty("cascading.tap.aws.s3.path_style_access", "false"))) {
                standard.enablePathStyleAccess();
            }
        }
        return (AmazonS3)standard.build();
    }

    public S3Checkpointer getCheckpointer() {
        return this.checkpointer;
    }

    public String getBucketName() {
        return this.bucketName;
    }

    public String getKey() {
        return this.key;
    }

    protected String getMarker() {
        if (this.checkpointer != null) {
            return this.checkpointer.getLastKey(this.getBucketName());
        }
        return null;
    }

    protected void setLastMarker(String marker) {
        if (this.checkpointer != null) {
            this.checkpointer.setLastKey(this.getBucketName(), marker);
        }
    }

    protected void commitMarker() {
        if (this.checkpointer != null) {
            this.checkpointer.commit();
        }
    }

    public Predicate<String> getFilter() {
        return this.filter;
    }

    public String getDelimiter() {
        return this.delimiter;
    }

    public String getIdentifier() {
        return S3Tap.makeStringIdentifier(this.getBucketName(), this.getKey());
    }

    public String getFullIdentifier(Properties conf) {
        return this.getIdentifier();
    }

    public boolean deleteResource(Properties conf) throws IOException {
        AmazonS3 s3Client = this.getS3Client(conf);
        try {
            s3Client.deleteObject(this.getBucketName(), this.getKey());
        }
        catch (AmazonS3Exception exception) {
            throw this.handleException(s3Client, exception);
        }
        return true;
    }

    public boolean createResource(Properties conf) throws IOException {
        AmazonS3 s3Client = this.getS3Client(conf);
        try {
            s3Client.putObject(this.getBucketName(), this.getKey(), "");
        }
        catch (AmazonS3Exception exception) {
            throw this.handleException(s3Client, exception);
        }
        return true;
    }

    protected ObjectMetadata getObjectMetadata(Properties conf) {
        try {
            if (this.objectMetadata == null) {
                this.objectMetadata = this.getS3Client(conf).getObjectMetadata(this.getBucketName(), this.getKey());
            }
            return this.objectMetadata;
        }
        catch (AmazonS3Exception exception) {
            throw this.handleException(this.getS3Client(conf), exception);
        }
    }

    public TupleEntryIterator openForRead(final FlowProcess<? extends Properties> flowProcess, InputStream input) throws IOException {
        final AmazonS3 s3Client = this.getS3Client((Properties)flowProcess.getConfig());
        final String[] identifier = new String[1];
        CloseableIterator<InputStream> iterator = new CloseableIterator<InputStream>(){
            S3Iterable iterable;
            Iterator<S3ObjectSummary> iterator;
            InputStream lastInputStream;
            {
                this.iterable = S3Iterable.iterable(s3Client, S3Tap.this.getBucketName(), S3Tap.this.getKey()).withFilter(S3Tap.this.getFilter()).withMarker(S3Tap.this.getMarker());
                this.iterator = this.iterable.iterator();
            }

            public boolean hasNext() {
                return this.iterator.hasNext();
            }

            public InputStream next() {
                this.safeClose();
                final S3ObjectSummary objectSummary = this.iterator.next();
                identifier[0] = S3Tap.makeStringIdentifier(objectSummary.getBucketName(), objectSummary.getKey());
                flowProcess.getFlowProcessContext().setSourcePath(identifier[0]);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("s3 retrieving: {}/{}, with size: {}", new Object[]{objectSummary.getBucketName(), objectSummary.getKey(), objectSummary.getSize()});
                }
                this.lastInputStream = new CheckedFilterInputStream((InputStream)s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey()).getObjectContent()){

                    @Override
                    public void close() throws IOException {
                        S3Tap.this.setLastMarker(objectSummary.getKey());
                        super.close();
                    }
                };
                return this.lastInputStream;
            }

            private void safeClose() {
                try {
                    if (this.lastInputStream != null) {
                        this.lastInputStream.close();
                    }
                    this.lastInputStream = null;
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }

            public void close() {
                this.safeClose();
                S3Tap.this.commitMarker();
            }
        };
        return new TupleEntrySchemeIterator(flowProcess, (Tap)this, this.getScheme(), (CloseableIterator)iterator, () -> identifier[0]);
    }

    public TupleEntryCollector openForWrite(FlowProcess<? extends Properties> flowProcess, OutputStream outputStream) throws IOException {
        AmazonS3 s3Client = this.getS3Client((Properties)flowProcess.getConfig());
        if (!s3Client.doesBucketExistV2(this.getBucketName())) {
            s3Client.createBucket(this.getBucketName());
        }
        final String key = this.resolveKey(flowProcess, this.getKey());
        FileBackedOutputStream fileBackedOutputStream = new FileBackedOutputStream(512000, true);
        final DataOutputStream dataOutputStream = new DataOutputStream((OutputStream)fileBackedOutputStream);
        final ByteSource byteSource = fileBackedOutputStream.asByteSource();
        final TransferManager transferManager = TransferManagerBuilder.standard().withS3Client(s3Client).build();
        final String loggableIdentifier = S3Tap.makeStringIdentifier(this.getBucketName(), key);
        return new TupleEntrySchemeCollector<Properties, OutputStream>(flowProcess, (Tap)this, this.getScheme(), (OutputStream)dataOutputStream, loggableIdentifier){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void close() {
                super.close();
                LOG.info("s3 starting async upload: {}", (Object)loggableIdentifier);
                InputStream inputStream = S3Tap.this.openInputStream(byteSource, loggableIdentifier);
                try {
                    ObjectMetadata metadata = new ObjectMetadata();
                    metadata.setHeader("Content-Length", (Object)dataOutputStream.size());
                    Upload upload = S3Tap.this.createUpload(key, transferManager, new PutObjectRequest(S3Tap.this.getBucketName(), key, inputStream, metadata));
                    UploadResult uploadResult = upload.waitForUploadResult();
                    S3Tap.this.handleResult(upload, uploadResult, loggableIdentifier);
                }
                catch (SdkClientException exception) {
                    LOG.error("s3 upload failed on: " + loggableIdentifier, (Throwable)exception);
                    throw new TapException("s3 upload failed on: " + loggableIdentifier, (Throwable)exception);
                }
                catch (InterruptedException interruptedException) {
                }
                finally {
                    transferManager.shutdownNow(false);
                }
            }
        };
    }

    protected void handleResult(Upload upload, UploadResult uploadResult, String loggableIdentifier) {
        Transfer.TransferState state = upload.getState();
        if (state == Transfer.TransferState.Canceled) {
            LOG.warn("s3 canceled upload: {}, with key: {}", (Object)this.getIdentifier(), (Object)uploadResult.getKey());
        } else {
            if (state == Transfer.TransferState.Failed) {
                LOG.error("s3 failed upload: {}, with key: {}", (Object)this.getIdentifier(), (Object)uploadResult.getKey());
                throw new TapException("s3 upload failed on: " + loggableIdentifier);
            }
            LOG.info("s3 completed upload: {}, with key: {}", (Object)this.getIdentifier(), (Object)uploadResult.getKey());
        }
    }

    protected InputStream openInputStream(ByteSource byteSource, String loggableIdentifier) {
        InputStream inputStream;
        try {
            inputStream = byteSource.openBufferedStream();
        }
        catch (IOException exception) {
            LOG.error("s3 upload failed on: " + loggableIdentifier, (Throwable)exception);
            throw new TapException("s3 upload failed on: " + loggableIdentifier, (Throwable)exception);
        }
        return inputStream;
    }

    protected Upload createUpload(final String key, TransferManager transferManager, PutObjectRequest request) {
        return transferManager.upload(request, new S3ProgressListener(){

            public void onPersistableTransfer(PersistableTransfer persistableTransfer) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("s3 for: {}, persistable transfer: {}", (Object)key, (Object)persistableTransfer);
                }
            }

            public void progressChanged(ProgressEvent progressEvent) {
                if (progressEvent.getEventType() == ProgressEventType.TRANSFER_FAILED_EVENT) {
                    LOG.error("s3 for: {}, event: {}", (Object)key, (Object)progressEvent);
                }
                if (progressEvent.getEventType() == ProgressEventType.TRANSFER_CANCELED_EVENT) {
                    LOG.warn("s3 for: {}, event: {}", (Object)key, (Object)progressEvent);
                }
                if (progressEvent.getEventType() == ProgressEventType.TRANSFER_PART_FAILED_EVENT) {
                    LOG.warn("s3 for: {}, event: {}", (Object)key, (Object)progressEvent);
                } else if (LOG.isDebugEnabled()) {
                    LOG.debug("s3 for: {}, event: {}", (Object)key, (Object)progressEvent);
                }
            }
        });
    }

    protected String resolveKey(FlowProcess<? extends Properties> flowProcess, String key) {
        int partNum = flowProcess.getIntegerProperty("cascading.local.tap.partition.seq", 0);
        key = key.replace(SEQUENCE_TOKEN, String.format("%05d", partNum));
        if (this.getScheme() instanceof FileFormat) {
            return key + "." + ((FileFormat)this.getScheme()).getExtension();
        }
        return key;
    }

    public boolean resourceExists(Properties conf) throws IOException {
        AmazonS3 s3Client = this.getS3Client(conf);
        try {
            if (this.getKey() == null) {
                return s3Client.doesBucketExistV2(this.getBucketName());
            }
            return s3Client.doesObjectExist(this.getBucketName(), this.getKey());
        }
        catch (AmazonS3Exception exception) {
            throw this.handleException(s3Client, exception);
        }
    }

    protected AmazonS3Exception handleException(AmazonS3 s3Client, AmazonS3Exception exception) {
        if (exception.getStatusCode() == 400) {
            LOG.error("s3 request failed, try changing the AWS Region from: {}, using property: {}", new Object[]{s3Client.getRegionName(), "cascading.tap.aws.s3.region", exception});
        }
        return exception;
    }

    public long getModifiedTime(Properties conf) throws IOException {
        return this.getObjectMetadata(conf).getLastModified().getTime();
    }

    public boolean isDirectory(FlowProcess<? extends Properties> flowProcess) throws IOException {
        return MIME_DIRECTORY.equalsIgnoreCase(this.getObjectMetadata((Properties)flowProcess.getConfig()).getContentType());
    }

    public boolean isDirectory(Properties conf) throws IOException {
        return this.isDirectory((FlowProcess<? extends Properties>)FlowProcess.nullFlowProcess());
    }

    public String[] getChildIdentifiers(FlowProcess<? extends Properties> flowProcess) throws IOException {
        return this.getChildIdentifiers((Properties)flowProcess.getConfig());
    }

    public String[] getChildIdentifiers(Properties conf) throws IOException {
        return this.getChildIdentifiers(conf, 1, false);
    }

    public String[] getChildIdentifiers(FlowProcess<? extends Properties> flowProcess, int depth, boolean fullyQualified) throws IOException {
        return this.getChildIdentifiers((Properties)flowProcess.getConfig(), depth, fullyQualified);
    }

    public String[] getChildIdentifiers(Properties conf, int depth, boolean fullyQualified) throws IOException {
        if (!this.resourceExists(conf)) {
            return new String[0];
        }
        S3Iterable objects = S3Iterable.iterable(this.getS3Client(conf), this.getBucketName(), this.getKey()).withDelimiter(this.getDelimiter()).withMaxDepth(depth).withFilter(this.getFilter()).withMarker(this.getMarker());
        Iterator<S3ObjectSummary> iterator = objects.iterator();
        ArrayList<String> results = new ArrayList<String>();
        while (iterator.hasNext()) {
            results.add(this.makePath(iterator, fullyQualified));
        }
        return results.toArray(new String[results.size()]);
    }

    protected String makePath(Iterator<S3ObjectSummary> iterator, boolean fullyQualified) {
        String key = iterator.next().getKey();
        if (fullyQualified) {
            return S3Tap.makeStringIdentifier(this.getBucketName(), key);
        }
        return key.substring(this.getKey().length());
    }

    public long getSize(FlowProcess<? extends Properties> flowProcess) throws IOException {
        return this.getSize((Properties)flowProcess.getConfig());
    }

    public long getSize(Properties conf) throws IOException {
        if (this.isDirectory(conf)) {
            return 0L;
        }
        return this.getObjectMetadata(conf).getInstanceLength();
    }

    protected static String makeStringIdentifier(String bucketName, String keyPrefix) {
        if (Util.isEmpty((String)keyPrefix)) {
            return String.format("s3://%s/", bucketName);
        }
        return String.format("s3://%s/%s", bucketName, keyPrefix);
    }

    private class CheckedFilterInputStream
    extends FilterInputStream {
        public CheckedFilterInputStream(InputStream inputStream) {
            super(inputStream);
        }
    }
}

