/*
 * Decompiled with CFR 0.152.
 */
package me.tfeng.play.mongodb;

import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import java.util.concurrent.atomic.AtomicBoolean;
import me.tfeng.play.mongodb.OplogItem;
import me.tfeng.play.mongodb.OplogItemHandler;
import me.tfeng.play.mongodb.RecordConverter;
import me.tfeng.play.spring.Startable;
import org.bson.types.BSONTimestamp;
import play.Logger;
import play.core.enhancers.PropertiesEnhancer;

@PropertiesEnhancer.GeneratedAccessor
@PropertiesEnhancer.RewrittenAccessor
public class OplogListener
implements Startable {
    public static final String COLLECTION_NAME = "oplog.rs";
    public static final String DB_NAME = "local";
    private static final Logger.ALogger LOG = Logger.of(OplogListener.class);
    private static final AtomicBoolean stopping = new AtomicBoolean(false);
    private DBCollection collection;
    private DBCursor cursor;
    private OplogItemHandler handler;
    private MongoClient mongoClient;
    private String namespace;
    private BSONTimestamp startTimestamp;
    private Thread thread;

    public void onStart() throws Throwable {
        if (this.mongoClient == null || this.handler == null) {
            throw new Exception("mongoClient and handler must both be provided");
        }
        LOG.info("Connecting to local.oplog.rs in MongoDB");
        this.collection = this.mongoClient.getDB(DB_NAME).getCollection(COLLECTION_NAME);
        this.cursor = this.collection.find(this.getQuery()).sort(this.getSort()).setOptions(this.getOptions());
        stopping.set(false);
        this.thread = new Thread(new OplogListenerThread());
        this.thread.start();
        LOG.info("Handler thread started");
    }

    public void onStop() throws Throwable {
        stopping.set(true);
        this.cursor.close();
        LOG.info("Waiting for handler thread to stop");
        this.thread.join();
    }

    public void setHandler(OplogItemHandler handler) {
        this.handler = handler;
    }

    public void setMongoClient(MongoClient mongoClient) {
        this.mongoClient = mongoClient;
    }

    public void setNamespace(String namespace) {
        this.namespace = namespace;
    }

    public void setStartTimestamp(BSONTimestamp startTimestamp) {
        this.startTimestamp = startTimestamp;
    }

    protected int getOptions() {
        return 2;
    }

    protected DBObject getQuery() {
        BasicDBObject query = new BasicDBObject();
        if (this.startTimestamp != null) {
            query.put("ts", (Object)new BasicDBObject("$gt", (Object)this.startTimestamp));
        }
        if (this.namespace != null) {
            query.put("ns", (Object)this.namespace);
        }
        return query;
    }

    protected DBObject getSort() {
        return new BasicDBObject("$natural", (Object)1);
    }

    @PropertiesEnhancer.GeneratedAccessor
    @PropertiesEnhancer.RewrittenAccessor
    private class OplogListenerThread
    implements Runnable {
        private OplogListenerThread() {
        }

        @Override
        public void run() {
            do {
                DBObject object;
                try {
                    object = OplogListener.this.cursor.next();
                }
                catch (Exception e) {
                    if (stopping.get()) break;
                    if (e instanceof RuntimeException) {
                        throw (RuntimeException)e;
                    }
                    throw new RuntimeException("Unexpected exception occurred while trying to read the next oplog item", e);
                }
                if (LOG.isDebugEnabled()) {
                    LOG.info("Received oplog item " + object);
                }
                OplogItem oplogItem = RecordConverter.toRecord(OplogItem.class, object);
                OplogListener.this.handler.handle(oplogItem);
            } while (!stopping.get());
        }
    }
}

