/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.CompletedFetch;
import org.apache.kafka.clients.consumer.internals.FetchConfig;
import org.apache.kafka.clients.consumer.internals.FetchMetricsAggregator;
import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
import org.apache.kafka.clients.consumer.internals.FetchMetricsRegistry;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.UUIDDeserializer;
import org.apache.kafka.common.serialization.UUIDSerializer;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class CompletedFetchTest {
    private static final String TOPIC_NAME = "test";
    private static final TopicPartition TP = new TopicPartition("test", 0);
    private static final long PRODUCER_ID = 1000L;
    private static final short PRODUCER_EPOCH = 0;

    @Test
    public void testSimple() {
        long fetchOffset = 5L;
        int startingOffset = 10;
        int numRecords = 11;
        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData().setRecords((BaseRecords)this.newRecords(startingOffset, numRecords, fetchOffset));
        CompletedFetch<String, String> completedFetch = this.newCompletedFetch(fetchOffset, partitionData);
        List records = completedFetch.fetchRecords(10);
        Assertions.assertEquals((int)10, (int)records.size());
        ConsumerRecord record = (ConsumerRecord)records.get(0);
        Assertions.assertEquals((long)10L, (long)record.offset());
        records = completedFetch.fetchRecords(10);
        Assertions.assertEquals((int)1, (int)records.size());
        record = (ConsumerRecord)records.get(0);
        Assertions.assertEquals((long)20L, (long)record.offset());
        records = completedFetch.fetchRecords(10);
        Assertions.assertEquals((int)0, (int)records.size());
    }

    @Test
    public void testAbortedTransactionRecordsRemoved() {
        int numRecords = 10;
        Records rawRecords = this.newTranscactionalRecords(ControlRecordType.ABORT, numRecords);
        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData().setRecords((BaseRecords)rawRecords).setAbortedTransactions(this.newAbortedTransactions());
        CompletedFetch<String, String> completedFetch = this.newCompletedFetch(IsolationLevel.READ_COMMITTED, OffsetResetStrategy.NONE, true, 0L, partitionData);
        List records = completedFetch.fetchRecords(10);
        Assertions.assertEquals((int)0, (int)records.size());
        completedFetch = this.newCompletedFetch(IsolationLevel.READ_UNCOMMITTED, OffsetResetStrategy.NONE, true, 0L, partitionData);
        records = completedFetch.fetchRecords(10);
        Assertions.assertEquals((int)numRecords, (int)records.size());
    }

    @Test
    public void testCommittedTransactionRecordsIncluded() {
        int numRecords = 10;
        Records rawRecords = this.newTranscactionalRecords(ControlRecordType.COMMIT, numRecords);
        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData().setRecords((BaseRecords)rawRecords);
        CompletedFetch<String, String> completedFetch = this.newCompletedFetch(IsolationLevel.READ_COMMITTED, OffsetResetStrategy.NONE, true, 0L, partitionData);
        List records = completedFetch.fetchRecords(10);
        Assertions.assertEquals((int)10, (int)records.size());
    }

    @Test
    public void testNegativeFetchCount() {
        long fetchOffset = 0L;
        int startingOffset = 0;
        int numRecords = 10;
        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData().setRecords((BaseRecords)this.newRecords(startingOffset, numRecords, fetchOffset));
        CompletedFetch<String, String> completedFetch = this.newCompletedFetch(fetchOffset, partitionData);
        List records = completedFetch.fetchRecords(-10);
        Assertions.assertEquals((int)0, (int)records.size());
    }

    @Test
    public void testNoRecordsInFetch() {
        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData().setPartitionIndex(0).setHighWatermark(10L).setLastStableOffset(20L).setLogStartOffset(0L);
        CompletedFetch<String, String> completedFetch = this.newCompletedFetch(IsolationLevel.READ_UNCOMMITTED, OffsetResetStrategy.NONE, false, 1L, partitionData);
        List records = completedFetch.fetchRecords(10);
        Assertions.assertEquals((int)0, (int)records.size());
    }

    @Test
    public void testCorruptedMessage() {
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        builder.append(new SimpleRecord(new UUIDSerializer().serialize(TOPIC_NAME, UUID.randomUUID())));
        builder.append(0L, "key".getBytes(), "value".getBytes());
        MemoryRecords records = builder.build();
        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData().setPartitionIndex(0).setHighWatermark(10L).setLastStableOffset(20L).setLogStartOffset(0L).setRecords((BaseRecords)records);
        CompletedFetch completedFetch = this.newCompletedFetch((Deserializer)new UUIDDeserializer(), (Deserializer)new UUIDDeserializer(), IsolationLevel.READ_COMMITTED, OffsetResetStrategy.NONE, false, 0L, partitionData);
        completedFetch.fetchRecords(10);
        Assertions.assertThrows(RecordDeserializationException.class, () -> completedFetch.fetchRecords(10));
    }

    private CompletedFetch<String, String> newCompletedFetch(long fetchOffset, FetchResponseData.PartitionData partitionData) {
        return this.newCompletedFetch(IsolationLevel.READ_UNCOMMITTED, OffsetResetStrategy.NONE, true, fetchOffset, partitionData);
    }

    private CompletedFetch<String, String> newCompletedFetch(IsolationLevel isolationLevel, OffsetResetStrategy offsetResetStrategy, boolean checkCrcs, long fetchOffset, FetchResponseData.PartitionData partitionData) {
        return this.newCompletedFetch((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), isolationLevel, offsetResetStrategy, checkCrcs, fetchOffset, partitionData);
    }

    private <K, V> CompletedFetch<K, V> newCompletedFetch(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, IsolationLevel isolationLevel, OffsetResetStrategy offsetResetStrategy, boolean checkCrcs, long fetchOffset, FetchResponseData.PartitionData partitionData) {
        LogContext logContext = new LogContext();
        SubscriptionState subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
        FetchMetricsRegistry metricsRegistry = new FetchMetricsRegistry();
        FetchMetricsManager metrics = new FetchMetricsManager(new Metrics(), metricsRegistry);
        FetchMetricsAggregator metricAggregator = new FetchMetricsAggregator(metrics, Collections.singleton(TP));
        FetchConfig fetchConfig = new FetchConfig(1, 0x3200000, 500, 0x100000, 500, checkCrcs, "", keyDeserializer, valueDeserializer, isolationLevel);
        return new CompletedFetch(logContext, subscriptions, fetchConfig, BufferSupplier.create(), TP, partitionData, metricAggregator, Long.valueOf(fetchOffset), ApiKeys.FETCH.latestVersion());
    }

    private Records newRecords(long baseOffset, int count, long firstMessageId) {
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)baseOffset);
        for (int i = 0; i < count; ++i) {
            builder.append(0L, "key".getBytes(), ("value-" + (firstMessageId + (long)i)).getBytes());
        }
        return builder.build();
    }

    private Records newTranscactionalRecords(ControlRecordType controlRecordType, int numRecords) {
        MockTime time = new MockTime();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L, (long)time.milliseconds(), (long)1000L, (short)0, (int)0, (boolean)true, (int)-1);
        for (int i = 0; i < numRecords; ++i) {
            builder.append(new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
        }
        builder.build();
        this.writeTransactionMarker(buffer, controlRecordType, numRecords, time);
        buffer.flip();
        return MemoryRecords.readableRecords((ByteBuffer)buffer);
    }

    private void writeTransactionMarker(ByteBuffer buffer, ControlRecordType controlRecordType, int offset, Time time) {
        MemoryRecords.writeEndTransactionalMarker((ByteBuffer)buffer, (long)offset, (long)time.milliseconds(), (int)0, (long)1000L, (short)0, (EndTransactionMarker)new EndTransactionMarker(controlRecordType, 0));
    }

    private List<FetchResponseData.AbortedTransaction> newAbortedTransactions() {
        FetchResponseData.AbortedTransaction abortedTransaction = new FetchResponseData.AbortedTransaction();
        abortedTransaction.setFirstOffset(0L);
        abortedTransaction.setProducerId(1000L);
        return Collections.singletonList(abortedTransaction);
    }
}

