/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StateUpdater;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskAndAction;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.slf4j.Logger;

public class DefaultStateUpdater
implements StateUpdater {
    private static final String BUG_ERROR_MESSAGE = "This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).";
    private final Time time;
    private final String name;
    private final ChangelogReader changelogReader;
    private final TopologyMetadata topologyMetadata;
    private final Queue<TaskAndAction> tasksAndActions = new LinkedList<TaskAndAction>();
    private final Lock tasksAndActionsLock = new ReentrantLock();
    private final Condition tasksAndActionsCondition = this.tasksAndActionsLock.newCondition();
    private final Queue<StreamTask> restoredActiveTasks = new LinkedList<StreamTask>();
    private final Lock restoredActiveTasksLock = new ReentrantLock();
    private final Condition restoredActiveTasksCondition = this.restoredActiveTasksLock.newCondition();
    private final BlockingQueue<StateUpdater.ExceptionAndTasks> exceptionsAndFailedTasks = new LinkedBlockingQueue<StateUpdater.ExceptionAndTasks>();
    private final BlockingQueue<Task> removedTasks = new LinkedBlockingQueue<Task>();
    private final long commitIntervalMs;
    private long lastCommitMs;
    private StateUpdaterThread stateUpdaterThread = null;
    private CountDownLatch shutdownGate;

    public DefaultStateUpdater(String name, StreamsConfig config, ChangelogReader changelogReader, TopologyMetadata topologyMetadata, Time time) {
        this.time = time;
        this.name = name;
        this.changelogReader = changelogReader;
        this.topologyMetadata = topologyMetadata;
        this.commitIntervalMs = config.getLong("commit.interval.ms");
    }

    @Override
    public void start() {
        if (this.stateUpdaterThread == null) {
            this.stateUpdaterThread = new StateUpdaterThread(this.name, this.changelogReader);
            this.stateUpdaterThread.start();
            this.shutdownGate = new CountDownLatch(1);
            this.lastCommitMs = this.time.milliseconds();
        }
    }

    @Override
    public void shutdown(Duration timeout) {
        if (this.stateUpdaterThread != null) {
            this.stateUpdaterThread.isRunning.set(false);
            this.stateUpdaterThread.interrupt();
            try {
                if (!this.shutdownGate.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
                    throw new StreamsException("State updater thread did not shutdown within the timeout");
                }
                this.stateUpdaterThread = null;
            }
            catch (InterruptedException interruptedException) {}
        } else {
            this.removeAddedTasksFromInputQueue();
        }
    }

    private void removeAddedTasksFromInputQueue() {
        this.tasksAndActionsLock.lock();
        try {
            TaskAndAction taskAndAction;
            while ((taskAndAction = this.tasksAndActions.peek()) != null) {
                if (taskAndAction.getAction() == TaskAndAction.Action.ADD) {
                    this.removedTasks.add(taskAndAction.getTask());
                }
                this.tasksAndActions.poll();
            }
        }
        finally {
            this.tasksAndActionsLock.unlock();
        }
    }

    @Override
    public void add(Task task) {
        this.verifyStateFor(task);
        this.tasksAndActionsLock.lock();
        try {
            this.tasksAndActions.add(TaskAndAction.createAddTask(task));
            this.tasksAndActionsCondition.signalAll();
        }
        finally {
            this.tasksAndActionsLock.unlock();
        }
    }

    private void verifyStateFor(Task task) {
        if (task.isActive() && task.state() != Task.State.RESTORING) {
            throw new IllegalStateException("Active task " + task.id() + " is not in state RESTORING. " + BUG_ERROR_MESSAGE);
        }
        if (!task.isActive() && task.state() != Task.State.RUNNING) {
            throw new IllegalStateException("Standby task " + task.id() + " is not in state RUNNING. " + BUG_ERROR_MESSAGE);
        }
    }

    @Override
    public void remove(TaskId taskId) {
        this.tasksAndActionsLock.lock();
        try {
            this.tasksAndActions.add(TaskAndAction.createRemoveTask(taskId));
            this.tasksAndActionsCondition.signalAll();
        }
        finally {
            this.tasksAndActionsLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Set<StreamTask> drainRestoredActiveTasks(Duration timeout) {
        long timeoutMs = timeout.toMillis();
        long startTime = this.time.milliseconds();
        long deadline = startTime + timeoutMs;
        long now = startTime;
        HashSet<StreamTask> result = new HashSet<StreamTask>();
        try {
            while (now <= deadline && result.isEmpty()) {
                this.restoredActiveTasksLock.lock();
                try {
                    while (this.restoredActiveTasks.isEmpty() && now <= deadline) {
                        boolean elapsed = this.restoredActiveTasksCondition.await(deadline - now, TimeUnit.MILLISECONDS);
                        now = this.time.milliseconds();
                    }
                    result.addAll(this.restoredActiveTasks);
                    this.restoredActiveTasks.clear();
                }
                finally {
                    this.restoredActiveTasksLock.unlock();
                }
                now = this.time.milliseconds();
            }
            return result;
        }
        catch (InterruptedException interruptedException) {
            return result;
        }
    }

    @Override
    public Set<Task> drainRemovedTasks() {
        ArrayList result = new ArrayList();
        this.removedTasks.drainTo(result);
        return new HashSet<Task>(result);
    }

    @Override
    public boolean hasRemovedTasks() {
        return !this.removedTasks.isEmpty();
    }

    @Override
    public List<StateUpdater.ExceptionAndTasks> drainExceptionsAndFailedTasks() {
        ArrayList<StateUpdater.ExceptionAndTasks> result = new ArrayList<StateUpdater.ExceptionAndTasks>();
        this.exceptionsAndFailedTasks.drainTo(result);
        return result;
    }

    @Override
    public boolean hasExceptionsAndFailedTasks() {
        return !this.exceptionsAndFailedTasks.isEmpty();
    }

    public Set<StandbyTask> getUpdatingStandbyTasks() {
        return this.stateUpdaterThread != null ? Collections.unmodifiableSet(new HashSet<StandbyTask>(this.stateUpdaterThread.getUpdatingStandbyTasks())) : Collections.emptySet();
    }

    @Override
    public Set<Task> getUpdatingTasks() {
        return this.stateUpdaterThread != null ? Collections.unmodifiableSet(new HashSet<Task>(this.stateUpdaterThread.getUpdatingTasks())) : Collections.emptySet();
    }

    public Set<StreamTask> getRestoredActiveTasks() {
        this.restoredActiveTasksLock.lock();
        try {
            Set<StreamTask> set = Collections.unmodifiableSet(new HashSet<StreamTask>(this.restoredActiveTasks));
            return set;
        }
        finally {
            this.restoredActiveTasksLock.unlock();
        }
    }

    public List<StateUpdater.ExceptionAndTasks> getExceptionsAndFailedTasks() {
        return Collections.unmodifiableList(new ArrayList<StateUpdater.ExceptionAndTasks>(this.exceptionsAndFailedTasks));
    }

    public Set<Task> getRemovedTasks() {
        return Collections.unmodifiableSet(new HashSet<Task>(this.removedTasks));
    }

    public Set<Task> getPausedTasks() {
        return this.stateUpdaterThread != null ? Collections.unmodifiableSet(new HashSet<Task>(this.stateUpdaterThread.getPausedTasks())) : Collections.emptySet();
    }

    @Override
    public Set<Task> getTasks() {
        return this.executeWithQueuesLocked(() -> this.getStreamOfTasks().map(ReadOnlyTask::new).collect(Collectors.toSet()));
    }

    @Override
    public boolean restoresActiveTasks() {
        return !this.executeWithQueuesLocked(() -> this.getStreamOfNonPausedTasks().filter(Task::isActive).collect(Collectors.toSet())).isEmpty();
    }

    public Set<StreamTask> getActiveTasks() {
        return this.executeWithQueuesLocked(() -> this.getStreamOfTasks().filter(Task::isActive).map(t -> (StreamTask)t).collect(Collectors.toSet()));
    }

    @Override
    public Set<StandbyTask> getStandbyTasks() {
        return this.executeWithQueuesLocked(() -> this.getStreamOfTasks().filter(t -> !t.isActive()).map(t -> (StandbyTask)t).collect(Collectors.toSet()));
    }

    private <T> Set<T> executeWithQueuesLocked(Supplier<Set<T>> action) {
        this.tasksAndActionsLock.lock();
        this.restoredActiveTasksLock.lock();
        try {
            Set<T> set = action.get();
            return set;
        }
        finally {
            this.restoredActiveTasksLock.unlock();
            this.tasksAndActionsLock.unlock();
        }
    }

    private Stream<Task> getStreamOfTasks() {
        return Stream.concat(this.getStreamOfNonPausedTasks(), this.getPausedTasks().stream());
    }

    private Stream<Task> getStreamOfNonPausedTasks() {
        return Stream.concat(this.tasksAndActions.stream().filter(taskAndAction -> taskAndAction.getAction() == TaskAndAction.Action.ADD).map(TaskAndAction::getTask), Stream.concat(this.getUpdatingTasks().stream(), Stream.concat(this.restoredActiveTasks.stream(), Stream.concat(this.exceptionsAndFailedTasks.stream().flatMap(exceptionAndTasks -> exceptionAndTasks.getTasks().stream()), this.removedTasks.stream()))));
    }

    private class StateUpdaterThread
    extends Thread {
        private final ChangelogReader changelogReader;
        private final AtomicBoolean isRunning;
        private final Map<TaskId, Task> updatingTasks;
        private final Map<TaskId, Task> pausedTasks;
        private final Logger log;

        public StateUpdaterThread(String name, ChangelogReader changelogReader) {
            super(name);
            this.isRunning = new AtomicBoolean(true);
            this.updatingTasks = new ConcurrentHashMap<TaskId, Task>();
            this.pausedTasks = new ConcurrentHashMap<TaskId, Task>();
            this.changelogReader = changelogReader;
            String logPrefix = String.format("state-updater [%s] ", name);
            LogContext logContext = new LogContext(logPrefix);
            this.log = logContext.logger(DefaultStateUpdater.class);
        }

        public Collection<Task> getUpdatingTasks() {
            return this.updatingTasks.values();
        }

        public Collection<StandbyTask> getUpdatingStandbyTasks() {
            return this.updatingTasks.values().stream().filter(t -> !t.isActive()).map(t -> (StandbyTask)t).collect(Collectors.toList());
        }

        private boolean onlyStandbyTasksUpdating() {
            return !this.updatingTasks.isEmpty() && this.updatingTasks.values().stream().noneMatch(Task::isActive);
        }

        public Collection<Task> getPausedTasks() {
            return this.pausedTasks.values();
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        @Override
        public void run() {
            this.log.info("State updater thread started");
            try {
                while (this.isRunning.get()) {
                    try {
                        this.runOnce();
                    }
                    catch (InterruptedException | InterruptException interruptedException) {
                        DefaultStateUpdater.this.removeAddedTasksFromInputQueue();
                        this.removeUpdatingAndPausedTasks();
                        DefaultStateUpdater.this.shutdownGate.countDown();
                        this.log.info("State updater thread shutdown");
                        return;
                    }
                }
            }
            catch (RuntimeException anyOtherException) {
                this.handleRuntimeException(anyOtherException);
                return;
            }
            finally {
                DefaultStateUpdater.this.removeAddedTasksFromInputQueue();
                this.removeUpdatingAndPausedTasks();
                DefaultStateUpdater.this.shutdownGate.countDown();
                this.log.info("State updater thread shutdown");
            }
        }

        private void runOnce() throws InterruptedException {
            this.performActionsOnTasks();
            this.restoreTasks();
            this.checkAllUpdatingTaskStates(DefaultStateUpdater.this.time.milliseconds());
            this.waitIfAllChangelogsCompletelyRead();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void performActionsOnTasks() {
            DefaultStateUpdater.this.tasksAndActionsLock.lock();
            try {
                for (TaskAndAction taskAndAction : this.getTasksAndActions()) {
                    TaskAndAction.Action action = taskAndAction.getAction();
                    switch (action) {
                        case ADD: {
                            this.addTask(taskAndAction.getTask());
                            break;
                        }
                        case REMOVE: {
                            this.removeTask(taskAndAction.getTaskId());
                        }
                    }
                }
            }
            finally {
                DefaultStateUpdater.this.tasksAndActionsLock.unlock();
            }
        }

        private void restoreTasks() {
            try {
                this.changelogReader.restore(this.updatingTasks);
            }
            catch (TaskCorruptedException taskCorruptedException) {
                this.handleTaskCorruptedException(taskCorruptedException);
            }
            catch (StreamsException streamsException) {
                this.handleStreamsException(streamsException);
            }
            Set<TopicPartition> completedChangelogs = this.changelogReader.completedChangelogs();
            List activeTasks = this.updatingTasks.values().stream().filter(Task::isActive).collect(Collectors.toList());
            for (Task task : activeTasks) {
                this.maybeCompleteRestoration((StreamTask)task, completedChangelogs);
            }
        }

        private void handleRuntimeException(RuntimeException runtimeException) {
            this.log.error("An unexpected error occurred within the state updater thread: " + runtimeException);
            this.addToExceptionsAndFailedTasksThenClearUpdatingTasks(new StateUpdater.ExceptionAndTasks(new HashSet<Task>(this.updatingTasks.values()), runtimeException));
            this.isRunning.set(false);
        }

        private void handleTaskCorruptedException(TaskCorruptedException taskCorruptedException) {
            this.log.info("Encountered task corrupted exception: ", (Throwable)((Object)taskCorruptedException));
            Set<TaskId> corruptedTaskIds = taskCorruptedException.corruptedTasks();
            HashSet<Task> corruptedTasks = new HashSet<Task>();
            for (TaskId taskId : corruptedTaskIds) {
                Task corruptedTask = this.updatingTasks.get(taskId);
                if (corruptedTask == null) {
                    throw new IllegalStateException("Task " + taskId + " is corrupted but is not updating. " + DefaultStateUpdater.BUG_ERROR_MESSAGE);
                }
                corruptedTasks.add(corruptedTask);
                this.removeCheckpointForCorruptedTask(corruptedTask);
            }
            this.addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(new StateUpdater.ExceptionAndTasks(corruptedTasks, (RuntimeException)((Object)taskCorruptedException)));
        }

        private void removeCheckpointForCorruptedTask(Task task) {
            task.markChangelogAsCorrupted(task.changelogPartitions());
            task.maybeCheckpoint(true);
        }

        private void handleStreamsException(StreamsException streamsException) {
            this.log.info("Encountered streams exception: ", (Throwable)((Object)streamsException));
            if (streamsException.taskId().isPresent()) {
                this.handleStreamsExceptionWithTask(streamsException);
            } else {
                this.handleStreamsExceptionWithoutTask(streamsException);
            }
        }

        private void handleStreamsExceptionWithTask(StreamsException streamsException) {
            TaskId failedTaskId = streamsException.taskId().get();
            if (!this.updatingTasks.containsKey(failedTaskId)) {
                throw new IllegalStateException("Task " + failedTaskId + " failed but is not updating. " + DefaultStateUpdater.BUG_ERROR_MESSAGE);
            }
            HashSet<Task> failedTask = new HashSet<Task>();
            failedTask.add(this.updatingTasks.get(failedTaskId));
            this.addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(new StateUpdater.ExceptionAndTasks(failedTask, (RuntimeException)((Object)streamsException)));
        }

        private void handleStreamsExceptionWithoutTask(StreamsException streamsException) {
            this.addToExceptionsAndFailedTasksThenClearUpdatingTasks(new StateUpdater.ExceptionAndTasks(new HashSet<Task>(this.updatingTasks.values()), (RuntimeException)((Object)streamsException)));
        }

        private void addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(StateUpdater.ExceptionAndTasks exceptionAndTasks) {
            DefaultStateUpdater.this.exceptionsAndFailedTasks.add(exceptionAndTasks);
            exceptionAndTasks.getTasks().stream().map(Task::id).forEach(this.updatingTasks::remove);
            if (exceptionAndTasks.getTasks().stream().anyMatch(Task::isActive)) {
                this.transitToUpdateStandbysIfOnlyStandbysLeft();
            }
        }

        private void addToExceptionsAndFailedTasksThenClearUpdatingTasks(StateUpdater.ExceptionAndTasks exceptionAndTasks) {
            DefaultStateUpdater.this.exceptionsAndFailedTasks.add(exceptionAndTasks);
            this.updatingTasks.clear();
        }

        private void waitIfAllChangelogsCompletelyRead() throws InterruptedException {
            if (this.isRunning.get() && this.changelogReader.allChangelogsCompleted()) {
                DefaultStateUpdater.this.tasksAndActionsLock.lock();
                try {
                    while (DefaultStateUpdater.this.tasksAndActions.isEmpty()) {
                        DefaultStateUpdater.this.tasksAndActionsCondition.await();
                    }
                }
                finally {
                    DefaultStateUpdater.this.tasksAndActionsLock.unlock();
                }
            }
        }

        private void removeUpdatingAndPausedTasks() {
            this.changelogReader.clear();
            this.updatingTasks.forEach((id, task) -> {
                task.maybeCheckpoint(true);
                DefaultStateUpdater.this.removedTasks.add(task);
            });
            this.updatingTasks.clear();
            this.pausedTasks.forEach((id, task) -> DefaultStateUpdater.this.removedTasks.add(task));
            this.pausedTasks.clear();
        }

        private List<TaskAndAction> getTasksAndActions() {
            ArrayList<TaskAndAction> tasksAndActionsToProcess = new ArrayList<TaskAndAction>(DefaultStateUpdater.this.tasksAndActions);
            DefaultStateUpdater.this.tasksAndActions.clear();
            return tasksAndActionsToProcess;
        }

        private void addTask(Task task) {
            if (this.isStateless(task)) {
                this.addToRestoredTasks((StreamTask)task);
                this.log.info("Stateless active task " + task.id() + " was added to the restored tasks of the state updater");
            } else {
                Task existingTask = this.updatingTasks.putIfAbsent(task.id(), task);
                if (existingTask != null) {
                    throw new IllegalStateException((existingTask.isActive() ? "Active" : "Standby") + " task " + task.id() + " already exist, should not try to add another " + (task.isActive() ? "active" : "standby") + " task with the same id. " + DefaultStateUpdater.BUG_ERROR_MESSAGE);
                }
                this.changelogReader.register(task.changelogPartitions(), task.stateManager());
                if (task.isActive()) {
                    this.log.info("Stateful active task " + task.id() + " was added to the state updater");
                    this.changelogReader.enforceRestoreActive();
                } else {
                    this.log.info("Standby task " + task.id() + " was added to the state updater");
                    if (this.updatingTasks.size() == 1) {
                        this.changelogReader.transitToUpdateStandby();
                    }
                }
            }
        }

        private void removeTask(TaskId taskId) {
            if (this.updatingTasks.containsKey(taskId)) {
                Task task = this.updatingTasks.get(taskId);
                task.maybeCheckpoint(true);
                Set<TopicPartition> changelogPartitions = task.changelogPartitions();
                this.changelogReader.unregister(changelogPartitions);
                DefaultStateUpdater.this.removedTasks.add(task);
                this.updatingTasks.remove(taskId);
                if (task.isActive()) {
                    this.transitToUpdateStandbysIfOnlyStandbysLeft();
                }
                this.log.info((task.isActive() ? "Active" : "Standby") + " task " + task.id() + " was removed from the updating tasks and added to the removed tasks.");
            } else if (this.pausedTasks.containsKey(taskId)) {
                Task task = this.pausedTasks.get(taskId);
                Set<TopicPartition> changelogPartitions = task.changelogPartitions();
                this.changelogReader.unregister(changelogPartitions);
                DefaultStateUpdater.this.removedTasks.add(task);
                this.pausedTasks.remove(taskId);
                this.log.info((task.isActive() ? "Active" : "Standby") + " task " + task.id() + " was removed from the paused tasks and added to the removed tasks.");
            } else {
                this.log.info("Task " + taskId + " was not removed since it is not updating or paused.");
            }
        }

        private void pauseTask(Task task) {
            TaskId taskId = task.id();
            task.maybeCheckpoint(true);
            this.pausedTasks.put(taskId, task);
            this.updatingTasks.remove(taskId);
            if (task.isActive()) {
                this.transitToUpdateStandbysIfOnlyStandbysLeft();
            }
            this.log.debug((task.isActive() ? "Active" : "Standby") + " task " + task.id() + " was paused from the updating tasks and added to the paused tasks.");
        }

        private void resumeTask(Task task) {
            TaskId taskId = task.id();
            this.updatingTasks.put(taskId, task);
            this.pausedTasks.remove(taskId);
            if (task.isActive()) {
                this.log.debug("Stateful active task " + task.id() + " was resumed to the updating tasks of the state updater");
                this.changelogReader.enforceRestoreActive();
            } else {
                this.log.debug("Standby task " + task.id() + " was resumed to the updating tasks of the state updater");
                if (this.updatingTasks.size() == 1) {
                    this.changelogReader.transitToUpdateStandby();
                }
            }
        }

        private boolean isStateless(Task task) {
            return task.changelogPartitions().isEmpty() && task.isActive();
        }

        private void maybeCompleteRestoration(StreamTask task, Set<TopicPartition> restoredChangelogs) {
            Set<TopicPartition> changelogPartitions = task.changelogPartitions();
            if (restoredChangelogs.containsAll(changelogPartitions)) {
                task.maybeCheckpoint(true);
                this.changelogReader.unregister(changelogPartitions);
                this.addToRestoredTasks(task);
                this.updatingTasks.remove(task.id());
                this.log.info("Stateful active task " + task.id() + " completed restoration");
                this.transitToUpdateStandbysIfOnlyStandbysLeft();
            }
        }

        private void transitToUpdateStandbysIfOnlyStandbysLeft() {
            if (this.onlyStandbyTasksUpdating()) {
                this.changelogReader.transitToUpdateStandby();
            }
        }

        private void addToRestoredTasks(StreamTask task) {
            DefaultStateUpdater.this.restoredActiveTasksLock.lock();
            try {
                DefaultStateUpdater.this.restoredActiveTasks.add(task);
                this.log.debug("Active task " + task.id() + " was added to the restored tasks");
                DefaultStateUpdater.this.restoredActiveTasksCondition.signalAll();
            }
            finally {
                DefaultStateUpdater.this.restoredActiveTasksLock.unlock();
            }
        }

        private void checkAllUpdatingTaskStates(long now) {
            long elapsedMsSinceLastCommit = now - DefaultStateUpdater.this.lastCommitMs;
            if (elapsedMsSinceLastCommit > DefaultStateUpdater.this.commitIntervalMs) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Checking all restoring task states since {}ms has elapsed (commit interval is {}ms)", (Object)elapsedMsSinceLastCommit, (Object)DefaultStateUpdater.this.commitIntervalMs);
                }
                for (Task task : this.updatingTasks.values()) {
                    if (DefaultStateUpdater.this.topologyMetadata.isPaused(task.id().topologyName())) {
                        this.pauseTask(task);
                        continue;
                    }
                    this.log.debug("Try to checkpoint current restoring progress for task {}", (Object)task.id());
                    task.maybeCheckpoint(false);
                }
                for (Task task : this.pausedTasks.values()) {
                    if (DefaultStateUpdater.this.topologyMetadata.isPaused(task.id().topologyName())) continue;
                    this.resumeTask(task);
                }
                DefaultStateUpdater.this.lastCommitMs = now;
            }
        }
    }
}

