package com.ververica.cdc.connectors.mongodb.source.reader;

import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
import com.ververica.cdc.connectors.mongodb.source.offset.ChangeStreamOffset;
import com.ververica.cdc.connectors.mongodb.source.utils.MongoRecordUtils;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/mongodb/source/reader/MongoDBRecordEmitter.class */
public final class MongoDBRecordEmitter<T> extends IncrementalSourceRecordEmitter<T> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBRecordEmitter.class);

    public MongoDBRecordEmitter(DebeziumDeserializationSchema<T> debeziumDeserializationSchema, SourceReaderMetrics sourceReaderMetrics, OffsetFactory offsetFactory) {
        super(debeziumDeserializationSchema, sourceReaderMetrics, false, offsetFactory);
    }

    protected void processElement(SourceRecord sourceRecord, SourceOutput<T> sourceOutput, SourceSplitState sourceSplitState) throws Exception {
        if (WatermarkEvent.isWatermarkEvent(sourceRecord)) {
            Offset offsetPosition = getOffsetPosition(sourceRecord);
            if (WatermarkEvent.isHighWatermarkEvent(sourceRecord) && sourceSplitState.isSnapshotSplitState()) {
                sourceSplitState.asSnapshotSplitState().setHighWatermark(offsetPosition);
                return;
            }
            return;
        }
        if (MongoRecordUtils.isHeartbeatEvent(sourceRecord)) {
            if (sourceSplitState.isStreamSplitState()) {
                updatePositionForStreamSplit(sourceRecord, sourceSplitState);
            }
        } else {
            if (!MongoRecordUtils.isDataChangeRecord(sourceRecord)) {
                LOG.info("Meet unknown element {}, just skip.", sourceRecord);
                return;
            }
            if (sourceSplitState.isStreamSplitState()) {
                updatePositionForStreamSplit(sourceRecord, sourceSplitState);
            }
            reportMetrics(sourceRecord);
            emitElement(sourceRecord, sourceOutput);
        }
    }

    private void updatePositionForStreamSplit(SourceRecord sourceRecord, SourceSplitState sourceSplitState) {
        BsonDocument resumeToken = MongoRecordUtils.getResumeToken(sourceRecord);
        ChangeStreamOffset changeStreamOffset = (ChangeStreamOffset) sourceSplitState.asStreamSplitState().getStartingOffset();
        if (changeStreamOffset != null) {
            changeStreamOffset.updatePosition(resumeToken);
        }
        sourceSplitState.asStreamSplitState().setStartingOffset(changeStreamOffset);
    }

    protected void reportMetrics(SourceRecord sourceRecord) {
        long currentTimeMillis = System.currentTimeMillis();
        this.sourceReaderMetrics.recordProcessTime(currentTimeMillis);
        Long messageTimestamp = MongoRecordUtils.getMessageTimestamp(sourceRecord);
        if (messageTimestamp == null || messageTimestamp.longValue() <= 0) {
            return;
        }
        Long fetchTimestamp = MongoRecordUtils.getFetchTimestamp(sourceRecord);
        if (fetchTimestamp != null && fetchTimestamp.longValue() >= messageTimestamp.longValue()) {
            this.sourceReaderMetrics.recordFetchDelay(fetchTimestamp.longValue() - messageTimestamp.longValue());
        }
        this.sourceReaderMetrics.recordEmitDelay(currentTimeMillis - messageTimestamp.longValue());
    }
}
