package org.apache.pulsar.io.alluxio.sink;

import alluxio.AlluxioURI;
import alluxio.Constants;
import alluxio.client.WriteType;
import alluxio.client.file.FileOutStream;
import alluxio.client.file.FileSystem;
import alluxio.conf.InstancedConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.AlluxioException;
import alluxio.grpc.CreateFilePOptions;
import alluxio.grpc.WritePType;
import alluxio.util.FileSystemOptions;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name = Constants.SCHEME, type = IOType.SINK, help = "The sink connector is used for moving records from Pulsar to Alluxio.", configClass = AlluxioSinkConfig.class)
/* loaded from: input_file:org/apache/pulsar/io/alluxio/sink/AlluxioSink.class */
public class AlluxioSink implements Sink<GenericObject> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AlluxioSink.class);
    private FileSystem fileSystem;
    private FileOutStream fileOutStream;
    private long recordsNum;
    private String tmpFilePath;
    private String fileDirPath;
    private String tmpFileDirPath;
    private long lastRotationTime;
    private long rotationRecordsNum;
    private long rotationInterval;
    private AlluxioSinkConfig alluxioSinkConfig;
    private AlluxioState alluxioState;
    private InstancedConfiguration configuration = InstancedConfiguration.defaults();
    private ObjectMapper objectMapper = new ObjectMapper();
    private List<Record<GenericObject>> recordsToAck;

    /* loaded from: input_file:org/apache/pulsar/io/alluxio/sink/AlluxioSink$AlluxioState.class */
    private enum AlluxioState {
        WRITE_STARTED,
        FILE_ROTATED,
        FILE_COMMITTED
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        this.alluxioSinkConfig = AlluxioSinkConfig.load(map);
        this.alluxioSinkConfig.validate();
        String alluxioMasterHost = this.alluxioSinkConfig.getAlluxioMasterHost();
        int alluxioMasterPort = this.alluxioSinkConfig.getAlluxioMasterPort();
        this.configuration.set(PropertyKey.MASTER_HOSTNAME, alluxioMasterHost);
        this.configuration.set(PropertyKey.MASTER_RPC_PORT, Integer.valueOf(alluxioMasterPort));
        if (this.alluxioSinkConfig.getSecurityLoginUser() != null) {
            this.configuration.set(PropertyKey.SECURITY_LOGIN_USERNAME, this.alluxioSinkConfig.getSecurityLoginUser());
        }
        this.fileSystem = FileSystem.Factory.create(this.configuration);
        String alluxioDir = this.alluxioSinkConfig.getAlluxioDir();
        this.fileDirPath = alluxioDir.startsWith("/") ? alluxioDir : "/" + alluxioDir;
        this.tmpFileDirPath = this.fileDirPath + "/tmp";
        AlluxioURI alluxioURI = new AlluxioURI(this.fileDirPath);
        if (!this.fileSystem.exists(alluxioURI)) {
            this.fileSystem.createDirectory(alluxioURI);
        }
        AlluxioURI alluxioURI2 = new AlluxioURI(this.tmpFileDirPath);
        if (!this.fileSystem.exists(alluxioURI2)) {
            this.fileSystem.createDirectory(alluxioURI2);
        }
        this.recordsNum = 0L;
        this.recordsToAck = Lists.newArrayList();
        this.tmpFilePath = "";
        this.alluxioState = AlluxioState.WRITE_STARTED;
        this.lastRotationTime = System.currentTimeMillis();
        this.rotationRecordsNum = this.alluxioSinkConfig.getRotationRecords();
        this.rotationInterval = this.alluxioSinkConfig.getRotationInterval();
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:2:0x000f. Please report as an issue. */
    @Override // org.apache.pulsar.io.core.Sink
    public void write(Record<GenericObject> record) {
        long currentTimeMillis = System.currentTimeMillis();
        switch (this.alluxioState) {
            case WRITE_STARTED:
                try {
                    writeToAlluxio(record);
                    if (!shouldRotate(currentTimeMillis)) {
                        return;
                    } else {
                        this.alluxioState = AlluxioState.FILE_ROTATED;
                    }
                } catch (AlluxioException | IOException e) {
                    log.error("Unable to write record to alluxio.", e);
                    record.fail();
                    return;
                }
            case FILE_ROTATED:
                try {
                    closeAndCommitTmpFile();
                    this.alluxioState = AlluxioState.FILE_COMMITTED;
                    ackRecords();
                } catch (AlluxioException | IOException e2) {
                    log.error("Unable to flush records to alluxio.", e2);
                    failRecords();
                    try {
                        deleteTmpFile();
                        return;
                    } catch (AlluxioException | IOException e3) {
                        log.error("Failed to delete tmp cache file.", e2);
                        return;
                    }
                }
            case FILE_COMMITTED:
                this.alluxioState = AlluxioState.WRITE_STARTED;
                return;
            default:
                log.error("{} is not a valid state when writing record to alluxio temp dir {}.", this.alluxioState, this.tmpFileDirPath);
                return;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        try {
            closeAndCommitTmpFile();
            ackRecords();
        } catch (AlluxioException | IOException e) {
            log.error("Unable to flush records to alluxio.", e);
            failRecords();
        }
        deleteTmpFile();
    }

    private void ackRecords() {
        this.recordsToAck.forEach((v0) -> {
            v0.ack();
        });
        this.recordsToAck.clear();
    }

    private void failRecords() {
        this.recordsToAck.forEach((v0) -> {
            v0.fail();
        });
        this.recordsToAck.clear();
    }

    private void writeToAlluxio(Record<GenericObject> record) throws AlluxioException, IOException {
        KeyValue<String, String> extractKeyValue = extractKeyValue(record);
        if (this.fileOutStream == null) {
            createTmpFile();
        }
        this.fileOutStream.write(toBytes(extractKeyValue.getValue()));
        if (this.alluxioSinkConfig.getLineSeparator() != 0) {
            this.fileOutStream.write(this.alluxioSinkConfig.getLineSeparator());
        }
        this.recordsNum++;
        this.recordsToAck.add(record);
    }

    private void createTmpFile() throws AlluxioException, IOException {
        CreateFilePOptions.Builder builder = FileSystemOptions.createFileDefaults(this.configuration).toBuilder();
        UUID randomUUID = UUID.randomUUID();
        this.tmpFilePath = this.tmpFileDirPath + "/" + randomUUID.toString() + "_tmp" + this.alluxioSinkConfig.getFileExtension();
        if (this.alluxioSinkConfig.getWriteType() != null) {
            try {
                builder.setWriteType(WritePType.valueOf(this.alluxioSinkConfig.getWriteType().toUpperCase()));
            } catch (IllegalArgumentException e) {
                throw new IllegalArgumentException("Illegal write type when creating Alluxio files, valid values are: " + Arrays.asList(WriteType.values()));
            }
        }
        this.fileOutStream = this.fileSystem.createFile(new AlluxioURI(this.tmpFilePath), builder.build());
    }

    private void closeAndCommitTmpFile() throws AlluxioException, IOException {
        if (this.fileOutStream != null) {
            this.fileOutStream.close();
        }
        String filePrefix = this.alluxioSinkConfig.getFilePrefix();
        this.alluxioSinkConfig.getFileExtension();
        this.fileSystem.rename(new AlluxioURI(this.tmpFilePath), new AlluxioURI(this.fileDirPath + "/" + (filePrefix + "-" + System.currentTimeMillis() + filePrefix)));
        this.fileOutStream = null;
        this.tmpFilePath = "";
        this.recordsNum = 0L;
        this.lastRotationTime = System.currentTimeMillis();
    }

    private void deleteTmpFile() throws AlluxioException, IOException {
        if (this.tmpFilePath.equals("")) {
            return;
        }
        this.fileSystem.delete(new AlluxioURI(this.tmpFilePath));
    }

    private boolean shouldRotate(long j) {
        boolean z = false;
        if (this.recordsNum >= this.rotationRecordsNum) {
            z = true;
        } else if (this.rotationInterval != -1 && j - this.lastRotationTime >= this.rotationInterval) {
            z = true;
        }
        return z;
    }

    private static byte[] toByteArray(Object obj) throws IOException {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
                try {
                    objectOutputStream.writeObject(obj);
                    objectOutputStream.flush();
                    byte[] byteArray = byteArrayOutputStream.toByteArray();
                    objectOutputStream.close();
                    byteArrayOutputStream.close();
                    return byteArray;
                } catch (Throwable th) {
                    try {
                        objectOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException e) {
            log.error("Failed to serialize the object.", (Throwable) e);
            throw e;
        }
    }

    private static byte[] toBytes(Object obj) throws IOException {
        return obj instanceof String ? ((String) obj).getBytes(StandardCharsets.UTF_8) : obj instanceof byte[] ? (byte[]) obj : toByteArray(obj);
    }

    public KeyValue<String, String> extractKeyValue(Record<GenericObject> record) throws JsonProcessingException {
        Schema<GenericObject> schema;
        GenericObject value;
        if (!this.alluxioSinkConfig.isSchemaEnable()) {
            return new KeyValue<>(null, new String(record.getMessage().orElseThrow(() -> {
                return new IllegalArgumentException("Record does not carry message information");
            }).getData(), StandardCharsets.UTF_8));
        }
        if (record.getSchema() == null || !(record.getSchema() instanceof KeyValueSchema)) {
            schema = record.getSchema();
            value = record.getValue();
        } else {
            schema = ((KeyValueSchema) record.getSchema()).getValueSchema();
            value = (GenericObject) ((org.apache.pulsar.common.schema.KeyValue) record.getValue().getNativeObject()).getValue();
        }
        String str = null;
        if (value != null) {
            str = schema != null ? stringifyValue(schema, value) : value.getNativeObject() instanceof byte[] ? new String((byte[]) value.getNativeObject(), StandardCharsets.UTF_8) : value.getNativeObject().toString();
        }
        return new KeyValue<>(null, str);
    }

    public String stringifyValue(Schema<?> schema, Object obj) throws JsonProcessingException {
        if (schema.getSchemaInfo().getType() != SchemaType.JSON) {
            throw new UnsupportedOperationException("Unsupported value schemaType=" + schema.getSchemaInfo().getType());
        }
        return this.objectMapper.writeValueAsString((JsonNode) ((GenericRecord) obj).getNativeObject());
    }
}
