/*
 * Decompiled with CFR 0.152.
 */
package com.grinderwolf.swm.internal.lettuce.core;

import com.grinderwolf.swm.internal.lettuce.core.Operators;
import com.grinderwolf.swm.internal.lettuce.core.api.StatefulConnection;
import com.grinderwolf.swm.internal.lettuce.core.api.StatefulRedisConnection;
import com.grinderwolf.swm.internal.lettuce.core.internal.ExceptionFactory;
import com.grinderwolf.swm.internal.lettuce.core.internal.LettuceAssert;
import com.grinderwolf.swm.internal.lettuce.core.output.StreamingOutput;
import com.grinderwolf.swm.internal.lettuce.core.protocol.CommandWrapper;
import com.grinderwolf.swm.internal.lettuce.core.protocol.DemandAware;
import com.grinderwolf.swm.internal.lettuce.core.protocol.RedisCommand;
import io.netty.util.Recycler;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.Collection;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.util.context.Context;

class RedisPublisher<K, V, T>
implements Publisher<T> {
    private static final InternalLogger LOG = InternalLoggerFactory.getInstance(RedisPublisher.class);
    private final boolean traceEnabled = LOG.isTraceEnabled();
    private final Supplier<? extends RedisCommand<K, V, T>> commandSupplier;
    private final AtomicReference<RedisCommand<K, V, T>> ref;
    private final StatefulConnection<K, V> connection;
    private final boolean dissolve;
    private final Executor executor;

    public RedisPublisher(RedisCommand<K, V, T> staticCommand, StatefulConnection<K, V> connection, boolean dissolve, Executor publishOn) {
        this(() -> staticCommand, connection, dissolve, publishOn);
    }

    public RedisPublisher(Supplier<RedisCommand<K, V, T>> commandSupplier, StatefulConnection<K, V> connection, boolean dissolve, Executor publishOn) {
        LettuceAssert.notNull(commandSupplier, "CommandSupplier must not be null");
        LettuceAssert.notNull(connection, "StatefulConnection must not be null");
        LettuceAssert.notNull((Object)publishOn, "Executor must not be null");
        this.commandSupplier = commandSupplier;
        this.connection = connection;
        this.dissolve = dissolve;
        this.executor = publishOn;
        this.ref = new AtomicReference<RedisCommand<K, RedisCommand<K, V, T>, T>>(commandSupplier.get());
    }

    @Override
    public void subscribe(Subscriber<? super T> subscriber) {
        RedisCommand<K, V, T> command;
        if (this.traceEnabled) {
            LOG.trace("subscribe: {}@{}", (Object)subscriber.getClass().getName(), (Object)Objects.hashCode(subscriber));
        }
        if ((command = this.ref.get()) != null) {
            if (!this.ref.compareAndSet(command, null)) {
                command = this.commandSupplier.get();
            }
        } else {
            command = this.commandSupplier.get();
        }
        RedisSubscription<T> redisSubscription = new RedisSubscription<T>(this.connection, command, this.dissolve, this.executor);
        redisSubscription.subscribe(subscriber);
    }

    static class OnComplete
    implements Runnable {
        private static final Recycler<OnComplete> RECYCLER = new Recycler<OnComplete>(){

            @Override
            protected OnComplete newObject(Recycler.Handle<OnComplete> handle) {
                return new OnComplete(handle);
            }
        };
        private final Recycler.Handle<OnComplete> handle;
        private Throwable signal;
        private Subscriber<?> subscriber;

        OnComplete(Recycler.Handle<OnComplete> handle) {
            this.handle = handle;
        }

        static OnComplete newInstance(Throwable signal, Subscriber<?> subscriber) {
            OnComplete entry = RECYCLER.get();
            entry.signal = signal;
            entry.subscriber = subscriber;
            return entry;
        }

        static OnComplete newInstance(Subscriber<?> subscriber) {
            OnComplete entry = RECYCLER.get();
            entry.signal = null;
            entry.subscriber = subscriber;
            return entry;
        }

        @Override
        public void run() {
            try {
                if (this.signal != null) {
                    this.subscriber.onError(this.signal);
                } else {
                    this.subscriber.onComplete();
                }
            }
            finally {
                this.recycle();
            }
        }

        private void recycle() {
            this.signal = null;
            this.subscriber = null;
            this.handle.recycle(this);
        }
    }

    static class OnNext
    implements Runnable {
        private static final Recycler<OnNext> RECYCLER = new Recycler<OnNext>(){

            @Override
            protected OnNext newObject(Recycler.Handle<OnNext> handle) {
                return new OnNext(handle);
            }
        };
        private final Recycler.Handle<OnNext> handle;
        private Object signal;
        private Subscriber<Object> subscriber;

        OnNext(Recycler.Handle<OnNext> handle) {
            this.handle = handle;
        }

        static OnNext newInstance(Object signal, Subscriber<?> subscriber) {
            OnNext entry = RECYCLER.get();
            entry.signal = signal;
            entry.subscriber = subscriber;
            return entry;
        }

        @Override
        public void run() {
            try {
                this.subscriber.onNext(this.signal);
            }
            finally {
                this.recycle();
            }
        }

        private void recycle() {
            this.signal = null;
            this.subscriber = null;
            this.handle.recycle(this);
        }
    }

    static class PublishOnSubscriber<T>
    implements RedisSubscriber<T> {
        private final CoreSubscriber<T> delegate;
        private final Executor executor;

        public PublishOnSubscriber(Subscriber<T> delegate, Executor executor) {
            this.delegate = reactor.core.publisher.Operators.toCoreSubscriber(delegate);
            this.executor = executor;
        }

        @Override
        public Context currentContext() {
            return this.delegate.currentContext();
        }

        @Override
        public void onSubscribe(Subscription s) {
            this.delegate.onSubscribe(s);
        }

        @Override
        public void onNext(T t) {
            this.executor.execute(OnNext.newInstance(t, this.delegate));
        }

        @Override
        public void onError(Throwable t) {
            this.executor.execute(OnComplete.newInstance(t, this.delegate));
        }

        @Override
        public void onComplete() {
            this.executor.execute(OnComplete.newInstance(this.delegate));
        }
    }

    static class ImmediateSubscriber<T>
    implements RedisSubscriber<T> {
        private final CoreSubscriber<T> delegate;

        public ImmediateSubscriber(Subscriber<T> delegate) {
            this.delegate = reactor.core.publisher.Operators.toCoreSubscriber(delegate);
        }

        @Override
        public Context currentContext() {
            return this.delegate.currentContext();
        }

        @Override
        public void onSubscribe(Subscription s) {
            this.delegate.onSubscribe(s);
        }

        @Override
        public void onNext(T t) {
            this.delegate.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            this.delegate.onError(t);
        }

        @Override
        public void onComplete() {
            this.delegate.onComplete();
        }
    }

    static interface RedisSubscriber<T>
    extends CoreSubscriber<T> {
        public static <T> RedisSubscriber<T> create(Subscriber<?> delegate, Executor executor) {
            if (executor == ImmediateEventExecutor.INSTANCE) {
                return new ImmediateSubscriber(delegate);
            }
            return new PublishOnSubscriber(delegate, executor);
        }
    }

    private static class CompositeSubscriber<T>
    extends StreamingOutput.Subscriber<T> {
        private final StreamingOutput.Subscriber<T> first;
        private final StreamingOutput.Subscriber<T> second;

        public CompositeSubscriber(StreamingOutput.Subscriber<T> first, StreamingOutput.Subscriber<T> second) {
            this.first = first;
            this.second = second;
        }

        @Override
        public void onNext(T t) {
            throw new UnsupportedOperationException();
        }

        @Override
        public void onNext(Collection<T> outputTarget, T t) {
            this.first.onNext(outputTarget, t);
            this.second.onNext(outputTarget, t);
        }
    }

    static class SubscriptionCommand<K, V, T>
    extends CommandWrapper<K, V, T>
    implements DemandAware.Sink {
        private final boolean dissolve;
        private final RedisSubscription<T> subscription;
        private volatile DemandAware.Source source;

        public SubscriptionCommand(RedisCommand<K, V, T> command, RedisSubscription<T> subscription, boolean dissolve) {
            super(command);
            this.subscription = subscription;
            this.dissolve = dissolve;
        }

        @Override
        public boolean hasDemand() {
            return this.isDone() || this.subscription.state() == State.COMPLETED || this.subscription.data.isEmpty();
        }

        @Override
        protected void doOnComplete() {
            if (this.getOutput() != null) {
                Object result = this.getOutput().get();
                if (this.getOutput().hasError()) {
                    this.onError(ExceptionFactory.createExecutionException(this.getOutput().getError()));
                    return;
                }
                if (!(this.getOutput() instanceof StreamingOutput) && result != null) {
                    if (this.dissolve && result instanceof Collection) {
                        Collection collection = (Collection)result;
                        for (Object t : collection) {
                            if (t == null) continue;
                            this.subscription.onNext(t);
                        }
                    } else {
                        this.subscription.onNext(result);
                    }
                }
            }
            this.subscription.onAllDataRead();
        }

        @Override
        public void setSource(DemandAware.Source source) {
            this.source = source;
        }

        @Override
        public void removeSource() {
            this.source = null;
        }

        @Override
        protected void doOnError(Throwable throwable) {
            this.onError(throwable);
        }

        private void onError(Throwable throwable) {
            this.subscription.onError(throwable);
        }
    }

    static enum State {
        UNSUBSCRIBED{

            @Override
            void subscribe(RedisSubscription<?> subscription, Subscriber<?> subscriber) {
                LettuceAssert.notNull(subscriber, "Subscriber must not be null");
                if (!subscription.changeState(this, NO_DEMAND)) {
                    throw new IllegalStateException(this.toString());
                }
                subscription.subscriber = RedisSubscriber.create(subscriber, ((RedisSubscription)subscription).executor);
                subscriber.onSubscribe(subscription);
            }
        }
        ,
        NO_DEMAND{

            @Override
            void request(RedisSubscription<?> subscription, long n) {
                if (Operators.request(RedisSubscription.DEMAND, subscription, n)) {
                    if (subscription.changeState(this, DEMAND)) {
                        try {
                            subscription.checkCommandDispatch();
                        }
                        catch (Exception ex) {
                            subscription.onError(ex);
                        }
                        subscription.checkOnDataAvailable();
                    }
                    subscription.potentiallyReadMore();
                    subscription.state().onDataAvailable(subscription);
                } else {
                    this.onError(subscription, Exceptions.nullOrNegativeRequestException(n));
                }
            }
        }
        ,
        DEMAND{

            @Override
            void onDataAvailable(RedisSubscription<?> subscription) {
                try {
                    do {
                        if (this.read(subscription)) continue;
                        return;
                    } while (subscription.hasDemand());
                }
                catch (Exception e) {
                    subscription.onError(e);
                }
            }

            @Override
            void request(RedisSubscription<?> subscription, long n) {
                if (Operators.request(RedisSubscription.DEMAND, subscription, n)) {
                    this.onDataAvailable(subscription);
                    subscription.potentiallyReadMore();
                } else {
                    this.onError(subscription, Exceptions.nullOrNegativeRequestException(n));
                }
            }

            private boolean read(RedisSubscription<?> subscription) {
                State state = subscription.state();
                if (state == NO_DEMAND || state == DEMAND) {
                    if (!subscription.changeState(state, READING)) {
                        return false;
                    }
                } else {
                    return false;
                }
                subscription.readAndPublish();
                if (subscription.allDataRead && subscription.data.isEmpty()) {
                    state.onAllDataRead(subscription);
                    return false;
                }
                subscription.afterRead();
                return subscription.allDataRead || !subscription.data.isEmpty();
            }
        }
        ,
        READING{

            @Override
            void request(RedisSubscription<?> subscription, long n) {
                DEMAND.request(subscription, n);
            }
        }
        ,
        COMPLETED{

            @Override
            void request(RedisSubscription<?> subscription, long n) {
            }

            @Override
            void cancel(RedisSubscription<?> subscription) {
            }

            @Override
            void onAllDataRead(RedisSubscription<?> subscription) {
            }

            @Override
            void onError(RedisSubscription<?> subscription, Throwable t) {
            }
        };


        void subscribe(RedisSubscription<?> subscription, Subscriber<?> subscriber) {
            throw new IllegalStateException(this.toString());
        }

        void request(RedisSubscription<?> subscription, long n) {
            throw new IllegalStateException(this.toString());
        }

        void cancel(RedisSubscription<?> subscription) {
            subscription.command.cancel();
            if (subscription.changeState(this, COMPLETED)) {
                this.readData(subscription);
            }
        }

        void readData(RedisSubscription<?> subscription) {
            DemandAware.Source source = ((RedisSubscription)subscription).subscriptionCommand.source;
            if (source != null) {
                source.requestMore();
            }
        }

        void onDataAvailable(RedisSubscription<?> subscription) {
        }

        void onAllDataRead(RedisSubscription<?> subscription) {
            if (subscription.data.isEmpty() && subscription.complete()) {
                this.readData(subscription);
                RedisSubscriber subscriber = subscription.subscriber;
                if (subscriber != null) {
                    subscriber.onComplete();
                }
            }
        }

        void onError(RedisSubscription<?> subscription, Throwable t) {
            State state;
            while ((state = subscription.state()) != COMPLETED && subscription.changeState(state, COMPLETED)) {
                this.readData(subscription);
                RedisSubscriber subscriber = subscription.subscriber;
                if (subscriber == null) continue;
                subscriber.onError(t);
                return;
            }
        }
    }

    private static enum CommandDispatch {
        UNDISPATCHED{

            @Override
            void dispatch(RedisSubscription<?> redisSubscription) {
                if (RedisSubscription.COMMAND_DISPATCH.compareAndSet(redisSubscription, this, DISPATCHED)) {
                    redisSubscription.dispatchCommand();
                }
            }
        }
        ,
        DISPATCHED;


        void dispatch(RedisSubscription<?> redisSubscription) {
        }
    }

    static class RedisSubscription<T>
    extends StreamingOutput.Subscriber<T>
    implements Subscription {
        static final InternalLogger LOG = InternalLoggerFactory.getInstance(RedisPublisher.class);
        static final int ST_PROGRESS = 0;
        static final int ST_COMPLETED = 1;
        static final AtomicLongFieldUpdater<RedisSubscription> DEMAND = AtomicLongFieldUpdater.newUpdater(RedisSubscription.class, "demand");
        static final AtomicReferenceFieldUpdater<RedisSubscription, State> STATE = AtomicReferenceFieldUpdater.newUpdater(RedisSubscription.class, State.class, "state");
        static final AtomicReferenceFieldUpdater<RedisSubscription, CommandDispatch> COMMAND_DISPATCH = AtomicReferenceFieldUpdater.newUpdater(RedisSubscription.class, CommandDispatch.class, "commandDispatch");
        private final SubscriptionCommand<?, ?, T> subscriptionCommand;
        private final boolean traceEnabled = LOG.isTraceEnabled();
        final Queue<T> data = Operators.newQueue();
        final StatefulConnection<?, ?> connection;
        final RedisCommand<?, ?, T> command;
        final boolean dissolve;
        private final Executor executor;
        volatile long demand;
        volatile State state = State.UNSUBSCRIBED;
        volatile CommandDispatch commandDispatch = CommandDispatch.UNDISPATCHED;
        volatile boolean allDataRead = false;
        volatile RedisSubscriber<? super T> subscriber;

        RedisSubscription(StatefulConnection<?, ?> connection, RedisCommand<?, ?, T> command, boolean dissolve, Executor executor) {
            LettuceAssert.notNull(connection, "Connection must not be null");
            LettuceAssert.notNull(command, "RedisCommand must not be null");
            LettuceAssert.notNull((Object)executor, "Executor must not be null");
            this.connection = connection;
            this.command = command;
            this.dissolve = dissolve;
            this.executor = executor;
            if (command.getOutput() instanceof StreamingOutput) {
                StreamingOutput streamingOutput = (StreamingOutput)((Object)command.getOutput());
                if (connection instanceof StatefulRedisConnection && ((StatefulRedisConnection)connection).isMulti()) {
                    streamingOutput.setSubscriber(new CompositeSubscriber(this, streamingOutput.getSubscriber()));
                } else {
                    streamingOutput.setSubscriber(this);
                }
            }
            this.subscriptionCommand = new SubscriptionCommand(command, this, dissolve);
        }

        void subscribe(Subscriber<? super T> subscriber) {
            if (subscriber == null) {
                throw new NullPointerException("Subscriber must not be null");
            }
            State state = this.state();
            if (this.traceEnabled) {
                LOG.trace("{} subscribe: {}@{}", new Object[]{state, subscriber.getClass().getName(), subscriber.hashCode()});
            }
            state.subscribe(this, subscriber);
        }

        @Override
        public final void request(long n) {
            State state = this.state();
            if (this.traceEnabled) {
                LOG.trace("{} request: {}", (Object)state, (Object)n);
            }
            state.request(this, n);
        }

        @Override
        public final void cancel() {
            State state = this.state();
            if (this.traceEnabled) {
                LOG.trace("{} cancel", (Object)state);
            }
            state.cancel(this);
        }

        @Override
        public void onNext(T t) {
            long initial;
            LettuceAssert.notNull(t, "Data must not be null");
            State state = this.state();
            if (state == State.COMPLETED) {
                return;
            }
            if (this.data.isEmpty() && this.state() == State.DEMAND && (initial = this.getDemand()) > 0L) {
                try {
                    DEMAND.decrementAndGet(this);
                    this.subscriber.onNext(t);
                }
                catch (Exception e) {
                    this.onError(e);
                }
                return;
            }
            if (!this.data.offer(t)) {
                RedisSubscriber<? super T> subscriber = this.subscriber;
                Context context = Context.empty();
                if (subscriber instanceof CoreSubscriber) {
                    context = ((CoreSubscriber)subscriber).currentContext();
                }
                Throwable e = Operators.onOperatorError(this, Exceptions.failWithOverflow(), t, context);
                this.onError(e);
                return;
            }
            this.onDataAvailable();
        }

        final void onDataAvailable() {
            State state = this.state();
            if (this.traceEnabled) {
                LOG.trace("{} onDataAvailable()", (Object)state);
            }
            state.onDataAvailable(this);
        }

        final void onAllDataRead() {
            State state = this.state();
            if (this.traceEnabled) {
                LOG.trace("{} onAllDataRead()", (Object)state);
            }
            this.allDataRead = true;
            this.onDataAvailable();
        }

        final void onError(Throwable t) {
            State state = this.state();
            if (LOG.isErrorEnabled()) {
                LOG.trace("{} onError(): {}", new Object[]{state, t.toString(), t});
            }
            state.onError(this, t);
        }

        protected T read() {
            return this.data.poll();
        }

        boolean hasDemand() {
            return this.getDemand() > 0L;
        }

        private long getDemand() {
            return DEMAND.get(this);
        }

        boolean changeState(State oldState, State newState) {
            return STATE.compareAndSet(this, oldState, newState);
        }

        boolean afterRead() {
            return this.changeState(State.READING, this.getDemand() > 0L ? State.DEMAND : State.NO_DEMAND);
        }

        public boolean complete() {
            return this.changeState(State.READING, State.COMPLETED);
        }

        void checkCommandDispatch() {
            COMMAND_DISPATCH.get(this).dispatch(this);
        }

        void dispatchCommand() {
            this.connection.dispatch(this.subscriptionCommand);
        }

        void checkOnDataAvailable() {
            if (this.data.isEmpty()) {
                this.potentiallyReadMore();
            }
            if (!this.data.isEmpty()) {
                this.onDataAvailable();
            }
        }

        void potentiallyReadMore() {
            if (this.getDemand() + 1L > (long)this.data.size()) {
                this.state().readData(this);
            }
        }

        void readAndPublish() {
            while (this.hasDemand()) {
                T data = this.read();
                if (data == null) {
                    return;
                }
                DEMAND.decrementAndGet(this);
                this.subscriber.onNext(data);
            }
        }

        State state() {
            return STATE.get(this);
        }
    }
}

