package com.googlecode.protobuf.pro.stream;

import com.google.protobuf.Message;
import com.googlecode.protobuf.pro.stream.logging.StreamLogger;
import com.googlecode.protobuf.pro.stream.wire.StreamProtocol;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.channel.Channel;

/* loaded from: input_file:protobuf-streamer-pro-1.2.3.jar:com/googlecode/protobuf/pro/stream/StreamingClient.class */
public class StreamingClient<E extends Message, F extends Message> {
    private static Log log = LogFactory.getLog(StreamingClient.class);
    private final Channel channel;
    private final PeerInfo clientInfo;
    private final PeerInfo serverInfo;
    private final int chunkSize;
    private StreamLogger streamLogger;
    private final Map<Integer, TransferState<E, F>> pendingTransferMap = new ConcurrentHashMap();
    private long lastUsedTS = System.currentTimeMillis();
    private AtomicInteger correlationId = new AtomicInteger(1);

    public StreamingClient(Channel channel, PeerInfo peerInfo, PeerInfo peerInfo2, int i) {
        this.channel = channel;
        this.clientInfo = peerInfo;
        this.serverInfo = peerInfo2;
        this.chunkSize = i;
    }

    public TransferIn pull(E e) {
        TransferIn transferIn = new TransferIn(getNextCorrelationId(), this.channel);
        registerPendingRequest(transferIn.getCorrelationId(), new TransferState<>(System.currentTimeMillis(), e, null, transferIn, null));
        StreamProtocol.PullRequest build = StreamProtocol.PullRequest.newBuilder().setCorrelationId(transferIn.getCorrelationId()).setRequestProto(e.toByteString()).build();
        StreamProtocol.WirePayload build2 = StreamProtocol.WirePayload.newBuilder().setPull(build).build();
        if (log.isDebugEnabled()) {
            log.debug("Sending [" + build.getCorrelationId() + "]PullRequest.");
        }
        this.channel.write(build2).awaitUninterruptibly();
        return transferIn;
    }

    public TransferOut push(F f) {
        TransferOut transferOut = new TransferOut(getNextCorrelationId(), this.chunkSize, this.channel);
        registerPendingRequest(transferOut.getCorrelationId(), new TransferState<>(System.currentTimeMillis(), null, f, null, transferOut));
        StreamProtocol.PushRequest build = StreamProtocol.PushRequest.newBuilder().setCorrelationId(transferOut.getCorrelationId()).setRequestProto(f.toByteString()).build();
        StreamProtocol.WirePayload build2 = StreamProtocol.WirePayload.newBuilder().setPush(build).build();
        if (log.isDebugEnabled()) {
            log.debug("Sending [" + build.getCorrelationId() + "]PushRequest.");
        }
        this.channel.write(build2).awaitUninterruptibly();
        return transferOut;
    }

    public PeerInfo getPeerInfo() {
        return this.serverInfo;
    }

    public String toString() {
        return "StreamClient->" + getPeerInfo();
    }

    public void closeNotification(StreamProtocol.CloseNotification closeNotification) {
        if (log.isDebugEnabled()) {
            log.debug("Received [" + closeNotification.getCorrelationId() + "]CloseNotification.");
        }
        TransferState<E, F> removePendingTransfer = removePendingTransfer(closeNotification.getCorrelationId());
        if (removePendingTransfer == null) {
            if (log.isDebugEnabled()) {
                log.debug("No PendingTransferState found for correlationId " + closeNotification.getCorrelationId());
            }
        } else {
            TransferOut pushStream = removePendingTransfer.getPushStream();
            if (pushStream == null) {
                throw new IllegalStateException("TransferState missing transferOut");
            }
            pushStream.handleClosure();
        }
    }

    public void pullChunk(StreamProtocol.Chunk chunk) {
        if (log.isDebugEnabled()) {
            log.debug("Received [" + chunk.getCorrelationId() + ":" + chunk.getSeqNo() + "]PullChunk. " + chunk.getChunkType());
        }
        TransferState<E, F> lookupPendingTransfer = lookupPendingTransfer(chunk.getCorrelationId());
        if (lookupPendingTransfer == null) {
            if (log.isDebugEnabled()) {
                log.debug("No PendingTransferState found for correlationId " + chunk.getCorrelationId());
                return;
            }
            return;
        }
        TransferIn pullStream = lookupPendingTransfer.getPullStream();
        if (pullStream == null) {
            throw new IllegalStateException("TransferState missing transferIn");
        }
        if (chunk.getParameterCount() > 0) {
            for (StreamProtocol.Parameter parameter : chunk.getParameterList()) {
                pullStream.provideParameter(parameter.getName(), parameter.getValue());
            }
        }
        pullStream.handleData(chunk.getPayload());
        if (StreamProtocol.ChunkTypeCode.END == chunk.getChunkType()) {
            removePendingTransfer(chunk.getCorrelationId());
            pullStream.handleClosure();
        }
    }

    public void handleClosure() {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this.pendingTransferMap.keySet());
        do {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                TransferState<E, F> removePendingTransfer = removePendingTransfer(((Integer) it.next()).intValue());
                if (removePendingTransfer != null) {
                    TransferIn pullStream = removePendingTransfer.getPullStream();
                    if (pullStream != null) {
                        pullStream.handleClosure();
                    }
                    TransferOut pushStream = removePendingTransfer.getPushStream();
                    if (pushStream != null) {
                        pushStream.handleClosure();
                    }
                }
            }
        } while (this.pendingTransferMap.size() > 0);
    }

    protected void doLog(TransferState<E, F> transferState, Message message, String str, Map<String, String> map) {
        if (this.streamLogger != null) {
            this.streamLogger.logTransfer(this.clientInfo, this.serverInfo, message, str, transferState.getCorrelationId(), map, transferState.getStartTimestamp(), System.currentTimeMillis());
        }
    }

    private int getNextCorrelationId() {
        return this.correlationId.getAndIncrement();
    }

    private void registerPendingRequest(int i, TransferState<E, F> transferState) {
        updateLastUsed();
        if (this.pendingTransferMap.containsKey(Integer.valueOf(i))) {
            throw new IllegalArgumentException("State already registered");
        }
        this.pendingTransferMap.put(Integer.valueOf(i), transferState);
    }

    private TransferState<E, F> removePendingTransfer(int i) {
        updateLastUsed();
        return this.pendingTransferMap.remove(Integer.valueOf(i));
    }

    private TransferState<E, F> lookupPendingTransfer(int i) {
        updateLastUsed();
        return this.pendingTransferMap.get(Integer.valueOf(i));
    }

    private void updateLastUsed() {
        this.lastUsedTS = System.currentTimeMillis();
    }

    public PeerInfo getClientInfo() {
        return this.clientInfo;
    }

    public PeerInfo getServerInfo() {
        return this.serverInfo;
    }

    public Channel getChannel() {
        return this.channel;
    }

    public StreamLogger getStreamLogger() {
        return this.streamLogger;
    }

    public void setStreamLogger(StreamLogger streamLogger) {
        this.streamLogger = streamLogger;
    }

    public long getLastUsedTS() {
        return this.lastUsedTS;
    }
}
