package org.apache.streams.mongo;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.util.JSON;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/mongo/MongoPersistWriter.class */
public class MongoPersistWriter implements StreamsPersistWriter, Runnable, Flushable, Closeable {
    public static final String STREAMS_ID = "MongoPersistWriter";
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistWriter.class);
    private static final long MAX_WRITE_LATENCY = 1000;
    protected volatile Queue<StreamsDatum> persistQueue;
    private ObjectMapper mapper;
    private volatile AtomicLong lastWrite;
    private ScheduledExecutorService backgroundFlushTask;
    private MongoConfiguration config;
    protected MongoClient client;
    protected DB db;
    protected DBCollection collection;
    protected List<DBObject> insertBatch;
    protected final ReadWriteLock lock;

    public MongoPersistWriter() {
        this((MongoConfiguration) new ComponentConfigurator(MongoConfiguration.class).detectConfiguration());
    }

    public MongoPersistWriter(MongoConfiguration mongoConfiguration) {
        this.mapper = StreamsJacksonMapper.getInstance();
        this.lastWrite = new AtomicLong(System.currentTimeMillis());
        this.backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
        this.insertBatch = new ArrayList();
        this.lock = new ReentrantReadWriteLock();
        this.config = mongoConfiguration;
    }

    public void setPersistQueue(Queue<StreamsDatum> queue) {
        this.persistQueue = queue;
    }

    public Queue<StreamsDatum> getPersistQueue() {
        return this.persistQueue;
    }

    public String getId() {
        return STREAMS_ID;
    }

    public void write(StreamsDatum streamsDatum) {
        DBObject prepareObject = prepareObject(streamsDatum);
        if (prepareObject != null) {
            addToBatch(prepareObject);
            flushIfNecessary();
        }
    }

    @Override // java.io.Flushable
    public void flush() throws IOException {
        try {
            LOGGER.debug("Attempting to flush {} items to mongo", Integer.valueOf(this.insertBatch.size()));
            this.lock.writeLock().lock();
            this.collection.insert(this.insertBatch);
            this.lastWrite.set(System.currentTimeMillis());
            this.insertBatch = new ArrayList();
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        this.client.close();
        this.backgroundFlushTask.shutdownNow();
    }

    public void start() {
        connectToMongo();
        this.backgroundFlushTask.scheduleAtFixedRate(this::flushIfNecessary, 0L, 2000L, TimeUnit.MILLISECONDS);
    }

    public void stop() {
        try {
            flush();
        } catch (IOException e) {
            LOGGER.error("Error flushing", e);
        }
        try {
            close();
        } catch (IOException e2) {
            LOGGER.error("Error closing", e2);
        }
        try {
            this.backgroundFlushTask.shutdown();
            if (!this.backgroundFlushTask.awaitTermination(15L, TimeUnit.SECONDS)) {
                this.backgroundFlushTask.shutdownNow();
                if (!this.backgroundFlushTask.awaitTermination(15L, TimeUnit.SECONDS)) {
                    LOGGER.error("Stream did not terminate");
                }
            }
        } catch (InterruptedException e3) {
            this.backgroundFlushTask.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            if (this.persistQueue.peek() != null) {
                try {
                    write(this.persistQueue.remove());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            try {
                Thread.sleep(new Random().nextInt(1));
            } catch (InterruptedException e2) {
                LOGGER.trace("Interrupt", e2);
            }
        }
    }

    public void prepare(Object obj) {
        this.persistQueue = new ConcurrentLinkedQueue();
        start();
    }

    public void cleanUp() {
        stop();
    }

    protected void flushIfNecessary() {
        long currentTimeMillis = System.currentTimeMillis() - this.lastWrite.get();
        if (this.insertBatch.size() > 0) {
            if (this.insertBatch.size() % 100 == 0 || currentTimeMillis > MAX_WRITE_LATENCY) {
                try {
                    flush();
                } catch (IOException e) {
                    LOGGER.error("Error writing to Mongo", e);
                }
            }
        }
    }

    protected void addToBatch(DBObject dBObject) {
        try {
            this.lock.readLock().lock();
            this.insertBatch.add(dBObject);
        } finally {
            this.lock.readLock().unlock();
        }
    }

    protected DBObject prepareObject(StreamsDatum streamsDatum) {
        DBObject dBObject = null;
        if (streamsDatum.getDocument() instanceof String) {
            dBObject = (DBObject) JSON.parse((String) streamsDatum.getDocument());
        } else {
            try {
                dBObject = (DBObject) JSON.parse(this.mapper.valueToTree(streamsDatum.getDocument()).toString());
            } catch (Exception e) {
                LOGGER.error("Unsupported type: " + streamsDatum.getDocument().getClass(), e);
            }
        }
        return dBObject;
    }

    private synchronized void connectToMongo() {
        ServerAddress serverAddress = new ServerAddress(this.config.getHost(), this.config.getPort().intValue());
        if (StringUtils.isNotEmpty(this.config.getUser()) && StringUtils.isNotEmpty(this.config.getPassword())) {
            this.client = new MongoClient(serverAddress, (List) Stream.of(MongoCredential.createCredential(this.config.getUser(), this.config.getDb(), this.config.getPassword().toCharArray())).collect(Collectors.toList()));
        } else {
            this.client = new MongoClient(serverAddress);
        }
        this.db = this.client.getDB(this.config.getDb());
        if (!this.db.collectionExists(this.config.getCollection())) {
            this.db.createCollection(this.config.getCollection(), (DBObject) null);
        }
        this.collection = this.db.getCollection(this.config.getCollection());
    }
}
