package co.cask.tephra.persist;

import co.cask.tephra.metrics.MetricsCollector;
import co.cask.tephra.persist.AbstractTransactionLog;
import java.io.EOFException;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hive.com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/tephra/persist/HDFSTransactionLog.class */
public class HDFSTransactionLog extends AbstractTransactionLog {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HDFSTransactionLog.class);
    private final FileSystem fs;
    private final Configuration hConf;
    private final Path logPath;

    /* loaded from: input_file:co/cask/tephra/persist/HDFSTransactionLog$LogReader.class */
    private static final class LogReader implements TransactionLogReader {
        private boolean closed;
        private SequenceFile.Reader reader;
        private LongWritable key = new LongWritable();

        public LogReader(SequenceFile.Reader reader) {
            this.reader = reader;
        }

        @Override // co.cask.tephra.persist.TransactionLogReader
        public TransactionEdit next() {
            try {
                return next(new TransactionEdit());
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }

        @Override // co.cask.tephra.persist.TransactionLogReader
        public TransactionEdit next(TransactionEdit transactionEdit) throws IOException {
            if (!this.closed && this.reader.next(this.key, transactionEdit)) {
                return transactionEdit;
            }
            return null;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.closed) {
                return;
            }
            this.reader.close();
            this.closed = true;
        }
    }

    /* loaded from: input_file:co/cask/tephra/persist/HDFSTransactionLog$LogWriter.class */
    private static final class LogWriter implements TransactionLogWriter {
        private final SequenceFile.Writer internalWriter;

        public LogWriter(FileSystem fileSystem, Configuration configuration, Path path) throws IOException {
            this.internalWriter = SequenceFile.createWriter(fileSystem, configuration, path, LongWritable.class, TransactionEdit.class);
            HDFSTransactionLog.LOG.info("Created a new TransactionLog writer for " + path);
        }

        @Override // co.cask.tephra.persist.TransactionLogWriter
        public void append(AbstractTransactionLog.Entry entry) throws IOException {
            this.internalWriter.append(entry.getKey(), entry.getEdit());
        }

        @Override // co.cask.tephra.persist.TransactionLogWriter
        public void sync() throws IOException {
            this.internalWriter.syncFs();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.internalWriter.close();
        }
    }

    public HDFSTransactionLog(FileSystem fileSystem, Configuration configuration, Path path, long j, MetricsCollector metricsCollector) {
        super(j, metricsCollector);
        this.fs = fileSystem;
        this.hConf = configuration;
        this.logPath = path;
    }

    @Override // co.cask.tephra.persist.AbstractTransactionLog
    protected TransactionLogWriter createWriter() throws IOException {
        return new LogWriter(this.fs, this.hConf, this.logPath);
    }

    @Override // co.cask.tephra.persist.AbstractTransactionLog, co.cask.tephra.persist.TransactionLog
    public String getName() {
        return this.logPath.getName();
    }

    @Override // co.cask.tephra.persist.AbstractTransactionLog, co.cask.tephra.persist.TransactionLog
    public TransactionLogReader getReader() throws IOException {
        long len = this.fs.getFileStatus(this.logPath).getLen();
        if (len <= 0) {
            LOG.warn("File " + this.logPath + " might be still open, length is 0");
        }
        try {
            new HDFSUtil().recoverFileLease(this.fs, this.logPath, this.hConf);
            try {
                LOG.info("New file size for " + this.logPath + " is " + this.fs.getFileStatus(this.logPath).getLen());
                return new LogReader(new SequenceFile.Reader(this.fs, this.logPath, this.hConf));
            } catch (EOFException e) {
                if (len > 0) {
                    return null;
                }
                LOG.warn("Could not open " + this.logPath + " for reading. File is empty", (Throwable) e);
                return null;
            }
        } catch (IOException e2) {
            throw e2;
        }
    }
}
