package akka.persistence.jdbc.journal.dao;

import akka.persistence.jdbc.config.BaseDaoConfig;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy$;
import akka.stream.QueueOfferResult;
import akka.stream.QueueOfferResult$Dropped$;
import akka.stream.QueueOfferResult$Enqueued$;
import akka.stream.QueueOfferResult$QueueClosed$;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source$;
import akka.stream.scaladsl.SourceQueueWithComplete;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.SeqOps;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Vector$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: BaseDao.scala */
@ScalaSignature(bytes = "\u0006\u0005!4Q!\u0003\u0006\u0002\u0002UAQ!\b\u0001\u0005\u0002yAq\u0001\f\u0001C\u0002\u001b\rQ\u0006C\u00045\u0001\t\u0007i1A\u001b\t\u000bq\u0002a\u0011A\u001f\t\u000f\u0011\u0003!\u0019!C\u0001\u000b\"1Q\f\u0001Q\u0001\n\u0019CQA\u0018\u0001\u0007\u0002}CQ!\u001a\u0001\u0005\u0002\u0019\u0014qAQ1tK\u0012\u000bwN\u0003\u0002\f\u0019\u0005\u0019A-Y8\u000b\u00055q\u0011a\u00026pkJt\u0017\r\u001c\u0006\u0003\u001fA\tAA\u001b3cG*\u0011\u0011CE\u0001\fa\u0016\u00148/[:uK:\u001cWMC\u0001\u0014\u0003\u0011\t7n[1\u0004\u0001U\u0011acI\n\u0003\u0001]\u0001\"\u0001G\u000e\u000e\u0003eQ\u0011AG\u0001\u0006g\u000e\fG.Y\u0005\u00039e\u0011a!\u00118z%\u00164\u0017A\u0002\u001fj]&$h\bF\u0001 !\r\u0001\u0003!I\u0007\u0002\u0015A\u0011!e\t\u0007\u0001\t\u0015!\u0003A1\u0001&\u0005\u0005!\u0016C\u0001\u0014*!\tAr%\u0003\u0002)3\t9aj\u001c;iS:<\u0007C\u0001\r+\u0013\tY\u0013DA\u0002B]f\f1!\\1u+\u0005q\u0003CA\u00183\u001b\u0005\u0001$BA\u0019\u0013\u0003\u0019\u0019HO]3b[&\u00111\u0007\r\u0002\r\u001b\u0006$XM]5bY&TXM]\u0001\u0003K\u000e,\u0012A\u000e\t\u0003oij\u0011\u0001\u000f\u0006\u0003se\t!bY8oGV\u0014(/\u001a8u\u0013\tY\u0004H\u0001\tFq\u0016\u001cW\u000f^5p]\u000e{g\u000e^3yi\u0006i!-Y:f\t\u0006|7i\u001c8gS\u001e,\u0012A\u0010\t\u0003\u007f\tk\u0011\u0001\u0011\u0006\u0003\u0003:\taaY8oM&<\u0017BA\"A\u00055\u0011\u0015m]3EC>\u001cuN\u001c4jO\u0006QqO]5uKF+X-^3\u0016\u0003\u0019\u00032a\u0012&M\u001b\u0005A%BA%1\u0003!\u00198-\u00197bINd\u0017BA&I\u0005]\u0019v.\u001e:dKF+X-^3XSRD7i\\7qY\u0016$X\r\u0005\u0003\u0019\u001b>+\u0016B\u0001(\u001a\u0005\u0019!V\u000f\u001d7feA\u0019q\u0007\u0015*\n\u0005EC$a\u0002)s_6L7/\u001a\t\u00031MK!\u0001V\r\u0003\tUs\u0017\u000e\u001e\t\u0004-n\u000bS\"A,\u000b\u0005aK\u0016!C5n[V$\u0018M\u00197f\u0015\tQ\u0016$\u0001\u0006d_2dWm\u0019;j_:L!\u0001X,\u0003\u0007M+\u0017/A\u0006xe&$X-U;fk\u0016\u0004\u0013\u0001E<sSR,'j\\;s]\u0006d'k\\<t)\t\u00017\rE\u00028CJK!A\u0019\u001d\u0003\r\u0019+H/\u001e:f\u0011\u0015!w\u00011\u0001V\u0003\tA8/A\u000brk\u0016,Xm\u0016:ji\u0016Tu.\u001e:oC2\u0014vn^:\u0015\u0005\u0001<\u0007\"\u00023\t\u0001\u0004)\u0006")
/* loaded from: input_file:akka/persistence/jdbc/journal/dao/BaseDao.class */
public abstract class BaseDao<T> {
    private final SourceQueueWithComplete<Tuple2<Promise<BoxedUnit>, Seq<T>>> writeQueue = (SourceQueueWithComplete) Source$.MODULE$.queue(baseDaoConfig().bufferSize(), OverflowStrategy$.MODULE$.dropNew()).batchWeighted(baseDaoConfig().batchSize(), tuple2 -> {
        return BoxesRunTime.boxToLong($anonfun$writeQueue$1(tuple2));
    }, tuple22 -> {
        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(Vector$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Promise[]{(Promise) tuple22._1()}))), tuple22._2());
    }, (tuple23, tuple24) -> {
        Tuple2 tuple23 = new Tuple2(tuple23, tuple24);
        if (tuple23 != null) {
            Tuple2 tuple24 = (Tuple2) tuple23._1();
            Tuple2 tuple25 = (Tuple2) tuple23._2();
            if (tuple24 != null) {
                Seq seq = (Seq) tuple24._1();
                Seq seq2 = (Seq) tuple24._2();
                if (tuple25 != null) {
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(seq.$colon$plus((Promise) tuple25._1())), seq2.$plus$plus((Seq) tuple25._2()));
                }
            }
        }
        throw new MatchError(tuple23);
    }).mapAsync(baseDaoConfig().parallelism(), tuple25 -> {
        if (tuple25 == null) {
            throw new MatchError(tuple25);
        }
        Seq seq = (Seq) tuple25._1();
        return this.writeJournalRows((Seq) tuple25._2()).map(boxedUnit -> {
            $anonfun$writeQueue$5(seq, boxedUnit);
            return BoxedUnit.UNIT;
        }, this.ec()).recover(new BaseDao$$anonfun$$nestedInanonfun$writeQueue$4$1(null, seq), this.ec());
    }).toMat(Sink$.MODULE$.ignore(), Keep$.MODULE$.left()).run(mat());

    public abstract Materializer mat();

    public abstract ExecutionContext ec();

    public abstract BaseDaoConfig baseDaoConfig();

    public SourceQueueWithComplete<Tuple2<Promise<BoxedUnit>, Seq<T>>> writeQueue() {
        return this.writeQueue;
    }

    public abstract Future<BoxedUnit> writeJournalRows(Seq<T> seq);

    public Future<BoxedUnit> queueWriteJournalRows(Seq<T> seq) {
        Promise apply = Promise$.MODULE$.apply();
        return writeQueue().offer(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(apply), seq)).flatMap(queueOfferResult -> {
            Future failed;
            if (QueueOfferResult$Enqueued$.MODULE$.equals(queueOfferResult)) {
                failed = apply.future();
            } else if (queueOfferResult instanceof QueueOfferResult.Failure) {
                failed = Future$.MODULE$.failed(new Exception("Failed to write journal row batch", ((QueueOfferResult.Failure) queueOfferResult).cause()));
            } else if (QueueOfferResult$Dropped$.MODULE$.equals(queueOfferResult)) {
                failed = Future$.MODULE$.failed(new Exception(new StringBuilder(129).append("Failed to enqueue journal row batch write, the queue buffer was full (").append(this.baseDaoConfig().bufferSize()).append(" elements) please check the jdbc-journal.bufferSize setting").toString()));
            } else {
                if (!QueueOfferResult$QueueClosed$.MODULE$.equals(queueOfferResult)) {
                    throw new MatchError(queueOfferResult);
                }
                failed = Future$.MODULE$.failed(new Exception("Failed to enqueue journal row batch write, the queue was closed"));
            }
            return failed;
        }, ec());
    }

    public static final /* synthetic */ long $anonfun$writeQueue$1(Tuple2 tuple2) {
        return ((SeqOps) tuple2._2()).size();
    }

    public static final /* synthetic */ void $anonfun$writeQueue$5(Seq seq, BoxedUnit boxedUnit) {
        seq.foreach(promise -> {
            return promise.success(boxedUnit);
        });
    }
}
