/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.cli;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.JsonParseException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.cli.NoSplitter;
import org.apache.pulsar.client.cli.PulsarClientTool;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Parameters(commandDescription="Produce messages to a specified topic")
public class CmdProduce {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarClientTool.class);
    private static final int MAX_MESSAGES = 1000;
    static final String KEY_VALUE_ENCODING_TYPE_NOT_SET = "";
    private static final String KEY_VALUE_ENCODING_TYPE_SEPARATED = "separated";
    private static final String KEY_VALUE_ENCODING_TYPE_INLINE = "inline";
    @Parameter(description="TopicName", required=true)
    private List<String> mainOptions;
    @Parameter(names={"-m", "--messages"}, description="Messages to send, either -m or -f must be specified. Specify -m for each message.", splitter=NoSplitter.class)
    private List<String> messages = new ArrayList<String>();
    @Parameter(names={"-f", "--files"}, description="Comma separated file paths to send, either -m or -f must be specified.")
    private List<String> messageFileNames = new ArrayList<String>();
    @Parameter(names={"-n", "--num-produce"}, description="Number of times to send message(s), the count of messages/files * num-produce should below than 1000.")
    private int numTimesProduce = 1;
    @Parameter(names={"-r", "--rate"}, description="Rate (in msg/sec) at which to produce, value 0 means to produce messages as fast as possible.")
    private double publishRate = 0.0;
    @Parameter(names={"-db", "--disable-batching"}, description="Disable batch sending of messages")
    private boolean disableBatching = false;
    @Parameter(names={"-c", "--chunking"}, description="Should split the message and publish in chunks if message size is larger than allowed max size")
    private boolean chunkingAllowed = false;
    @Parameter(names={"-s", "--separator"}, description="Character to split messages string on default is comma")
    private String separator = ",";
    @Parameter(names={"-p", "--properties"}, description="Properties to add, Comma separated key=value string, like k1=v1,k2=v2.")
    private List<String> properties = new ArrayList<String>();
    @Parameter(names={"-k", "--key"}, description="message key to add ")
    private String key;
    @Parameter(names={"-vs", "--value-schema"}, description="Schema type (can be bytes,avro,json,string...)")
    private String valueSchema = "bytes";
    @Parameter(names={"-ks", "--key-schema"}, description="Schema type (can be bytes,avro,json,string...)")
    private String keySchema = "string";
    @Parameter(names={"-kvet", "--key-value-encoding-type"}, description="Key Value Encoding Type (it can be separated or inline)")
    private String keyValueEncodingType = null;
    @Parameter(names={"-ekn", "--encryption-key-name"}, description="The public key name to encrypt payload")
    private String encKeyName = null;
    @Parameter(names={"-ekv", "--encryption-key-value"}, description="The URI of public key to encrypt payload, for example file:///path/to/public.key or data:application/x-pem-file;base64,*****")
    private String encKeyValue = null;
    @Parameter(names={"-dr", "--disable-replication"}, description="Disable geo-replication for messages.")
    private boolean disableReplication = false;
    private ClientBuilder clientBuilder;
    private Authentication authentication;
    private String serviceURL;

    public void updateConfig(ClientBuilder newBuilder, Authentication authentication, String serviceURL) {
        this.clientBuilder = newBuilder;
        this.authentication = authentication;
        this.serviceURL = serviceURL;
    }

    private List<byte[]> generateMessageBodies(List<String> stringMessages, List<String> messageFileNames) {
        ArrayList<byte[]> messageBodies = new ArrayList<byte[]>();
        for (String m : stringMessages) {
            messageBodies.add(m.getBytes());
        }
        try {
            for (String filename : messageFileNames) {
                byte[] fileBytes = Files.readAllBytes(Paths.get(filename, new String[0]));
                messageBodies.add(fileBytes);
            }
        }
        catch (Exception e) {
            LOG.error(e.getMessage(), (Throwable)e);
        }
        return messageBodies;
    }

    public int run() throws PulsarClientException {
        if (this.mainOptions.size() != 1) {
            throw new ParameterException("Please provide one and only one topic name.");
        }
        if (this.numTimesProduce <= 0) {
            throw new ParameterException("Number of times need to be positive number.");
        }
        if (this.messages.size() > 0) {
            this.messages = Collections.unmodifiableList(Arrays.asList(this.messages.get(0).split(this.separator)));
        }
        if (this.messages.size() == 0 && this.messageFileNames.size() == 0) {
            throw new ParameterException("Please supply message content with either --messages or --files");
        }
        if (this.keyValueEncodingType == null) {
            this.keyValueEncodingType = KEY_VALUE_ENCODING_TYPE_NOT_SET;
        } else {
            switch (this.keyValueEncodingType) {
                case "separated": 
                case "inline": {
                    break;
                }
                default: {
                    throw new ParameterException("--key-value-encoding-type " + this.keyValueEncodingType + " is not valid, only 'separated' or 'inline'");
                }
            }
        }
        int totalMessages = (this.messages.size() + this.messageFileNames.size()) * this.numTimesProduce;
        if (totalMessages > 1000) {
            String msg = "Attempting to send " + totalMessages + " messages. Please do not send more than 1000 messages";
            throw new ParameterException(msg);
        }
        String topic = this.mainOptions.get(0);
        if (this.serviceURL.startsWith("ws")) {
            return this.publishToWebSocket(topic);
        }
        return this.publish(topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int publish(String topic) {
        int numMessagesSent = 0;
        int returnCode = 0;
        try (PulsarClient client = this.clientBuilder.build();){
            Schema<?> schema = CmdProduce.buildSchema(this.keySchema, this.valueSchema, this.keyValueEncodingType);
            ProducerBuilder producerBuilder = client.newProducer(schema).topic(topic);
            if (this.chunkingAllowed) {
                producerBuilder.enableChunking(true);
                producerBuilder.enableBatching(false);
            } else if (this.disableBatching) {
                producerBuilder.enableBatching(false);
            }
            if (StringUtils.isNotBlank((CharSequence)this.encKeyName) && StringUtils.isNotBlank((CharSequence)this.encKeyValue)) {
                producerBuilder.addEncryptionKey(this.encKeyName);
                producerBuilder.defaultCryptoKeyReader(this.encKeyValue);
            }
            try (Producer producer = producerBuilder.create();){
                List<byte[]> messageBodies = this.generateMessageBodies(this.messages, this.messageFileNames);
                RateLimiter limiter = this.publishRate > 0.0 ? RateLimiter.create((double)this.publishRate) : null;
                HashMap<String, String> kvMap = new HashMap<String, String>();
                for (String property : this.properties) {
                    String[] kv = property.split("=");
                    kvMap.put(kv[0], kv[1]);
                }
                for (int i = 0; i < this.numTimesProduce; ++i) {
                    for (byte[] content : messageBodies) {
                        if (limiter != null) {
                            limiter.acquire();
                        }
                        TypedMessageBuilder message = producer.newMessage();
                        if (!kvMap.isEmpty()) {
                            message.properties(kvMap);
                        }
                        switch (this.keyValueEncodingType) {
                            case "": {
                                if (this.key != null && !this.key.isEmpty()) {
                                    message.key(this.key);
                                }
                                message.value((Object)content);
                                break;
                            }
                            case "separated": 
                            case "inline": {
                                KeyValue kv = new KeyValue(this.key != null ? this.key.getBytes(StandardCharsets.UTF_8) : null, (Object)content);
                                message.value((Object)kv);
                                break;
                            }
                            default: {
                                throw new IllegalStateException();
                            }
                        }
                        if (this.disableReplication) {
                            message.disableReplication();
                        }
                        message.send();
                        ++numMessagesSent;
                    }
                }
            }
        }
        catch (Exception e) {
            LOG.error("Error while producing messages");
            LOG.error(e.getMessage(), (Throwable)e);
            returnCode = -1;
        }
        finally {
            LOG.info("{} messages successfully produced", (Object)numMessagesSent);
        }
        return returnCode;
    }

    static Schema<?> buildSchema(String keySchema, String schema, String keyValueEncodingType) {
        switch (keyValueEncodingType) {
            case "": {
                return CmdProduce.buildComponentSchema(schema);
            }
            case "separated": {
                return Schema.KeyValue(CmdProduce.buildComponentSchema(keySchema), CmdProduce.buildComponentSchema(schema), (KeyValueEncodingType)KeyValueEncodingType.SEPARATED);
            }
            case "inline": {
                return Schema.KeyValue(CmdProduce.buildComponentSchema(keySchema), CmdProduce.buildComponentSchema(schema), (KeyValueEncodingType)KeyValueEncodingType.INLINE);
            }
        }
        throw new IllegalArgumentException("Invalid KeyValueEncodingType " + keyValueEncodingType + ", only: 'none','separated' and 'inline");
    }

    private static Schema<?> buildComponentSchema(String schema) {
        Schema<?> base;
        switch (schema) {
            case "string": {
                base = Schema.STRING;
                break;
            }
            case "bytes": {
                return Schema.BYTES;
            }
            default: {
                if (schema.startsWith("avro:")) {
                    base = CmdProduce.buildGenericSchema(SchemaType.AVRO, schema.substring(5));
                    break;
                }
                if (schema.startsWith("json:")) {
                    base = CmdProduce.buildGenericSchema(SchemaType.JSON, schema.substring(5));
                    break;
                }
                throw new IllegalArgumentException("Invalid schema type: " + schema);
            }
        }
        return Schema.AUTO_PRODUCE_BYTES((Schema)base);
    }

    private static Schema<?> buildGenericSchema(SchemaType type, String definition) {
        return Schema.generic((SchemaInfo)SchemaInfoImpl.builder().schema(definition.getBytes(StandardCharsets.UTF_8)).name("client").properties(new HashMap()).type(type).build());
    }

    @VisibleForTesting
    public String getWebSocketProduceUri(String topic) {
        String serviceURLWithoutTrailingSlash = this.serviceURL.substring(0, this.serviceURL.endsWith("/") ? this.serviceURL.length() - 1 : this.serviceURL.length());
        TopicName topicName = TopicName.get((String)topic);
        String wsTopic = topicName.isV2() ? String.format("%s/%s/%s/%s", topicName.getDomain(), topicName.getTenant(), topicName.getNamespacePortion(), topicName.getLocalName()) : String.format("%s/%s/%s/%s/%s", topicName.getDomain(), topicName.getTenant(), topicName.getCluster(), topicName.getNamespacePortion(), topicName.getLocalName());
        String uriFormat = "%s/ws" + (topicName.isV2() ? "/v2/" : "/") + "producer/%s";
        return String.format(uriFormat, serviceURLWithoutTrailingSlash, wsTopic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int publishToWebSocket(String topic) {
        int numMessagesSent = 0;
        int returnCode = 0;
        URI produceUri = URI.create(this.getWebSocketProduceUri(topic));
        WebSocketClient produceClient = new WebSocketClient(new SslContextFactory(true));
        ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
        try {
            if (this.authentication != null) {
                this.authentication.start();
                AuthenticationDataProvider authData = this.authentication.getAuthData();
                if (authData.hasDataForHttp()) {
                    for (Map.Entry kv : authData.getHttpHeaders()) {
                        produceRequest.setHeader((String)kv.getKey(), (String)kv.getValue());
                    }
                }
            }
        }
        catch (Exception e) {
            LOG.error("Authentication plugin error: " + e.getMessage());
            return -1;
        }
        CompletableFuture<Void> connected = new CompletableFuture<Void>();
        ProducerSocket produceSocket = new ProducerSocket(connected);
        try {
            produceClient.start();
        }
        catch (Exception e) {
            LOG.error("Failed to start websocket-client", (Throwable)e);
            return -1;
        }
        try {
            LOG.info("Trying to create websocket session.. on {},{}", (Object)produceUri, (Object)produceRequest);
            produceClient.connect((Object)produceSocket, produceUri, produceRequest);
            connected.get();
        }
        catch (Exception e) {
            LOG.error("Failed to create web-socket session", (Throwable)e);
            return -1;
        }
        try {
            List<byte[]> messageBodies = this.generateMessageBodies(this.messages, this.messageFileNames);
            RateLimiter limiter = this.publishRate > 0.0 ? RateLimiter.create((double)this.publishRate) : null;
            for (int i = 0; i < this.numTimesProduce; ++i) {
                int index = i * 10;
                for (byte[] content : messageBodies) {
                    if (limiter != null) {
                        limiter.acquire();
                    }
                    produceSocket.send(index++, content).get(30L, TimeUnit.SECONDS);
                    ++numMessagesSent;
                }
            }
            produceSocket.close();
        }
        catch (Exception e) {
            LOG.error("Error while producing messages");
            LOG.error(e.getMessage(), (Throwable)e);
            returnCode = -1;
        }
        finally {
            LOG.info("{} messages successfully produced", (Object)numMessagesSent);
        }
        return returnCode;
    }

    @WebSocket(maxTextMessageSize=65536)
    public static class ProducerSocket {
        private final CountDownLatch closeLatch = new CountDownLatch(1);
        private Session session;
        private CompletableFuture<Void> connected;
        private volatile CompletableFuture<Void> result;

        public ProducerSocket(CompletableFuture<Void> connected) {
            this.connected = connected;
        }

        public CompletableFuture<Void> send(int index, byte[] content) throws Exception {
            this.session.getRemote().sendString(ProducerSocket.getTestJsonPayload(index, content));
            this.result = new CompletableFuture();
            return this.result;
        }

        private static String getTestJsonPayload(int index, byte[] content) throws JsonProcessingException {
            ProducerMessage msg = new ProducerMessage();
            msg.payload = Base64.getEncoder().encodeToString(content);
            msg.key = Integer.toString(index);
            return ObjectMapperFactory.getThreadLocal().writeValueAsString((Object)msg);
        }

        public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedException {
            return this.closeLatch.await(duration, unit);
        }

        @OnWebSocketClose
        public void onClose(int statusCode, String reason) {
            LOG.info("Connection closed: {} - {}", (Object)statusCode, (Object)reason);
            this.session = null;
            this.closeLatch.countDown();
        }

        @OnWebSocketConnect
        public void onConnect(Session session) {
            LOG.info("Got connect: {}", (Object)session);
            this.session = session;
            this.connected.complete(null);
        }

        @OnWebSocketMessage
        public synchronized void onMessage(String msg) throws JsonParseException {
            LOG.info("ack= {}", (Object)msg);
            if (this.result != null) {
                this.result.complete(null);
            }
        }

        public RemoteEndpoint getRemote() {
            return this.session.getRemote();
        }

        public Session getSession() {
            return this.session;
        }

        public void close() {
            this.session.close();
        }
    }
}

