/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.common;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
import io.debezium.util.Metronome;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseSourceTask
extends SourceTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(BaseSourceTask.class);
    private final AtomicReference<State> state = new AtomicReference<State>(State.STOPPED);
    private final ReentrantLock stateLock = new ReentrantLock();
    private volatile ElapsedTimeStrategy restartDelay;
    private volatile Map<String, String> props;
    private ChangeEventSourceCoordinator coordinator;
    private volatile Map<String, ?> lastOffset;
    private Duration retriableRestartWait;

    public final void start(Map<String, String> props) {
        if (this.context == null) {
            throw new ConnectException("Unexpected null context");
        }
        this.stateLock.lock();
        try {
            if (!this.state.compareAndSet(State.STOPPED, State.RUNNING)) {
                LOGGER.info("Connector has already been started");
                return;
            }
            this.props = props;
            Configuration config = Configuration.from(props);
            this.retriableRestartWait = config.getDuration(CommonConnectorConfig.RETRIABLE_RESTART_WAIT, ChronoUnit.MILLIS);
            this.restartDelay = null;
            if (!config.validateAndRecord(this.getAllConfigurationFields(), arg_0 -> ((Logger)LOGGER).error(arg_0))) {
                throw new ConnectException("Error configuring an instance of " + ((Object)((Object)this)).getClass().getSimpleName() + "; check the logs for details");
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Starting {} with configuration:", (Object)((Object)((Object)this)).getClass().getSimpleName());
                config.withMaskedPasswords().forEach((propName, propValue) -> LOGGER.info("   {} = {}", propName, propValue));
            }
            this.coordinator = this.start(config);
        }
        finally {
            this.stateLock.unlock();
        }
    }

    protected abstract ChangeEventSourceCoordinator start(Configuration var1);

    public final List<SourceRecord> poll() throws InterruptedException {
        boolean started = this.startIfNeededAndPossible();
        if (!started) {
            Metronome.parker(Duration.of(2L, ChronoUnit.SECONDS), Clock.SYSTEM).pause();
            return Collections.emptyList();
        }
        try {
            return this.doPoll();
        }
        catch (RetriableException e) {
            this.stop(true);
            throw e;
        }
    }

    protected abstract List<SourceRecord> doPoll() throws InterruptedException;

    private boolean startIfNeededAndPossible() {
        this.stateLock.lock();
        try {
            if (this.state.get() == State.RUNNING) {
                boolean bl = true;
                return bl;
            }
            if (this.restartDelay != null && this.restartDelay.hasElapsed()) {
                this.start(this.props);
                boolean bl = true;
                return bl;
            }
            LOGGER.info("Awaiting end of restart backoff period after a retriable error");
            boolean bl = false;
            return bl;
        }
        finally {
            this.stateLock.unlock();
        }
    }

    public final void stop() {
        this.stop(false);
    }

    private void stop(boolean restart) {
        this.stateLock.lock();
        try {
            if (!this.state.compareAndSet(State.RUNNING, State.STOPPED)) {
                LOGGER.info("Connector has already been stopped");
                return;
            }
            if (restart) {
                LOGGER.warn("Going to restart connector after {} sec. after a retriable exception", (Object)this.retriableRestartWait.getSeconds());
            } else {
                LOGGER.info("Stopping down connector");
            }
            try {
                if (this.coordinator != null) {
                    this.coordinator.stop();
                }
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                LOGGER.error("Interrupted while stopping coordinator", (Throwable)e);
                throw new ConnectException("Interrupted while stopping coordinator, failing the task");
            }
            this.doStop();
            if (restart && this.restartDelay == null) {
                this.restartDelay = ElapsedTimeStrategy.constant(Clock.system(), this.retriableRestartWait.toMillis());
                this.restartDelay.hasElapsed();
            }
        }
        finally {
            this.stateLock.unlock();
        }
    }

    protected abstract void doStop();

    public void commitRecord(SourceRecord record) throws InterruptedException {
        Map currentOffset = record.sourceOffset();
        if (currentOffset != null) {
            this.lastOffset = currentOffset;
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void commit() throws InterruptedException {
        boolean locked = this.stateLock.tryLock();
        if (locked) {
            try {
                if (this.coordinator == null || this.lastOffset == null) return;
                this.coordinator.commitOffset(this.lastOffset);
                return;
            }
            finally {
                this.stateLock.unlock();
            }
        } else {
            LOGGER.warn("Couldn't commit processed log positions with the source database due to a concurrent connector shutdown or restart");
        }
    }

    protected abstract Iterable<Field> getAllConfigurationFields();

    protected OffsetContext getPreviousOffset(OffsetContext.Loader loader) {
        Map<String, ?> partition = loader.getPartition();
        if (this.lastOffset != null) {
            OffsetContext offsetContext = loader.load(this.lastOffset);
            LOGGER.info("Found previous offset after restart {}", (Object)offsetContext);
            return offsetContext;
        }
        Map previousOffset = (Map)this.context.offsetStorageReader().offsets(Collections.singleton(partition)).get(partition);
        if (previousOffset != null) {
            OffsetContext offsetContext = loader.load(previousOffset);
            LOGGER.info("Found previous offset {}", (Object)offsetContext);
            return offsetContext;
        }
        return null;
    }

    protected static enum State {
        RUNNING,
        STOPPED;

    }
}

