package top.javatool.canal.client.spring.boot.autoconfigure;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import top.javatool.canal.client.client.KafkaCanalClient;
import top.javatool.canal.client.factory.MapColumnModelFactory;
import top.javatool.canal.client.handler.EntryHandler;
import top.javatool.canal.client.handler.MessageHandler;
import top.javatool.canal.client.handler.RowDataHandler;
import top.javatool.canal.client.handler.impl.AsyncFlatMessageHandlerImpl;
import top.javatool.canal.client.handler.impl.MapRowDataHandlerImpl;
import top.javatool.canal.client.handler.impl.SyncFlatMessageHandlerImpl;
import top.javatool.canal.client.spring.boot.properties.CanalKafkaProperties;
import top.javatool.canal.client.spring.boot.properties.CanalProperties;

@EnableConfigurationProperties({CanalKafkaProperties.class})
@Configuration
@ConditionalOnBean({EntryHandler.class})
@ConditionalOnProperty(value = {CanalProperties.CANAL_MODE}, havingValue = "kafka")
@Import({ThreadPoolAutoConfiguration.class})
/* loaded from: input_file:top/javatool/canal/client/spring/boot/autoconfigure/KafkaClientAutoConfiguration.class */
public class KafkaClientAutoConfiguration {
    private CanalKafkaProperties canalKafkaProperties;

    public KafkaClientAutoConfiguration(CanalKafkaProperties canalKafkaProperties) {
        this.canalKafkaProperties = canalKafkaProperties;
    }

    @Bean
    public RowDataHandler<List<Map<String, String>>> rowDataHandler() {
        return new MapRowDataHandlerImpl(new MapColumnModelFactory());
    }

    @ConditionalOnProperty(value = {CanalProperties.CANAL_ASYNC}, havingValue = "true")
    @Bean
    public MessageHandler messageHandler(RowDataHandler<List<Map<String, String>>> rowDataHandler, List<EntryHandler> list, ExecutorService executorService) {
        return new AsyncFlatMessageHandlerImpl(list, rowDataHandler, executorService);
    }

    @ConditionalOnProperty(value = {CanalProperties.CANAL_ASYNC}, havingValue = "false")
    @Bean
    public MessageHandler messageHandler(RowDataHandler<List<Map<String, String>>> rowDataHandler, List<EntryHandler> list) {
        return new SyncFlatMessageHandlerImpl(list, rowDataHandler);
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    public KafkaCanalClient zookeeperClusterCanalClient(MessageHandler messageHandler) {
        return KafkaCanalClient.builder().servers(this.canalKafkaProperties.getServers()).groupId(this.canalKafkaProperties.getGroupId()).topic(this.canalKafkaProperties.getTopic()).messageHandler(messageHandler).batchSize(this.canalKafkaProperties.getBatchSize()).filter(this.canalKafkaProperties.getFilter()).timeout(this.canalKafkaProperties.getTimeout()).unit(this.canalKafkaProperties.getUnit()).build();
    }
}
