/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.catchup.storecopy;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.compress.utils.Charsets;
import org.eclipse.collections.api.set.primitive.LongSet;
import org.eclipse.collections.impl.factory.primitive.LongSets;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchupAddressProvider;
import org.neo4j.causalclustering.catchup.CatchupAddressResolutionException;
import org.neo4j.causalclustering.catchup.CatchupClientBuilder;
import org.neo4j.causalclustering.catchup.CatchupServerBuilder;
import org.neo4j.causalclustering.catchup.CatchupServerHandler;
import org.neo4j.causalclustering.catchup.CatchupServerProtocol;
import org.neo4j.causalclustering.catchup.ResponseMessageType;
import org.neo4j.causalclustering.catchup.storecopy.FakeFile;
import org.neo4j.causalclustering.catchup.storecopy.FileHeader;
import org.neo4j.causalclustering.catchup.storecopy.FileSender;
import org.neo4j.causalclustering.catchup.storecopy.GetIndexFilesRequest;
import org.neo4j.causalclustering.catchup.storecopy.GetStoreFileRequest;
import org.neo4j.causalclustering.catchup.storecopy.InMemoryStoreStreamProvider;
import org.neo4j.causalclustering.catchup.storecopy.PrepareStoreCopyRequest;
import org.neo4j.causalclustering.catchup.storecopy.PrepareStoreCopyResponse;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyClient;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFinishedResponse;
import org.neo4j.causalclustering.catchup.storecopy.StoreFileStreamProvider;
import org.neo4j.causalclustering.catchup.storecopy.StoreFileStreamingProtocol;
import org.neo4j.causalclustering.catchup.storecopy.StoreResource;
import org.neo4j.causalclustering.catchup.storecopy.StreamToDiskProvider;
import org.neo4j.causalclustering.catchup.storecopy.TerminationCondition;
import org.neo4j.causalclustering.catchup.storecopy.TestCatchupServerHandler;
import org.neo4j.causalclustering.helper.ConstantTimeTimeoutStrategy;
import org.neo4j.causalclustering.helper.TimeoutStrategy;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.net.Server;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.DuplicatingLogProvider;
import org.neo4j.logging.FormattedLogProvider;
import org.neo4j.logging.Level;
import org.neo4j.logging.LogProvider;
import org.neo4j.ports.allocation.PortAuthority;
import org.neo4j.test.rule.TestDirectory;

public class StoreCopyClientIT {
    private FileSystemAbstraction fsa = new DefaultFileSystemAbstraction();
    private final AssertableLogProvider assertableLogProvider = new AssertableLogProvider(true);
    private final LogProvider logProvider = new DuplicatingLogProvider(new LogProvider[]{this.assertableLogProvider, FormattedLogProvider.withDefaultLogLevel((Level)Level.DEBUG).toOutputStream((OutputStream)System.out)});
    private final TerminationCondition defaultTerminationCondition = TerminationCondition.CONTINUE_INDEFINITELY;
    @Rule
    public TestDirectory testDirectory = TestDirectory.testDirectory((FileSystemAbstraction)this.fsa);
    private StoreCopyClient subject;
    private FakeFile fileA = new FakeFile("fileA", "This is file a content");
    private FakeFile fileB = new FakeFile("another-file-b", "Totally different content 123");
    private FakeFile indexFileA = new FakeFile("lucene", "Lucene 123");
    private Server catchupServer;
    private TestCatchupServerHandler serverHandler;
    private File targetLocation = new File("copyTargetLocation");

    private static void writeContents(FileSystemAbstraction fileSystemAbstraction, File file, String contents) {
        byte[] bytes = contents.getBytes();
        try (StoreChannel storeChannel = fileSystemAbstraction.create(file);){
            storeChannel.write(ByteBuffer.wrap(bytes));
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Before
    public void setup() throws Throwable {
        this.serverHandler = new TestCatchupServerHandler(this.logProvider, this.testDirectory, this.fsa);
        this.serverHandler.addFile(this.fileA);
        this.serverHandler.addFile(this.fileB);
        this.serverHandler.addIndexFile(this.indexFileA);
        StoreCopyClientIT.writeContents(this.fsa, this.relative(this.fileA.getFilename()), this.fileA.getContent());
        StoreCopyClientIT.writeContents(this.fsa, this.relative(this.fileB.getFilename()), this.fileB.getContent());
        StoreCopyClientIT.writeContents(this.fsa, this.relative(this.indexFileA.getFilename()), this.indexFileA.getContent());
        ListenSocketAddress listenAddress = new ListenSocketAddress("localhost", PortAuthority.allocatePort());
        this.catchupServer = new CatchupServerBuilder((CatchupServerHandler)this.serverHandler).listenAddress(listenAddress).build();
        this.catchupServer.start();
        CatchUpClient catchUpClient = new CatchupClientBuilder().build();
        catchUpClient.start();
        ConstantTimeTimeoutStrategy storeCopyBackoffStrategy = new ConstantTimeTimeoutStrategy(1L, TimeUnit.MILLISECONDS);
        Monitors monitors = new Monitors();
        this.subject = new StoreCopyClient(catchUpClient, monitors, this.logProvider, (TimeoutStrategy)storeCopyBackoffStrategy);
    }

    @After
    public void shutdown() throws Throwable {
        this.catchupServer.stop();
    }

    @Test
    public void canPerformCatchup() throws StoreCopyFailedException, IOException {
        InMemoryStoreStreamProvider storeFileStream = new InMemoryStoreStreamProvider();
        CatchupAddressProvider catchupAddressProvider = CatchupAddressProvider.fromSingleAddress((AdvertisedSocketAddress)StoreCopyClientIT.from(this.catchupServer.address().getPort()));
        this.subject.copyStoreFiles(catchupAddressProvider, this.serverHandler.getStoreId(), (StoreFileStreamProvider)storeFileStream, () -> this.defaultTerminationCondition, this.targetLocation);
        HashSet<String> expectedFiles = new HashSet<String>(Arrays.asList(this.fileA.getFilename(), this.fileB.getFilename(), this.indexFileA.getFilename()));
        Assert.assertEquals(expectedFiles, storeFileStream.fileStreams().keySet());
        Assert.assertEquals((Object)this.fileContent(this.relative(this.fileA.getFilename())), (Object)this.clientFileContents(storeFileStream, this.fileA.getFilename()));
        Assert.assertEquals((Object)this.fileContent(this.relative(this.fileB.getFilename())), (Object)this.clientFileContents(storeFileStream, this.fileB.getFilename()));
    }

    @Test
    public void failedFileCopyShouldRetry() throws StoreCopyFailedException, IOException {
        this.fileB.setRemainingFailed(2);
        InMemoryStoreStreamProvider clientStoreFileStream = new InMemoryStoreStreamProvider();
        CatchupAddressProvider catchupAddressProvider = CatchupAddressProvider.fromSingleAddress((AdvertisedSocketAddress)StoreCopyClientIT.from(this.catchupServer.address().getPort()));
        this.subject.copyStoreFiles(catchupAddressProvider, this.serverHandler.getStoreId(), (StoreFileStreamProvider)clientStoreFileStream, () -> this.defaultTerminationCondition, this.targetLocation);
        HashSet<String> expectedFiles = new HashSet<String>(Arrays.asList(this.fileA.getFilename(), this.fileB.getFilename(), this.indexFileA.getFilename()));
        Assert.assertEquals(expectedFiles, clientStoreFileStream.fileStreams().keySet());
        Assert.assertEquals((Object)this.fileContent(this.relative(this.fileA.getFilename())), (Object)this.clientFileContents(clientStoreFileStream, this.fileA.getFilename()));
        Assert.assertEquals((Object)this.fileContent(this.relative(this.fileB.getFilename())), (Object)this.clientFileContents(clientStoreFileStream, this.fileB.getFilename()));
        Assert.assertEquals((long)3L, (long)this.serverHandler.getRequestCount(this.fileB.getFilename()));
        Assert.assertEquals((long)1L, (long)this.serverHandler.getRequestCount(this.fileA.getFilename()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldNotAppendToFileWhenRetryingWithNewFile() throws Throwable {
        final String fileName = "foo";
        final String copyFileName = "bar";
        String unfinishedContent = "abcd";
        String finishedContent = "abcdefgh";
        final Iterator contents = Iterators.iterator((Object[])new String[]{unfinishedContent, finishedContent});
        TestCatchupServerHandler halfWayFailingServerhandler = new TestCatchupServerHandler(this.logProvider, this.testDirectory, this.fsa){

            @Override
            public ChannelHandler getStoreFileRequestHandler(final CatchupServerProtocol catchupServerProtocol) {
                return new SimpleChannelInboundHandler<GetStoreFileRequest>(){

                    protected void channelRead0(ChannelHandlerContext ctx, GetStoreFileRequest msg) {
                        File file = new File(fileName);
                        File fileCopy = new File(copyFileName);
                        String thisConent = (String)contents.next();
                        StoreCopyClientIT.writeContents(StoreCopyClientIT.this.fsa, file, thisConent);
                        StoreCopyClientIT.writeContents(StoreCopyClientIT.this.fsa, fileCopy, thisConent);
                        this.sendFile(ctx, file);
                        this.sendFile(ctx, fileCopy);
                        StoreCopyFinishedResponse.Status status = contents.hasNext() ? StoreCopyFinishedResponse.Status.E_UNKNOWN : StoreCopyFinishedResponse.Status.SUCCESS;
                        new StoreFileStreamingProtocol().end(ctx, status);
                        catchupServerProtocol.expect((Enum)CatchupServerProtocol.State.MESSAGE_TYPE);
                    }

                    private void sendFile(ChannelHandlerContext ctx, File file) {
                        ctx.write((Object)ResponseMessageType.FILE);
                        ctx.write((Object)new FileHeader(file.getName()));
                        ctx.writeAndFlush((Object)new FileSender(new StoreResource(file, file.getName(), 16, StoreCopyClientIT.this.fsa))).addListener(future -> StoreCopyClientIT.this.fsa.deleteFile(file));
                    }
                };
            }

            @Override
            public ChannelHandler storeListingRequestHandler(final CatchupServerProtocol catchupServerProtocol) {
                return new SimpleChannelInboundHandler<PrepareStoreCopyRequest>(){

                    protected void channelRead0(ChannelHandlerContext ctx, PrepareStoreCopyRequest msg) {
                        ctx.write((Object)ResponseMessageType.PREPARE_STORE_COPY_RESPONSE);
                        ctx.writeAndFlush((Object)PrepareStoreCopyResponse.success((File[])new File[]{new File(fileName)}, (LongSet)LongSets.immutable.empty(), (long)1L));
                        catchupServerProtocol.expect((Enum)CatchupServerProtocol.State.MESSAGE_TYPE);
                    }
                };
            }

            @Override
            public ChannelHandler getIndexSnapshotRequestHandler(CatchupServerProtocol catchupServerProtocol) {
                return new SimpleChannelInboundHandler<GetIndexFilesRequest>(){

                    protected void channelRead0(ChannelHandlerContext ctx, GetIndexFilesRequest msg) {
                        throw new IllegalStateException("There should not be any index requests");
                    }
                };
            }
        };
        Server halfWayFailingServer = null;
        try {
            ListenSocketAddress listenAddress = new ListenSocketAddress("localhost", PortAuthority.allocatePort());
            halfWayFailingServer = new CatchupServerBuilder((CatchupServerHandler)halfWayFailingServerhandler).listenAddress(listenAddress).build();
            halfWayFailingServer.start();
            CatchupAddressProvider addressProvider = CatchupAddressProvider.fromSingleAddress((AdvertisedSocketAddress)new AdvertisedSocketAddress(listenAddress.getHostname(), listenAddress.getPort()));
            StoreId storeId = halfWayFailingServerhandler.getStoreId();
            File storeDir = this.testDirectory.makeGraphDbDir();
            StreamToDiskProvider streamToDiskProvider = new StreamToDiskProvider(storeDir, this.fsa, new Monitors());
            this.subject.copyStoreFiles(addressProvider, storeId, (StoreFileStreamProvider)streamToDiskProvider, () -> this.defaultTerminationCondition, this.targetLocation);
            Assert.assertEquals((Object)this.fileContent(new File(storeDir, fileName)), (Object)finishedContent);
            File fileCopy = new File(storeDir, copyFileName);
            ByteBuffer buffer = ByteBuffer.wrap(new byte[finishedContent.length()]);
            try (StoreChannel storeChannel = this.fsa.create(fileCopy);){
                storeChannel.read(buffer);
            }
            Assert.assertEquals((Object)finishedContent, (Object)new String(buffer.array(), Charsets.UTF_8));
        }
        finally {
            halfWayFailingServer.stop();
            halfWayFailingServer.shutdown();
        }
    }

    @Test
    public void shouldLogConnetionRefusedMessage() {
        InMemoryStoreStreamProvider clientStoreFileStream = new InMemoryStoreStreamProvider();
        final int port = PortAuthority.allocatePort();
        try {
            this.subject.copyStoreFiles(new CatchupAddressProvider(){

                public AdvertisedSocketAddress primary() {
                    return StoreCopyClientIT.from(StoreCopyClientIT.this.catchupServer.address().getPort());
                }

                public AdvertisedSocketAddress secondary() {
                    return new AdvertisedSocketAddress("localhost", port);
                }
            }, this.serverHandler.getStoreId(), (StoreFileStreamProvider)clientStoreFileStream, () -> new Once(), this.targetLocation);
            Assert.fail();
        }
        catch (StoreCopyFailedException e) {
            this.assertableLogProvider.assertContainsExactlyOneMessageMatching((Matcher)CoreMatchers.both((Matcher)CoreMatchers.startsWith((String)"Connection refused:")).and(CoreMatchers.containsString((String)("localhost/127.0.0.1:" + port))));
        }
    }

    @Test
    public void shouldLogUpstreamIssueMessage() {
        InMemoryStoreStreamProvider clientStoreFileStream = new InMemoryStoreStreamProvider();
        final CatchupAddressResolutionException catchupAddressResolutionException = new CatchupAddressResolutionException(new MemberId(UUID.randomUUID()));
        try {
            this.subject.copyStoreFiles(new CatchupAddressProvider(){

                public AdvertisedSocketAddress primary() {
                    return StoreCopyClientIT.from(StoreCopyClientIT.this.catchupServer.address().getPort());
                }

                public AdvertisedSocketAddress secondary() throws CatchupAddressResolutionException {
                    throw catchupAddressResolutionException;
                }
            }, this.serverHandler.getStoreId(), (StoreFileStreamProvider)clientStoreFileStream, () -> new Once(), this.targetLocation);
            Assert.fail();
        }
        catch (StoreCopyFailedException e) {
            this.assertableLogProvider.assertContainsExactlyOneMessageMatching(CoreMatchers.startsWith((String)"Unable to resolve address for"));
            this.assertableLogProvider.assertLogStringContains(catchupAddressResolutionException.getMessage());
        }
    }

    private static AdvertisedSocketAddress from(int port) {
        return new AdvertisedSocketAddress("localhost", port);
    }

    private File relative(String filename) {
        return this.testDirectory.file(filename);
    }

    private String fileContent(File file) throws IOException {
        return StoreCopyClientIT.fileContent(file, this.fsa);
    }

    static String fileContent(File file, FileSystemAbstraction fsa) throws IOException {
        int chunkSize = 128;
        StringBuilder stringBuilder = new StringBuilder();
        try (Reader reader = fsa.openAsReader(file, Charsets.UTF_8);){
            CharBuffer charBuffer = CharBuffer.wrap(new char[chunkSize]);
            while (reader.read(charBuffer) != -1) {
                charBuffer.flip();
                stringBuilder.append(charBuffer);
                charBuffer.clear();
            }
        }
        return stringBuilder.toString();
    }

    private String clientFileContents(InMemoryStoreStreamProvider storeFileStreamsProvider, String filename) {
        return storeFileStreamsProvider.fileStreams().get(filename).toString();
    }

    private static class Once
    implements TerminationCondition {
        private Once() {
        }

        public void assertContinue() throws StoreCopyFailedException {
            throw new StoreCopyFailedException("One try only");
        }
    }
}

