package org.spf4j.perf.impl;

import com.google.common.base.Charsets;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.DatagramChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spf4j.base.AbstractRunnable;
import org.spf4j.concurrent.DefaultExecutor;
import org.spf4j.perf.CloseableMeasurementRecorder;
import org.spf4j.perf.impl.ms.graphite.GraphiteUdpStore;
import org.spf4j.recyclable.ObjectCreationException;

/* loaded from: input_file:org/spf4j/perf/impl/GraphiteUdpStoreTest.class */
public final class GraphiteUdpStoreTest {
    private static final Logger LOG = LoggerFactory.getLogger(GraphiteUdpStoreTest.class);
    private static final File TSDB_TXT;
    private static volatile boolean terminated;
    private static volatile Future<?> server;
    private static final BlockingQueue<String> QUEUE;

    @Test
    public void testGraphiteUdpStore() throws IOException, ObjectCreationException, InterruptedException {
        GraphiteUdpStore graphiteUdpStore = new GraphiteUdpStore("127.0.0.1", 1976);
        graphiteUdpStore.saveMeasurements(graphiteUdpStore.alocateMeasurements(new MeasurementsInfoImpl("bla", "ms", new String[]{"val1", "val2", "val3"}, new String[]{"ms", "ms", "ms"}), 0), 1L, new long[]{2, 3, 5});
        LOG.debug("measurements sent: {} {} {} {}", new Object[]{1L, 2L, 3L, 5L});
        String poll = QUEUE.poll(5L, TimeUnit.SECONDS);
        LOG.debug("measurements received: {} ", poll);
        Assert.assertEquals("bla/val1 2 1", poll);
        String poll2 = QUEUE.poll(5L, TimeUnit.SECONDS);
        LOG.debug("measurements received: {} ", poll2);
        Assert.assertEquals("bla/val2 3 1", poll2);
        String poll3 = QUEUE.poll(5L, TimeUnit.SECONDS);
        LOG.debug("measurements received: {} ", poll3);
        Assert.assertEquals("bla/val3 5 1", poll3);
    }

    @Before
    public void beforeTest() {
        QUEUE.drainTo(new ArrayList());
    }

    @Test
    @SuppressFBWarnings({"MDM_THREAD_YIELD"})
    public void testStore() throws InterruptedException, IOException {
        CloseableMeasurementRecorder createScalableQuantizedRecorder2 = RecorderFactory.createScalableQuantizedRecorder2("test measurement", "ms", 1000, 10, 0, 6, 10);
        for (int i = 0; i < 100; i++) {
            createScalableQuantizedRecorder2.record(i);
            Thread.sleep(100L);
        }
        createScalableQuantizedRecorder2.close();
        RecorderFactory.MEASUREMENT_STORE.flush();
        List<String> readAllLines = Files.readAllLines(TSDB_TXT.toPath(), StandardCharsets.UTF_8);
        LOG.debug("measurements = {}", readAllLines);
        Assert.assertThat(readAllLines, Matchers.hasItem(Matchers.allOf(Matchers.containsString("Q6_7"), Matchers.containsString("test measurement"))));
        Assert.assertThat(QUEUE.poll(5L, TimeUnit.SECONDS), Matchers.containsString("test-measurement"));
    }

    @BeforeClass
    public static void runUdpServer() {
        server = DefaultExecutor.INSTANCE.submit((Runnable) new AbstractRunnable(true) { // from class: org.spf4j.perf.impl.GraphiteUdpStoreTest.1
            public void doRun() throws IOException, InterruptedException {
                DatagramChannel open = DatagramChannel.open();
                open.socket().bind(new InetSocketAddress("127.0.0.1", 1976));
                ByteBuffer allocate = ByteBuffer.allocate(512);
                while (!GraphiteUdpStoreTest.terminated) {
                    allocate.rewind();
                    try {
                        open.receive(allocate);
                        byte[] bArr = new byte[allocate.position()];
                        allocate.rewind();
                        allocate.get(bArr);
                        String[] split = new String(bArr, Charsets.UTF_8).split("\n");
                        GraphiteUdpStoreTest.LOG.debug("Received = {}", split);
                        for (String str : split) {
                            GraphiteUdpStoreTest.QUEUE.put(str);
                        }
                    } catch (ClosedByInterruptException e) {
                        return;
                    }
                }
            }
        });
    }

    @AfterClass
    public static void stopUdpServer() {
        terminated = true;
        server.cancel(true);
    }

    static {
        try {
            File createTempFile = File.createTempFile("ttt", "tsdb");
            TSDB_TXT = File.createTempFile("ttt", "tsdbtxt");
            System.setProperty("spf4j.perf.ms.config", "TSDB@" + createTempFile.getAbsolutePath() + ",TSDB_TXT@" + TSDB_TXT.getAbsolutePath() + ",GRAPHITE_UDP@127.0.0.1:1976");
            terminated = false;
            QUEUE = new LinkedBlockingQueue();
        } catch (IOException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}
