package org.springframework.messaging.tcp.reactor;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.concurrent.ImmediateEventExecutor;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.ipc.netty.FutureMono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.options.ClientOptions;
import reactor.ipc.netty.resources.LoopResources;
import reactor.ipc.netty.resources.PoolResources;
import reactor.ipc.netty.tcp.TcpClient;

/* loaded from: classes3.dex */
public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
    private static final int PUBLISH_ON_BUFFER_SIZE = 16;
    private static Log logger = LogFactory.getLog((Class<?>) ReactorNettyTcpClient.class);

    @Nullable
    private final ChannelGroup channelGroup;
    private final ReactorNettyCodec<P> codec;

    @Nullable
    private LoopResources loopResources;

    @Nullable
    private PoolResources poolResources;
    private final Scheduler scheduler;
    private volatile boolean stopping;
    private final TcpClient tcpClient;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes3.dex */
    public class ReactorNettyHandler implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {
        private final TcpConnectionHandler<P> connectionHandler;

        ReactorNettyHandler(TcpConnectionHandler<P> tcpConnectionHandler) {
            this.connectionHandler = tcpConnectionHandler;
        }

        @Override // java.util.function.BiFunction
        public Publisher<Void> apply(NettyInbound nettyInbound, NettyOutbound nettyOutbound) {
            if (ReactorNettyTcpClient.logger.isDebugEnabled()) {
                ReactorNettyTcpClient.logger.debug("Connected to " + nettyInbound.remoteAddress());
            }
            DirectProcessor create = DirectProcessor.create();
            final ReactorNettyTcpConnection reactorNettyTcpConnection = new ReactorNettyTcpConnection(nettyInbound, nettyOutbound, ReactorNettyTcpClient.this.codec, create);
            ReactorNettyTcpClient.this.scheduler.schedule(new Runnable() { // from class: org.springframework.messaging.tcp.reactor.-$$Lambda$ReactorNettyTcpClient$ReactorNettyHandler$Da_gllkdordzXf2uxud6iFRx_gs
                @Override // java.lang.Runnable
                public final void run() {
                    ReactorNettyTcpClient.ReactorNettyHandler.this.lambda$apply$0$ReactorNettyTcpClient$ReactorNettyHandler(reactorNettyTcpConnection);
                }
            });
            nettyInbound.context().addHandler(new StompMessageDecoder(ReactorNettyTcpClient.this.codec));
            Flux publishOn = nettyInbound.receiveObject().cast(Message.class).publishOn(ReactorNettyTcpClient.this.scheduler, 16);
            final TcpConnectionHandler<P> tcpConnectionHandler = this.connectionHandler;
            tcpConnectionHandler.getClass();
            Consumer consumer = new Consumer() { // from class: org.springframework.messaging.tcp.reactor.-$$Lambda$cQBs8MxZlHbNiP8n_58Ehs3y6sc
                @Override // java.util.function.Consumer
                public final void accept(Object obj) {
                    TcpConnectionHandler.this.handleMessage((Message) obj);
                }
            };
            final TcpConnectionHandler<P> tcpConnectionHandler2 = this.connectionHandler;
            tcpConnectionHandler2.getClass();
            Consumer consumer2 = new Consumer() { // from class: org.springframework.messaging.tcp.reactor.-$$Lambda$A-hIfKRnS8dAMFsBHF1YSeRFFW4
                @Override // java.util.function.Consumer
                public final void accept(Object obj) {
                    TcpConnectionHandler.this.handleFailure((Throwable) obj);
                }
            };
            final TcpConnectionHandler<P> tcpConnectionHandler3 = this.connectionHandler;
            tcpConnectionHandler3.getClass();
            publishOn.subscribe(consumer, consumer2, new Runnable() { // from class: org.springframework.messaging.tcp.reactor.-$$Lambda$P8xuxeHS2ctyjkCG1wuu-mhSmJA
                @Override // java.lang.Runnable
                public final void run() {
                    TcpConnectionHandler.this.afterConnectionClosed();
                }
            });
            return create;
        }

        public /* synthetic */ void lambda$apply$0$ReactorNettyTcpClient$ReactorNettyHandler(TcpConnection tcpConnection) {
            this.connectionHandler.afterConnected(tcpConnection);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes3.dex */
    public static class StompMessageDecoder<P> extends ByteToMessageDecoder {
        private final ReactorNettyCodec<P> codec;

        public StompMessageDecoder(ReactorNettyCodec<P> reactorNettyCodec) {
            this.codec = reactorNettyCodec;
        }

        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
            list.addAll(this.codec.decode(byteBuf));
        }
    }

    public ReactorNettyTcpClient(final String str, final int i, ReactorNettyCodec<P> reactorNettyCodec) {
        this((Consumer<ClientOptions.Builder<?>>) new Consumer() { // from class: org.springframework.messaging.tcp.reactor.-$$Lambda$ReactorNettyTcpClient$aRgXVso12wdDKtaRdG6_xqxe_Dk
            @Override // java.util.function.Consumer
            public final void accept(Object obj) {
                ((ClientOptions.Builder) obj).host(str).port(i);
            }
        }, reactorNettyCodec);
    }

    public ReactorNettyTcpClient(Consumer<ClientOptions.Builder<?>> consumer, ReactorNettyCodec<P> reactorNettyCodec) {
        this.scheduler = Schedulers.newParallel("tcp-client-scheduler");
        this.stopping = false;
        Assert.notNull(consumer, "Consumer<ClientOptions.Builder<?> is required");
        Assert.notNull(reactorNettyCodec, "ReactorNettyCodec is required");
        this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
        this.tcpClient = TcpClient.create(consumer.andThen(new Consumer() { // from class: org.springframework.messaging.tcp.reactor.-$$Lambda$ReactorNettyTcpClient$ooXE4vNAC3fNmFrAYeXWU6zrY58
            @Override // java.util.function.Consumer
            public final void accept(Object obj) {
                ReactorNettyTcpClient.this.lambda$new$1$ReactorNettyTcpClient((ClientOptions.Builder) obj);
            }
        }));
        this.codec = reactorNettyCodec;
    }

    public ReactorNettyTcpClient(TcpClient tcpClient, ReactorNettyCodec<P> reactorNettyCodec) {
        this.scheduler = Schedulers.newParallel("tcp-client-scheduler");
        this.stopping = false;
        Assert.notNull(tcpClient, "TcpClient is required");
        Assert.notNull(reactorNettyCodec, "ReactorNettyCodec is required");
        this.tcpClient = tcpClient;
        this.codec = reactorNettyCodec;
        this.channelGroup = null;
        this.loopResources = null;
        this.poolResources = null;
    }

    private ListenableFuture<Void> handleShuttingDownConnectFailure(TcpConnectionHandler<P> tcpConnectionHandler) {
        IllegalStateException illegalStateException = new IllegalStateException("Shutting down.");
        tcpConnectionHandler.afterConnectFailure(illegalStateException);
        return new MonoToListenableFutureAdapter(Mono.error(illegalStateException));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static /* synthetic */ Integer lambda$null$3(Integer num, Object obj) {
        Integer.valueOf(num.intValue() + 1);
        return num;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static /* synthetic */ void lambda$updateConnectMono$2(MonoProcessor monoProcessor, Object obj) {
        if (monoProcessor.isTerminated()) {
            return;
        }
        if (obj instanceof Throwable) {
            monoProcessor.onError((Throwable) obj);
        } else {
            monoProcessor.onComplete();
        }
    }

    private <T> Function<Flux<T>, Publisher<?>> reconnectFunction(final ReconnectStrategy reconnectStrategy) {
        return new Function() { // from class: org.springframework.messaging.tcp.reactor.-$$Lambda$ReactorNettyTcpClient$70ezkaDtTcKpDzYO1WAYBflsVFY
            @Override // java.util.function.Function
            public final Object apply(Object obj) {
                return ReactorNettyTcpClient.this.lambda$reconnectFunction$6$ReactorNettyTcpClient(reconnectStrategy, (Flux) obj);
            }
        };
    }

    private Mono<Void> stopScheduler() {
        return Mono.fromRunnable(new Runnable() { // from class: org.springframework.messaging.tcp.reactor.-$$Lambda$ReactorNettyTcpClient$lc_octFB24FgtuSd8zCqdyRHwTs
            @Override // java.lang.Runnable
            public final void run() {
                ReactorNettyTcpClient.this.lambda$stopScheduler$10$ReactorNettyTcpClient();
            }
        });
    }

    private <T> Consumer<T> updateConnectMono(final MonoProcessor<Void> monoProcessor) {
        return new Consumer() { // from class: org.springframework.messaging.tcp.reactor.-$$Lambda$ReactorNettyTcpClient$UL9ecqlOyVcEDy65StORhyXso3I
            @Override // java.util.function.Consumer
            public final void accept(Object obj) {
                ReactorNettyTcpClient.lambda$updateConnectMono$2(monoProcessor, obj);
            }
        };
    }

    @Override // org.springframework.messaging.tcp.TcpOperations
    public ListenableFuture<Void> connect(TcpConnectionHandler<P> tcpConnectionHandler) {
        Assert.notNull(tcpConnectionHandler, "TcpConnectionHandler is required");
        if (this.stopping) {
            return handleShuttingDownConnectFailure(tcpConnectionHandler);
        }
        Mono newHandler = this.tcpClient.newHandler(new ReactorNettyHandler(tcpConnectionHandler));
        tcpConnectionHandler.getClass();
        return new MonoToListenableFutureAdapter(newHandler.doOnError(new $$Lambda$M4yEaqud0anL2qcx2lNdYLKPV4(tcpConnectionHandler)).then());
    }

    @Override // org.springframework.messaging.tcp.TcpOperations
    public ListenableFuture<Void> connect(TcpConnectionHandler<P> tcpConnectionHandler, ReconnectStrategy reconnectStrategy) {
        Assert.notNull(tcpConnectionHandler, "TcpConnectionHandler is required");
        Assert.notNull(reconnectStrategy, "ReconnectStrategy is required");
        if (this.stopping) {
            return handleShuttingDownConnectFailure(tcpConnectionHandler);
        }
        MonoProcessor<Void> create = MonoProcessor.create();
        Mono doOnError = this.tcpClient.newHandler(new ReactorNettyHandler(tcpConnectionHandler)).doOnNext(updateConnectMono(create)).doOnError(updateConnectMono(create));
        tcpConnectionHandler.getClass();
        doOnError.doOnError(new $$Lambda$M4yEaqud0anL2qcx2lNdYLKPV4(tcpConnectionHandler)).flatMap(new Function() { // from class: org.springframework.messaging.tcp.reactor.-$$Lambda$51j1UkgNOHNykQspJvmGPANR4s8
            @Override // java.util.function.Function
            public final Object apply(Object obj) {
                return ((NettyContext) obj).onClose();
            }
        }).retryWhen(reconnectFunction(reconnectStrategy)).repeatWhen(reconnectFunction(reconnectStrategy)).subscribe();
        return new MonoToListenableFutureAdapter(create);
    }

    public /* synthetic */ void lambda$new$1$ReactorNettyTcpClient(ClientOptions.Builder builder) {
        Assert.isTrue((builder.isLoopAvailable() || builder.isPoolAvailable()) ? false : true, "The provided ClientOptions.Builder contains LoopResources and/or PoolResources. Please, use the constructor that accepts a TcpClient instance for full control over initialization and lifecycle.");
        builder.channelGroup(this.channelGroup);
        builder.preferNative(false);
        this.loopResources = LoopResources.create("tcp-client-loop");
        builder.loopResources(this.loopResources);
        if (builder.isPoolDisabled()) {
            return;
        }
        this.poolResources = PoolResources.elastic("tcp-client-pool");
        builder.poolResources(this.poolResources);
    }

    public /* synthetic */ Mono lambda$null$4$ReactorNettyTcpClient(Long l) {
        return Mono.delay(Duration.ofMillis(l.longValue()), this.scheduler);
    }

    public /* synthetic */ Publisher lambda$null$5$ReactorNettyTcpClient(ReconnectStrategy reconnectStrategy, Integer num) {
        return (Mono) Optional.ofNullable(reconnectStrategy.getTimeToNextAttempt(num.intValue())).map(new Function() { // from class: org.springframework.messaging.tcp.reactor.-$$Lambda$ReactorNettyTcpClient$X6UZgdodqUA67vY_Z94pkbqO3_w
            @Override // java.util.function.Function
            public final Object apply(Object obj) {
                return ReactorNettyTcpClient.this.lambda$null$4$ReactorNettyTcpClient((Long) obj);
            }
        }).orElse(Mono.empty());
    }

    public /* synthetic */ Publisher lambda$reconnectFunction$6$ReactorNettyTcpClient(final ReconnectStrategy reconnectStrategy, Flux flux) {
        return flux.scan(1, new BiFunction() { // from class: org.springframework.messaging.tcp.reactor.-$$Lambda$ReactorNettyTcpClient$X7k3JSIaXlb47qWcfSI55kIjwY4
            @Override // java.util.function.BiFunction
            public final Object apply(Object obj, Object obj2) {
                return ReactorNettyTcpClient.lambda$null$3((Integer) obj, obj2);
            }
        }).flatMap(new Function() { // from class: org.springframework.messaging.tcp.reactor.-$$Lambda$ReactorNettyTcpClient$2eL1W_1CVUoS1V5en6uxSEW1xjo
            @Override // java.util.function.Function
            public final Object apply(Object obj) {
                return ReactorNettyTcpClient.this.lambda$null$5$ReactorNettyTcpClient(reconnectStrategy, (Integer) obj);
            }
        });
    }

    public /* synthetic */ void lambda$stopScheduler$10$ReactorNettyTcpClient() {
        this.scheduler.dispose();
        for (int i = 0; i < 20 && !this.scheduler.isDisposed(); i++) {
            try {
                Thread.sleep(100L);
            } catch (Throwable unused) {
                return;
            }
        }
    }

    @Override // org.springframework.messaging.tcp.TcpOperations
    public ListenableFuture<Void> shutdown() {
        Mono<Void> stopScheduler;
        if (this.stopping) {
            SettableListenableFuture settableListenableFuture = new SettableListenableFuture();
            settableListenableFuture.set(null);
            return settableListenableFuture;
        }
        this.stopping = true;
        ChannelGroup channelGroup = this.channelGroup;
        if (channelGroup != null) {
            Mono from = FutureMono.from(channelGroup.close());
            if (this.loopResources != null) {
                from = from.onErrorResume(new Function() { // from class: org.springframework.messaging.tcp.reactor.-$$Lambda$ReactorNettyTcpClient$jR1Tca4LcUsh0DRkquicVIbiLbw
                    @Override // java.util.function.Function
                    public final Object apply(Object obj) {
                        Mono empty;
                        empty = Mono.empty();
                        return empty;
                    }
                }).then(this.loopResources.disposeLater());
            }
            if (this.poolResources != null) {
                from = from.onErrorResume(new Function() { // from class: org.springframework.messaging.tcp.reactor.-$$Lambda$ReactorNettyTcpClient$eB__q3vPpWulPRohbyFd4MEMuPs
                    @Override // java.util.function.Function
                    public final Object apply(Object obj) {
                        Mono empty;
                        empty = Mono.empty();
                        return empty;
                    }
                }).then(this.poolResources.disposeLater());
            }
            stopScheduler = from.onErrorResume(new Function() { // from class: org.springframework.messaging.tcp.reactor.-$$Lambda$ReactorNettyTcpClient$UOqtm2AoIbLlwkIIw7uU9obFjKg
                @Override // java.util.function.Function
                public final Object apply(Object obj) {
                    Mono empty;
                    empty = Mono.empty();
                    return empty;
                }
            }).then(stopScheduler());
        } else {
            stopScheduler = stopScheduler();
        }
        return new MonoToListenableFutureAdapter(stopScheduler);
    }

    public String toString() {
        return "ReactorNettyTcpClient[" + this.tcpClient + "]";
    }
}
