/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration;

import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.Provider;
import java.security.Security;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.tests.integration.PulsarContainer;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class SimpleProducerConsumerTest
extends TestRetrySupport {
    private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
    private PulsarContainer pulsarContainer;
    private URI lookupUrl;
    private PulsarClient pulsarClient;

    @BeforeClass(alwaysRun=true)
    public void setup() throws Exception {
        this.incrementSetupNumber();
        Security.addProvider((Provider)new BouncyCastleProvider());
        this.pulsarContainer = new PulsarContainer();
        this.pulsarContainer.start();
        this.pulsarClient = PulsarClient.builder().serviceUrl(this.pulsarContainer.getPlainTextPulsarBrokerUrl()).build();
        this.lookupUrl = new URI(this.pulsarContainer.getPlainTextPulsarBrokerUrl());
        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.pulsarContainer.getPulsarAdminUrl()).build();
        try {
            admin.tenants().createTenant("my-property", new TenantInfo(new HashSet<String>(Arrays.asList("appid1", "appid2")), Collections.singleton("standalone")));
            admin.namespaces().createNamespace("my-property/my-ns");
            admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Collections.singleton("standalone"));
        }
        finally {
            if (Collections.singletonList(admin).get(0) != null) {
                admin.close();
            }
        }
    }

    @AfterClass(alwaysRun=true)
    public void cleanup() throws Exception {
        this.markCurrentSetupNumberCleaned();
        if (this.pulsarClient != null) {
            this.pulsarClient.close();
            this.pulsarClient = null;
        }
        if (this.pulsarContainer != null) {
            this.pulsarContainer.stop();
            this.pulsarContainer.close();
            this.pulsarContainer = null;
        }
    }

    private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
        return PulsarClient.builder().serviceUrl(url).statsInterval((long)intervalInSecs, TimeUnit.SECONDS).build();
    }

    @Test
    public void testRSAEncryption() throws Exception {
        String message;
        int i;
        String topicName = "persistent://my-property/my-ns/myrsa-topic1-" + System.currentTimeMillis();
        int totalMsg = 10;
        HashSet messageSet = new HashSet();
        class EncKeyReader
        implements CryptoKeyReader {
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }
        }
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/my-ns/myrsa-topic1"}).subscriptionName("my-subscriber-name").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).subscribe();
        Consumer normalConsumer = this.pulsarClient.newConsumer().topic(new String[]{topicName}).subscriptionName("my-subscriber-name-normal").subscribe();
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1").addEncryptionKey("client-rsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        Producer producer2 = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1").addEncryptionKey("client-rsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        for (i = 0; i < 10; ++i) {
            message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        for (i = 10; i < 20; ++i) {
            message = "my-message-" + i;
            producer2.send((Object)message.getBytes());
        }
        MessageImpl msg = null;
        msg = (MessageImpl)normalConsumer.receive(500, TimeUnit.MILLISECONDS);
        Assert.assertNull((Object)msg);
        for (int i2 = 0; i2 < 20; ++i2) {
            msg = (MessageImpl)consumer.receive(5, TimeUnit.SECONDS);
            msg.getEncryptionCtx().orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i2;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative((Message)msg);
        consumer.close();
    }

    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage, T expectedMessage) {
        Assert.assertEquals(receivedMessage, expectedMessage, (String)("Received message " + receivedMessage + " did not match the expected message " + expectedMessage));
        Assert.assertTrue((boolean)messagesReceived.add(receivedMessage), (String)("Received duplicate message " + receivedMessage));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRedeliveryOfFailedMessages() throws Exception {
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(this.pulsarContainer.getPlainTextPulsarBrokerUrl()).build();
        try {
            String encryptionKeyName = "client-rsa.pem";
            String encryptionKeyVersion = "1.0";
            final HashMap<String, String> metadata = new HashMap<String, String>();
            metadata.put("version", "1.0");
            String topicName = "persistent://my-property/my-ns/myrsa-topic2";
            class EncKeyReader
            implements CryptoKeyReader {
                EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

                EncKeyReader() {
                }

                public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                    String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                    if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                        try {
                            this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                            this.keyInfo.setMetadata(metadata);
                            return this.keyInfo;
                        }
                        catch (IOException e) {
                            Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                        }
                    } else {
                        Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                    }
                    return null;
                }

                public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                    String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                    if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                        try {
                            this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                            this.keyInfo.setMetadata(metadata);
                            return this.keyInfo;
                        }
                        catch (IOException e) {
                            Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                        }
                    } else {
                        Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                    }
                    return null;
                }
            }
            Producer producer = pulsarClient.newProducer().topic(topicName).addEncryptionKey("client-rsa.pem").compressionType(CompressionType.LZ4).cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
            PulsarClient newPulsarClient = this.newPulsarClient(this.lookupUrl.toString(), 0);
            try {
                Consumer consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName).subscriptionName("my-subscriber-name").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
                PulsarClient newPulsarClient1 = this.newPulsarClient(this.lookupUrl.toString(), 0);
                try {
                    class InvalidKeyReader
                    implements CryptoKeyReader {
                        EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

                        InvalidKeyReader() {
                        }

                        public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                            return null;
                        }

                        public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) {
                            return null;
                        }
                    }
                    Consumer consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName).subscriptionName("my-subscriber-name").cryptoKeyReader((CryptoKeyReader)new InvalidKeyReader()).subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
                    PulsarClient newPulsarClient2 = this.newPulsarClient(this.lookupUrl.toString(), 0);
                    try {
                        int i;
                        Consumer consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName).subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscribe();
                        int numberOfMessages = 100;
                        String message = "my-message";
                        HashSet<String> messages = new HashSet<String>();
                        for (int i2 = 0; i2 < numberOfMessages; ++i2) {
                            producer.send((Object)(message + i2).getBytes());
                        }
                        Message m = consumer2.receive(3, TimeUnit.SECONDS);
                        Assert.assertNull((Object)m);
                        m = consumer3.receive(3, TimeUnit.SECONDS);
                        Assert.assertNull((Object)m);
                        for (i = 0; i < numberOfMessages; ++i) {
                            m = consumer1.receive();
                            messages.add(new String(m.getData()));
                            consumer1.acknowledge(m);
                        }
                        m = consumer2.receive(3, TimeUnit.SECONDS);
                        Assert.assertNull((Object)m);
                        m = consumer3.receive(3, TimeUnit.SECONDS);
                        Assert.assertNull((Object)m);
                        for (i = 0; i < numberOfMessages; ++i) {
                            Assert.assertTrue((boolean)messages.contains(message + i));
                        }
                        consumer1.close();
                        consumer2.close();
                        consumer3.close();
                        newPulsarClient.close();
                        newPulsarClient1.close();
                        newPulsarClient2.close();
                    }
                    finally {
                        if (Collections.singletonList(newPulsarClient2).get(0) != null) {
                            newPulsarClient2.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(newPulsarClient1).get(0) != null) {
                        newPulsarClient1.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(newPulsarClient).get(0) != null) {
                    newPulsarClient.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(pulsarClient).get(0) != null) {
                pulsarClient.close();
            }
        }
    }

    @Test
    public void testEncryptionFailure() throws Exception {
        class EncKeyReader
        implements CryptoKeyReader {
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        log.error("Failed to read certificate from {}", (Object)CERT_FILE_PATH);
                    }
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        log.error("Failed to read certificate from {}", (Object)CERT_FILE_PATH);
                    }
                }
                return null;
            }
        }
        int totalMsg = 10;
        MessageImpl msg = null;
        HashSet messageSet = new HashSet();
        Consumer consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/myenc-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        try {
            this.pulsarClient.newProducer().topic("persistent://my-property/use/myenc-ns/myenc-topic1").addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
            Assert.fail((String)"Producer creation should not suceed if failing to read key");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/use/myenc-ns/myenc-topic1").addEncryptionKey("client-rsa.pem").cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 10; ++i) {
            String message = "my-message-" + i;
            producer.send((Object)message.getBytes());
        }
        msg = (MessageImpl)consumer.receive(5, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg, (String)"Receive should have failed with no keyreader");
        consumer.close();
        consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/myenc-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        int msgNum = 0;
        try {
            msg = (MessageImpl)consumer.receive(5, TimeUnit.SECONDS);
            String receivedMessage = new String(msg.getData());
            String expectedMessage = "my-message-" + msgNum++;
            Assert.assertNotEquals((Object)receivedMessage, (Object)expectedMessage, (String)("Received encrypted message " + receivedMessage + " should not match the expected message " + expectedMessage));
            consumer.acknowledgeCumulative((Message)msg);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"Failed to receive message even after ConsumerCryptoFailureAction.CONSUME is set.");
        }
        consumer.close();
        consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/myenc-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.FAIL).cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        for (int i = msgNum; i < 9; ++i) {
            msg = (MessageImpl)consumer.receive(5, TimeUnit.SECONDS);
            msg.getEncryptionCtx().orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
            String receivedMessage = new String(msg.getData());
            log.debug("Received message: [{}]", (Object)receivedMessage);
            String expectedMessage = "my-message-" + i;
            this.testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
        }
        consumer.acknowledgeCumulative((Message)msg);
        consumer.close();
        consumer.close();
        consumer = this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/myenc-ns/myenc-topic1"}).subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD).acknowledgmentGroupTime(0L, TimeUnit.SECONDS).subscribe();
        msg = (MessageImpl)consumer.receive(5, TimeUnit.SECONDS);
        Assert.assertNull((Object)msg, (String)"Message received even aftet ConsumerCryptoFailureAction.DISCARD is set.");
    }

    @Test
    public void testEncryptionConsumerWithoutCryptoReader() throws Exception {
        String encryptionKeyName = "client-rsa.pem";
        String encryptionKeyVersion = "1.0";
        final HashMap<String, String> metadata = new HashMap<String, String>();
        metadata.put("version", "1.0");
        class EncKeyReader
        implements CryptoKeyReader {
            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            EncKeyReader() {
            }

            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        this.keyInfo.setMetadata(metadata);
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }

            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
                if (Files.isReadable(Paths.get(CERT_FILE_PATH, new String[0]))) {
                    try {
                        this.keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH, new String[0])));
                        this.keyInfo.setMetadata(metadata);
                        return this.keyInfo;
                    }
                    catch (IOException e) {
                        Assert.fail((String)("Failed to read certificate from " + CERT_FILE_PATH));
                    }
                } else {
                    Assert.fail((String)("Certificate file " + CERT_FILE_PATH + " is not present or not readable."));
                }
                return null;
            }
        }
        Producer producer = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic3").addEncryptionKey("client-rsa.pem").compressionType(CompressionType.LZ4).cryptoKeyReader((CryptoKeyReader)new EncKeyReader()).create();
        Consumer consumer = this.pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic3").subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME).subscribe();
        String message = "my-message";
        producer.send((Object)message.getBytes());
        TopicMessageImpl msg = (TopicMessageImpl)consumer.receive(5, TimeUnit.SECONDS);
        String receivedMessage = this.decryptMessage((TopicMessageImpl<byte[]>)msg, "client-rsa.pem", new EncKeyReader());
        Assert.assertEquals((String)message, (String)receivedMessage);
        consumer.close();
    }

    private String decryptMessage(TopicMessageImpl<byte[]> msg, String encryptionKeyName, CryptoKeyReader reader) throws Exception {
        Optional ctx = msg.getEncryptionCtx();
        Assert.assertTrue((boolean)ctx.isPresent());
        EncryptionContext encryptionCtx = (EncryptionContext)ctx.orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
        Map keys = encryptionCtx.getKeys();
        Assert.assertEquals((int)keys.size(), (int)1);
        EncryptionContext.EncryptionKey encryptionKey = (EncryptionContext.EncryptionKey)keys.get(encryptionKeyName);
        byte[] dataKey = encryptionKey.getKeyValue();
        Map metadata = encryptionKey.getMetadata();
        String version = (String)metadata.get("version");
        Assert.assertEquals((String)version, (String)"1.0");
        CompressionType compressionType = encryptionCtx.getCompressionType();
        int uncompressedSize = encryptionCtx.getUncompressedMessageSize();
        byte[] encrParam = encryptionCtx.getParam();
        String encAlgo = encryptionCtx.getAlgorithm();
        int batchSize = encryptionCtx.getBatchSize().orElse(0);
        ByteBuf payloadBuf = Unpooled.wrappedBuffer((byte[])msg.getData());
        MessageCryptoBc crypto = new MessageCryptoBc("test", false);
        MessageMetadata msgMetadata = new MessageMetadata().setEncryptionParam(encrParam).setProducerName("test").setSequenceId(123L).setPublishTime(12333453454L).setCompression(CompressionCodecProvider.convertToWireProtocol((CompressionType)compressionType)).setUncompressedSize(uncompressedSize);
        if (encAlgo != null) {
            msgMetadata.setEncryptionAlgo(encAlgo);
        }
        msgMetadata.addEncryptionKey().setKey(encryptionKeyName).setValue(dataKey);
        ByteBuf decryptedPayload = crypto.decrypt(() -> msgMetadata, payloadBuf, reader);
        CompressionCodec codec = CompressionCodecProvider.getCompressionCodec((CompressionType)compressionType);
        ByteBuf uncompressedPayload = codec.decode(decryptedPayload, uncompressedSize);
        if (batchSize > 0) {
            SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
            uncompressedPayload = Commands.deSerializeSingleMessageInBatch((ByteBuf)uncompressedPayload, (SingleMessageMetadata)singleMessageMetadata, (int)0, (int)batchSize);
        }
        byte[] data = new byte[uncompressedPayload.readableBytes()];
        uncompressedPayload.readBytes(data);
        uncompressedPayload.release();
        return new String(data);
    }
}

