/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.plugins.index.lucene.writer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.plugins.index.ConfigHelper;
import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
import org.apache.lucene.index.IndexableField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IndexWriterPool {
    private static final Logger LOG = LoggerFactory.getLogger(IndexWriterPool.class);
    public static final String OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE = "oak.indexer.parallelWriter.maxBatchSize";
    public static final int DEFAULT_OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE = 256;
    public static final String OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE = "oak.indexer.parallelWriter.queueSize";
    public static final int DEFAULT_OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE = 64;
    public static final String OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS = "oak.indexer.parallelWriter.numberThreads";
    public static final int DEFAULT_OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS = 4;
    private final int maxBatchSize = ConfigHelper.getSystemPropertyAsInt((String)"oak.indexer.parallelWriter.maxBatchSize", (int)256);
    private final int queueSize = ConfigHelper.getSystemPropertyAsInt((String)"oak.indexer.parallelWriter.queueSize", (int)64);
    private final int numberOfThreads = ConfigHelper.getSystemPropertyAsInt((String)"oak.indexer.parallelWriter.numberThreads", (int)4);
    private final ArrayList<Operation> batch = new ArrayList(this.maxBatchSize);
    private final BlockingQueue<OperationBatch> queue = new ArrayBlockingQueue<OperationBatch>(this.queueSize);
    private final List<Worker> workers;
    private final List<Future<?>> workerFutures;
    private final ExecutorService writersPool;
    private final ScheduledExecutorService monitorTaskExecutor;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Object pendingBatchesLock = new Object();
    private final HashSet<Long> pendingBatches = new HashSet();
    private long batchSequenceNumber = 0L;
    private final long startTimeNanos = System.nanoTime();
    private long updateCount = 0L;
    private long deleteCount = 0L;
    private long totalEnqueueTimeNanos = 0L;
    private long enqueuingDelayMessageLastLoggedMillis = 0L;
    static final OperationBatch SHUTDOWN = new OperationBatch(-1L, new Operation[0]);

    public IndexWriterPool() {
        this.writersPool = Executors.newFixedThreadPool(this.numberOfThreads, (ThreadFactory)BasicThreadFactory.builder().daemon().build());
        this.monitorTaskExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)BasicThreadFactory.builder().daemon().namingPattern("index-writer-monitor").build());
        this.workers = IntStream.range(0, this.numberOfThreads).mapToObj(x$0 -> new Worker(x$0)).collect(Collectors.toList());
        this.workerFutures = this.workers.stream().map(this.writersPool::submit).collect(Collectors.toList());
        this.monitorTaskExecutor.scheduleAtFixedRate(this::printStatistics, 60L, 60L, TimeUnit.SECONDS);
        LOG.info("Writing thread started");
    }

    public void updateDocument(LuceneIndexWriter writer, String path, Iterable<? extends IndexableField> doc) throws IOException {
        this.checkOpen();
        ++this.updateCount;
        this.enqueueOperation(new UpdateOperation(writer, path, doc));
    }

    public void deleteDocuments(LuceneIndexWriter writer, String path) throws IOException {
        this.checkOpen();
        ++this.deleteCount;
        this.enqueueOperation(new DeleteOperation(writer, path));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean closeWriter(LuceneIndexWriter writer, long timestamp) throws IOException {
        this.checkOpen();
        try {
            LOG.debug("Closing writer: {}", (Object)writer);
            long seqNumber = this.flushBatch();
            LOG.debug("All pending operations enqueued. Waiting until all batches up to {} are processed", (Object)seqNumber);
            Object object = this.pendingBatchesLock;
            synchronized (object) {
                while (true) {
                    Long earliestPending = this.pendingBatches.isEmpty() ? null : (Long)this.pendingBatches.stream().min(Long::compareTo).get();
                    LOG.debug("Earliest pending batch: {}. Waiting until all batches up to {} are processed", (Object)earliestPending, (Object)seqNumber);
                    if (earliestPending == null || earliestPending > seqNumber) break;
                    this.pendingBatchesLock.wait();
                }
            }
            LOG.debug("All batches up to {} processed. Enqueuing close operation for writer {}", (Object)seqNumber, (Object)writer);
            SynchronousQueue<CloseResult> closeOpSync = new SynchronousQueue<CloseResult>();
            this.batch.add(new CloseWriterOperation(writer, timestamp, closeOpSync));
            this.flushBatch();
            CloseResult res = closeOpSync.take();
            LOG.debug("Writer {} closed. Result: {}", (Object)writer, (Object)res);
            if (res.error == null) {
                return res.result;
            }
            throw new IOException("Error while closing writer", res.error);
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for the worker to finish", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.flushBatch();
            this.queue.add(SHUTDOWN);
            LOG.info("Shutting down PipelinedIndexWriter. Total enqueue time: {} ms", (Object)(this.totalEnqueueTimeNanos / 1000000L));
            for (Future<?> f : this.workerFutures) {
                LOG.info("Waiting for future: {}", f);
                try {
                    f.get();
                }
                catch (InterruptedException | ExecutionException e) {
                    LOG.info("Error while waiting for future", (Throwable)e);
                }
            }
            this.printStatistics();
            new ExecutorCloser(this.writersPool, 1, TimeUnit.SECONDS).close();
            new ExecutorCloser((ExecutorService)this.monitorTaskExecutor, 1, TimeUnit.SECONDS).close();
        } else {
            LOG.warn("PipelinedIndexWriter already closed");
        }
    }

    private void enqueueOperation(Operation op) {
        this.batch.add(op);
        if (this.batch.size() == this.maxBatchSize) {
            this.flushBatch();
        }
    }

    private void checkOpen() {
        if (this.closed.get()) {
            throw new IllegalStateException("PipelinedIndexWriter is closed");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long flushBatch() {
        try {
            long currentTimeMillis;
            long seqNumber;
            Object object = this.pendingBatchesLock;
            synchronized (object) {
                seqNumber = this.batchSequenceNumber++;
                this.pendingBatches.add(seqNumber);
            }
            if (seqNumber % 1000L == 0L) {
                LOG.info("Enqueuing batch {}, size: {}", (Object)seqNumber, (Object)this.batch.size());
            }
            long enqueuingStartNanos = System.nanoTime();
            this.queue.put(new OperationBatch(seqNumber, this.batch.toArray(new Operation[0])));
            long enqueuingEndNanos = System.nanoTime();
            long durationNanos = enqueuingEndNanos - enqueuingStartNanos;
            this.totalEnqueueTimeNanos += durationNanos;
            long durationMillis = durationNanos / 1000000L;
            if (durationMillis > 10L && (currentTimeMillis = enqueuingEndNanos / 1000000L) - this.enqueuingDelayMessageLastLoggedMillis > TimeUnit.SECONDS.toMillis(10L)) {
                LOG.info("Enqueuing batch delayed. Seq number: {}, size: {}. Delay: {} ms (These messages are logged every 10 seconds)", new Object[]{seqNumber, this.batch.size(), durationMillis});
                this.enqueuingDelayMessageLastLoggedMillis = currentTimeMillis;
            }
            this.batch.clear();
            return seqNumber;
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting to put an update operation in the queue", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private void printStatistics() {
        double percentageEnqueueTime = FormattingUtils.safeComputePercentage((long)this.totalEnqueueTimeNanos, (long)(System.nanoTime() - this.startTimeNanos));
        String percentageEnqueueTimeStr = String.format("%.2f", percentageEnqueueTime);
        LOG.info("updateCount: {}, deleteCount: {}, batchesEnqueuedCount: {}, pendingBatchesCount: {},  enqueueTime: {} ms ({}%)", new Object[]{this.updateCount, this.deleteCount, this.batchSequenceNumber, this.pendingBatches.size(), this.totalEnqueueTimeNanos / 1000000L, percentageEnqueueTimeStr});
        this.workers.forEach(Worker::printStatistics);
    }

    private class Worker
    implements Runnable {
        private final long startTime = System.nanoTime();
        private final int id;
        private long opCount = 0L;
        private long batchesProcessed = 0L;
        private long totalBusyTime = 0L;

        public Worker(int id) {
            this.id = id;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            String oldName = Thread.currentThread().getName();
            Thread.currentThread().setName("index-writer-" + this.id);
            try {
                LOG.info("[{}] Worker started", (Object)this.id);
                while (true) {
                    OperationBatch op;
                    if ((op = IndexWriterPool.this.queue.take()) == SHUTDOWN) {
                        IndexWriterPool.this.queue.add(SHUTDOWN);
                        LOG.info("[{}] Shutting down worker", (Object)this.id);
                        return;
                    }
                    long start = System.nanoTime();
                    Operation[] operationArray = op.operations;
                    int n = operationArray.length;
                    for (int i = 0; i < n; ++this.opCount, ++i) {
                        Operation operation = operationArray[i];
                        operation.execute();
                    }
                    ++this.batchesProcessed;
                    long durationNanos = System.nanoTime() - start;
                    this.totalBusyTime += durationNanos;
                    long durationMillis = durationNanos / 1000000L;
                    if (durationMillis > 1000L) {
                        LOG.info("[{}] Processing batch {} of size {} took {} millis.", new Object[]{this.id, op.sequenceNumber, op.operations.length, durationMillis});
                    }
                    Object object = IndexWriterPool.this.pendingBatchesLock;
                    synchronized (object) {
                        IndexWriterPool.this.pendingBatches.remove(op.sequenceNumber);
                        IndexWriterPool.this.pendingBatchesLock.notifyAll();
                    }
                    continue;
                    break;
                }
            }
            catch (InterruptedException e) {
                LOG.warn("[{}] Interrupted while waiting for an index write operation", (Object)this.id, (Object)e);
                throw new RuntimeException(e);
            }
            catch (Throwable t) {
                LOG.error("[{}] Error while processing an index write operation", (Object)this.id, (Object)t);
                throw new RuntimeException(t);
            }
            finally {
                Thread.currentThread().setName(oldName);
            }
        }

        void printStatistics() {
            double busyTimePercentage = FormattingUtils.safeComputePercentage((long)this.totalBusyTime, (long)(System.nanoTime() - this.startTime));
            String busyTimePercentageStr = String.format("%.2f", busyTimePercentage);
            LOG.info("[{}] operationsProcessed: {}, batchesProcessed: {}, busyTime: {} ms ({}%)", new Object[]{this.id, this.opCount, this.batchesProcessed, this.totalBusyTime / 1000000L, busyTimePercentageStr});
        }
    }

    private static class CloseWriterOperation
    extends Operation {
        private final long timestamp;
        private final SynchronousQueue<CloseResult> sync;

        CloseWriterOperation(LuceneIndexWriter delegate, long timestamp, SynchronousQueue<CloseResult> sync) {
            super(delegate);
            this.timestamp = timestamp;
            this.sync = sync;
        }

        @Override
        public void execute() {
            try {
                try {
                    boolean closeResult = this.delegate.close(this.timestamp);
                    this.sync.put(new CloseResult(closeResult));
                }
                catch (IOException e) {
                    this.sync.put(new CloseResult(e));
                }
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static class CloseResult {
        final Boolean result;
        final Throwable error;

        CloseResult(boolean result) {
            this.result = result;
            this.error = null;
        }

        CloseResult(Throwable error) {
            this.result = null;
            this.error = error;
        }

        public String toString() {
            return "CloseResult{result=" + this.result + ", error=" + String.valueOf(this.error) + "}";
        }
    }

    private static class DeleteOperation
    extends Operation {
        private final String path;

        DeleteOperation(LuceneIndexWriter delegate, String path) {
            super(delegate);
            this.path = path;
        }

        @Override
        public void execute() throws IOException {
            this.delegate.deleteDocuments(this.path);
        }
    }

    private static class UpdateOperation
    extends Operation {
        private final String path;
        private final Iterable<? extends IndexableField> doc;

        UpdateOperation(LuceneIndexWriter delegate, String path, Iterable<? extends IndexableField> doc) {
            super(delegate);
            this.path = path;
            this.doc = doc;
        }

        @Override
        public void execute() throws IOException {
            this.delegate.updateDocument(this.path, this.doc);
        }
    }

    private static abstract class Operation {
        final LuceneIndexWriter delegate;

        public Operation(LuceneIndexWriter delegate) {
            this.delegate = delegate;
        }

        abstract void execute() throws IOException;
    }

    private static class OperationBatch {
        final long sequenceNumber;
        final Operation[] operations;

        public OperationBatch(long sequenceNumber, Operation[] operations) {
            Objects.requireNonNull(operations);
            this.sequenceNumber = sequenceNumber;
            this.operations = operations;
        }
    }
}

