package com.aliyun.openservices.paifeaturestore.dao;

import com.aliyun.openservices.paifeaturestore.constants.FSType;
import com.aliyun.openservices.paifeaturestore.datasource.FeatureDBClient;
import com.aliyun.openservices.paifeaturestore.datasource.FeatureDBFactory;
import com.aliyun.openservices.paifeaturestore.datasource.RecordBlock;
import com.aliyun.openservices.paifeaturestore.datasource.UInt8ValueColumn;
import com.aliyun.openservices.paifeaturestore.domain.FeatureResult;
import com.aliyun.openservices.paifeaturestore.domain.FeatureStoreResult;
import com.aliyun.openservices.paifeaturestore.model.FeatureViewSeqConfig;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/aliyun/openservices/paifeaturestore/dao/FeatureViewFeatureDBDao.class */
public class FeatureViewFeatureDBDao implements FeatureViewDao {
    private static Log log = LogFactory.getLog(FeatureViewFeatureDBDao.class);
    private FeatureDBClient featureDBClient;
    private String database;
    private String schema;
    private String table;
    private String primaryKeyField;
    public Map<String, FSType> fieldTypeMap;
    private List<String> fields;
    private final List<Map<String, Object>> writeData = new ArrayList();
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition condition = this.lock.newCondition();
    private final ExecutorService executor = Executors.newFixedThreadPool(4);

    public FeatureViewFeatureDBDao(DaoConfig daoConfig) {
        this.database = daoConfig.featureDBDatabase;
        this.schema = daoConfig.featureDBSchema;
        this.table = daoConfig.featureDBTable;
        FeatureDBClient featureDBClient = FeatureDBFactory.get(daoConfig.featureDBName);
        if (null == featureDBClient) {
            throw new RuntimeException(String.format("featuredbclient:%s not found", daoConfig.featureDBName));
        }
        this.featureDBClient = featureDBClient;
        this.fieldTypeMap = daoConfig.fieldTypeMap;
        this.primaryKeyField = daoConfig.primaryKeyField;
        this.fields = daoConfig.fields;
        startAsyncWrite();
    }

    @Override // com.aliyun.openservices.paifeaturestore.dao.FeatureViewDao
    public FeatureResult getFeatures(String[] strArr, String[] strArr2) {
        HashSet hashSet = new HashSet(Arrays.asList(strArr2));
        List asList = Arrays.asList(strArr);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < asList.size(); i += 200) {
            int i2 = i + 200;
            if (i2 > asList.size()) {
                i2 = asList.size();
            }
            arrayList.add(asList.subList(i, i2));
        }
        List list = (List) arrayList.stream().map(list2 -> {
            return CompletableFuture.supplyAsync(() -> {
                FeatureStoreResult featureStoreResult = new FeatureStoreResult();
                ArrayList arrayList2 = new ArrayList(list2.size());
                try {
                    RecordBlock rootAsRecordBlock = RecordBlock.getRootAsRecordBlock(ByteBuffer.wrap(this.featureDBClient.requestFeatureDB(list2, this.database, this.schema, this.table)));
                    for (int i3 = 0; i3 < rootAsRecordBlock.valuesLength(); i3++) {
                        UInt8ValueColumn uInt8ValueColumn = new UInt8ValueColumn();
                        rootAsRecordBlock.values(uInt8ValueColumn, i3);
                        if (uInt8ValueColumn.valueLength() >= 2) {
                            ByteBuffer order = uInt8ValueColumn.valueAsByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
                            byte b = order.get();
                            byte b2 = order.get();
                            if (b != 70 || b2 != 49) {
                                String format = String.format("invalid proto version, %d, %d", Byte.valueOf(b), Byte.valueOf(b2));
                                log.error(format);
                                throw new RuntimeException(format);
                            }
                            HashMap hashMap = new HashMap(strArr2.length);
                            for (String str : this.fields) {
                                if (!str.equals(this.primaryKeyField)) {
                                    if (1 != order.get()) {
                                        switch (this.fieldTypeMap.get(str)) {
                                            case FS_INT32:
                                                if (hashSet.contains(str)) {
                                                    hashMap.put(str, Integer.valueOf(order.getInt()));
                                                    break;
                                                }
                                                break;
                                            case FS_INT64:
                                                if (hashSet.contains(str)) {
                                                    hashMap.put(str, Long.valueOf(order.getLong()));
                                                    break;
                                                }
                                                break;
                                            case FS_DOUBLE:
                                                if (hashSet.contains(str)) {
                                                    hashMap.put(str, Double.valueOf(order.getDouble()));
                                                    break;
                                                }
                                                break;
                                            case FS_BOOLEAN:
                                                if (order.get() == 0) {
                                                    if (hashSet.contains(str)) {
                                                        hashMap.put(str, false);
                                                        break;
                                                    }
                                                } else if (hashSet.contains(str)) {
                                                    hashMap.put(str, true);
                                                    break;
                                                }
                                                break;
                                            case FS_FLOAT:
                                                if (hashSet.contains(str)) {
                                                    hashMap.put(str, Float.valueOf(order.getFloat()));
                                                    break;
                                                }
                                                break;
                                            default:
                                                int i4 = order.getInt();
                                                byte[] bArr = new byte[i4];
                                                order.get(bArr, 0, i4);
                                                if (hashSet.contains(str)) {
                                                    hashMap.put(str, new String(bArr));
                                                    break;
                                                }
                                                break;
                                        }
                                    } else if (hashSet.contains(str)) {
                                        hashMap.put(str, null);
                                    }
                                }
                            }
                            hashMap.put(this.primaryKeyField, list2.get(i3));
                            arrayList2.add(hashMap);
                        }
                    }
                    featureStoreResult.setFeatureDataList(arrayList2);
                    return featureStoreResult;
                } catch (Exception e) {
                    log.error(String.format("request featuredb error:%s", e.getMessage()));
                    throw new RuntimeException(e);
                }
            });
        }).collect(Collectors.toList());
        try {
            List<FeatureResult> list3 = (List) CompletableFuture.allOf((CompletableFuture[]) list.toArray(new CompletableFuture[0])).thenApply(r4 -> {
                return (List) list.stream().map((v0) -> {
                    return v0.join();
                }).collect(Collectors.toList());
            }).get();
            FeatureStoreResult featureStoreResult = new FeatureStoreResult();
            featureStoreResult.setFeatureFields(strArr2);
            featureStoreResult.setFeatureFieldTypeMap(this.fieldTypeMap);
            ArrayList arrayList2 = new ArrayList(strArr.length);
            for (FeatureResult featureResult : list3) {
                if (null != featureResult.getFeatureData()) {
                    arrayList2.addAll(featureResult.getFeatureData());
                }
            }
            featureStoreResult.setFeatureDataList(arrayList2);
            return featureStoreResult;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.aliyun.openservices.paifeaturestore.dao.FeatureViewDao
    public FeatureResult getSequenceFeatures(String[] strArr, String str, FeatureViewSeqConfig featureViewSeqConfig) {
        return null;
    }

    @Override // com.aliyun.openservices.paifeaturestore.dao.FeatureViewDao
    public void writeFeatures(List<Map<String, Object>> list) {
        this.lock.lock();
        try {
            this.writeData.addAll(list);
            if (this.writeData.size() >= 20) {
                this.condition.signal();
            }
        } finally {
            this.lock.unlock();
        }
    }

    @Override // com.aliyun.openservices.paifeaturestore.dao.FeatureViewDao
    public void writeFlush() {
        this.lock.lock();
        try {
            if (this.writeData.size() > 0) {
                try {
                    doWriteFeatures();
                    this.executor.shutdown();
                    try {
                        if (!this.executor.awaitTermination(60L, TimeUnit.SECONDS)) {
                            this.executor.shutdownNow();
                        }
                    } catch (InterruptedException e) {
                        this.executor.shutdownNow();
                    }
                } catch (Exception e2) {
                    log.error(String.format("request featuredb error:%s", e2.getMessage()));
                }
            }
        } finally {
            this.lock.unlock();
        }
    }

    private void startAsyncWrite() {
        new Thread(() -> {
            while (true) {
                this.lock.lock();
                try {
                    try {
                        this.condition.await(50L, TimeUnit.MILLISECONDS);
                        if (!this.writeData.isEmpty()) {
                            doWriteFeatures();
                        }
                        this.lock.unlock();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        this.lock.unlock();
                    }
                } catch (Throwable th) {
                    this.lock.unlock();
                    throw th;
                }
            }
        }).start();
    }

    private void doWriteFeatures() {
        ArrayList arrayList = new ArrayList(this.writeData);
        this.writeData.clear();
        this.executor.submit(() -> {
            try {
                this.featureDBClient.writeFeatureDB(arrayList, this.database, this.schema, this.table);
            } catch (Exception e) {
                log.error(String.format("request featuredb error:%s", e.getMessage()));
            }
        });
    }
}
