/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.shaded.org.apache.ignite.internal.util.subscription;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.shaded.it.unimi.dsi.fastutil.IndirectPriorityQueue;
import org.apache.ignite.shaded.it.unimi.dsi.fastutil.objects.ObjectArrays;
import org.apache.ignite.shaded.it.unimi.dsi.fastutil.objects.ObjectHeapIndirectPriorityQueue;
import org.apache.ignite.shaded.org.jetbrains.annotations.Nullable;

public class OrderedMergePublisher<T>
implements Flow.Publisher<T> {
    private final Comparator<? super T> comp;
    private final Flow.Publisher<? extends T>[] sources;
    private final int prefetch;

    public OrderedMergePublisher(Comparator<? super T> comp, int prefetch, Flow.Publisher<? extends T> ... sources) {
        this.sources = sources;
        this.prefetch = prefetch;
        this.comp = comp;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super T> downstream) {
        OrderedMergeSubscription<? extends T> subscription = new OrderedMergeSubscription<T>(downstream, this.comp, this.prefetch, this.sources.length);
        subscription.subscribe(this.sources);
        downstream.onSubscribe(subscription);
        subscription.drain();
    }

    static final class OrderedMergeSubscription<T>
    implements Flow.Subscription {
        private static final Object DONE = new Object();
        final Flow.Subscriber<? super T> downstream;
        private final AtomicInteger guardCntr = new AtomicInteger(1);
        private final OrderedMergeSubscriber<T>[] subscribers;
        private final Object[] values;
        private final IndirectPriorityQueue<Object> valuesQueue;
        private State state = State.INITIAL;
        private ErrorChain errorChain;
        private boolean cancelled;
        private long requested;
        private int waiting;
        private long emitted;
        static final VarHandle ERROR_CHAIN;
        static final VarHandle CANCELLED;
        static final VarHandle STATE;
        static final VarHandle REQUESTED;

        OrderedMergeSubscription(Flow.Subscriber<? super T> downstream, Comparator<? super T> comp, int prefetch, int cnt) {
            this.downstream = downstream;
            this.subscribers = new OrderedMergeSubscriber[cnt];
            for (int i = 0; i < cnt; ++i) {
                this.subscribers[i] = new OrderedMergeSubscriber(this, prefetch);
            }
            this.values = new Object[cnt];
            this.valuesQueue = new ObjectHeapIndirectPriorityQueue<T>(this.values, comp);
            this.waiting = cnt;
        }

        void subscribe(Flow.Publisher<? extends T>[] sources) {
            for (int i = 0; i < sources.length; ++i) {
                sources[i].subscribe(this.subscribers[i]);
            }
            this.guardCntr.set(0);
        }

        @Override
        public void request(long n) {
            long next;
            long current;
            do {
                if ((next = (current = REQUESTED.getAcquire(this)) + n) >= 0L) continue;
                next = Long.MAX_VALUE;
            } while (!REQUESTED.compareAndSet(this, current, next));
            this.drain();
        }

        @Override
        public void cancel() {
            if (CANCELLED.compareAndSet(this, false, true)) {
                STATE.setRelease(this, State.STOP);
                for (OrderedMergeSubscriber<T> inner : this.subscribers) {
                    inner.cancel();
                }
                if (this.guardCntr.get() == 0) {
                    Arrays.fill(this.values, null);
                    for (OrderedMergeSubscriber<T> inner : this.subscribers) {
                        inner.queue.clear();
                    }
                }
            }
        }

        private void onInnerError(OrderedMergeSubscriber<T> sender, Throwable ex) {
            this.updateError(ex);
            sender.done = true;
            this.drain();
        }

        private void updateError(Throwable throwable) {
            ErrorChain next;
            ErrorChain current;
            while (!ERROR_CHAIN.compareAndSet(this, current = ERROR_CHAIN.getAcquire(this), next = new ErrorChain(throwable, current))) {
            }
        }

        private void drain() {
            if (this.guardCntr.getAndIncrement() != 0) {
                return;
            }
            Flow.Subscriber<Object> downstream = this.downstream;
            OrderedMergeSubscriber<T>[] subscribers = this.subscribers;
            Object[] values = this.values;
            long emitted = this.emitted;
            block6: while (true) {
                switch (STATE.getAcquire(this).ordinal()) {
                    case 0: {
                        int waiting = this.waiting;
                        int i = 0;
                        while (i < waiting) {
                            boolean innerDone = subscribers[0].done;
                            Object obj = subscribers[0].queue.poll();
                            int done = obj == null && innerDone ? 1 : 0;
                            int initialized = obj != null || innerDone ? 1 : 0;
                            values[0] = done > 0 ? DONE : obj;
                            int move = initialized * (waiting -= initialized);
                            ObjectArrays.swap(values, 0, move);
                            ObjectArrays.swap(subscribers, 0, move);
                            i = initialized == 0 ? waiting : i;
                        }
                        this.waiting = waiting;
                        if (waiting != 0) break;
                        for (i = 0; i < values.length; ++i) {
                            if (values[i] == DONE) continue;
                            this.valuesQueue.enqueue(i);
                        }
                        State state = this.valuesQueue.isEmpty() ? State.COMPLETING : State.RUNNING;
                        STATE.compareAndSet(this, State.INITIAL, state);
                        continue block6;
                    }
                    case 1: {
                        long requested = REQUESTED.getAcquire(this);
                        while (!this.valuesQueue.isEmpty()) {
                            int minIndex = this.valuesQueue.first();
                            if (values[minIndex] == null) {
                                boolean done = subscribers[minIndex].done;
                                Object val = subscribers[minIndex].queue.poll();
                                if (val != null) {
                                    values[minIndex] = val;
                                    this.valuesQueue.changed();
                                    minIndex = this.valuesQueue.first();
                                } else {
                                    if (!done) break;
                                    this.valuesQueue.dequeue();
                                    continue;
                                }
                            }
                            if (emitted == requested) break;
                            downstream.onNext(values[minIndex]);
                            ++emitted;
                            values[minIndex] = null;
                            subscribers[minIndex].request(1L);
                        }
                        if (!this.valuesQueue.isEmpty()) break;
                        STATE.compareAndSet(this, State.RUNNING, State.COMPLETING);
                        continue block6;
                    }
                    case 2: {
                        STATE.set(this, State.STOP);
                        if (!CANCELLED.getAcquire(this)) {
                            assert (this.valuesQueue.isEmpty());
                            this.finish(downstream);
                        }
                        Arrays.fill(values, null);
                        for (OrderedMergeSubscriber<T> inner : subscribers) {
                            inner.queue.clear();
                        }
                        return;
                    }
                    case 3: {
                        return;
                    }
                    default: {
                        throw new IllegalStateException("Should never get here.");
                    }
                }
                this.emitted = emitted;
                if (this.guardCntr.decrementAndGet() == 0) break;
                this.guardCntr.set(1);
            }
        }

        private void finish(Flow.Subscriber<? super T> downstream) {
            ErrorChain chain = ERROR_CHAIN.getAcquire(this);
            if (chain == null) {
                downstream.onComplete();
            } else {
                downstream.onError(chain.buildThrowable());
            }
        }

        static {
            MethodHandles.Lookup lk = MethodHandles.lookup();
            try {
                ERROR_CHAIN = lk.findVarHandle(OrderedMergeSubscription.class, "errorChain", ErrorChain.class);
                CANCELLED = lk.findVarHandle(OrderedMergeSubscription.class, "cancelled", Boolean.TYPE);
                REQUESTED = lk.findVarHandle(OrderedMergeSubscription.class, "requested", Long.TYPE);
                STATE = lk.findVarHandle(OrderedMergeSubscription.class, "state", State.class);
            }
            catch (Throwable ex) {
                throw new InternalError(ex);
            }
        }

        static final class OrderedMergeSubscriber<T>
        extends AtomicReference<Flow.Subscription>
        implements Flow.Subscriber<T>,
        Flow.Subscription {
            private final OrderedMergeSubscription<T> parent;
            private final int prefetch;
            private final int limit;
            private final Queue<T> queue;
            private int consumed;
            private volatile boolean done;

            OrderedMergeSubscriber(OrderedMergeSubscription<T> parent, int prefetch) {
                assert (prefetch > 0);
                this.parent = parent;
                this.prefetch = prefetch;
                this.limit = prefetch - (prefetch >> 2);
                this.queue = new ConcurrentLinkedQueue<T>();
            }

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                if (this.compareAndSet(null, subscription)) {
                    subscription.request(this.prefetch);
                } else {
                    subscription.cancel();
                }
            }

            @Override
            public void onNext(T item) {
                this.queue.offer(item);
                this.parent.drain();
            }

            @Override
            public void onError(Throwable throwable) {
                this.parent.onInnerError(this, throwable);
            }

            @Override
            public void onComplete() {
                this.done = true;
                this.parent.drain();
            }

            @Override
            public synchronized void request(long n) {
                int c = this.consumed + 1;
                if (c == this.limit) {
                    this.consumed = 0;
                    Flow.Subscription subscription = (Flow.Subscription)this.get();
                    if (subscription != this) {
                        subscription.request(c);
                    }
                } else {
                    this.consumed = c;
                }
            }

            @Override
            public void cancel() {
                Flow.Subscription subscription = this.getAndSet(this);
                if (subscription != null && subscription != this) {
                    subscription.cancel();
                }
            }
        }
    }

    private static enum State {
        INITIAL,
        RUNNING,
        COMPLETING,
        STOP;

    }

    private static class ErrorChain {
        private final Throwable error;
        @Nullable
        private final ErrorChain next;
        private boolean built = false;

        private ErrorChain(Throwable error, @Nullable ErrorChain next) {
            this.error = error;
            this.next = next;
        }

        synchronized Throwable buildThrowable() {
            if (this.built) {
                return this.error;
            }
            ErrorChain chain = this.next;
            while (chain != null) {
                this.error.addSuppressed(chain.error);
                chain = chain.next;
            }
            this.built = true;
            return this.error;
        }
    }
}

