/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.ChannelWithBlockCount;
import org.apache.flink.runtime.operators.sort.CircularElement;
import org.apache.flink.runtime.operators.sort.ExceptionHandler;
import org.apache.flink.runtime.operators.sort.LargeRecordHandler;
import org.apache.flink.runtime.operators.sort.MergeIterator;
import org.apache.flink.runtime.operators.sort.SpillChannelManager;
import org.apache.flink.runtime.operators.sort.StageRunner;
import org.apache.flink.runtime.operators.sort.ThreadBase;
import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class SpillingThread<E>
extends ThreadBase<E> {
    private static final Logger LOG = LoggerFactory.getLogger(SpillingThread.class);
    private final MemoryManager memManager;
    private final IOManager ioManager;
    private final TypeSerializer<E> serializer;
    private final TypeComparator<E> comparator;
    private final List<MemorySegment> writeMemory;
    private final List<MemorySegment> mergeReadMemory;
    private final int maxFanIn;
    private final SpillChannelManager spillChannelManager;
    private final LargeRecordHandler<E> largeRecordHandler;
    private final SpillingBehaviour<E> spillingBehaviour;
    private volatile boolean spillingBehaviourOpened = false;
    private final int minNumWriteBuffers;
    private final int maxNumWriteBuffers;

    SpillingThread(@Nullable ExceptionHandler<IOException> exceptionHandler, StageRunner.StageMessageDispatcher<E> dispatcher, MemoryManager memManager, IOManager ioManager, TypeSerializer<E> serializer, TypeComparator<E> comparator, List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxNumFileHandles, SpillChannelManager spillingChannelManager, @Nullable LargeRecordHandler<E> largeRecordHandler, SpillingBehaviour<E> spillingBehaviour, int minNumWriteBuffers, int maxNumWriteBuffers) {
        super(exceptionHandler, "SortMerger spilling thread", dispatcher);
        this.memManager = (MemoryManager)Preconditions.checkNotNull((Object)memManager);
        this.ioManager = (IOManager)Preconditions.checkNotNull((Object)ioManager);
        this.serializer = (TypeSerializer)Preconditions.checkNotNull(serializer);
        this.comparator = (TypeComparator)Preconditions.checkNotNull(comparator);
        this.mergeReadMemory = (List)Preconditions.checkNotNull(sortReadMemory);
        this.writeMemory = (List)Preconditions.checkNotNull(writeMemory);
        this.maxFanIn = maxNumFileHandles;
        this.spillChannelManager = (SpillChannelManager)Preconditions.checkNotNull((Object)spillingChannelManager);
        this.largeRecordHandler = largeRecordHandler;
        this.spillingBehaviour = (SpillingBehaviour)Preconditions.checkNotNull(spillingBehaviour);
        this.minNumWriteBuffers = minNumWriteBuffers;
        this.maxNumWriteBuffers = maxNumWriteBuffers;
    }

    @Override
    public void go() throws IOException, InterruptedException {
        ArrayDeque<CircularElement<E>> cache = new ArrayDeque<CircularElement<E>>();
        boolean cacheOnly = this.readCache(cache);
        if (!this.isRunning()) {
            return;
        }
        MutableObjectIterator<E> largeRecords = null;
        if (cacheOnly && this.largeRecordHandler != null && this.largeRecordHandler.hasData()) {
            CircularElement circElement;
            ArrayList<MemorySegment> memoryForLargeRecordSorting = new ArrayList<MemorySegment>();
            while ((circElement = this.dispatcher.poll(StageRunner.SortStage.READ)) != null) {
                circElement.getBuffer().dispose();
                memoryForLargeRecordSorting.addAll(circElement.getMemory());
            }
            if (memoryForLargeRecordSorting.isEmpty()) {
                cacheOnly = false;
                LOG.debug("Going to disk-based merge because of large records.");
            } else {
                LOG.debug("Sorting large records, to add them to in-memory merge.");
                largeRecords = this.largeRecordHandler.finishWriteAndSortKeys(memoryForLargeRecordSorting);
            }
        }
        if (cacheOnly) {
            this.mergeInMemory(cache, largeRecords);
            return;
        }
        List<ChannelWithBlockCount> channelIDs = this.startSpilling(cache);
        this.mergeOnDisk(channelIDs);
    }

    @Override
    public void close() throws InterruptedException {
        super.close();
        if (this.spillingBehaviourOpened) {
            this.spillingBehaviour.close();
            this.spillingBehaviourOpened = false;
        }
    }

    private boolean readCache(Queue<CircularElement<E>> cache) throws InterruptedException {
        while (this.isRunning()) {
            CircularElement element = this.dispatcher.take(StageRunner.SortStage.SPILL);
            if (element == CircularElement.SPILLING_MARKER) {
                return false;
            }
            if (element == CircularElement.EOF_MARKER) {
                return true;
            }
            cache.add(element);
        }
        return false;
    }

    private void mergeOnDisk(List<ChannelWithBlockCount> channelIDs) throws IOException {
        List<Object> mergeReadMemory;
        MutableObjectIterator<E> largeRecords = null;
        if (this.largeRecordHandler != null && this.largeRecordHandler.hasData()) {
            List<MemorySegment> longRecMem;
            if (channelIDs.isEmpty()) {
                longRecMem = this.mergeReadMemory;
                mergeReadMemory = Collections.emptyList();
            } else {
                int i;
                int maxMergedStreams = Math.min(this.maxFanIn, channelIDs.size());
                int pagesPerStream = Math.max(this.minNumWriteBuffers, Math.min(this.maxNumWriteBuffers, this.mergeReadMemory.size() / 2 / maxMergedStreams));
                int totalMergeReadMemory = maxMergedStreams * pagesPerStream;
                mergeReadMemory = new ArrayList<MemorySegment>(totalMergeReadMemory);
                for (i = 0; i < totalMergeReadMemory; ++i) {
                    mergeReadMemory.add(this.mergeReadMemory.get(i));
                }
                longRecMem = new ArrayList<MemorySegment>();
                for (i = totalMergeReadMemory; i < this.mergeReadMemory.size(); ++i) {
                    longRecMem.add(this.mergeReadMemory.get(i));
                }
            }
            LOG.debug("Sorting keys for large records.");
            largeRecords = this.largeRecordHandler.finishWriteAndSortKeys(longRecMem);
        } else {
            mergeReadMemory = this.mergeReadMemory;
        }
        while (this.isRunning() && channelIDs.size() > this.maxFanIn) {
            channelIDs = this.mergeChannelList(channelIDs, mergeReadMemory, this.writeMemory);
        }
        this.memManager.release(this.writeMemory);
        this.writeMemory.clear();
        if (channelIDs.isEmpty()) {
            if (largeRecords == null) {
                this.dispatcher.sendResult(EmptyMutableObjectIterator.get());
            } else {
                this.dispatcher.sendResult(largeRecords);
            }
        } else {
            LOG.debug("Beginning final merge.");
            ArrayList<List<MemorySegment>> readBuffers = new ArrayList<List<MemorySegment>>(channelIDs.size());
            this.getSegmentsForReaders(readBuffers, mergeReadMemory, channelIDs.size());
            this.dispatcher.sendResult(this.getMergingIterator(channelIDs, readBuffers, new ArrayList<FileIOChannel>(channelIDs.size()), largeRecords));
        }
        LOG.debug("Spilling and merging thread done.");
    }

    private void mergeInMemory(Queue<CircularElement<E>> cache, MutableObjectIterator<E> largeRecords) throws IOException {
        LOG.debug("Initiating in memory merge.");
        ArrayList<MutableObjectIterator<MutableObjectIterator<E>>> iterators = new ArrayList<MutableObjectIterator<MutableObjectIterator<E>>>(cache.size() + 1);
        for (CircularElement circularElement : cache) {
            iterators.add(circularElement.getBuffer().getIterator());
        }
        if (largeRecords != null) {
            iterators.add(largeRecords);
        }
        LOG.debug("Releasing unused sort-buffer memory.");
        this.disposeSortBuffers(true);
        if (iterators.isEmpty()) {
            this.dispatcher.sendResult(EmptyMutableObjectIterator.get());
        } else if (iterators.size() == 1) {
            this.dispatcher.sendResult((MutableObjectIterator)iterators.get(0));
        } else {
            this.dispatcher.sendResult(new MergeIterator(iterators, this.comparator));
        }
    }

    private List<ChannelWithBlockCount> startSpilling(Queue<CircularElement<E>> cache) throws IOException, InterruptedException {
        FileIOChannel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
        ArrayList<ChannelWithBlockCount> channelIDs = new ArrayList<ChannelWithBlockCount>();
        this.openSpillingBehaviour();
        while (this.isRunning()) {
            CircularElement<E> element;
            CircularElement<E> circularElement = element = cache.isEmpty() ? this.dispatcher.take(StageRunner.SortStage.SPILL) : cache.poll();
            if (!this.isRunning()) {
                return Collections.emptyList();
            }
            if (element == CircularElement.EOF_MARKER) break;
            FileIOChannel.ID channel = enumerator.next();
            this.spillChannelManager.registerChannelToBeRemovedAtShutdown(channel);
            BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
            this.spillChannelManager.registerOpenChannelToBeRemovedAtShutdown(writer);
            ChannelWriterOutputView output = new ChannelWriterOutputView(writer, this.writeMemory, this.memManager.getPageSize());
            LOG.debug("Spilling buffer " + element.getId() + ".");
            this.spillingBehaviour.spillBuffer(element, output, this.largeRecordHandler);
            LOG.debug("Spilled buffer " + element.getId() + ".");
            output.close();
            this.spillChannelManager.unregisterOpenChannelToBeRemovedAtShutdown(writer);
            if (output.getBytesWritten() > 0L) {
                channelIDs.add(new ChannelWithBlockCount(channel, output.getBlockCount()));
            }
            element.getBuffer().reset();
            this.dispatcher.send(StageRunner.SortStage.READ, element);
        }
        LOG.debug("Spilling done.");
        LOG.debug("Releasing sort-buffer memory.");
        this.disposeSortBuffers(false);
        return channelIDs;
    }

    private void openSpillingBehaviour() {
        if (!this.spillingBehaviourOpened) {
            this.spillingBehaviour.open();
            this.spillingBehaviourOpened = true;
        }
    }

    private void disposeSortBuffers(boolean releaseMemory) {
        CircularElement element;
        while ((element = this.dispatcher.poll(StageRunner.SortStage.READ)) != null) {
            element.getBuffer().dispose();
            if (!releaseMemory) continue;
            this.memManager.release(element.getMemory());
        }
    }

    private MergeIterator<E> getMergingIterator(List<ChannelWithBlockCount> channelIDs, List<List<MemorySegment>> inputSegments, List<FileIOChannel> readerList, MutableObjectIterator<E> largeRecords) throws IOException {
        LOG.debug("Performing merge of {} sorted streams.", (Object)channelIDs.size());
        ArrayList<MutableObjectIterator<MutableObjectIterator<E>>> iterators = new ArrayList<MutableObjectIterator<MutableObjectIterator<E>>>(channelIDs.size() + 1);
        for (int i = 0; i < channelIDs.size(); ++i) {
            ChannelWithBlockCount channel = channelIDs.get(i);
            List<MemorySegment> segsForChannel = inputSegments.get(i);
            BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(channel.getChannel());
            readerList.add(reader);
            this.spillChannelManager.registerOpenChannelToBeRemovedAtShutdown(reader);
            this.spillChannelManager.unregisterChannelToBeRemovedAtShutdown(channel.getChannel());
            ChannelReaderInputView inView = new ChannelReaderInputView(reader, segsForChannel, channel.getBlockCount(), false);
            iterators.add(new ChannelReaderInputViewIterator<E>(inView, null, this.serializer));
        }
        if (largeRecords != null) {
            iterators.add(largeRecords);
        }
        return new MergeIterator(iterators, this.comparator);
    }

    private List<ChannelWithBlockCount> mergeChannelList(List<ChannelWithBlockCount> channelIDs, List<MemorySegment> allReadBuffers, List<MemorySegment> writeBuffers) throws IOException {
        double scale = Math.ceil(Math.log(channelIDs.size()) / Math.log(this.maxFanIn)) - 1.0;
        int numStart = channelIDs.size();
        int numEnd = (int)Math.pow(this.maxFanIn, scale);
        int numMerges = (int)Math.ceil((double)(numStart - numEnd) / (double)(this.maxFanIn - 1));
        int numNotMerged = numEnd - numMerges;
        int numToMerge = numStart - numNotMerged;
        ArrayList<ChannelWithBlockCount> mergedChannelIDs = new ArrayList<ChannelWithBlockCount>(numEnd);
        mergedChannelIDs.addAll(channelIDs.subList(0, numNotMerged));
        int channelsToMergePerStep = (int)Math.ceil((double)numToMerge / (double)numMerges);
        ArrayList<List<MemorySegment>> readBuffers = new ArrayList<List<MemorySegment>>(channelsToMergePerStep);
        this.getSegmentsForReaders(readBuffers, allReadBuffers, channelsToMergePerStep);
        ArrayList<ChannelWithBlockCount> channelsToMergeThisStep = new ArrayList<ChannelWithBlockCount>(channelsToMergePerStep);
        int channelNum = numNotMerged;
        while (this.isRunning() && channelNum < channelIDs.size()) {
            channelsToMergeThisStep.clear();
            for (int i = 0; i < channelsToMergePerStep && channelNum < channelIDs.size(); ++i, ++channelNum) {
                channelsToMergeThisStep.add(channelIDs.get(channelNum));
            }
            mergedChannelIDs.add(this.mergeChannels(channelsToMergeThisStep, readBuffers, writeBuffers));
        }
        return mergedChannelIDs;
    }

    private ChannelWithBlockCount mergeChannels(List<ChannelWithBlockCount> channelIDs, List<List<MemorySegment>> readBuffers, List<MemorySegment> writeBuffers) throws IOException {
        ArrayList<FileIOChannel> channelAccesses = new ArrayList<FileIOChannel>(channelIDs.size());
        MergeIterator<E> mergeIterator = this.getMergingIterator(channelIDs, readBuffers, channelAccesses, null);
        FileIOChannel.ID mergedChannelID = this.ioManager.createChannel();
        this.spillChannelManager.registerChannelToBeRemovedAtShutdown(mergedChannelID);
        BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(mergedChannelID);
        this.spillChannelManager.registerOpenChannelToBeRemovedAtShutdown(writer);
        ChannelWriterOutputView output = new ChannelWriterOutputView(writer, writeBuffers, this.memManager.getPageSize());
        this.openSpillingBehaviour();
        this.spillingBehaviour.mergeRecords(mergeIterator, output);
        output.close();
        int numBlocksWritten = output.getBlockCount();
        this.spillChannelManager.unregisterOpenChannelToBeRemovedAtShutdown(writer);
        for (FileIOChannel access : channelAccesses) {
            access.closeAndDelete();
            this.spillChannelManager.unregisterOpenChannelToBeRemovedAtShutdown(access);
        }
        return new ChannelWithBlockCount(mergedChannelID, numBlocksWritten);
    }

    private void getSegmentsForReaders(List<List<MemorySegment>> target, List<MemorySegment> memory, int numChannels) {
        int k;
        ArrayList<MemorySegment> segs;
        int i;
        int numBuffers = memory.size();
        int buffersPerChannelLowerBound = numBuffers / numChannels;
        int numChannelsWithOneMore = numBuffers % numChannels;
        Iterator<MemorySegment> segments = memory.iterator();
        for (i = 0; i < numChannelsWithOneMore; ++i) {
            segs = new ArrayList<MemorySegment>(buffersPerChannelLowerBound + 1);
            target.add(segs);
            for (k = buffersPerChannelLowerBound; k >= 0; --k) {
                segs.add(segments.next());
            }
        }
        for (i = numChannelsWithOneMore; i < numChannels; ++i) {
            segs = new ArrayList(buffersPerChannelLowerBound);
            target.add(segs);
            for (k = buffersPerChannelLowerBound; k > 0; --k) {
                segs.add(segments.next());
            }
        }
    }

    static interface SpillingBehaviour<E> {
        default public void open() {
        }

        default public void close() {
        }

        public void spillBuffer(CircularElement<E> var1, ChannelWriterOutputView var2, LargeRecordHandler<E> var3) throws IOException;

        public void mergeRecords(MergeIterator<E> var1, ChannelWriterOutputView var2) throws IOException;
    }
}

