package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.concurrent.TimeUnit;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.kaazing.k3po.junit.annotation.ScriptProperty;
import org.kaazing.k3po.junit.annotation.Specification;
import org.kaazing.k3po.junit.rules.K3poRule;
import org.reaktivity.reaktor.test.ReaktorRule;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/FetchIT.class */
public class FetchIT {
    private final K3poRule k3po = new K3poRule().addScriptRoot("route", "org/reaktivity/specification/nukleus/kafka/control/route.ext").addScriptRoot("routeAnyTopic", "org/reaktivity/specification/nukleus/kafka/control/route").addScriptRoot("server", "org/reaktivity/specification/kafka/fetch.v5").addScriptRoot("metadata", "org/reaktivity/specification/kafka/metadata.v5").addScriptRoot("client", "org/reaktivity/specification/nukleus/kafka/streams/fetch");
    private final TestRule timeout = new DisableOnDebug(new Timeout(10, TimeUnit.SECONDS));
    private final ReaktorRule reaktor;

    @Rule
    public final TestRule chain;

    public FetchIT() {
        String str = "kafka";
        this.reaktor = new ReaktorRule().nukleus((v1) -> {
            return r2.equals(v1);
        }).directory("target/nukleus-itests").commandBufferCapacity(1024).responseBufferCapacity(1024).counterValuesBufferCapacity(1024).clean();
        this.chain = RuleChain.outerRule(this.reaktor).around(this.k3po).around(this.timeout);
    }

    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    @Test
    @Specification({"${route}/client/controller", "${client}/begin.ext.missing/client"})
    @Ignore("BEGIN vs RESET read order not yet guaranteed to match write order")
    public void shouldRejectWhenBeginExtMissing() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${routeAnyTopic}/client/controller", "${client}/invalid.topic.name/client", "${metadata}/one.topic.error.invalid.topic/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldRejectWhenTopicNameContainsDisallowedCharacters() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/invalid.fetch.key.and.multiple.offsets/client"})
    public void shouldRejectInvalidBeginExWithFetchKeyAndMultipleOffsets() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/invalid.missing.fetch.key/client"})
    public void shouldRejectInvalidBeginExWithMissingFetchKey() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/invalid.more.than.one.fetch.key.hash/client"})
    public void shouldRejectInvalidBeginExWithMoreThanOneFetchKeyHash() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/topic.name.not.equals.route.ext/client"})
    @ScriptProperty({"routedTopicName \"not_test\""})
    public void shouldRejectTopicNameNotEqualToRoutedTopic() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${routeAnyTopic}/client/controller", "${client}/unknown.topic.name/client", "${metadata}/one.topic.error.unknown.topic/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldRejectWhenTopicIsUnknown() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fanout.with.historical.message/client", "${server}/fanout.with.historical.message/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldFanoutUsingHistoricalConnection() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fanout.with.historical.messages/client", "${server}/fanout.with.historical.messages/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldFanoutDiscardingHistoricalMessageToJoinLiveStream() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fanout.with.slow.consumer/client", "${server}/fanout.with.slow.consumer/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldFanoutWithSlowConsumer() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.and.hash.code.picks.partition.zero/client", "${server}/fetch.key.and.hash.code.picks.partition.zero/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessageUsingFetchKeyAndExplicitHashCode() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.and.no.key.messages/client", "${server}/fetch.key.and.no.key.multiple.partitions/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessageOnSubscribesWithAndWithoutKeyFromMultiplePartitions() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.and.no.key.unsubscribe/client", "${server}/fetch.key.and.no.key.multiple.partitions.unsubscribe/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldNotGiveAssertionErrorWhenUnsubscribeLeavingConsumerOnAnotherPartition() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_TWO_HAS_WRITTEN_RESET");
        this.k3po.notifyBarrier("CLIENT_TWO_UNSUBSCRIBED");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.default.partioner.picks.partition.one/client", "${server}/fetch.key.default.partioner.picks.partition.one/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessageUsingFetchKeyAndDefaultPartitioner() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.messages/client", "${server}/fetch.key.multiple.matches.flow.controlled/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\"", "applicationConnectWindow 15"})
    public void shouldReceiveMultipleMessagesMatchingFetchKeyFlowControlled() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.message/client", "${server}/fetch.key.multiple.record.batches.first.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessageMatchingFetchKeyWithLastOffsetWithMultipleRecordBatches() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.no.messages/client", "${server}/fetch.key.multiple.record.batches.no.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveNoMessagesMatchingFetchKeyWithMultipleRecordBatches() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.no.messages/client", "${server}/fetch.key.no.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveNoMessagesMatchingFetchKey() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.three.messages/client", "${server}/fetch.key.three.matches.flow.controlled/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\"", "applicationConnectWindow 15"})
    public void shouldReceiveMessagesThreeMatchingFetchKeyFlowControlled() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.nonzero.offset.message/client", "${server}/fetch.key.nonzero.offset.first.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessageMatchingFetchKeyFirstNonZeroOffset() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.message/client", "${server}/fetch.key.zero.offset.first.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessageMatchingFetchKeyFirst() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.no.offsets.message/client", "${server}/fetch.key.zero.offset.first.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessageMatchingFetchKeyFirstWithZeroLengthOffsetsArray() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.message/client", "${server}/fetch.key.zero.offset.last.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessageMatchingFetchKeyLast() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.messages/client", "${server}/fetch.key.zero.offset.multiple.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMultipleMessagesMatchingFetchKey() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/fetch.key.zero.offset.messages.historical/client", "${server}/fetch.key.zero.offset.multiple.matches.historical/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMultipleHistoricalMessagesMatchingFetchKey() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/header.and.fetch.key.zero.offset.message/client", "${server}/header.and.fetch.key.zero.offset.first.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessageMatchingFetchKeyAndHeader() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/header.zero.offset.message/client", "${server}/header.zero.offset.first.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessageMatchingHeaderFirst() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/header.zero.offset.message/client", "${server}/header.zero.offset.last.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessageMatchingHeaderLast() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/header.zero.offset.messages/client", "${server}/header.zero.offset.multiple.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMultipleMessagesMatchingHeader() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/headers.and.fetch.key.zero.offset.message/client", "${server}/headers.and.fetch.key.zero.offset.first.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessageMatchingFetchKeyAndHeaders() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/headers.zero.offset.messages.historical/client", "${server}/headers.zero.offset.multiple.matches.historical/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveHistoricalMessagesMatchingHeaders() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/ktable.message/client", "${server}/ktable.message/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveKtableMessage() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/ktable.messages/client", "${server}/ktable.messages/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveKtableMessages() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/ktable.messages.header.multiple.matches/client", "${server}/ktable.messages.header.multiple.matches/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveKtableMessagesFilteredByHeader() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/ktable.messages.historical/client", "${server}/ktable.messages.historical/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveKTableHistoricalMessages() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/ktable.messages.multiple.nodes/client", "${server}/ktable.messages.multiple.nodes/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveKTableMessagesFromMultipleNodes() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${server}/live.fetch.abort.and.reconnect/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReconnectOnAbortOnLiveFetchConnection() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.message/client", "${server}/live.fetch.reset.reconnect.and.message/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReconnectOnResetOnLiveConnectionAndReceiveMessage() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset/client", "${server}/zero.offset/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldRequestMessagesAtZeroOffset() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.message/client", "${server}/zero.offset.message/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessageAtZeroOffset() throws Exception {
        this.k3po.start();
        this.k3po.notifyBarrier("WRITE_FETCH_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/no.offsets.message/client", "${server}/zero.offset.message/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessageAtZeroOffsetWhenEmptyOffsetsArray() throws Exception {
        this.k3po.start();
        this.k3po.notifyBarrier("WRITE_FETCH_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.and.reset/client", "${server}/zero.offset.message/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldHandleFetchResponseAfterUnsubscribe() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("SUBSCRIBED");
        this.k3po.notifyBarrier("WRITE_FETCH_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.and.reset/client", "${server}/zero.offset.messages.multiple.partitions/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldHandleFetchResponseMultiplePartitionsAfterUnsubscribe() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("SUBSCRIBED");
        this.k3po.notifyBarrier("WRITE_FETCH_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.message/client", "${server}/zero.offset.message.single.partition.multiple.nodes/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldNotFetchFromNodeWithNoPartitions() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.messages/client", "${server}/zero.offset.messages/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessagesAtZeroOffset() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.messagesets/client", "${server}/zero.offset.messagesets/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessageSetsAtZeroOffset() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.messages.fanout/client", "${server}/zero.offset.messages.fanout/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldFanoutMessagesAtZeroOffset() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_ONE_CONNECTED");
        this.k3po.awaitBarrier("CLIENT_TWO_CONNECTED");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.messages.multiple.partitions/client", "${server}/zero.offset.messages.multiple.nodes/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessagesFromMultipleNodes() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.messages.multiple.partitions/client", "${server}/zero.offset.messages.multiple.partitions/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessagesFromMultiplePartitions() throws Exception {
        this.k3po.start();
        this.k3po.notifyBarrier("WRITE_FETCH_RESPONSE");
        this.k3po.finish();
    }

    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.messages.multiple.partitions.partition.1/client", "${server}/zero.offset.messages.multiple.partitions.partition.1/server"})
    @Ignore("order of RESET vs BEGIN/DATA/ABORT/END not guaranteed")
    public void shouldCleanUpStateWhenUnsubscribeAfterReceiveMessageFromSecondPartition() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/zero.offset.messagesets.fanout/client", "${server}/zero.offset.messagesets.fanout/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldFanoutMessageSetsAtZeroOffset() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_ONE_CONNECTED");
        this.k3po.awaitBarrier("CLIENT_TWO_CONNECTED");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_ONE");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_TWO");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/nonzero.offset/client", "${server}/nonzero.offset/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldRequestMessagesAtNonZeroOffset() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/nonzero.offset.message/client", "${server}/nonzero.offset.message/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessageAtNonZeroOffset() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/nonzero.offset.messages/client", "${server}/nonzero.offset.messages/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldReceiveMessagesAtNonZeroOffset() throws Exception {
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/distinct.offset.messagesets.fanout/client", "${server}/distinct.offset.messagesets.fanout/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldFanoutMessageSetsAtDistinctOffsets() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_ONE_CONNECTED");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_ONE");
        this.k3po.awaitBarrier("CLIENT_TWO_CONNECTED");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_TWO");
        this.k3po.notifyBarrier("SERVER_DELIVER_HISTORICAL_RESPONSE");
        this.k3po.awaitBarrier("CLIENT_ONE_RECEIVED_THIRD_MESSAGE");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_THREE");
        this.k3po.finish();
    }

    @Test
    @Specification({"${route}/client/controller", "${client}/distinct.offset.messagesets.fanout/client", "${server}/distinct.offset.messagesets.fanout/server"})
    @ScriptProperty({"networkAccept \"nukleus://target/streams/kafka\""})
    public void shouldFanoutMessageSetsAtDistinctOffsetsWithoutDeliveringLiveMessageOutOfOrder() throws Exception {
        this.k3po.start();
        this.k3po.awaitBarrier("CLIENT_ONE_CONNECTED");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_ONE");
        this.k3po.awaitBarrier("CLIENT_TWO_CONNECTED");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_TWO");
        this.k3po.notifyBarrier("SERVER_DELIVER_RESPONSE_THREE");
        this.k3po.awaitBarrier("CLIENT_ONE_RECEIVED_THIRD_MESSAGE");
        this.k3po.notifyBarrier("SERVER_DELIVER_HISTORICAL_RESPONSE");
        this.k3po.finish();
    }
}
