package org.opendaylight.controller.cluster.datastore;

import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
import akka.japi.Creator;
import akka.persistence.RecoveryFailure;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator;
import org.opendaylight.controller.cluster.datastore.TransactionProxy;
import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/opendaylight/controller/cluster/datastore/Shard.class */
public class Shard extends RaftActor {
    private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
    private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
    public static final String DEFAULT_NAME = "default";
    private final InMemoryDOMDataStore store;
    private final ShardIdentifier name;
    private final ShardStats shardMBean;
    private final List<ActorSelection> dataChangeListeners;
    private final List<DelayedListenerRegistration> delayedListenerRegistrations;
    private final DatastoreContext datastoreContext;
    private final DataPersistenceProvider dataPersistenceProvider;
    private SchemaContext schemaContext;
    private ActorRef createSnapshotTransaction;
    private int createSnapshotTransactionCounter;
    private final ShardCommitCoordinator commitCoordinator;
    private final long transactionCommitTimeout;
    private Cancellable txCommitTimeoutCheckSchedule;
    private final Optional<ActorRef> roleChangeNotifier;
    private ShardRecoveryCoordinator recoveryCoordinator;
    private List<Object> currentLogRecoveryBatch;
    private final Map<String, DOMStoreTransactionChain> transactionChains;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opendaylight/controller/cluster/datastore/Shard$DelayedListenerRegistration.class */
    public static class DelayedListenerRegistration implements ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
        private volatile boolean closed;
        private final RegisterChangeListener registerChangeListener;
        private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> delegate;

        DelayedListenerRegistration(RegisterChangeListener registerChangeListener) {
            this.registerChangeListener = registerChangeListener;
        }

        void setDelegate(ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> listenerRegistration) {
            this.delegate = listenerRegistration;
        }

        boolean isClosed() {
            return this.closed;
        }

        RegisterChangeListener getRegisterChangeListener() {
            return this.registerChangeListener;
        }

        /* renamed from: getInstance, reason: merged with bridge method [inline-methods] */
        public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> m10getInstance() {
            if (this.delegate != null) {
                return (AsyncDataChangeListener) this.delegate.getInstance();
            }
            return null;
        }

        public void close() {
            this.closed = true;
            if (this.delegate != null) {
                this.delegate.close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opendaylight/controller/cluster/datastore/Shard$ShardCreator.class */
    public static class ShardCreator implements Creator<Shard> {
        private static final long serialVersionUID = 1;
        final ShardIdentifier name;
        final Map<ShardIdentifier, String> peerAddresses;
        final DatastoreContext datastoreContext;
        final SchemaContext schemaContext;

        ShardCreator(ShardIdentifier shardIdentifier, Map<ShardIdentifier, String> map, DatastoreContext datastoreContext, SchemaContext schemaContext) {
            this.name = shardIdentifier;
            this.peerAddresses = map;
            this.datastoreContext = datastoreContext;
            this.schemaContext = schemaContext;
        }

        /* renamed from: create, reason: merged with bridge method [inline-methods] */
        public Shard m11create() throws Exception {
            return new Shard(this.name, this.peerAddresses, this.datastoreContext, this.schemaContext);
        }
    }

    protected Shard(ShardIdentifier shardIdentifier, Map<ShardIdentifier, String> map, DatastoreContext datastoreContext, SchemaContext schemaContext) {
        super(shardIdentifier.toString(), mapPeerAddresses(map), Optional.of(datastoreContext.getShardRaftConfig()));
        this.dataChangeListeners = Lists.newArrayList();
        this.delayedListenerRegistrations = Lists.newArrayList();
        this.transactionChains = new HashMap();
        this.name = shardIdentifier;
        this.datastoreContext = datastoreContext;
        this.schemaContext = schemaContext;
        this.dataPersistenceProvider = datastoreContext.isPersistent() ? new AbstractUntypedPersistentActor.PersistentDataProvider(this) : new RaftActor.NonPersistentRaftDataProvider(this);
        this.LOG.info("Shard created : {} persistent : {}", shardIdentifier, Boolean.valueOf(datastoreContext.isPersistent()));
        this.store = InMemoryDOMDataStoreFactory.create(shardIdentifier.toString(), (SchemaService) null, datastoreContext.getDataStoreProperties());
        if (schemaContext != null) {
            this.store.onGlobalContextUpdated(schemaContext);
        }
        this.shardMBean = ShardMBeanFactory.getShardStatsMBean(shardIdentifier.toString(), datastoreContext.getDataStoreMXBeanType());
        this.shardMBean.setDataStoreExecutor(this.store.getDomStoreExecutor());
        this.shardMBean.setNotificationManager(this.store.getDataChangeListenerNotificationManager());
        if (isMetricsCaptureEnabled()) {
            getContext().become(new MeteringBehavior(this));
        }
        this.commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1L, TimeUnit.MINUTES), datastoreContext.getShardTransactionCommitQueueCapacity());
        this.transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
        this.roleChangeNotifier = createRoleChangeNotifier(shardIdentifier.toString());
    }

    private static Map<String, String> mapPeerAddresses(Map<ShardIdentifier, String> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<ShardIdentifier, String> entry : map.entrySet()) {
            hashMap.put(entry.getKey().toString(), entry.getValue());
        }
        return hashMap;
    }

    public static Props props(ShardIdentifier shardIdentifier, Map<ShardIdentifier, String> map, DatastoreContext datastoreContext, SchemaContext schemaContext) {
        Preconditions.checkNotNull(shardIdentifier, "name should not be null");
        Preconditions.checkNotNull(map, "peerAddresses should not be null");
        Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
        Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
        return Props.create(new ShardCreator(shardIdentifier, map, datastoreContext, schemaContext));
    }

    private Optional<ActorRef> createRoleChangeNotifier(String str) {
        return Optional.of(getContext().actorOf(RoleChangeNotifier.getProps(str), str + "-notifier"));
    }

    public void postStop() {
        super.postStop();
        if (this.txCommitTimeoutCheckSchedule != null) {
            this.txCommitTimeoutCheckSchedule.cancel();
        }
    }

    public void onReceiveRecover(Object obj) throws Exception {
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("{}: onReceiveRecover: Received message {} from {}", new Object[]{persistenceId(), obj.getClass().toString(), getSender()});
        }
        if (!(obj instanceof RecoveryFailure)) {
            super.onReceiveRecover(obj);
        } else {
            this.LOG.error("{}: Recovery failed because of this cause", persistenceId(), ((RecoveryFailure) obj).cause());
            onRecoveryComplete();
        }
    }

    public void onReceiveCommand(Object obj) throws Exception {
        if (obj.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
            handleReadDataReply(obj);
            return;
        }
        if (obj.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
            handleCreateTransaction(obj);
            return;
        }
        if (obj instanceof ForwardedReadyTransaction) {
            handleForwardedReadyTransaction((ForwardedReadyTransaction) obj);
            return;
        }
        if (obj.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
            handleCanCommitTransaction(CanCommitTransaction.fromSerializable(obj));
            return;
        }
        if (obj.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
            handleCommitTransaction(CommitTransaction.fromSerializable(obj));
            return;
        }
        if (obj.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
            handleAbortTransaction(AbortTransaction.fromSerializable(obj));
            return;
        }
        if (obj.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
            closeTransactionChain(CloseTransactionChain.fromSerializable(obj));
            return;
        }
        if (obj instanceof RegisterChangeListener) {
            registerChangeListener((RegisterChangeListener) obj);
            return;
        }
        if (obj instanceof UpdateSchemaContext) {
            updateSchemaContext((UpdateSchemaContext) obj);
            return;
        }
        if (obj instanceof PeerAddressResolved) {
            PeerAddressResolved peerAddressResolved = (PeerAddressResolved) obj;
            setPeerAddress(peerAddressResolved.getPeerId().toString(), peerAddressResolved.getPeerAddress());
        } else if (obj.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
            handleTransactionCommitTimeoutCheck();
        } else {
            super.onReceiveCommand(obj);
        }
    }

    protected Optional<ActorRef> getRoleChangeNotifier() {
        return this.roleChangeNotifier;
    }

    private void handleTransactionCommitTimeoutCheck() {
        ShardCommitCoordinator.CohortEntry currentCohortEntry = this.commitCoordinator.getCurrentCohortEntry();
        if (currentCohortEntry == null || System.currentTimeMillis() - currentCohortEntry.getLastAccessTime() <= this.transactionCommitTimeout) {
            return;
        }
        this.LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting", new Object[]{persistenceId(), currentCohortEntry.getTransactionID(), Long.valueOf(this.transactionCommitTimeout)});
        doAbortTransaction(currentCohortEntry.getTransactionID(), null);
    }

    private void handleCommitTransaction(CommitTransaction commitTransaction) {
        String transactionID = commitTransaction.getTransactionID();
        this.LOG.debug("{}: Committing transaction {}", persistenceId(), transactionID);
        ShardCommitCoordinator.CohortEntry cohortEntryIfCurrent = this.commitCoordinator.getCohortEntryIfCurrent(transactionID);
        if (cohortEntryIfCurrent == null) {
            IllegalStateException illegalStateException = new IllegalStateException(String.format("%s: Cannot commit transaction %s - it is not the current transaction", persistenceId(), transactionID));
            this.LOG.error(illegalStateException.getMessage());
            this.shardMBean.incrementFailedTransactionsCount();
            getSender().tell(new Status.Failure(illegalStateException), getSelf());
            return;
        }
        try {
            cohortEntryIfCurrent.getCohort().preCommit().get();
            persistData(getSender(), transactionID, new CompositeModificationByteStringPayload(cohortEntryIfCurrent.getModification().toSerializable()));
        } catch (Exception e) {
            this.LOG.error("{}: An exception occurred while preCommitting transaction {}", new Object[]{persistenceId(), cohortEntryIfCurrent.getTransactionID(), e});
            this.shardMBean.incrementFailedTransactionsCount();
            getSender().tell(new Status.Failure(e), getSelf());
        }
        cohortEntryIfCurrent.updateLastAccessTime();
    }

    private void finishCommit(@Nonnull ActorRef actorRef, @Nonnull String str) {
        ShardCommitCoordinator.CohortEntry cohortEntryIfCurrent = this.commitCoordinator.getCohortEntryIfCurrent(str);
        if (cohortEntryIfCurrent == null) {
            ShardCommitCoordinator.CohortEntry andRemoveCohortEntry = this.commitCoordinator.getAndRemoveCohortEntry(str);
            if (andRemoveCohortEntry != null) {
                commitWithNewTransaction(andRemoveCohortEntry.getModification());
                actorRef.tell(COMMIT_TRANSACTION_REPLY, getSelf());
                return;
            } else {
                IllegalStateException illegalStateException = new IllegalStateException(String.format("%s: Could not finish committing transaction %s - no CohortEntry found", persistenceId(), str));
                this.LOG.error(illegalStateException.getMessage());
                actorRef.tell(new Status.Failure(illegalStateException), getSelf());
                return;
            }
        }
        this.LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntryIfCurrent.getTransactionID());
        try {
            try {
                cohortEntryIfCurrent.getCohort().commit().get();
                actorRef.tell(COMMIT_TRANSACTION_REPLY, getSelf());
                this.shardMBean.incrementCommittedTransactionCount();
                this.shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
                this.commitCoordinator.currentTransactionComplete(str, true);
                this.shardMBean.setTransactionCommitQueueSize(this.commitCoordinator.getQueueSize());
            } catch (Exception e) {
                actorRef.tell(new Status.Failure(e), getSelf());
                this.LOG.error("{}: An exception occurred while committing transaction {}", new Object[]{persistenceId(), str, e});
                this.shardMBean.incrementFailedTransactionsCount();
                this.commitCoordinator.currentTransactionComplete(str, true);
                this.shardMBean.setTransactionCommitQueueSize(this.commitCoordinator.getQueueSize());
            }
        } catch (Throwable th) {
            this.commitCoordinator.currentTransactionComplete(str, true);
            this.shardMBean.setTransactionCommitQueueSize(this.commitCoordinator.getQueueSize());
            throw th;
        }
    }

    private void handleCanCommitTransaction(CanCommitTransaction canCommitTransaction) {
        this.LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommitTransaction.getTransactionID());
        this.commitCoordinator.handleCanCommit(canCommitTransaction, getSender(), self());
    }

    private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReadyTransaction) {
        this.LOG.debug("{}: Readying transaction {}, client version {}", new Object[]{persistenceId(), forwardedReadyTransaction.getTransactionID(), Integer.valueOf(forwardedReadyTransaction.getTxnClientVersion())});
        this.commitCoordinator.transactionReady(forwardedReadyTransaction.getTransactionID(), forwardedReadyTransaction.getCohort(), forwardedReadyTransaction.getModification());
        ActorRef self = self();
        if (forwardedReadyTransaction.getTxnClientVersion() < 1) {
            this.LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
            self = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(forwardedReadyTransaction.getTransactionID()));
        }
        ShardTransactionMessages.ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(self));
        getSender().tell(forwardedReadyTransaction.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply, getSelf());
    }

    private void handleAbortTransaction(AbortTransaction abortTransaction) {
        doAbortTransaction(abortTransaction.getTransactionID(), getSender());
    }

    private void doAbortTransaction(String str, final ActorRef actorRef) {
        ShardCommitCoordinator.CohortEntry cohortEntryIfCurrent = this.commitCoordinator.getCohortEntryIfCurrent(str);
        if (cohortEntryIfCurrent != null) {
            this.commitCoordinator.currentTransactionComplete(str, false);
        } else {
            cohortEntryIfCurrent = this.commitCoordinator.getAndRemoveCohortEntry(str);
        }
        if (cohortEntryIfCurrent == null) {
            return;
        }
        cohortEntryIfCurrent.setAborted(true);
        this.LOG.debug("{}: Aborting transaction {}", persistenceId(), str);
        ListenableFuture abort = cohortEntryIfCurrent.getCohort().abort();
        final ActorRef self = getSelf();
        Futures.addCallback(abort, new FutureCallback<Void>() { // from class: org.opendaylight.controller.cluster.datastore.Shard.1
            public void onSuccess(Void r5) {
                Shard.this.shardMBean.incrementAbortTransactionsCount();
                if (actorRef != null) {
                    actorRef.tell(new AbortTransactionReply().toSerializable(), self);
                }
            }

            public void onFailure(Throwable th) {
                Shard.this.LOG.error("{}: An exception happened during abort", Shard.this.persistenceId(), th);
                if (actorRef != null) {
                    actorRef.tell(new Status.Failure(th), self);
                }
            }
        });
        this.shardMBean.setTransactionCommitQueueSize(this.commitCoordinator.getQueueSize());
    }

    private void handleCreateTransaction(Object obj) {
        if (isLeader()) {
            createTransaction(CreateTransaction.fromSerializable(obj));
        } else if (getLeader() != null) {
            getLeader().forward(obj, getContext());
        } else {
            getSender().tell(new Status.Failure(new NoShardLeaderException(String.format("Shard %s currently has no leader so a transaction cannot be created. Try again later.", persistenceId()))), getSelf());
        }
    }

    private void handleReadDataReply(Object obj) {
        self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(obj)), self());
        this.createSnapshotTransaction = null;
        getSender().tell(PoisonPill.getInstance(), self());
    }

    private void closeTransactionChain(CloseTransactionChain closeTransactionChain) {
        DOMStoreTransactionChain remove = this.transactionChains.remove(closeTransactionChain.getTransactionChainId());
        if (remove != null) {
            remove.close();
        }
    }

    private ActorRef createTypedTransactionActor(int i, ShardTransactionIdentifier shardTransactionIdentifier, String str, int i2) {
        DOMStoreTransactionChain dOMStoreTransactionChain = this.store;
        if (!str.isEmpty()) {
            dOMStoreTransactionChain = (DOMStoreTransactionFactory) this.transactionChains.get(str);
            if (dOMStoreTransactionChain == null) {
                DOMStoreTransactionChain createTransactionChain = this.store.createTransactionChain();
                this.transactionChains.put(str, createTransactionChain);
                dOMStoreTransactionChain = createTransactionChain;
            }
        }
        if (this.schemaContext == null) {
            throw new IllegalStateException("SchemaContext is not set");
        }
        if (i == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
            this.shardMBean.incrementReadOnlyTransactionCount();
            return getContext().actorOf(ShardTransaction.props(dOMStoreTransactionChain.newReadOnlyTransaction(), getSelf(), this.schemaContext, this.datastoreContext, this.shardMBean, shardTransactionIdentifier.getRemoteTransactionId(), i2), shardTransactionIdentifier.toString());
        }
        if (i == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
            this.shardMBean.incrementReadWriteTransactionCount();
            return getContext().actorOf(ShardTransaction.props(dOMStoreTransactionChain.newReadWriteTransaction(), getSelf(), this.schemaContext, this.datastoreContext, this.shardMBean, shardTransactionIdentifier.getRemoteTransactionId(), i2), shardTransactionIdentifier.toString());
        }
        if (i != TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
            throw new IllegalArgumentException("Shard=" + this.name + ":CreateTransaction message has unidentified transaction type=" + i);
        }
        this.shardMBean.incrementWriteOnlyTransactionCount();
        return getContext().actorOf(ShardTransaction.props(dOMStoreTransactionChain.newWriteOnlyTransaction(), getSelf(), this.schemaContext, this.datastoreContext, this.shardMBean, shardTransactionIdentifier.getRemoteTransactionId(), i2), shardTransactionIdentifier.toString());
    }

    private void createTransaction(CreateTransaction createTransaction) {
        try {
            if (getRaftState() != RaftState.IsolatedLeader || createTransaction.getTransactionType() == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
                getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(createTransaction(createTransaction.getTransactionType(), createTransaction.getTransactionId(), createTransaction.getTransactionChainId(), createTransaction.getVersion())), createTransaction.getTransactionId()).toSerializable(), getSelf());
            } else {
                getSender().tell(new Status.Failure(new NoShardLeaderException(String.format("Shard %s was the leader but has lost contact with all of its followers. Either all other follower nodes are down or this node is isolated by a network partition.", persistenceId()))), getSelf());
            }
        } catch (Exception e) {
            getSender().tell(new Status.Failure(e), getSelf());
        }
    }

    private ActorRef createTransaction(int i, String str, String str2, int i2) {
        ShardTransactionIdentifier build = ShardTransactionIdentifier.builder().remoteTransactionId(str).build();
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("{}: Creating transaction : {} ", persistenceId(), build);
        }
        return createTypedTransactionActor(i, build, str2, i2);
    }

    private void syncCommitTransaction(DOMStoreWriteTransaction dOMStoreWriteTransaction) throws ExecutionException, InterruptedException {
        DOMStoreThreePhaseCommitCohort ready = dOMStoreWriteTransaction.ready();
        ready.preCommit().get();
        ready.commit().get();
    }

    private void commitWithNewTransaction(Modification modification) {
        DOMStoreWriteTransaction newWriteOnlyTransaction = this.store.newWriteOnlyTransaction();
        modification.apply(newWriteOnlyTransaction);
        try {
            syncCommitTransaction(newWriteOnlyTransaction);
            this.shardMBean.incrementCommittedTransactionCount();
            this.shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
        } catch (InterruptedException | ExecutionException e) {
            this.shardMBean.incrementFailedTransactionsCount();
            this.LOG.error("{}: Failed to commit", persistenceId(), e);
        }
    }

    private void updateSchemaContext(UpdateSchemaContext updateSchemaContext) {
        this.schemaContext = updateSchemaContext.getSchemaContext();
        updateSchemaContext(updateSchemaContext.getSchemaContext());
        this.store.onGlobalContextUpdated(updateSchemaContext.getSchemaContext());
    }

    @VisibleForTesting
    void updateSchemaContext(SchemaContext schemaContext) {
        this.store.onGlobalContextUpdated(schemaContext);
    }

    private void registerChangeListener(RegisterChangeListener registerChangeListener) {
        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> listenerRegistration;
        this.LOG.debug("{}: registerDataChangeListener for {}", persistenceId(), registerChangeListener.getPath());
        if (isLeader()) {
            listenerRegistration = doChangeListenerRegistration(registerChangeListener);
        } else {
            this.LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
            DelayedListenerRegistration delayedListenerRegistration = new DelayedListenerRegistration(registerChangeListener);
            this.delayedListenerRegistrations.add(delayedListenerRegistration);
            listenerRegistration = delayedListenerRegistration;
        }
        ActorRef actorOf = getContext().actorOf(DataChangeListenerRegistration.props(listenerRegistration));
        this.LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ", persistenceId(), actorOf.path());
        getSender().tell(new RegisterChangeListenerReply(actorOf.path()), getSelf());
    }

    private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> doChangeListenerRegistration(RegisterChangeListener registerChangeListener) {
        ActorSelection actorSelection = getContext().system().actorSelection(registerChangeListener.getDataChangeListenerPath());
        actorSelection.tell(new EnableNotification(true), getSelf());
        this.dataChangeListeners.add(actorSelection);
        DataChangeListenerProxy dataChangeListenerProxy = new DataChangeListenerProxy(this.schemaContext, actorSelection);
        this.LOG.debug("{}: Registering for path {}", persistenceId(), registerChangeListener.getPath());
        return this.store.registerChangeListener(registerChangeListener.getPath(), dataChangeListenerProxy, registerChangeListener.getScope());
    }

    private boolean isMetricsCaptureEnabled() {
        return new CommonConfig(getContext().system().settings().config()).isMetricCaptureEnabled();
    }

    protected void startLogRecoveryBatch(int i) {
        this.currentLogRecoveryBatch = Lists.newArrayListWithCapacity(i);
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("{} : starting log recovery batch with max size {}", persistenceId(), Integer.valueOf(i));
        }
    }

    protected void appendRecoveredLogEntry(Payload payload) {
        if (payload instanceof CompositeModificationPayload) {
            this.currentLogRecoveryBatch.add(((CompositeModificationPayload) payload).getModification());
        } else if (payload instanceof CompositeModificationByteStringPayload) {
            this.currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) payload).getModification());
        } else {
            this.LOG.error("Unknown state received {} during recovery", payload);
        }
    }

    protected void applyRecoverySnapshot(ByteString byteString) {
        if (this.recoveryCoordinator == null) {
            this.recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), this.schemaContext);
        }
        this.recoveryCoordinator.submit(byteString, this.store.newWriteOnlyTransaction());
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("{} : submitted recovery sbapshot", persistenceId());
        }
    }

    protected void applyCurrentLogRecoveryBatch() {
        if (this.recoveryCoordinator == null) {
            this.recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), this.schemaContext);
        }
        this.recoveryCoordinator.submit(this.currentLogRecoveryBatch, this.store.newWriteOnlyTransaction());
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("{} : submitted log recovery batch with size {}", persistenceId(), Integer.valueOf(this.currentLogRecoveryBatch.size()));
        }
    }

    protected void onRecoveryComplete() {
        if (this.recoveryCoordinator != null) {
            Collection<DOMStoreWriteTransaction> transactions = this.recoveryCoordinator.getTransactions();
            if (this.LOG.isDebugEnabled()) {
                this.LOG.debug("{} : recovery complete - committing {} Tx's", persistenceId(), Integer.valueOf(transactions.size()));
            }
            Iterator<DOMStoreWriteTransaction> it = transactions.iterator();
            while (it.hasNext()) {
                try {
                    syncCommitTransaction(it.next());
                    this.shardMBean.incrementCommittedTransactionCount();
                } catch (InterruptedException | ExecutionException e) {
                    this.shardMBean.incrementFailedTransactionsCount();
                    this.LOG.error("{}: Failed to commit", persistenceId(), e);
                }
            }
        }
        this.recoveryCoordinator = null;
        this.currentLogRecoveryBatch = null;
        updateJournalStats();
        getContext().parent().tell(new ActorInitialized(), getSelf());
        if (this.txCommitTimeoutCheckSchedule == null) {
            FiniteDuration create = Duration.create(this.transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
            this.txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(create, create, getSelf(), TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
        }
    }

    protected void applyState(ActorRef actorRef, String str, Object obj) {
        if (obj instanceof CompositeModificationPayload) {
            applyModificationToState(actorRef, str, ((CompositeModificationPayload) obj).getModification());
        } else if (obj instanceof CompositeModificationByteStringPayload) {
            applyModificationToState(actorRef, str, ((CompositeModificationByteStringPayload) obj).getModification());
        } else {
            this.LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", new Object[]{persistenceId(), obj, obj.getClass().getClassLoader(), CompositeModificationPayload.class.getClassLoader()});
        }
        updateJournalStats();
    }

    private void applyModificationToState(ActorRef actorRef, String str, Object obj) {
        if (obj != null) {
            if (actorRef == null) {
                commitWithNewTransaction(MutableCompositeModification.fromSerializable(obj, this.schemaContext));
                return;
            } else {
                finishCommit(actorRef, str);
                return;
            }
        }
        Logger logger = this.LOG;
        Object[] objArr = new Object[3];
        objArr[0] = persistenceId();
        objArr[1] = str;
        objArr[2] = actorRef != null ? actorRef.path().toString() : null;
        logger.error("{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}", objArr);
    }

    private void updateJournalStats() {
        ReplicatedLogEntry lastLogEntry = getLastLogEntry();
        if (lastLogEntry != null) {
            this.shardMBean.setLastLogIndex(lastLogEntry.getIndex());
            this.shardMBean.setLastLogTerm(lastLogEntry.getTerm());
        }
        this.shardMBean.setCommitIndex(getCommitIndex().longValue());
        this.shardMBean.setLastApplied(getLastApplied().longValue());
        this.shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize());
        this.shardMBean.setInMemoryJournalLogSize(getRaftActorContext().getReplicatedLog().size());
        this.shardMBean.setSnapshotIndex(getRaftActorContext().getReplicatedLog().getSnapshotIndex());
        this.shardMBean.setSnapshotTerm(getRaftActorContext().getReplicatedLog().getSnapshotTerm());
        if (getCurrentBehavior() != null) {
            this.shardMBean.setReplicatedToAllIndex(getCurrentBehavior().getReplicatedToAllIndex());
        }
    }

    protected void createSnapshot() {
        if (this.createSnapshotTransaction == null) {
            int ordinal = TransactionProxy.TransactionType.READ_ONLY.ordinal();
            StringBuilder append = new StringBuilder().append("createSnapshot");
            int i = this.createSnapshotTransactionCounter + 1;
            this.createSnapshotTransactionCounter = i;
            this.createSnapshotTransaction = createTransaction(ordinal, append.append(i).toString(), "", 1);
            this.createSnapshotTransaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
        }
    }

    @VisibleForTesting
    protected void applySnapshot(ByteString byteString) {
        this.LOG.info("{}: Applying snapshot", persistenceId());
        try {
            try {
                DOMStoreWriteTransaction newWriteOnlyTransaction = this.store.newWriteOnlyTransaction();
                NormalizedNode decode = new NormalizedNodeToNodeCodec(this.schemaContext).decode(NormalizedNodeMessages.Node.parseFrom(byteString));
                newWriteOnlyTransaction.delete(YangInstanceIdentifier.builder().build());
                newWriteOnlyTransaction.write(YangInstanceIdentifier.builder().build(), decode);
                syncCommitTransaction(newWriteOnlyTransaction);
                this.LOG.info("{}: Done applying snapshot", persistenceId());
            } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
                this.LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e);
                this.LOG.info("{}: Done applying snapshot", persistenceId());
            }
        } catch (Throwable th) {
            this.LOG.info("{}: Done applying snapshot", persistenceId());
            throw th;
        }
    }

    protected void onStateChanged() {
        boolean isLeader = isLeader();
        Iterator<ActorSelection> it = this.dataChangeListeners.iterator();
        while (it.hasNext()) {
            it.next().tell(new EnableNotification(isLeader), getSelf());
        }
        if (isLeader) {
            for (DelayedListenerRegistration delayedListenerRegistration : this.delayedListenerRegistrations) {
                if (!delayedListenerRegistration.isClosed()) {
                    delayedListenerRegistration.setDelegate(doChangeListenerRegistration(delayedListenerRegistration.getRegisterChangeListener()));
                }
            }
            this.delayedListenerRegistrations.clear();
        }
        this.shardMBean.setRaftState(getRaftState().name());
        this.shardMBean.setCurrentTerm(getCurrentTerm().longValue());
        if (isLeader) {
            return;
        }
        for (Map.Entry<String, DOMStoreTransactionChain> entry : this.transactionChains.entrySet()) {
            if (this.LOG.isDebugEnabled()) {
                this.LOG.debug("{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader", new Object[]{persistenceId(), entry.getKey(), getId()});
            }
            entry.getValue().close();
        }
        this.transactionChains.clear();
    }

    protected DataPersistenceProvider persistence() {
        return this.dataPersistenceProvider;
    }

    protected void onLeaderChanged(String str, String str2) {
        this.shardMBean.setLeader(str2);
    }

    public String persistenceId() {
        return this.name.toString();
    }

    @VisibleForTesting
    DataPersistenceProvider getDataPersistenceProvider() {
        return this.dataPersistenceProvider;
    }

    @VisibleForTesting
    InMemoryDOMDataStore getDataStore() {
        return this.store;
    }

    @VisibleForTesting
    ShardStats getShardMBean() {
        return this.shardMBean;
    }
}
