/*
 * Decompiled with CFR 0.152.
 */
package org.visallo.core.model;

import org.json.JSONObject;
import org.visallo.core.config.Configuration;
import org.visallo.core.exception.VisalloException;
import org.visallo.core.ingest.WorkerSpout;
import org.visallo.core.ingest.WorkerTuple;
import org.visallo.core.model.workQueue.WorkQueueRepository;
import org.visallo.core.status.StatusServer;
import org.visallo.core.util.VisalloLogger;
import org.visallo.core.util.VisalloLoggerFactory;

public abstract class WorkerBase {
    private final boolean statusEnabled;
    private WorkQueueRepository workQueueRepository;
    private volatile boolean shouldRun;
    private StatusServer statusServer = null;

    protected WorkerBase(WorkQueueRepository workQueueRepository, Configuration configuration) {
        this.workQueueRepository = workQueueRepository;
        this.statusEnabled = configuration.getBoolean("status.enabled", true);
    }

    public void run() throws Exception {
        VisalloLogger logger = VisalloLoggerFactory.getLogger(this.getClass());
        logger.debug("begin runner", new Object[0]);
        WorkerSpout workerSpout = this.prepareWorkerSpout();
        this.shouldRun = true;
        if (this.statusEnabled) {
            this.statusServer = this.createStatusServer();
        }
        while (this.shouldRun) {
            WorkerTuple tuple;
            try {
                tuple = workerSpout.nextTuple();
            }
            catch (Exception ex) {
                throw new VisalloException("Failed to get next tuple", ex);
            }
            if (tuple == null) {
                Thread.sleep(100L);
                continue;
            }
            try {
                logger.debug("start processing", new Object[0]);
                long startTime = System.currentTimeMillis();
                this.process(tuple.getMessageId(), tuple.getJson());
                long endTime = System.currentTimeMillis();
                logger.debug("completed processing in (%dms)", endTime - startTime);
                workerSpout.ack(tuple.getMessageId());
            }
            catch (Throwable ex) {
                logger.error("Could not process tuple: %s", tuple, ex);
                workerSpout.fail(tuple.getMessageId());
            }
        }
    }

    protected abstract StatusServer createStatusServer() throws Exception;

    protected abstract void process(Object var1, JSONObject var2) throws Exception;

    public void stop() {
        this.shouldRun = false;
        if (this.statusServer != null) {
            this.statusServer.shutdown();
        }
    }

    protected WorkerSpout prepareWorkerSpout() {
        WorkerSpout spout = this.workQueueRepository.createWorkerSpout(this.getQueueName());
        spout.open();
        return spout;
    }

    protected abstract String getQueueName();

    protected WorkQueueRepository getWorkQueueRepository() {
        return this.workQueueRepository;
    }

    public boolean shouldRun() {
        return this.shouldRun;
    }
}

