package org.zalando.fahrschein.example;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zalando.fahrschein.EventProcessingException;
import org.zalando.fahrschein.ExponentialBackoffStrategy;
import org.zalando.fahrschein.IORunnable;
import org.zalando.fahrschein.Listener;
import org.zalando.fahrschein.NakadiClient;
import org.zalando.fahrschein.NoBackoffStrategy;
import org.zalando.fahrschein.StreamParameters;
import org.zalando.fahrschein.ZignAccessTokenProvider;
import org.zalando.fahrschein.domain.Lock;
import org.zalando.fahrschein.example.domain.SalesOrder;
import org.zalando.fahrschein.example.domain.SalesOrderPlaced;
import org.zalando.fahrschein.inmemory.InMemoryCursorManager;
import org.zalando.fahrschein.jdbc.JdbcCursorManager;
import org.zalando.fahrschein.jdbc.JdbcPartitionManager;
import org.zalando.jackson.datatype.money.MoneyModule;

/* loaded from: input_file:org/zalando/fahrschein/example/Main.class */
public class Main {
    private static final String SALES_ORDER_SERVICE_ORDER_PLACED = "sales-order-service.order-placed";
    private static final String JDBC_URL = "jdbc:postgresql://localhost:5432/local_nakadi_cursor_db";
    private static final String JDBC_USERNAME = "postgres";
    private static final String JDBC_PASSWORD = "postgres";
    private static final Logger LOG = LoggerFactory.getLogger(Main.class);
    private static final URI NAKADI_URI = URI.create("https://nakadi-staging.aruha-test.zalan.do");

    public static void main(String[] strArr) throws IOException, InterruptedException {
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        objectMapper.disable(new MapperFeature[]{MapperFeature.DEFAULT_VIEW_INCLUSION});
        objectMapper.registerModule(new JavaTimeModule());
        objectMapper.registerModule(new Jdk8Module());
        objectMapper.registerModule(new MoneyModule());
        objectMapper.registerModule(new ParameterNamesModule());
        subscriptionListen(objectMapper, list -> {
            if (Math.random() < 1.0E-4d) {
                throw new EventProcessingException("Random failure");
            }
            Iterator it = list.iterator();
            while (it.hasNext()) {
                SalesOrder salesOrder = ((SalesOrderPlaced) it.next()).getSalesOrder();
                LOG.info("Received sales order [{}] created at [{}]", salesOrder.getOrderNumber(), salesOrder.getCreatedAt());
            }
        });
    }

    private static void subscriptionListen(ObjectMapper objectMapper, Listener<SalesOrderPlaced> listener) throws IOException {
        NakadiClient build = NakadiClient.builder(NAKADI_URI).withAccessTokenProvider(new ZignAccessTokenProvider()).build();
        build.stream(build.subscribe("fahrschein-demo", SALES_ORDER_SERVICE_ORDER_PLACED, "fahrschein-demo")).withObjectMapper(objectMapper).listen(SalesOrderPlaced.class, listener);
    }

    private static void simpleListen(ObjectMapper objectMapper, Listener<SalesOrderPlaced> listener) throws IOException {
        InMemoryCursorManager inMemoryCursorManager = new InMemoryCursorManager();
        NakadiClient build = NakadiClient.builder(NAKADI_URI).withAccessTokenProvider(new ZignAccessTokenProvider()).withCursorManager(inMemoryCursorManager).build();
        inMemoryCursorManager.fromOldestAvailableOffset(SALES_ORDER_SERVICE_ORDER_PLACED, build.getPartitions(SALES_ORDER_SERVICE_ORDER_PLACED));
        build.stream(SALES_ORDER_SERVICE_ORDER_PLACED).withObjectMapper(objectMapper).withBackoffStrategy(new ExponentialBackoffStrategy().withMaxRetries(10)).listen(SalesOrderPlaced.class, listener);
    }

    private static void persistentListen(ObjectMapper objectMapper, Listener<SalesOrderPlaced> listener) throws IOException {
        HikariConfig hikariConfig = new HikariConfig();
        hikariConfig.setJdbcUrl(JDBC_URL);
        hikariConfig.setUsername("postgres");
        hikariConfig.setPassword("postgres");
        JdbcCursorManager jdbcCursorManager = new JdbcCursorManager(new HikariDataSource(hikariConfig), "fahrschein-demo");
        NakadiClient build = NakadiClient.builder(NAKADI_URI).withAccessTokenProvider(new ZignAccessTokenProvider()).withCursorManager(jdbcCursorManager).build();
        jdbcCursorManager.fromOldestAvailableOffset(SALES_ORDER_SERVICE_ORDER_PLACED, build.getPartitions(SALES_ORDER_SERVICE_ORDER_PLACED));
        build.stream(SALES_ORDER_SERVICE_ORDER_PLACED).withObjectMapper(objectMapper).listen(SalesOrderPlaced.class, listener);
    }

    private static void multiInstanceListen(ObjectMapper objectMapper, Listener<SalesOrderPlaced> listener) throws IOException {
        HikariConfig hikariConfig = new HikariConfig();
        hikariConfig.setJdbcUrl(JDBC_URL);
        hikariConfig.setUsername("postgres");
        hikariConfig.setPassword("postgres");
        HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig);
        ZignAccessTokenProvider zignAccessTokenProvider = new ZignAccessTokenProvider();
        AtomicInteger atomicInteger = new AtomicInteger();
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(16);
        for (int i = 0; i < 12; i++) {
            String str = "consumer-" + atomicInteger.getAndIncrement();
            JdbcPartitionManager jdbcPartitionManager = new JdbcPartitionManager(hikariDataSource, "fahrschein-demo");
            JdbcCursorManager jdbcCursorManager = new JdbcCursorManager(hikariDataSource, "fahrschein-demo");
            NakadiClient build = NakadiClient.builder(NAKADI_URI).withAccessTokenProvider(zignAccessTokenProvider).withCursorManager(jdbcCursorManager).build();
            List partitions = build.getPartitions(SALES_ORDER_SERVICE_ORDER_PLACED);
            jdbcCursorManager.fromOldestAvailableOffset(SALES_ORDER_SERVICE_ORDER_PLACED, partitions);
            IORunnable iORunnable = () -> {
                IORunnable iORunnable2 = () -> {
                    Optional lockPartitions = jdbcPartitionManager.lockPartitions(SALES_ORDER_SERVICE_ORDER_PLACED, partitions, str);
                    if (lockPartitions.isPresent()) {
                        Lock lock = (Lock) lockPartitions.get();
                        try {
                            build.stream(SALES_ORDER_SERVICE_ORDER_PLACED).withLock(lock).withObjectMapper(objectMapper).withStreamParameters(new StreamParameters().withStreamLimit(10)).withBackoffStrategy(new NoBackoffStrategy()).listen(SalesOrderPlaced.class, listener);
                            jdbcPartitionManager.unlockPartitions(lock);
                        } catch (Throwable th) {
                            jdbcPartitionManager.unlockPartitions(lock);
                            throw th;
                        }
                    }
                };
                newScheduledThreadPool.scheduleWithFixedDelay(iORunnable2.unchecked(), 0L, 1L, TimeUnit.SECONDS);
            };
            newScheduledThreadPool.submit(iORunnable.unchecked());
        }
        try {
            Thread.sleep(60000L);
            newScheduledThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
