/*
 * Decompiled with CFR 0.152.
 */
package com.grinderwolf.swm.internal.lettuce.core.cluster;

import com.grinderwolf.swm.internal.lettuce.core.GeoArgs;
import com.grinderwolf.swm.internal.lettuce.core.GeoWithin;
import com.grinderwolf.swm.internal.lettuce.core.RedisURI;
import com.grinderwolf.swm.internal.lettuce.core.cluster.AbstractNodeSelection;
import com.grinderwolf.swm.internal.lettuce.core.cluster.AsyncClusterConnectionProvider;
import com.grinderwolf.swm.internal.lettuce.core.cluster.ClusterDistributionChannelWriter;
import com.grinderwolf.swm.internal.lettuce.core.cluster.NodeSelectionInvocationHandler;
import com.grinderwolf.swm.internal.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl;
import com.grinderwolf.swm.internal.lettuce.core.cluster.api.NodeSelectionSupport;
import com.grinderwolf.swm.internal.lettuce.core.cluster.models.partitions.RedisClusterNode;
import com.grinderwolf.swm.internal.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import com.grinderwolf.swm.internal.lettuce.core.cluster.pubsub.api.reactive.NodeSelectionPubSubReactiveCommands;
import com.grinderwolf.swm.internal.lettuce.core.cluster.pubsub.api.reactive.PubSubReactiveNodeSelection;
import com.grinderwolf.swm.internal.lettuce.core.cluster.pubsub.api.reactive.RedisClusterPubSubReactiveCommands;
import com.grinderwolf.swm.internal.lettuce.core.codec.RedisCodec;
import com.grinderwolf.swm.internal.lettuce.core.protocol.ConnectionIntent;
import com.grinderwolf.swm.internal.lettuce.core.pubsub.RedisPubSubReactiveCommandsImpl;
import com.grinderwolf.swm.internal.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import com.grinderwolf.swm.internal.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;

public class RedisClusterPubSubReactiveCommandsImpl<K, V>
extends RedisPubSubReactiveCommandsImpl<K, V>
implements RedisClusterPubSubReactiveCommands<K, V> {
    public RedisClusterPubSubReactiveCommandsImpl(StatefulRedisPubSubConnection<K, V> connection, RedisCodec<K, V> codec) {
        super(connection, codec);
    }

    @Override
    public Flux<V> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) {
        return super.georadius_ro(key, longitude, latitude, distance, unit);
    }

    @Override
    public Flux<GeoWithin<V>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) {
        return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs);
    }

    @Override
    public Flux<V> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) {
        return super.georadiusbymember_ro(key, member, distance, unit);
    }

    @Override
    public Flux<GeoWithin<V>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) {
        return super.georadiusbymember_ro(key, member, distance, unit, geoArgs);
    }

    @Override
    public StatefulRedisClusterPubSubConnectionImpl<K, V> getStatefulConnection() {
        return (StatefulRedisClusterPubSubConnectionImpl)super.getStatefulConnection();
    }

    @Override
    public PubSubReactiveNodeSelection<K, V> nodes(Predicate<RedisClusterNode> predicate) {
        StaticPubSubReactiveNodeSelection selection = new StaticPubSubReactiveNodeSelection(this.getStatefulConnection(), predicate);
        NodeSelectionInvocationHandler h = new NodeSelectionInvocationHandler((AbstractNodeSelection)selection, RedisPubSubReactiveCommands.class, NodeSelectionInvocationHandler.ExecutionModel.REACTIVE);
        return (PubSubReactiveNodeSelection)Proxy.newProxyInstance(NodeSelectionSupport.class.getClassLoader(), new Class[]{NodeSelectionPubSubReactiveCommands.class, PubSubReactiveNodeSelection.class}, (InvocationHandler)h);
    }

    private static class StaticPubSubReactiveNodeSelection<K, V>
    extends AbstractNodeSelection<RedisPubSubReactiveCommands<K, V>, NodeSelectionPubSubReactiveCommands<K, V>, K, V>
    implements PubSubReactiveNodeSelection<K, V> {
        private final List<RedisClusterNode> redisClusterNodes;
        private final ClusterDistributionChannelWriter writer;

        public StaticPubSubReactiveNodeSelection(StatefulRedisClusterPubSubConnection<K, V> globalConnection, Predicate<RedisClusterNode> selector) {
            this.redisClusterNodes = globalConnection.getPartitions().stream().filter(selector).collect(Collectors.toList());
            this.writer = ((StatefulRedisClusterPubSubConnectionImpl)globalConnection).getClusterDistributionChannelWriter();
        }

        @Override
        protected CompletableFuture<RedisPubSubReactiveCommands<K, V>> getApi(RedisClusterNode redisClusterNode) {
            return this.getConnection(redisClusterNode).thenApply(StatefulRedisPubSubConnection::reactive);
        }

        @Override
        protected List<RedisClusterNode> nodes() {
            return this.redisClusterNodes;
        }

        @Override
        protected CompletableFuture<StatefulRedisPubSubConnection<K, V>> getConnection(RedisClusterNode redisClusterNode) {
            RedisURI uri = redisClusterNode.getUri();
            AsyncClusterConnectionProvider async = (AsyncClusterConnectionProvider)((Object)this.writer.getClusterConnectionProvider());
            return async.getConnectionAsync(ConnectionIntent.WRITE, uri.getHost(), uri.getPort()).thenApply(it -> (StatefulRedisPubSubConnection)it);
        }
    }
}

