package com.alibaba.lindorm.client.core;

import com.alibaba.lindorm.client.AsyncCallback;
import com.alibaba.lindorm.client.FeedStreamService;
import com.alibaba.lindorm.client.LindormClientConfig;
import com.alibaba.lindorm.client.LindormClientConstants;
import com.alibaba.lindorm.client.LindormServiceProvider;
import com.alibaba.lindorm.client.TableService;
import com.alibaba.lindorm.client.core.feedstreamservice.LMessage;
import com.alibaba.lindorm.client.core.feedstreamservice.LMessageScanner;
import com.alibaba.lindorm.client.core.feedstreamservice.StreamScan;
import com.alibaba.lindorm.client.core.ipc.ClientCompletableFuture;
import com.alibaba.lindorm.client.core.meta.TableCategory;
import com.alibaba.lindorm.client.core.meta.TableState;
import com.alibaba.lindorm.client.core.tableservice.LDelete;
import com.alibaba.lindorm.client.core.tableservice.LUpsert;
import com.alibaba.lindorm.client.core.utils.Bytes;
import com.alibaba.lindorm.client.core.utils.CollectionUtils;
import com.alibaba.lindorm.client.core.utils.FeedStreamUtils;
import com.alibaba.lindorm.client.dml.ColumnKey;
import com.alibaba.lindorm.client.dml.ConditionFactory;
import com.alibaba.lindorm.client.dml.ConditionList;
import com.alibaba.lindorm.client.dml.QueryResults;
import com.alibaba.lindorm.client.dml.Row;
import com.alibaba.lindorm.client.dml.Select;
import com.alibaba.lindorm.client.dml.Upsert;
import com.alibaba.lindorm.client.exception.IllegalRequestException;
import com.alibaba.lindorm.client.exception.LindormException;
import com.alibaba.lindorm.client.schema.LindormPipeDescriptor;
import com.alibaba.lindorm.client.schema.LindormTableDescriptor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/alibaba/lindorm/client/core/LindormFeedStreamService.class */
public class LindormFeedStreamService extends LindormBasicService implements FeedStreamService {
    public static final Log LOG = LogFactory.getLog(LindormFeedStreamService.class);
    private static AtomicInteger feedStreamServiceCount = new AtomicInteger(0);
    private LindormTableService service;

    public LindormFeedStreamService(LindormClientConfig lindormClientConfig) throws LindormException {
        this(lindormClientConfig, "LindormFeedStreamService" + feedStreamServiceCount.getAndIncrement());
    }

    public LindormFeedStreamService(LindormClientConfig lindormClientConfig, String str) throws LindormException {
        super(lindormClientConfig, str);
        if (lindormClientConfig.getBoolean(LindormClientConstants.PREFETCH_ROUTECACHE, false)) {
        }
        this.service = (LindormTableService) LindormServiceProvider.TableServiceProvider.create(lindormClientConfig, str + "_internalTableService");
    }

    public TableService getInternalTableService() {
        return this.service;
    }

    @Override // com.alibaba.lindorm.client.core.LindormBasicService, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            this.service.close();
        } catch (Throwable th) {
            LOG.error("Failed closing internal table service, error ignored.", th);
        }
        super.close();
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public String getNamespace() throws LindormException {
        if (this.namespace == null) {
            throw new LindormException("Namespace is not specified!");
        }
        return this.namespace;
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public void useNamespace(String str) {
        this.namespace = str;
        this.service.useNamespace(str);
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public void createPipe(LindormPipeDescriptor lindormPipeDescriptor) throws LindormException {
        createPipe(lindormPipeDescriptor, 4);
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public void createPipe(LindormPipeDescriptor lindormPipeDescriptor, int i) throws LindormException {
        checkOpen();
        if (lindormPipeDescriptor == null) {
            throw new IllegalRequestException("Must specify a valid LindormPipeDescriptor, but has null.");
        }
        FeedStreamUtils.validatePipeDescriptor(lindormPipeDescriptor);
        LindormTableDescriptor createPipeTableDescriptor = FeedStreamUtils.createPipeTableDescriptor(lindormPipeDescriptor);
        if (i == 1) {
            this.service.createTable(createPipeTableDescriptor);
        } else {
            byte[][] split = Bytes.split(Bytes.EMPTY_BYTE_ARRAY, new byte[]{-1, -1, -1, -1}, true, i - 1);
            this.service.createTable(createPipeTableDescriptor, (byte[][]) Arrays.copyOfRange(split, 1, split.length - 1));
        }
        LOG.info("FeedStreamPipe created, pipe=" + lindormPipeDescriptor.toString() + ", tableDesc=" + createPipeTableDescriptor.toString());
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public LindormPipeDescriptor describePipe(String str) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        LindormTableDescriptor describeTable = this.service.describeTable(str);
        if (!FeedStreamUtils.isFeedStreamTable(describeTable.getTableAttributes())) {
            throw new IllegalRequestException("Must describe a pipe, pipe name not found.");
        }
        LindormPipeDescriptor createPipeDescriptor = FeedStreamUtils.createPipeDescriptor(describeTable);
        LOG.info("Get pipe descriptor " + createPipeDescriptor.toString());
        return createPipeDescriptor;
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public void offlinePipe(String str) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        this.service.offlineTable(str);
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public void onlinePipe(String str) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        this.service.onlineTable(str);
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public void offlinePipe(String str, int i) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        this.service.offlineTable(str, i);
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public void onlinePipe(String str, int i) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        this.service.onlineTable(str, i);
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public void deletePipe(String str) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        this.service.deleteTable(str);
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public List<String> listPipes() throws LindormException {
        checkOpen();
        return this.service.listTables(TableCategory.STREAM_TABLES);
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public TableState getPipeState(String str) throws LindormException {
        FeedStreamUtils.checkPipeName(str);
        return this.service.getTableState(str);
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public void truncatePipe(String str) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        this.service.truncateTable(str);
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public Long getStreamLatestMessageId(String str, String str2) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        FeedStreamUtils.checkStreamName(str2);
        Select where = this.service.select().from(str).where(ConditionFactory.and(ConditionFactory.compare(FeedStreamUtils.HASH_COLUMN_NAME, ConditionFactory.CompareOp.EQUAL, FeedStreamUtils.computeHashByStreamName(str2)), ConditionFactory.compare(FeedStreamUtils.STREAM_NAME_COLUMN_NAME, ConditionFactory.CompareOp.EQUAL, str2)));
        FeedStreamUtils.markSelectSequenceFamily(where);
        Row next = where.execute().next();
        if (next == null) {
            return null;
        }
        return next.getColumnValue(FeedStreamUtils.SEQUENCE_FAMILY_NAME_BYTES, FeedStreamUtils.MESSAGE_ID_COLUMN_NAME_BYTES).getLong();
    }

    private Upsert generateUpsert(String str, LMessage lMessage) throws LindormException {
        Upsert into = this.service.upsert().into(str);
        into.add(FeedStreamUtils.messageToRow(lMessage));
        into.setGlitchTimeout(lMessage.getGlitchTimeout());
        into.setOperationTimeout(lMessage.getOperationTimeout());
        return into;
    }

    private Upsert generateUpsert(String str, List<LMessage> list) throws LindormException {
        Upsert into = this.service.upsert().into(str);
        Iterator<LMessage> it = list.iterator();
        while (it.hasNext()) {
            into.add(FeedStreamUtils.messageToRow(it.next()));
        }
        into.setGlitchTimeout(list.get(0).getGlitchTimeout());
        into.setOperationTimeout(list.get(0).getOperationTimeout());
        return into;
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public long append(String str, LMessage lMessage) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        try {
            return ((Long) generateUpsert(str, lMessage).executeWithResults().get(0)).longValue();
        } catch (Throwable th) {
            if (th instanceof LindormException) {
                throw ((LindormException) th);
            }
            throw new LindormException(th);
        }
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public long update(String str, LMessage lMessage, LMessage lMessage2) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        FeedStreamUtils.checkUpdatable(lMessage2, lMessage);
        try {
            Upsert generateUpsert = generateUpsert(str, lMessage2);
            ArrayList arrayList = new ArrayList(1);
            arrayList.add(lMessage);
            FeedStreamUtils.prepareAttrs(arrayList, (LUpsert) generateUpsert);
            return ((Long) generateUpsert.executeWithResults().get(0)).longValue();
        } catch (Throwable th) {
            if (th instanceof LindormException) {
                throw ((LindormException) th);
            }
            throw new LindormException(th);
        }
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public List<Long> update(String str, List<LMessage> list, List<LMessage> list2) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        if (list == null || list2 == null) {
            throw new IllegalRequestException("Previous messages and current messages should not be null");
        }
        if (list.size() != list2.size()) {
            throw new IllegalRequestException("Previous messages and messages should be same size, previous messages " + list.size() + ", messages " + list2.size());
        }
        if (list.isEmpty()) {
            return Collections.emptyList();
        }
        for (int i = 0; i < list.size(); i++) {
            FeedStreamUtils.checkUpdatable(list2.get(i), list.get(i));
        }
        try {
            Upsert generateUpsert = generateUpsert(str, list2);
            FeedStreamUtils.prepareAttrs(list, (LUpsert) generateUpsert);
            List<Object> executeWithResults = generateUpsert.executeWithResults();
            ArrayList arrayList = new ArrayList(executeWithResults.size());
            Iterator<Object> it = executeWithResults.iterator();
            while (it.hasNext()) {
                arrayList.add((Long) it.next());
            }
            return arrayList;
        } catch (Throwable th) {
            if (th instanceof LindormException) {
                throw ((LindormException) th);
            }
            throw new LindormException(th);
        }
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public List<Long> append(String str, List<LMessage> list) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        if (list == null || list.isEmpty()) {
            return Collections.emptyList();
        }
        try {
            List<Object> executeWithResults = generateUpsert(str, list).executeWithResults();
            ArrayList arrayList = new ArrayList(executeWithResults.size());
            Iterator<Object> it = executeWithResults.iterator();
            while (it.hasNext()) {
                arrayList.add((Long) it.next());
            }
            return arrayList;
        } catch (Throwable th) {
            if (th instanceof LindormException) {
                throw ((LindormException) th);
            }
            throw new LindormException(th);
        }
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public Future<Long> appendAsync(String str, LMessage lMessage) throws LindormException {
        final ClientCompletableFuture clientCompletableFuture = new ClientCompletableFuture();
        appendAsync(str, lMessage, new AsyncCallback<Long>() { // from class: com.alibaba.lindorm.client.core.LindormFeedStreamService.1
            @Override // com.alibaba.lindorm.client.AsyncCallback
            public void onComplete(Long l) {
                clientCompletableFuture.complete(l);
            }

            @Override // com.alibaba.lindorm.client.AsyncCallback
            public void onError(Throwable th) {
                clientCompletableFuture.completeExceptionally(th);
            }

            @Override // com.alibaba.lindorm.client.AsyncCallback
            public boolean shouldProcessResultInPool() {
                return false;
            }
        });
        return clientCompletableFuture;
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public Future<List<Long>> appendAsync(String str, List<LMessage> list) throws LindormException {
        final ClientCompletableFuture clientCompletableFuture = new ClientCompletableFuture();
        appendAsync(str, list, new AsyncCallback<List<Long>>() { // from class: com.alibaba.lindorm.client.core.LindormFeedStreamService.2
            @Override // com.alibaba.lindorm.client.AsyncCallback
            public void onComplete(List<Long> list2) {
                clientCompletableFuture.complete(list2);
            }

            @Override // com.alibaba.lindorm.client.AsyncCallback
            public void onError(Throwable th) {
                clientCompletableFuture.completeExceptionally(th);
            }

            @Override // com.alibaba.lindorm.client.AsyncCallback
            public boolean shouldProcessResultInPool() {
                return false;
            }
        });
        return clientCompletableFuture;
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public void appendAsync(String str, LMessage lMessage, final AsyncCallback<Long> asyncCallback) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        generateUpsert(str, lMessage).executeWithResultsAsync(new AsyncCallback<List<Object>>() { // from class: com.alibaba.lindorm.client.core.LindormFeedStreamService.3
            @Override // com.alibaba.lindorm.client.AsyncCallback
            public void onComplete(List<Object> list) {
                if (list == null || list.isEmpty()) {
                    asyncCallback.onComplete(null);
                } else {
                    asyncCallback.onComplete((Long) list.get(0));
                }
            }

            @Override // com.alibaba.lindorm.client.AsyncCallback
            public void onError(Throwable th) {
                asyncCallback.onError(th);
            }

            @Override // com.alibaba.lindorm.client.AsyncCallback
            public boolean shouldProcessResultInPool() {
                return asyncCallback.shouldProcessResultInPool();
            }

            @Override // com.alibaba.lindorm.client.AsyncCallback
            public boolean isRetrying() {
                return asyncCallback.isRetrying();
            }
        });
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public void appendAsync(String str, List<LMessage> list, final AsyncCallback<List<Long>> asyncCallback) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        if (list == null || list.isEmpty()) {
            asyncCallback.onComplete(Collections.emptyList());
        } else {
            generateUpsert(str, list).executeWithResultsAsync(new AsyncCallback<List<Object>>() { // from class: com.alibaba.lindorm.client.core.LindormFeedStreamService.4
                @Override // com.alibaba.lindorm.client.AsyncCallback
                public void onComplete(List<Object> list2) {
                    if (list2 == null || list2.isEmpty()) {
                        asyncCallback.onComplete(Collections.emptyList());
                        return;
                    }
                    ArrayList arrayList = new ArrayList(list2.size());
                    Iterator<Object> it = list2.iterator();
                    while (it.hasNext()) {
                        arrayList.add((Long) it.next());
                    }
                    asyncCallback.onComplete(arrayList);
                }

                @Override // com.alibaba.lindorm.client.AsyncCallback
                public void onError(Throwable th) {
                    asyncCallback.onError(th);
                }

                @Override // com.alibaba.lindorm.client.AsyncCallback
                public boolean shouldProcessResultInPool() {
                    return asyncCallback.shouldProcessResultInPool();
                }

                @Override // com.alibaba.lindorm.client.AsyncCallback
                public boolean isRetrying() {
                    return asyncCallback.isRetrying();
                }
            });
        }
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public Future<Long> updateAsync(String str, LMessage lMessage, LMessage lMessage2) throws LindormException {
        final ClientCompletableFuture clientCompletableFuture = new ClientCompletableFuture();
        updateAsync(str, lMessage, lMessage2, new AsyncCallback<Long>() { // from class: com.alibaba.lindorm.client.core.LindormFeedStreamService.5
            @Override // com.alibaba.lindorm.client.AsyncCallback
            public void onComplete(Long l) {
                clientCompletableFuture.complete(l);
            }

            @Override // com.alibaba.lindorm.client.AsyncCallback
            public void onError(Throwable th) {
                clientCompletableFuture.completeExceptionally(th);
            }

            @Override // com.alibaba.lindorm.client.AsyncCallback
            public boolean shouldProcessResultInPool() {
                return false;
            }
        });
        return clientCompletableFuture;
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public void updateAsync(String str, LMessage lMessage, LMessage lMessage2, final AsyncCallback<Long> asyncCallback) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        FeedStreamUtils.checkUpdatable(lMessage2, lMessage);
        Upsert generateUpsert = generateUpsert(str, lMessage2);
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(lMessage);
        try {
            FeedStreamUtils.prepareAttrs(arrayList, (LUpsert) generateUpsert);
            generateUpsert.executeWithResultsAsync(new AsyncCallback<List<Object>>() { // from class: com.alibaba.lindorm.client.core.LindormFeedStreamService.6
                @Override // com.alibaba.lindorm.client.AsyncCallback
                public void onComplete(List<Object> list) {
                    if (list == null || list.isEmpty()) {
                        asyncCallback.onComplete(null);
                    } else {
                        asyncCallback.onComplete((Long) list.get(0));
                    }
                }

                @Override // com.alibaba.lindorm.client.AsyncCallback
                public void onError(Throwable th) {
                    asyncCallback.onError(th);
                }

                @Override // com.alibaba.lindorm.client.AsyncCallback
                public boolean shouldProcessResultInPool() {
                    return asyncCallback.shouldProcessResultInPool();
                }

                @Override // com.alibaba.lindorm.client.AsyncCallback
                public boolean isRetrying() {
                    return asyncCallback.isRetrying();
                }
            });
        } catch (IOException e) {
            throw new LindormException(e);
        }
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public LMessage get(String str, String str2, long j) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        FeedStreamUtils.checkStreamName(str2);
        FeedStreamUtils.validateMessageId(j);
        Select where = this.service.select().from(str).where(ConditionFactory.and(ConditionFactory.compare(FeedStreamUtils.HASH_COLUMN_NAME, ConditionFactory.CompareOp.EQUAL, FeedStreamUtils.computeHashByStreamName(str2)), ConditionFactory.compare(FeedStreamUtils.STREAM_NAME_COLUMN_NAME, ConditionFactory.CompareOp.EQUAL, str2), ConditionFactory.compare(FeedStreamUtils.MESSAGE_ID_COLUMN_NAME, ConditionFactory.CompareOp.EQUAL, Long.valueOf(j))));
        FeedStreamUtils.markSelectValueFamily(where);
        Row next = where.execute().next();
        if (next == null) {
            return null;
        }
        return FeedStreamUtils.rowToMessage(str2, next);
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public List<LMessage> get(String str, String str2, List<Long> list) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        FeedStreamUtils.checkStreamName(str2);
        if (list == null || list.size() == 0) {
            return new ArrayList();
        }
        byte[] computeHashByStreamName = FeedStreamUtils.computeHashByStreamName(str2);
        ConditionList or = ConditionFactory.or();
        for (Long l : list) {
            if (l == null) {
                throw new LindormException("Message Id must not be null.");
            }
            FeedStreamUtils.validateMessageId(l.longValue());
            or.add(ConditionFactory.compare(FeedStreamUtils.MESSAGE_ID_COLUMN_NAME, ConditionFactory.CompareOp.EQUAL, l));
        }
        Select where = this.service.select().from(str).where(ConditionFactory.and(ConditionFactory.compare(FeedStreamUtils.HASH_COLUMN_NAME, ConditionFactory.CompareOp.EQUAL, computeHashByStreamName), ConditionFactory.compare(FeedStreamUtils.STREAM_NAME_COLUMN_NAME, ConditionFactory.CompareOp.EQUAL, str2), or));
        FeedStreamUtils.markSelectValueFamily(where);
        Map<Long, LMessage> resultsToMessageIdMap = resultsToMessageIdMap(where.execute(), str2, list.size());
        ArrayList newArrayListWithCapacity = CollectionUtils.newArrayListWithCapacity(list.size());
        if (resultsToMessageIdMap.isEmpty()) {
            fillNull(newArrayListWithCapacity, list.size());
        } else {
            Iterator<Long> it = list.iterator();
            while (it.hasNext()) {
                newArrayListWithCapacity.add(resultsToMessageIdMap.get(it.next()));
            }
        }
        return newArrayListWithCapacity;
    }

    private <T> List<T> fillNull(List<T> list, int i) {
        for (int i2 = 0; i2 < i; i2++) {
            list.add(null);
        }
        return list;
    }

    private Map<Long, LMessage> resultsToMessageIdMap(QueryResults queryResults, String str, int i) throws LindormException {
        HashMap newHashMapWithExpectedSize = CollectionUtils.newHashMapWithExpectedSize(i);
        Iterator<Row> it = queryResults.iterator();
        while (it.hasNext()) {
            LMessage rowToMessage = FeedStreamUtils.rowToMessage(str, it.next());
            newHashMapWithExpectedSize.put(Long.valueOf(rowToMessage.getMessageId()), rowToMessage);
        }
        return newHashMapWithExpectedSize;
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public Long getMessageId(String str, String str2, byte[] bArr) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        FeedStreamUtils.checkStreamName(str2);
        FeedStreamUtils.validateIdempotentId(bArr);
        Select where = this.service.select().from(str).where(ConditionFactory.and(ConditionFactory.compare(FeedStreamUtils.HASH_COLUMN_NAME_BYTES, ConditionFactory.CompareOp.EQUAL, FeedStreamUtils.computeHashByStreamName(str2)), ConditionFactory.compare(FeedStreamUtils.STREAM_NAME_COLUMN_NAME_BYTES, ConditionFactory.CompareOp.EQUAL, str2), ConditionFactory.compare(FeedStreamUtils.ZERO_COLUMN_NAME_BYTES, ConditionFactory.CompareOp.EQUAL, Bytes.ZERO_BYTE), ConditionFactory.compare(FeedStreamUtils.IDEMPOTENT_ID_COLUMN_NAME_BYTES, ConditionFactory.CompareOp.EQUAL, bArr)));
        FeedStreamUtils.markSelectIdempotentFamily(where);
        Row next = where.execute().next();
        if (next == null) {
            return null;
        }
        return next.getColumnValue(FeedStreamUtils.IDEMPOTENT_FAMILY_NAME_BYTES, FeedStreamUtils.MESSAGE_ID_COLUMN_NAME_BYTES).getLong();
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public List<Long> getMessageId(String str, String str2, List<byte[]> list) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        FeedStreamUtils.checkStreamName(str2);
        if (list == null || list.size() == 0) {
            return new ArrayList();
        }
        byte[] computeHashByStreamName = FeedStreamUtils.computeHashByStreamName(str2);
        ConditionList or = ConditionFactory.or();
        for (byte[] bArr : list) {
            FeedStreamUtils.validateIdempotentId(bArr);
            or.add(ConditionFactory.compare(FeedStreamUtils.IDEMPOTENT_ID_COLUMN_NAME_BYTES, ConditionFactory.CompareOp.EQUAL, bArr));
        }
        Select where = this.service.select().from(str).where(ConditionFactory.and(ConditionFactory.compare(FeedStreamUtils.HASH_COLUMN_NAME_BYTES, ConditionFactory.CompareOp.EQUAL, computeHashByStreamName), ConditionFactory.compare(FeedStreamUtils.STREAM_NAME_COLUMN_NAME_BYTES, ConditionFactory.CompareOp.EQUAL, str2), ConditionFactory.compare(FeedStreamUtils.ZERO_COLUMN_NAME_BYTES, ConditionFactory.CompareOp.EQUAL, Bytes.ZERO_BYTE), or));
        FeedStreamUtils.markSelectIdempotentFamily(where);
        QueryResults execute = where.execute();
        TreeMap treeMap = new TreeMap(Bytes.BYTES_COMPARATOR);
        for (Row row : execute) {
            treeMap.put(row.getColumnValue(FeedStreamUtils.IDEMPOTENT_ID_COLUMN_NAME).getBinary(), Long.valueOf(row.getColumnValue(FeedStreamUtils.IDEMPOTENT_FAMILY_NAME, FeedStreamUtils.MESSAGE_ID_COLUMN_NAME).getLong().longValue()));
        }
        ArrayList newArrayListWithCapacity = CollectionUtils.newArrayListWithCapacity(list.size());
        if (treeMap.isEmpty()) {
            fillNull(newArrayListWithCapacity, list.size());
        } else {
            Iterator<byte[]> it = list.iterator();
            while (it.hasNext()) {
                newArrayListWithCapacity.add(treeMap.get(it.next()));
            }
        }
        return newArrayListWithCapacity;
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public LMessage get(String str, String str2, byte[] bArr) throws LindormException {
        checkOpen();
        Long messageId = getMessageId(str, str2, bArr);
        if (messageId == null) {
            return null;
        }
        return get(str, str2, messageId.longValue());
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public List<LMessage> batchGet(String str, String str2, List<byte[]> list) throws LindormException {
        checkOpen();
        if (list == null || list.size() == 0) {
            return new ArrayList();
        }
        List<Long> messageId = getMessageId(str, str2, list);
        ArrayList arrayList = new ArrayList(list.size());
        for (Long l : messageId) {
            if (l != null) {
                arrayList.add(l);
            }
        }
        List<LMessage> list2 = get(str, str2, arrayList);
        TreeMap treeMap = new TreeMap(Bytes.BYTES_COMPARATOR);
        for (LMessage lMessage : list2) {
            treeMap.put(lMessage.getIdempotentId(), lMessage);
        }
        ArrayList newArrayListWithCapacity = CollectionUtils.newArrayListWithCapacity(list.size());
        if (treeMap.isEmpty()) {
            fillNull(newArrayListWithCapacity, list.size());
        } else {
            Iterator<byte[]> it = list.iterator();
            while (it.hasNext()) {
                newArrayListWithCapacity.add(treeMap.get(it.next()));
            }
        }
        return newArrayListWithCapacity;
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public LMessageScanner getScanner(String str, String str2, long j, long j2) throws LindormException {
        StreamScan streamScan = new StreamScan();
        streamScan.setStartMessageId(j);
        streamScan.setEndMessageId(j2);
        return getScanner(str, str2, streamScan);
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public LMessageScanner getScanner(String str, String str2, long j, long j2, int i) throws LindormException {
        StreamScan streamScan = new StreamScan();
        streamScan.setStartMessageId(j);
        streamScan.setEndMessageId(j2);
        streamScan.setLimit(i);
        return getScanner(str, str2, streamScan);
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public LMessageScanner getScanner(String str, String str2, StreamScan streamScan) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        FeedStreamUtils.checkStreamName(str2);
        FeedStreamUtils.validateMessageId(streamScan.getStartMessageId());
        if (streamScan.getStartMessageId() > streamScan.getEndMessageId()) {
            throw new IllegalRequestException("Illegal start messageId " + streamScan.getStartMessageId() + ", end messageId " + streamScan.getEndMessageId());
        }
        Select from = this.service.select().from(str);
        ConditionList and = ConditionFactory.and(ConditionFactory.compare(FeedStreamUtils.HASH_COLUMN_NAME, ConditionFactory.CompareOp.EQUAL, FeedStreamUtils.computeHashByStreamName(str2)), ConditionFactory.compare(FeedStreamUtils.STREAM_NAME_COLUMN_NAME, ConditionFactory.CompareOp.EQUAL, str2));
        if (streamScan.isGetScan()) {
            and.add(ConditionFactory.compare(FeedStreamUtils.MESSAGE_ID_COLUMN_NAME, ConditionFactory.CompareOp.EQUAL, Long.valueOf(streamScan.getStartMessageId())));
        } else {
            and.add(ConditionFactory.compare(FeedStreamUtils.MESSAGE_ID_COLUMN_NAME, ConditionFactory.CompareOp.GREATER_OR_EQUAL, Long.valueOf(streamScan.getStartMessageId())));
            and.add(ConditionFactory.compare(FeedStreamUtils.MESSAGE_ID_COLUMN_NAME, ConditionFactory.CompareOp.LESS, Long.valueOf(streamScan.getEndMessageId())));
        }
        from.where(and);
        if (streamScan.getLimit() > 0) {
            from.limit(streamScan.getLimit());
        }
        List<String> tagsToRead = streamScan.getTagsToRead();
        if (tagsToRead != StreamScan.READ_ALL_TAGS) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(FeedStreamUtils.CKV_STREAM_NAME);
            arrayList.add(FeedStreamUtils.CKV_MESSAGE_ID);
            arrayList.add(FeedStreamUtils.CKV_BODY);
            arrayList.add(FeedStreamUtils.CKV_IDEMPOTENT_ID);
            if (tagsToRead != StreamScan.READ_NO_TAGS) {
                Iterator<String> it = tagsToRead.iterator();
                while (it.hasNext()) {
                    arrayList.add(new ColumnKey(FeedStreamUtils.VALUE_FAMILY_NAME, it.next()));
                }
            }
            from.columns(arrayList);
        }
        return new LMessageScanner(from.execute(), str2);
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public int delete(String str, String str2, long j) throws LindormException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        FeedStreamUtils.checkStreamName(str2);
        FeedStreamUtils.validateMessageId(j);
        LDelete lDelete = (LDelete) this.service.delete().from(str).where(ConditionFactory.and(ConditionFactory.compare(FeedStreamUtils.HASH_COLUMN_NAME, ConditionFactory.CompareOp.EQUAL, FeedStreamUtils.computeHashByStreamName(str2)), ConditionFactory.compare(FeedStreamUtils.STREAM_NAME_COLUMN_NAME, ConditionFactory.CompareOp.EQUAL, str2), ConditionFactory.compare(FeedStreamUtils.MESSAGE_ID_COLUMN_NAME, ConditionFactory.CompareOp.EQUAL, Long.valueOf(j))));
        lDelete.setAttribute(LDelete.DELETE_STREAM_ATTR, str2);
        return lDelete.execute();
    }

    @Override // com.alibaba.lindorm.client.FeedStreamService
    public LindormPipeDescriptor describePipeFromCache(String str) throws IOException {
        checkOpen();
        FeedStreamUtils.checkPipeName(str);
        LindormTableDescriptor describeTableFromCache = this.service.describeTableFromCache(str);
        if (describeTableFromCache == null) {
            return null;
        }
        if (FeedStreamUtils.isFeedStreamTable(describeTableFromCache.getTableAttributes())) {
            return FeedStreamUtils.createPipeDescriptor(describeTableFromCache);
        }
        throw new IllegalRequestException("Must describe a pipe, pipe name not found.");
    }
}
