/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.plugins.blob;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.jackrabbit.core.data.util.NamedThreadFactory;
import org.apache.jackrabbit.guava.common.cache.Weigher;
import org.apache.jackrabbit.guava.common.util.concurrent.FutureCallback;
import org.apache.jackrabbit.guava.common.util.concurrent.Futures;
import org.apache.jackrabbit.guava.common.util.concurrent.ListenableFuture;
import org.apache.jackrabbit.guava.common.util.concurrent.ListeningExecutorService;
import org.apache.jackrabbit.guava.common.util.concurrent.MoreExecutors;
import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.commons.StringUtils;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheStatsMBean;
import org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheUpgradeUtils;
import org.apache.jackrabbit.oak.plugins.blob.DataStoreCacheUtils;
import org.apache.jackrabbit.oak.plugins.blob.FileCache;
import org.apache.jackrabbit.oak.plugins.blob.SameThreadExecutorService;
import org.apache.jackrabbit.oak.plugins.blob.StagingCacheStats;
import org.apache.jackrabbit.oak.plugins.blob.StagingUploader;
import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.apache.jackrabbit.oak.stats.TimerStats;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UploadStagingCache
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(UploadStagingCache.class);
    protected static final String UPLOAD_STAGING_DIR = "upload";
    private final Weigher<String, File> memWeigher = new Weigher<String, File>(){

        public int weigh(String key, File value) {
            return StringUtils.estimateMemoryUsage((String)key) + StringUtils.estimateMemoryUsage((String)value.getAbsolutePath()) + 48;
        }
    };
    private long size;
    private AtomicLong currentSize;
    private ListeningExecutorService executor;
    private ScheduledExecutorService scheduledExecutor;
    private ConcurrentMap<String, File> map;
    private ConcurrentMap<String, File> attic;
    private File uploadCacheSpace;
    private StagingUploader uploader;
    private StagingCacheStats cacheStats;
    @Nullable
    private FileCache downloadCache;
    private ScheduledExecutorService statsExecutor;
    private LinkedBlockingQueue<String> retryQueue;

    private UploadStagingCache(File dir, File home, int uploadThreads, long size, StagingUploader uploader, @Nullable FileCache cache, StatisticsProvider statisticsProvider, @Nullable ListeningExecutorService executor, @Nullable ScheduledExecutorService scheduledExecutor, int purgeInterval, int retryInterval) {
        this.currentSize = new AtomicLong();
        this.size = size;
        this.executor = executor;
        if (executor == null) {
            this.executor = MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(uploadThreads, (ThreadFactory)new NamedThreadFactory("oak-ds-async-upload-thread")));
        }
        this.scheduledExecutor = scheduledExecutor;
        if (scheduledExecutor == null) {
            this.scheduledExecutor = Executors.newScheduledThreadPool(2, (ThreadFactory)new NamedThreadFactory("oak-ds-cache-scheduled-thread"));
        }
        this.map = new ConcurrentHashMap<String, File>();
        this.attic = new ConcurrentHashMap<String, File>();
        this.retryQueue = new LinkedBlockingQueue();
        this.uploadCacheSpace = new File(dir, UPLOAD_STAGING_DIR);
        this.uploader = uploader;
        if (statisticsProvider == null) {
            this.statsExecutor = Executors.newSingleThreadScheduledExecutor();
            statisticsProvider = new DefaultStatisticsProvider(this.statsExecutor);
        }
        this.cacheStats = new StagingCacheStats(this, statisticsProvider, size);
        this.downloadCache = cache;
        this.build(home, dir);
        this.scheduledExecutor.scheduleAtFixedRate(new RemoveJob(), purgeInterval, purgeInterval, TimeUnit.SECONDS);
        this.scheduledExecutor.scheduleAtFixedRate(new RetryJob(), retryInterval, retryInterval, TimeUnit.SECONDS);
    }

    private UploadStagingCache() {
    }

    public static UploadStagingCache build(File dir, File home, int uploadThreads, long size, StagingUploader uploader, @Nullable FileCache cache, StatisticsProvider statisticsProvider, @Nullable ListeningExecutorService executor, @Nullable ScheduledExecutorService scheduledExecutor, int purgeInterval, int retryInterval) {
        if (size > 0L) {
            return new UploadStagingCache(dir, home, uploadThreads, size, uploader, cache, statisticsProvider, executor, scheduledExecutor, purgeInterval, retryInterval);
        }
        return new UploadStagingCache(){

            @Override
            public Optional<CompletableFuture<Integer>> put(String id, File input) {
                return Optional.empty();
            }

            @Override
            protected void invalidate(String key) {
            }

            @Override
            protected Iterator<String> getAllIdentifiers() {
                return Collections.emptyIterator();
            }

            @Override
            @Nullable
            public File getIfPresent(String key) {
                return null;
            }

            @Override
            public DataStoreCacheStatsMBean getStats() {
                return new StagingCacheStats(this, StatisticsProvider.NOOP, 0L);
            }

            @Override
            public void close() {
            }
        };
    }

    private void build(File home, File rootPath) {
        List files;
        LOG.info("Scheduling pending uploads");
        DataStoreCacheUpgradeUtils.movePendingUploadsToStaging(home, rootPath, true);
        try {
            this.uploadCacheSpace.mkdirs();
            try (Stream<Path> stream = Files.find(this.uploadCacheSpace.toPath(), Integer.MAX_VALUE, (path, basicFileAttributes) -> basicFileAttributes.isRegularFile(), new FileVisitOption[0]);){
                files = stream.map(Path::toFile).collect(Collectors.toList());
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        int count = 0;
        for (File toBeSyncedFile : files) {
            Optional<CompletableFuture<Integer>> scheduled = this.putOptionalDisregardingSize(toBeSyncedFile.getName(), toBeSyncedFile, true);
            if (scheduled.isPresent()) {
                ++count;
                continue;
            }
            LOG.info("File [{}] not setup for upload", (Object)toBeSyncedFile.getName());
        }
        LOG.info("Scheduled [{}] pending uploads", (Object)count);
    }

    public Optional<CompletableFuture<Integer>> put(String id, File input) {
        return this.putOptionalDisregardingSize(id, input, false);
    }

    private Optional<CompletableFuture<Integer>> putOptionalDisregardingSize(String id, File input, boolean ignoreSize) {
        this.cacheStats.markRequest();
        long length = input.length();
        File uploadFile = DataStoreCacheUtils.getFile(id, this.uploadCacheSpace);
        if ((ignoreSize && this.currentSize.addAndGet(length) >= 0L || this.currentSize.addAndGet(length) <= this.size) && !this.attic.containsKey(id) && this.existsOrNotExistsMoveFile(input, uploadFile, this.currentSize, length) && this.map.putIfAbsent(id, uploadFile) == null) {
            try {
                this.cacheStats.markHit();
                this.cacheStats.incrementCount();
                this.cacheStats.incrementSize(length);
                this.cacheStats.incrementMemSize(this.memWeigher.weigh((Object)id, (Object)uploadFile));
                return Optional.of(this.stage(id, uploadFile));
            }
            catch (Exception e) {
                LOG.info("Error moving file to staging", (Throwable)e);
                this.currentSize.addAndGet(-length);
                this.map.remove(id, uploadFile);
            }
        } else {
            this.currentSize.addAndGet(-length);
            if (this.map.containsKey(id) || this.attic.containsKey(id)) {
                CompletableFuture<Integer> result = CompletableFuture.completedFuture(0);
                return Optional.of(result);
            }
        }
        return Optional.empty();
    }

    private synchronized boolean existsOrNotExistsMoveFile(File source, File destination, AtomicLong currentSize, long length) {
        if (!destination.exists()) {
            try {
                this.uploader.adopt(source, destination);
                LOG.trace("Moved file to staging");
            }
            catch (IOException e) {
                LOG.info("Error moving file to staging", (Throwable)e);
                currentSize.addAndGet(-length);
                return false;
            }
            LOG.trace("File [{}] moved to staging cache [{}]", (Object)source, (Object)destination);
            return true;
        }
        return true;
    }

    private CompletableFuture<Integer> stage(final String id, final File upload) {
        final CompletableFuture<Integer> result = new CompletableFuture<Integer>();
        try {
            ListenableFuture future = this.executor.submit(() -> {
                Integer n;
                block8: {
                    TimerStats.Context uploadContext = this.cacheStats.startUpLoaderTimer();
                    try {
                        this.uploader.write(id, upload);
                        LOG.debug("File added to backend [{}]", (Object)upload);
                        n = 1;
                        if (uploadContext == null) break block8;
                    }
                    catch (Throwable throwable) {
                        try {
                            if (uploadContext != null) {
                                try {
                                    uploadContext.close();
                                }
                                catch (Throwable throwable2) {
                                    throwable.addSuppressed(throwable2);
                                }
                            }
                            throw throwable;
                        }
                        catch (Exception e) {
                            LOG.error("Error adding file to backend", (Throwable)e);
                            throw e;
                        }
                    }
                    uploadContext.close();
                }
                return n;
            });
            Futures.addCallback((ListenableFuture)future, (FutureCallback)new FutureCallback<Integer>(){

                public void onSuccess(@Nullable Integer r) {
                    LOG.info("Successfully added [{}], [{}]", (Object)id, (Object)upload);
                    try {
                        UploadStagingCache.this.attic.put(id, upload);
                        if (UploadStagingCache.this.downloadCache != null) {
                            Files.setLastModifiedTime(upload.toPath(), FileTime.fromMillis(System.currentTimeMillis()));
                            UploadStagingCache.this.downloadCache.put(id, upload);
                            LOG.debug("[{}] added to cache", (Object)id);
                        }
                        UploadStagingCache.this.map.remove(id);
                    }
                    catch (IOException e) {
                        LOG.warn("Error in cleaning up [{}] from staging", (Object)upload);
                    }
                    result.complete(r);
                }

                public void onFailure(Throwable t) {
                    LOG.error("Error adding [{}] with file [{}] to backend", new Object[]{id, upload, t});
                    result.completeExceptionally(t);
                    UploadStagingCache.this.retryQueue.add(id);
                }
            }, (Executor)((Object)new SameThreadExecutorService()));
            LOG.debug("File [{}] scheduled for upload [{}]", (Object)upload, result);
        }
        catch (Exception e) {
            LOG.error("Error staging file for upload [{}]", (Object)upload, (Object)e);
        }
        return result;
    }

    protected void invalidate(String key) {
        if (!this.attic.containsKey(key) && this.map.containsKey(key)) {
            try {
                LOG.debug("Invalidating [{}]", (Object)key);
                File toBeDeleted = (File)this.map.get(key);
                this.deleteInternal(key, toBeDeleted);
                this.map.remove(key, toBeDeleted);
            }
            catch (IOException e) {
                LOG.warn("Could not delete file from staging", (Throwable)e);
            }
        }
    }

    protected Iterator<String> getAllIdentifiers() {
        return this.map.keySet().iterator();
    }

    private void remove() {
        LOG.info("Starting purge of uploaded files, current size [{}]", (Object)IOUtils.humanReadableByteCount((long)this.currentSize.get()));
        Iterator iterator = this.attic.keySet().iterator();
        int count = 0;
        while (iterator.hasNext()) {
            String key = (String)iterator.next();
            try {
                if (this.map.containsKey(key)) continue;
                LOG.trace("upload map contains id [{}]", (Object)key);
                File toBeDeleted = (File)this.attic.get(key);
                this.deleteInternal(key, toBeDeleted);
                iterator.remove();
                LOG.debug("Cache [{}] file deleted for id [{}]", (Object)toBeDeleted, (Object)key);
                ++count;
            }
            catch (IOException e) {
                LOG.error("Error in removing entry for id [{}]", (Object)key);
            }
        }
        LOG.info("Finished removal of [{}] files, current size [{}]", (Object)count, (Object)IOUtils.humanReadableByteCount((long)this.currentSize.get()));
    }

    private void deleteInternal(String key, File toBeDeleted) throws IOException {
        LOG.debug("Trying to delete file [{}]", (Object)toBeDeleted);
        long length = toBeDeleted.length();
        DataStoreCacheUtils.recursiveDelete(toBeDeleted, this.uploadCacheSpace);
        LOG.debug("deleted file [{}]", (Object)toBeDeleted);
        this.currentSize.addAndGet(-length);
        this.cacheStats.decrementSize(length);
        this.cacheStats.decrementMemSize(this.memWeigher.weigh((Object)key, (Object)toBeDeleted));
        this.cacheStats.decrementCount();
    }

    @Nullable
    public File getIfPresent(String key) {
        this.cacheStats.markLoad();
        if (this.map.containsKey(key)) {
            this.cacheStats.markLoadSuccess();
            return (File)this.map.get(key);
        }
        return null;
    }

    public DataStoreCacheStatsMBean getStats() {
        return this.cacheStats;
    }

    @Override
    public void close() {
        LOG.info("Uploads in progress on close [{}]", (Object)this.map.size());
        LOG.info("Uploads completed but not cleared from cache [{}]", (Object)this.attic.size());
        LOG.info("Staging cache stats on close [{}]", (Object)this.cacheStats.cacheInfoAsString());
        new ExecutorCloser((ExecutorService)this.executor).close();
        new ExecutorCloser((ExecutorService)this.scheduledExecutor).close();
        new ExecutorCloser((ExecutorService)this.statsExecutor).close();
    }

    protected void setDownloadCache(@Nullable FileCache downloadCache) {
        this.downloadCache = downloadCache;
    }

    class RetryJob
    implements Runnable {
        RetryJob() {
        }

        @Override
        public void run() {
            LOG.debug("Retry job started");
            int count = 0;
            ArrayList entries = new ArrayList();
            UploadStagingCache.this.retryQueue.drainTo(entries);
            for (String key : entries) {
                File file = (File)UploadStagingCache.this.map.get(key);
                LOG.info("Retrying upload of id [{}] with file [{}] ", (Object)key, (Object)file);
                UploadStagingCache.this.stage(key, file);
                ++count;
                LOG.info("Scheduled retry for upload of id [{}] with file [{}]", (Object)key, (Object)file);
            }
            LOG.debug("Retry job finished with staging [{}] jobs", (Object)count);
        }
    }

    class RemoveJob
    implements Runnable {
        RemoveJob() {
        }

        @Override
        public void run() {
            UploadStagingCache.this.remove();
        }
    }
}

