package com.ververica.cdc.connectors.mongodb.source.dialect;

import com.mongodb.client.MongoClient;
import com.ververica.cdc.connectors.base.dialect.DataSourceDialect;
import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
import com.ververica.cdc.connectors.mongodb.source.assigners.splitters.MongoDBChunkSplitter;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import com.ververica.cdc.connectors.mongodb.source.offset.ChangeStreamDescriptor;
import com.ververica.cdc.connectors.mongodb.source.offset.ChangeStreamOffset;
import com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBFetchTaskContext;
import com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBScanFetchTask;
import com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask;
import com.ververica.cdc.connectors.mongodb.source.utils.CollectionDiscoveryUtils;
import com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Experimental;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
/* loaded from: input_file:com/ververica/cdc/connectors/mongodb/source/dialect/MongoDBDialect.class */
public class MongoDBDialect implements DataSourceDialect<MongoDBSourceConfig> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBDialect.class);
    private final Map<MongoDBSourceConfig, CollectionDiscoveryUtils.CollectionDiscoveryInfo> cache = new ConcurrentHashMap();

    public String getName() {
        return "MongoDB";
    }

    public List<TableId> discoverDataCollections(MongoDBSourceConfig mongoDBSourceConfig) {
        return (List) discoverAndCacheDataCollections(mongoDBSourceConfig).getDiscoveredCollections().stream().map(TableId::parse).collect(Collectors.toList());
    }

    public Map<TableId, TableChanges.TableChange> discoverDataCollectionSchemas(MongoDBSourceConfig mongoDBSourceConfig) {
        List<TableId> discoverDataCollections = discoverDataCollections(mongoDBSourceConfig);
        HashMap hashMap = new HashMap(discoverDataCollections.size());
        for (TableId tableId : discoverDataCollections) {
            hashMap.put(tableId, collectionSchema(tableId));
        }
        return hashMap;
    }

    private CollectionDiscoveryUtils.CollectionDiscoveryInfo discoverAndCacheDataCollections(MongoDBSourceConfig mongoDBSourceConfig) {
        return this.cache.computeIfAbsent(mongoDBSourceConfig, mongoDBSourceConfig2 -> {
            MongoClient clientFor = MongoUtils.clientFor(mongoDBSourceConfig);
            List<String> databaseNames = CollectionDiscoveryUtils.databaseNames(clientFor, CollectionDiscoveryUtils.databaseFilter(mongoDBSourceConfig.getDatabaseList()));
            return new CollectionDiscoveryUtils.CollectionDiscoveryInfo(databaseNames, CollectionDiscoveryUtils.collectionNames(clientFor, databaseNames, CollectionDiscoveryUtils.collectionsFilter(mongoDBSourceConfig.getCollectionList())));
        });
    }

    public static TableChanges.TableChange collectionSchema(TableId tableId) {
        return new TableChanges.TableChange(TableChanges.TableChangeType.CREATE, Table.editor().tableId(tableId).addColumn(Column.editor().name(MongoDBEnvelope.ID_FIELD).optional(false).create()).setPrimaryKeyNames(new String[]{MongoDBEnvelope.ID_FIELD}).create());
    }

    public ChangeStreamOffset displayCurrentOffset(MongoDBSourceConfig mongoDBSourceConfig) {
        MongoClient clientFor = MongoUtils.clientFor(mongoDBSourceConfig);
        BsonDocument latestResumeToken = MongoUtils.getLatestResumeToken(clientFor, ChangeStreamDescriptor.deployment());
        ChangeStreamOffset changeStreamOffset = latestResumeToken != null ? new ChangeStreamOffset(latestResumeToken) : new ChangeStreamOffset(MongoUtils.getCurrentClusterTime(clientFor));
        LOG.info("Current change stream offset : {}", changeStreamOffset);
        return changeStreamOffset;
    }

    public boolean isDataCollectionIdCaseSensitive(MongoDBSourceConfig mongoDBSourceConfig) {
        return true;
    }

    public ChunkSplitter createChunkSplitter(MongoDBSourceConfig mongoDBSourceConfig) {
        return new MongoDBChunkSplitter(mongoDBSourceConfig);
    }

    public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase) {
        return sourceSplitBase.isSnapshotSplit() ? new MongoDBScanFetchTask(sourceSplitBase.asSnapshotSplit()) : new MongoDBStreamFetchTask(sourceSplitBase.asStreamSplit());
    }

    public MongoDBFetchTaskContext createFetchTaskContext(SourceSplitBase sourceSplitBase, MongoDBSourceConfig mongoDBSourceConfig) {
        CollectionDiscoveryUtils.CollectionDiscoveryInfo discoverAndCacheDataCollections = discoverAndCacheDataCollections(mongoDBSourceConfig);
        return new MongoDBFetchTaskContext(this, mongoDBSourceConfig, MongoUtils.getChangeStreamDescriptor(mongoDBSourceConfig, discoverAndCacheDataCollections.getDiscoveredDatabases(), discoverAndCacheDataCollections.getDiscoveredCollections()));
    }
}
