package com.google.cloud.pubsub.v1;

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.rpc.DataLossException;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.SequentialExecutorService;
import com.google.common.truth.Truth;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
/* loaded from: input_file:com/google/cloud/pubsub/v1/PublisherImplTest.class */
public class PublisherImplTest {
    private static final ProjectTopicName TEST_TOPIC = ProjectTopicName.of("test-project", "test-topic");
    private static final ExecutorProvider SINGLE_THREAD_EXECUTOR = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();
    private static final TransportChannelProvider TEST_CHANNEL_PROVIDER = LocalChannelProvider.create("test-server");
    private FakeScheduledExecutorService fakeExecutor;
    private FakePublisherServiceImpl testPublisherServiceImpl;
    private ManagedChannel testChannel;
    private Server testServer;

    @Before
    public void setUp() throws Exception {
        this.testPublisherServiceImpl = new FakePublisherServiceImpl();
        InProcessServerBuilder forName = InProcessServerBuilder.forName("test-server");
        forName.addService(this.testPublisherServiceImpl);
        this.testServer = forName.build();
        this.testChannel = InProcessChannelBuilder.forName("test-server").build();
        this.testServer.start();
        this.fakeExecutor = new FakeScheduledExecutorService();
    }

    @After
    public void tearDown() throws Exception {
        this.testServer.shutdownNow().awaitTermination();
        this.testChannel.shutdown();
    }

    @Test
    public void testPublishByDuration() throws Exception {
        Publisher build = getTestPublisherBuilder().setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setDelayThreshold(Duration.ofSeconds(5L)).setElementCountThreshold(10L).build()).build();
        this.testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2"));
        ApiFuture<String> sendTestMessage = sendTestMessage(build, "A");
        ApiFuture<String> sendTestMessage2 = sendTestMessage(build, "B");
        Assert.assertFalse(sendTestMessage.isDone());
        Assert.assertFalse(sendTestMessage2.isDone());
        this.fakeExecutor.advanceTime(Duration.ofSeconds(10L));
        Assert.assertEquals("1", sendTestMessage.get());
        Assert.assertEquals("2", sendTestMessage2.get());
        Assert.assertEquals(2L, this.testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount());
        shutdownTestPublisher(build);
    }

    @Test
    public void testPublishByNumBatchedMessages() throws Exception {
        Publisher build = getTestPublisherBuilder().setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold(2L).setDelayThreshold(Duration.ofSeconds(100L)).build()).build();
        this.testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")).addPublishResponse(PublishResponse.newBuilder().addMessageIds("3").addMessageIds("4"));
        ApiFuture<String> sendTestMessage = sendTestMessage(build, "A");
        ApiFuture<String> sendTestMessage2 = sendTestMessage(build, "B");
        ApiFuture<String> sendTestMessage3 = sendTestMessage(build, "C");
        Assert.assertEquals("1", sendTestMessage.get());
        Assert.assertEquals("2", sendTestMessage2.get());
        Assert.assertFalse(sendTestMessage3.isDone());
        ApiFuture<String> sendTestMessage4 = sendTestMessage(build, "D");
        Assert.assertEquals("3", sendTestMessage3.get());
        Assert.assertEquals("4", sendTestMessage4.get());
        Assert.assertEquals(2L, this.testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount());
        Assert.assertEquals(2L, this.testPublisherServiceImpl.getCapturedRequests().get(1).getMessagesCount());
        this.fakeExecutor.advanceTime(Duration.ofSeconds(100L));
        shutdownTestPublisher(build);
    }

    @Test
    public void testSinglePublishByNumBytes() throws Exception {
        Publisher build = getTestPublisherBuilder().setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold(2L).setDelayThreshold(Duration.ofSeconds(100L)).build()).build();
        this.testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")).addPublishResponse(PublishResponse.newBuilder().addMessageIds("3").addMessageIds("4"));
        ApiFuture<String> sendTestMessage = sendTestMessage(build, "A");
        ApiFuture<String> sendTestMessage2 = sendTestMessage(build, "B");
        ApiFuture<String> sendTestMessage3 = sendTestMessage(build, "C");
        Assert.assertEquals("1", sendTestMessage.get());
        Assert.assertEquals("2", sendTestMessage2.get());
        Assert.assertFalse(sendTestMessage3.isDone());
        ApiFuture<String> sendTestMessage4 = sendTestMessage(build, "D");
        Assert.assertEquals("3", sendTestMessage3.get());
        Assert.assertEquals("4", sendTestMessage4.get());
        Assert.assertEquals(2L, this.testPublisherServiceImpl.getCapturedRequests().size());
        this.fakeExecutor.advanceTime(Duration.ofSeconds(100L));
        shutdownTestPublisher(build);
    }

    @Test
    public void testPublishByShutdown() throws Exception {
        Publisher build = getTestPublisherBuilder().setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setDelayThreshold(Duration.ofSeconds(100L)).setElementCountThreshold(10L).build()).build();
        this.testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2"));
        ApiFuture<String> sendTestMessage = sendTestMessage(build, "A");
        ApiFuture<String> sendTestMessage2 = sendTestMessage(build, "B");
        build.shutdown();
        Assert.assertTrue(sendTestMessage.isDone());
        Assert.assertTrue(sendTestMessage2.isDone());
        Assert.assertEquals("1", sendTestMessage.get());
        Assert.assertEquals("2", sendTestMessage2.get());
        this.fakeExecutor.advanceTime(Duration.ofSeconds(100L));
        build.awaitTermination(1L, TimeUnit.MINUTES);
    }

    @Test
    public void testPublishMixedSizeAndDuration() throws Exception {
        Publisher build = getTestPublisherBuilder().setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold(2L).setDelayThreshold(Duration.ofSeconds(5L)).build()).build();
        this.testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2"));
        this.testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("3"));
        ApiFuture<String> sendTestMessage = sendTestMessage(build, "A");
        this.fakeExecutor.advanceTime(Duration.ofSeconds(2L));
        Assert.assertFalse(sendTestMessage.isDone());
        ApiFuture<String> sendTestMessage2 = sendTestMessage(build, "B");
        Assert.assertEquals("1", sendTestMessage.get());
        Assert.assertEquals("2", sendTestMessage2.get());
        ApiFuture<String> sendTestMessage3 = sendTestMessage(build, "C");
        Assert.assertFalse(sendTestMessage3.isDone());
        this.fakeExecutor.advanceTime(Duration.ofSeconds(5L));
        Assert.assertEquals("3", sendTestMessage3.get());
        Assert.assertEquals(2L, this.testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount());
        Assert.assertEquals(1L, this.testPublisherServiceImpl.getCapturedRequests().get(1).getMessagesCount());
        shutdownTestPublisher(build);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ApiFuture<String> sendTestMessage(Publisher publisher, String str) {
        return publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(str)).build());
    }

    @Test
    public void testBatchedMessagesWithOrderingKeyByNum() throws Exception {
        Publisher build = getTestPublisherBuilder().setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold(3L).setDelayThreshold(Duration.ofSeconds(100L)).build()).setEnableMessageOrdering(true).build();
        this.testPublisherServiceImpl.setAutoPublishResponse(true);
        ApiFuture<String> sendTestMessageWithOrderingKey = sendTestMessageWithOrderingKey(build, "m1", "OrderA");
        ApiFuture<String> sendTestMessageWithOrderingKey2 = sendTestMessageWithOrderingKey(build, "m2", "OrderB");
        ApiFuture<String> sendTestMessageWithOrderingKey3 = sendTestMessageWithOrderingKey(build, "m3", "OrderA");
        ApiFuture<String> sendTestMessageWithOrderingKey4 = sendTestMessageWithOrderingKey(build, "m4", "OrderB");
        Assert.assertFalse(sendTestMessageWithOrderingKey.isDone());
        Assert.assertFalse(sendTestMessageWithOrderingKey2.isDone());
        Assert.assertFalse(sendTestMessageWithOrderingKey3.isDone());
        Assert.assertFalse(sendTestMessageWithOrderingKey4.isDone());
        ApiFuture<String> sendTestMessageWithOrderingKey5 = sendTestMessageWithOrderingKey(build, "m5", "OrderA");
        Assert.assertTrue(Integer.parseInt((String) sendTestMessageWithOrderingKey.get()) < Integer.parseInt((String) sendTestMessageWithOrderingKey3.get()));
        Assert.assertTrue(Integer.parseInt((String) sendTestMessageWithOrderingKey3.get()) < Integer.parseInt((String) sendTestMessageWithOrderingKey5.get()));
        ApiFuture<String> sendTestMessageWithOrderingKey6 = sendTestMessageWithOrderingKey(build, "m6", "OrderB");
        Assert.assertTrue(Integer.parseInt((String) sendTestMessageWithOrderingKey2.get()) < Integer.parseInt((String) sendTestMessageWithOrderingKey4.get()));
        Assert.assertTrue(Integer.parseInt((String) sendTestMessageWithOrderingKey4.get()) < Integer.parseInt((String) sendTestMessageWithOrderingKey6.get()));
        for (PublishRequest publishRequest : this.testPublisherServiceImpl.getCapturedRequests()) {
            if (publishRequest.getMessagesCount() > 1) {
                String orderingKey = publishRequest.getMessages(0).getOrderingKey();
                Iterator it = publishRequest.getMessagesList().iterator();
                while (it.hasNext()) {
                    Assert.assertEquals(((PubsubMessage) it.next()).getOrderingKey(), orderingKey);
                }
            }
        }
        this.fakeExecutor.advanceTime(Duration.ofSeconds(100L));
        shutdownTestPublisher(build);
    }

    @Test
    public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception {
        Publisher build = getTestPublisherBuilder().setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold(10L).setDelayThreshold(Duration.ofSeconds(100L)).build()).setEnableMessageOrdering(true).build();
        this.testPublisherServiceImpl.setAutoPublishResponse(true);
        this.testPublisherServiceImpl.setExecutor(this.fakeExecutor);
        this.testPublisherServiceImpl.setPublishResponseDelay(Duration.ofSeconds(300L));
        ApiFuture<String> sendTestMessageWithOrderingKey = sendTestMessageWithOrderingKey(build, "m1", "OrderA");
        ApiFuture<String> sendTestMessageWithOrderingKey2 = sendTestMessageWithOrderingKey(build, "m2", "OrderB");
        ApiFuture<String> sendTestMessageWithOrderingKey3 = sendTestMessageWithOrderingKey(build, "m3", "OrderA");
        ApiFuture<String> sendTestMessageWithOrderingKey4 = sendTestMessageWithOrderingKey(build, "m4", "OrderB");
        Assert.assertFalse(sendTestMessageWithOrderingKey.isDone());
        Assert.assertFalse(sendTestMessageWithOrderingKey2.isDone());
        Assert.assertFalse(sendTestMessageWithOrderingKey3.isDone());
        Assert.assertFalse(sendTestMessageWithOrderingKey4.isDone());
        this.fakeExecutor.advanceTime(Duration.ofSeconds(100L));
        this.testPublisherServiceImpl.setPublishResponseDelay(Duration.ZERO);
        ApiFuture<String> sendTestMessageWithOrderingKey5 = sendTestMessageWithOrderingKey(build, "m5", "OrderA");
        this.fakeExecutor.advanceTime(Duration.ofSeconds(100L));
        this.fakeExecutor.advanceTime(Duration.ofSeconds(200L));
        Assert.assertTrue(Integer.parseInt((String) sendTestMessageWithOrderingKey.get()) < Integer.parseInt((String) sendTestMessageWithOrderingKey3.get()));
        Assert.assertTrue(Integer.parseInt((String) sendTestMessageWithOrderingKey2.get()) < Integer.parseInt((String) sendTestMessageWithOrderingKey4.get()));
        Assert.assertTrue(Integer.parseInt((String) sendTestMessageWithOrderingKey3.get()) < Integer.parseInt((String) sendTestMessageWithOrderingKey5.get()));
        for (PublishRequest publishRequest : this.testPublisherServiceImpl.getCapturedRequests()) {
            if (publishRequest.getMessagesCount() > 1) {
                String orderingKey = publishRequest.getMessages(0).getOrderingKey();
                Iterator it = publishRequest.getMessagesList().iterator();
                while (it.hasNext()) {
                    Assert.assertEquals(((PubsubMessage) it.next()).getOrderingKey(), orderingKey);
                }
            }
        }
        shutdownTestPublisher(build);
    }

    @Test
    public void testLargeMessagesDoNotReorderBatches() throws Exception {
        Publisher build = getTestPublisherBuilder().setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold(10L).setRequestByteThreshold(20L).setDelayThreshold(Duration.ofSeconds(100L)).build()).setEnableMessageOrdering(true).build();
        this.testPublisherServiceImpl.setAutoPublishResponse(true);
        ApiFuture<String> sendTestMessageWithOrderingKey = sendTestMessageWithOrderingKey(build, "m1", "OrderA");
        ApiFuture<String> sendTestMessageWithOrderingKey2 = sendTestMessageWithOrderingKey(build, "m2", "OrderB");
        Assert.assertFalse(sendTestMessageWithOrderingKey.isDone());
        Assert.assertFalse(sendTestMessageWithOrderingKey2.isDone());
        Assert.assertTrue(Integer.parseInt((String) sendTestMessageWithOrderingKey2.get()) < Integer.parseInt((String) sendTestMessageWithOrderingKey(build, "VeryLargeMessage", "OrderB").get()));
        this.fakeExecutor.advanceTime(Duration.ofSeconds(100L));
        shutdownTestPublisher(build);
    }

    @Test
    public void testOrderingKeyWhenDisabled_throwsException() throws Exception {
        Publisher build = getTestPublisherBuilder().build();
        try {
            sendTestMessageWithOrderingKey(build, "m1", "orderA");
            Assert.fail("Should have thrown an IllegalStateException");
        } catch (IllegalStateException e) {
        }
        shutdownTestPublisher(build);
    }

    @Test
    public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception {
        Publisher build = getTestPublisherBuilder().setExecutorProvider(SINGLE_THREAD_EXECUTOR).setRetrySettings(Publisher.Builder.DEFAULT_RETRY_SETTINGS.toBuilder().setTotalTimeout(Duration.ofSeconds(10L)).setMaxAttempts(1).build()).setEnableMessageOrdering(true).build();
        this.testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
        this.testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
        this.testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
        this.testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1"));
        Assert.assertEquals("1", sendTestMessageWithOrderingKey(build, "m1", "orderA").get());
        Assert.assertEquals(4L, this.testPublisherServiceImpl.getCapturedRequests().size());
        build.shutdown();
        Assert.assertTrue(build.awaitTermination(1L, TimeUnit.MINUTES));
    }

    @Test
    public void testResumePublish() throws Exception {
        Publisher build = getTestPublisherBuilder().setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold(2L).build()).setEnableMessageOrdering(true).build();
        ApiFuture<String> sendTestMessageWithOrderingKey = sendTestMessageWithOrderingKey(build, "m1", "orderA");
        ApiFuture<String> sendTestMessageWithOrderingKey2 = sendTestMessageWithOrderingKey(build, "m2", "orderA");
        this.fakeExecutor.advanceTime(Duration.ZERO);
        Assert.assertFalse(sendTestMessageWithOrderingKey.isDone());
        Assert.assertFalse(sendTestMessageWithOrderingKey2.isDone());
        this.testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT));
        this.fakeExecutor.advanceTime(Duration.ZERO);
        try {
            sendTestMessageWithOrderingKey.get();
            Assert.fail("This should fail.");
        } catch (ExecutionException e) {
        }
        try {
            sendTestMessageWithOrderingKey2.get();
            Assert.fail("This should fail.");
        } catch (ExecutionException e2) {
        }
        ApiFuture<String> sendTestMessageWithOrderingKey3 = sendTestMessageWithOrderingKey(build, "m3", "orderA");
        ApiFuture<String> sendTestMessageWithOrderingKey4 = sendTestMessageWithOrderingKey(build, "m4", "orderA");
        try {
            sendTestMessageWithOrderingKey3.get();
            Assert.fail("This should fail.");
        } catch (ExecutionException e3) {
            Assert.assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e3.getCause());
        }
        try {
            sendTestMessageWithOrderingKey4.get();
            Assert.fail("This should fail.");
        } catch (ExecutionException e4) {
            Assert.assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e4.getCause());
        }
        ApiFuture<String> sendTestMessageWithOrderingKey5 = sendTestMessageWithOrderingKey(build, "m5", "orderB");
        ApiFuture<String> sendTestMessageWithOrderingKey6 = sendTestMessageWithOrderingKey(build, "m6", "orderB");
        this.testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("5").addMessageIds("6"));
        Assert.assertEquals("5", sendTestMessageWithOrderingKey5.get());
        Assert.assertEquals("6", sendTestMessageWithOrderingKey6.get());
        build.resumePublish("orderA");
        ApiFuture<String> sendTestMessageWithOrderingKey7 = sendTestMessageWithOrderingKey(build, "m7", "orderA");
        ApiFuture<String> sendTestMessageWithOrderingKey8 = sendTestMessageWithOrderingKey(build, "m8", "orderA");
        this.testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("7").addMessageIds("8"));
        Assert.assertEquals("7", sendTestMessageWithOrderingKey7.get());
        Assert.assertEquals("8", sendTestMessageWithOrderingKey8.get());
        shutdownTestPublisher(build);
    }

    @Test
    public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws Exception {
        Publisher build = getTestPublisherBuilder().setExecutorProvider(SINGLE_THREAD_EXECUTOR).setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold(2L).setDelayThreshold(Duration.ofSeconds(500L)).build()).setEnableMessageOrdering(true).build();
        this.testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT));
        ApiFuture<String> sendTestMessageWithOrderingKey = sendTestMessageWithOrderingKey(build, "A", "a");
        ApiFuture<String> sendTestMessageWithOrderingKey2 = sendTestMessageWithOrderingKey(build, "B", "a");
        ApiFuture<String> sendTestMessageWithOrderingKey3 = sendTestMessageWithOrderingKey(build, "C", "a");
        try {
            sendTestMessageWithOrderingKey.get();
            Assert.fail("Should have failed.");
        } catch (ExecutionException e) {
        }
        try {
            sendTestMessageWithOrderingKey2.get();
            Assert.fail("Should have failed.");
        } catch (ExecutionException e2) {
        }
        try {
            sendTestMessageWithOrderingKey3.get();
            Assert.fail("Should have failed.");
        } catch (ExecutionException e3) {
            Assert.assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e3.getCause());
        }
        try {
            sendTestMessageWithOrderingKey(build, "D", "a").get();
            Assert.fail("Should have failed.");
        } catch (ExecutionException e4) {
            Assert.assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e4.getCause());
        }
    }

    private ApiFuture<String> sendTestMessageWithOrderingKey(Publisher publisher, String str, String str2) {
        return publisher.publish(PubsubMessage.newBuilder().setOrderingKey(str2).setData(ByteString.copyFromUtf8(str)).build());
    }

    @Test
    public void testErrorPropagation() throws Exception {
        Publisher build = getTestPublisherBuilder().setExecutorProvider(SINGLE_THREAD_EXECUTOR).setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold(1L).setDelayThreshold(Duration.ofSeconds(5L)).build()).build();
        this.testPublisherServiceImpl.addPublishError(Status.DATA_LOSS.asException());
        try {
            sendTestMessage(build, "A").get();
            Assert.fail("should throw exception");
        } catch (ExecutionException e) {
            Truth.assertThat(e.getCause()).isInstanceOf(DataLossException.class);
        }
    }

    @Test
    public void testPublishFailureRetries() throws Exception {
        Publisher build = getTestPublisherBuilder().setExecutorProvider(SINGLE_THREAD_EXECUTOR).setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold(1L).setDelayThreshold(Duration.ofSeconds(5L)).build()).build();
        this.testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
        this.testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1"));
        Assert.assertEquals("1", sendTestMessage(build, "A").get());
        Assert.assertEquals(2L, this.testPublisherServiceImpl.getCapturedRequests().size());
        shutdownTestPublisher(build);
    }

    @Test(expected = ExecutionException.class)
    public void testPublishFailureRetries_retriesDisabled() throws Exception {
        Publisher build = getTestPublisherBuilder().setExecutorProvider(SINGLE_THREAD_EXECUTOR).setRetrySettings(Publisher.Builder.DEFAULT_RETRY_SETTINGS.toBuilder().setTotalTimeout(Duration.ofSeconds(10L)).setMaxAttempts(1).build()).build();
        this.testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
        try {
            sendTestMessage(build, "A").get();
        } finally {
            Assert.assertSame(Integer.valueOf(this.testPublisherServiceImpl.getCapturedRequests().size()), Integer.valueOf(1));
            shutdownTestPublisher(build);
        }
    }

    @Test
    public void testPublishFailureRetries_maxRetriesSetup() throws Exception {
        Publisher build = getTestPublisherBuilder().setExecutorProvider(SINGLE_THREAD_EXECUTOR).setRetrySettings(Publisher.Builder.DEFAULT_RETRY_SETTINGS.toBuilder().setTotalTimeout(Duration.ofSeconds(10L)).setMaxAttempts(3).build()).build();
        this.testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
        this.testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
        this.testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1"));
        Assert.assertEquals("1", sendTestMessage(build, "A").get());
        Assert.assertEquals(3L, this.testPublisherServiceImpl.getCapturedRequests().size());
        shutdownTestPublisher(build);
    }

    @Test
    public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception {
        Publisher build = getTestPublisherBuilder().setExecutorProvider(SINGLE_THREAD_EXECUTOR).setRetrySettings(Publisher.Builder.DEFAULT_RETRY_SETTINGS.toBuilder().setTotalTimeout(Duration.ofSeconds(10L)).setMaxAttempts(0).build()).build();
        this.testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
        this.testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
        this.testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1"));
        Assert.assertEquals("1", sendTestMessage(build, "A").get());
        Assert.assertEquals(3L, this.testPublisherServiceImpl.getCapturedRequests().size());
        build.shutdown();
        Assert.assertTrue(build.awaitTermination(1L, TimeUnit.MINUTES));
    }

    @Test(expected = ExecutionException.class)
    public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exception {
        Publisher build = getTestPublisherBuilder().setExecutorProvider(SINGLE_THREAD_EXECUTOR).setRetrySettings(Publisher.Builder.DEFAULT_RETRY_SETTINGS.toBuilder().setTotalTimeout(Duration.ofSeconds(10L)).build()).setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold(1L).setDelayThreshold(Duration.ofSeconds(5L)).build()).build();
        this.testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT));
        try {
            sendTestMessage(build, "A").get();
        } finally {
            Assert.assertTrue(this.testPublisherServiceImpl.getCapturedRequests().size() >= 1);
            build.shutdown();
            Assert.assertTrue(build.awaitTermination(1L, TimeUnit.MINUTES));
        }
    }

    @Test
    public void testPublisherGetters() throws Exception {
        Publisher.Builder newBuilder = Publisher.newBuilder(TEST_TOPIC);
        newBuilder.setChannelProvider(FixedTransportChannelProvider.create(GrpcTransportChannel.create(this.testChannel)));
        newBuilder.setExecutorProvider(SINGLE_THREAD_EXECUTOR);
        newBuilder.setBatchingSettings(BatchingSettings.newBuilder().setRequestByteThreshold(10L).setDelayThreshold(Duration.ofMillis(11L)).setElementCountThreshold(12L).build());
        newBuilder.setCredentialsProvider(NoCredentialsProvider.create());
        Publisher build = newBuilder.build();
        Assert.assertEquals(TEST_TOPIC, build.getTopicName());
        Assert.assertEquals(10L, build.getBatchingSettings().getRequestByteThreshold().longValue());
        Assert.assertEquals(Duration.ofMillis(11L), build.getBatchingSettings().getDelayThreshold());
        Assert.assertEquals(12L, build.getBatchingSettings().getElementCountThreshold().longValue());
        build.shutdown();
        Assert.assertTrue(build.awaitTermination(1L, TimeUnit.MINUTES));
    }

    @Test
    public void testBuilderParametersAndDefaults() {
        Publisher.Builder newBuilder = Publisher.newBuilder(TEST_TOPIC);
        Assert.assertEquals(TEST_TOPIC.toString(), newBuilder.topicName);
        Assert.assertEquals(Publisher.Builder.DEFAULT_EXECUTOR_PROVIDER, newBuilder.executorProvider);
        Assert.assertEquals(1000L, newBuilder.batchingSettings.getRequestByteThreshold().longValue());
        Assert.assertEquals(Publisher.Builder.DEFAULT_DELAY_THRESHOLD, newBuilder.batchingSettings.getDelayThreshold());
        Assert.assertEquals(100L, newBuilder.batchingSettings.getElementCountThreshold().longValue());
        Assert.assertEquals(Publisher.Builder.DEFAULT_RETRY_SETTINGS, newBuilder.retrySettings);
    }

    @Test
    public void testBuilderInvalidArguments() {
        Publisher.Builder newBuilder = Publisher.newBuilder(TEST_TOPIC);
        try {
            newBuilder.setChannelProvider((TransportChannelProvider) null);
            Assert.fail("Should have thrown an IllegalArgumentException");
        } catch (NullPointerException e) {
        }
        try {
            newBuilder.setExecutorProvider((ExecutorProvider) null);
            Assert.fail("Should have thrown an IllegalArgumentException");
        } catch (NullPointerException e2) {
        }
        try {
            newBuilder.setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setRequestByteThreshold((Long) null).build());
            Assert.fail("Should have thrown an NullPointerException");
        } catch (NullPointerException e3) {
        }
        try {
            newBuilder.setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setRequestByteThreshold(0L).build());
            Assert.fail("Should have thrown an IllegalArgumentException");
        } catch (IllegalArgumentException e4) {
        }
        try {
            newBuilder.setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setRequestByteThreshold(-1L).build());
            Assert.fail("Should have thrown an IllegalArgumentException");
        } catch (IllegalArgumentException e5) {
        }
        newBuilder.setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setDelayThreshold(Duration.ofMillis(1L)).build());
        try {
            newBuilder.setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setDelayThreshold((Duration) null).build());
            Assert.fail("Should have thrown an NullPointerException");
        } catch (NullPointerException e6) {
        }
        try {
            newBuilder.setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setDelayThreshold(Duration.ofMillis(-1L)).build());
            Assert.fail("Should have thrown an IllegalArgumentException");
        } catch (IllegalArgumentException e7) {
        }
        newBuilder.setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold(1L).build());
        try {
            newBuilder.setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold((Long) null).build());
            Assert.fail("Should have thrown an NullPointerException");
        } catch (NullPointerException e8) {
        }
        try {
            newBuilder.setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold(0L).build());
            Assert.fail("Should have thrown an IllegalArgumentException");
        } catch (IllegalArgumentException e9) {
        }
        try {
            newBuilder.setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold(-1L).build());
            Assert.fail("Should have thrown an IllegalArgumentException");
        } catch (IllegalArgumentException e10) {
        }
        newBuilder.setRetrySettings(Publisher.Builder.DEFAULT_RETRY_SETTINGS.toBuilder().setInitialRpcTimeout(Publisher.Builder.MIN_RPC_TIMEOUT).build());
        try {
            newBuilder.setRetrySettings(Publisher.Builder.DEFAULT_RETRY_SETTINGS.toBuilder().setInitialRpcTimeout(Publisher.Builder.MIN_RPC_TIMEOUT.minusMillis(1L)).build());
            Assert.fail("Should have thrown an IllegalArgumentException");
        } catch (IllegalArgumentException e11) {
        }
        newBuilder.setRetrySettings(Publisher.Builder.DEFAULT_RETRY_SETTINGS.toBuilder().setTotalTimeout(Publisher.Builder.MIN_TOTAL_TIMEOUT).build());
        try {
            newBuilder.setRetrySettings(Publisher.Builder.DEFAULT_RETRY_SETTINGS.toBuilder().setTotalTimeout(Publisher.Builder.MIN_TOTAL_TIMEOUT.minusMillis(1L)).build());
            Assert.fail("Should have thrown an IllegalArgumentException");
        } catch (IllegalArgumentException e12) {
        }
    }

    @Test
    public void testAwaitTermination() throws Exception {
        Publisher build = getTestPublisherBuilder().setExecutorProvider(SINGLE_THREAD_EXECUTOR).setRetrySettings(Publisher.Builder.DEFAULT_RETRY_SETTINGS.toBuilder().setTotalTimeout(Duration.ofSeconds(10L)).setMaxAttempts(0).build()).build();
        sendTestMessage(build, "A");
        build.shutdown();
        Assert.assertTrue(build.awaitTermination(1L, TimeUnit.MINUTES));
    }

    @Test
    public void testShutDown() throws Exception {
        ApiFuture apiFuture = (ApiFuture) EasyMock.mock(ApiFuture.class);
        Publisher publisher = (Publisher) EasyMock.mock(Publisher.class);
        EasyMock.expect(publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("A")).build())).andReturn(apiFuture);
        EasyMock.expect(Boolean.valueOf(publisher.awaitTermination(1L, TimeUnit.MINUTES))).andReturn(true);
        publisher.shutdown();
        EasyMock.expectLastCall().once();
        EasyMock.replay(new Object[]{publisher});
        sendTestMessage(publisher, "A");
        publisher.shutdown();
        Assert.assertTrue(publisher.awaitTermination(1L, TimeUnit.MINUTES));
    }

    @Test
    public void testPublishFlowControl_throwException() throws Exception {
        Publisher build = getTestPublisherBuilder().setExecutorProvider(SINGLE_THREAD_EXECUTOR).setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold(1L).setDelayThreshold(Duration.ofSeconds(5L)).setFlowControlSettings(FlowControlSettings.newBuilder().setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException).setMaxOutstandingElementCount(1L).setMaxOutstandingRequestBytes(10L).build()).build()).build();
        try {
            sendTestMessage(build, "AAAAAAAAAAA").get();
            Assert.fail("Should have thrown an FlowController.MaxOutstandingRequestBytesReachedException");
        } catch (ExecutionException e) {
            Truth.assertThat(e.getCause()).isInstanceOf(FlowController.MaxOutstandingRequestBytesReachedException.class);
        }
        ApiFuture<String> sendTestMessage = sendTestMessage(build, "AAAA");
        try {
            sendTestMessage(build, "AA").get();
            Assert.fail("Should have thrown an FlowController.MaxOutstandingElementCountReachedException");
        } catch (ExecutionException e2) {
            Truth.assertThat(e2.getCause()).isInstanceOf(FlowController.MaxOutstandingElementCountReachedException.class);
        }
        this.testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1"));
        Assert.assertEquals("1", sendTestMessage.get());
        ApiFuture<String> sendTestMessage2 = sendTestMessage(build, "AAAA");
        this.testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("2"));
        Assert.assertEquals("2", sendTestMessage2.get());
    }

    @Test
    public void testPublishFlowControl_throwExceptionWithOrderingKey() throws Exception {
        Publisher build = getTestPublisherBuilder().setExecutorProvider(SINGLE_THREAD_EXECUTOR).setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold(1L).setDelayThreshold(Duration.ofSeconds(5L)).setFlowControlSettings(FlowControlSettings.newBuilder().setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException).setMaxOutstandingElementCount(1L).setMaxOutstandingRequestBytes(10L).build()).build()).setEnableMessageOrdering(true).build();
        try {
            sendTestMessageWithOrderingKey(build, "AAAAAAAAAAA", "a").get();
            Assert.fail("Should have thrown an FlowController.MaxOutstandingRequestBytesReachedException");
        } catch (ExecutionException e) {
            Truth.assertThat(e.getCause()).isInstanceOf(FlowController.MaxOutstandingRequestBytesReachedException.class);
        }
        try {
            sendTestMessageWithOrderingKey(build, "AAAA", "a").get();
            Assert.fail("This should fail.");
        } catch (ExecutionException e2) {
            Assert.assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e2.getCause());
        }
    }

    @Test
    public void testPublishFlowControl_block() throws Exception {
        final Publisher build = getTestPublisherBuilder().setExecutorProvider(SINGLE_THREAD_EXECUTOR).setBatchingSettings(Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder().setElementCountThreshold(1L).setDelayThreshold(Duration.ofSeconds(5L)).setFlowControlSettings(FlowControlSettings.newBuilder().setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block).setMaxOutstandingElementCount(2L).setMaxOutstandingRequestBytes(10L).build()).build()).build();
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(10);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        newScheduledThreadPool.execute(new Runnable() { // from class: com.google.cloud.pubsub.v1.PublisherImplTest.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    countDownLatch.await();
                    PublisherImplTest.this.testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1"));
                    countDownLatch2.countDown();
                    countDownLatch3.await();
                    PublisherImplTest.this.testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("2"));
                } catch (Exception e) {
                }
            }
        });
        sendTestMessage(build, "AA");
        sendTestMessage(build, "AA");
        final CountDownLatch countDownLatch4 = new CountDownLatch(1);
        CountDownLatch countDownLatch5 = new CountDownLatch(1);
        newScheduledThreadPool.execute(new Runnable() { // from class: com.google.cloud.pubsub.v1.PublisherImplTest.2
            @Override // java.lang.Runnable
            public void run() {
                PublisherImplTest.this.sendTestMessage(build, "AAAAAA");
                countDownLatch4.countDown();
            }
        });
        newScheduledThreadPool.execute(new Runnable() { // from class: com.google.cloud.pubsub.v1.PublisherImplTest.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    countDownLatch.countDown();
                    countDownLatch2.await();
                    countDownLatch3.countDown();
                } catch (Exception e) {
                }
            }
        });
        final CountDownLatch countDownLatch6 = new CountDownLatch(1);
        newScheduledThreadPool.execute(new Runnable() { // from class: com.google.cloud.pubsub.v1.PublisherImplTest.4
            @Override // java.lang.Runnable
            public void run() {
                try {
                    countDownLatch4.await();
                    PublisherImplTest.this.sendTestMessage(build, "A");
                    countDownLatch6.countDown();
                } catch (Exception e) {
                }
            }
        });
        countDownLatch4.await();
        this.testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("3"));
        countDownLatch5.countDown();
        countDownLatch6.await();
    }

    private Publisher.Builder getTestPublisherBuilder() {
        return Publisher.newBuilder(TEST_TOPIC).setExecutorProvider(FixedExecutorProvider.create(this.fakeExecutor)).setChannelProvider(FixedTransportChannelProvider.create(GrpcTransportChannel.create(this.testChannel))).setCredentialsProvider(NoCredentialsProvider.create());
    }

    private void shutdownTestPublisher(Publisher publisher) throws InterruptedException {
        publisher.shutdown();
        this.fakeExecutor.advanceTime(Duration.ofSeconds(10L));
        Assert.assertTrue(publisher.awaitTermination(1L, TimeUnit.MINUTES));
    }
}
