/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.BufferManager;
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class RecoveredInputChannel
extends InputChannel
implements ChannelStateHolder {
    private static final Logger LOG = LoggerFactory.getLogger(RecoveredInputChannel.class);
    private final ArrayDeque<Buffer> receivedBuffers = new ArrayDeque();
    private final CompletableFuture<?> stateConsumedFuture = new CompletableFuture();
    protected final BufferManager bufferManager;
    @GuardedBy(value="receivedBuffers")
    private boolean isReleased;
    protected ChannelStateWriter channelStateWriter;
    private int sequenceNumber = Integer.MIN_VALUE;
    protected final int networkBuffersPerChannel;
    private boolean exclusiveBuffersAssigned;
    private long lastStoppedCheckpointId = -1L;

    RecoveredInputChannel(SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId, int initialBackoff, int maxBackoff, Counter numBytesIn, Counter numBuffersIn, int networkBuffersPerChannel) {
        super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, numBytesIn, numBuffersIn);
        this.bufferManager = new BufferManager(inputGate.getMemorySegmentProvider(), this, 0);
        this.networkBuffersPerChannel = networkBuffersPerChannel;
    }

    @Override
    public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
        Preconditions.checkState((this.channelStateWriter == null ? 1 : 0) != 0, (Object)"Already initialized");
        this.channelStateWriter = (ChannelStateWriter)Preconditions.checkNotNull((Object)channelStateWriter);
    }

    public final InputChannel toInputChannel() throws IOException {
        Preconditions.checkState((boolean)this.stateConsumedFuture.isDone(), (Object)"recovered state is not fully consumed");
        InputChannel inputChannel = this.toInputChannelInternal();
        inputChannel.checkpointStopped(this.lastStoppedCheckpointId);
        return inputChannel;
    }

    @Override
    public void checkpointStopped(long checkpointId) {
        this.lastStoppedCheckpointId = checkpointId;
    }

    protected abstract InputChannel toInputChannelInternal() throws IOException;

    CompletableFuture<?> getStateConsumedFuture() {
        return this.stateConsumedFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onRecoveredStateBuffer(Buffer buffer) {
        boolean recycleBuffer = true;
        NetworkActionsLogger.traceRecover("InputChannelRecoveredStateHandler#recover", buffer, this.inputGate.getOwningTaskName(), this.channelInfo);
        try {
            boolean wasEmpty;
            ArrayDeque<Buffer> arrayDeque = this.receivedBuffers;
            synchronized (arrayDeque) {
                if (this.isReleased) {
                    wasEmpty = false;
                } else {
                    wasEmpty = this.receivedBuffers.isEmpty();
                    this.receivedBuffers.add(buffer);
                    recycleBuffer = false;
                }
            }
            if (wasEmpty) {
                this.notifyChannelNonEmpty();
            }
        }
        finally {
            if (recycleBuffer) {
                buffer.recycleBuffer();
            }
        }
    }

    public void finishReadRecoveredState() throws IOException {
        this.onRecoveredStateBuffer(EventSerializer.toBuffer(EndOfChannelStateEvent.INSTANCE, false));
        this.bufferManager.releaseFloatingBuffers();
        LOG.debug("{}/{} finished recovering input.", (Object)this.inputGate.getOwningTaskName(), (Object)this.channelInfo);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private InputChannel.BufferAndAvailability getNextRecoveredStateBuffer() throws IOException {
        Buffer.DataType nextDataType;
        Buffer next;
        ArrayDeque<Buffer> arrayDeque = this.receivedBuffers;
        synchronized (arrayDeque) {
            Preconditions.checkState((!this.isReleased ? 1 : 0) != 0, (Object)"Trying to read from released RecoveredInputChannel");
            next = this.receivedBuffers.poll();
            nextDataType = this.peekDataTypeUnsafe();
        }
        if (next == null) {
            return null;
        }
        if (this.isEndOfChannelStateEvent(next)) {
            this.stateConsumedFuture.complete(null);
            return null;
        }
        return new InputChannel.BufferAndAvailability(next, nextDataType, 0, this.sequenceNumber++);
    }

    private boolean isEndOfChannelStateEvent(Buffer buffer) throws IOException {
        if (buffer.isBuffer()) {
            return false;
        }
        AbstractEvent event = EventSerializer.fromBuffer(buffer, this.getClass().getClassLoader());
        buffer.setReaderIndex(0);
        return event.getClass() == EndOfChannelStateEvent.class;
    }

    @Override
    Optional<InputChannel.BufferAndAvailability> getNextBuffer() throws IOException {
        this.checkError();
        return Optional.ofNullable(this.getNextRecoveredStateBuffer());
    }

    private Buffer.DataType peekDataTypeUnsafe() {
        assert (Thread.holdsLock(this.receivedBuffers));
        Buffer first = this.receivedBuffers.peek();
        return first != null ? first.getDataType() : Buffer.DataType.NONE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    int getBuffersInUseCount() {
        ArrayDeque<Buffer> arrayDeque = this.receivedBuffers;
        synchronized (arrayDeque) {
            return this.receivedBuffers.size();
        }
    }

    @Override
    public void resumeConsumption() {
        throw new UnsupportedOperationException("RecoveredInputChannel should never be blocked.");
    }

    @Override
    public void acknowledgeAllRecordsProcessed() throws IOException {
        throw new UnsupportedOperationException("RecoveredInputChannel should not need acknowledge all records processed.");
    }

    @Override
    final void requestSubpartition(int subpartitionIndex) {
        throw new UnsupportedOperationException("RecoveredInputChannel should never request partition.");
    }

    @Override
    void sendTaskEvent(TaskEvent event) {
        throw new UnsupportedOperationException("RecoveredInputChannel should never send any task events.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    boolean isReleased() {
        ArrayDeque<Buffer> arrayDeque = this.receivedBuffers;
        synchronized (arrayDeque) {
            return this.isReleased;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void releaseAllResources() throws IOException {
        ArrayDeque<Buffer> releasedBuffers = new ArrayDeque<Buffer>();
        boolean shouldRelease = false;
        ArrayDeque<Buffer> arrayDeque = this.receivedBuffers;
        synchronized (arrayDeque) {
            if (!this.isReleased) {
                this.isReleased = true;
                shouldRelease = true;
                releasedBuffers.addAll(this.receivedBuffers);
                this.receivedBuffers.clear();
            }
        }
        if (shouldRelease) {
            this.bufferManager.releaseAllBuffers(releasedBuffers);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    protected int getNumberOfQueuedBuffers() {
        ArrayDeque<Buffer> arrayDeque = this.receivedBuffers;
        synchronized (arrayDeque) {
            return this.receivedBuffers.size();
        }
    }

    public Buffer requestBufferBlocking() throws InterruptedException, IOException {
        if (!this.exclusiveBuffersAssigned) {
            this.bufferManager.requestExclusiveBuffers(this.networkBuffersPerChannel);
            this.exclusiveBuffersAssigned = true;
        }
        return this.bufferManager.requestBufferBlocking();
    }

    @Override
    public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException {
        throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY);
    }

    @Override
    void announceBufferSize(int newBufferSize) {
    }
}

