TcpClientBuilder.java

package space.sunqian.fs.net.tcp;

import space.sunqian.annotation.Nonnull;
import space.sunqian.annotation.Nullable;
import space.sunqian.fs.Fs;
import space.sunqian.fs.base.Checker;
import space.sunqian.fs.io.IOKit;
import space.sunqian.fs.io.IOOperator;
import space.sunqian.fs.io.communicate.AbstractChannelContext;
import space.sunqian.fs.net.NetException;
import space.sunqian.fs.net.NetSelector;

import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;

/**
 * Builder for building new instances of {@link TcpClient} by {@link SocketChannel}.
 *
 * @author sunqian
 */
public class TcpClientBuilder {

    private @Nullable InetSocketAddress localAddress;
    private int bufSize = IOKit.bufferSize();
    private final @Nonnull Map<SocketOption<?>, Object> socketOptions = new LinkedHashMap<>();

    /**
     * Sets the I/O buffer size for advanced IO operations, typically used for read/write operations on
     * {@link IOOperator}. The default size is {@link IOKit#bufferSize()}.
     *
     * @param bufSize the I/O buffer size for advanced IO operations
     * @return this builder
     * @throws IllegalArgumentException if the I/O buffer size is negative or {@code 0}
     */
    public @Nonnull TcpClientBuilder ioBufferSize(int bufSize) throws IllegalArgumentException {
        Checker.checkArgument(bufSize > 0, "ioBufferSize must be positive");
        this.bufSize = bufSize;
        return this;
    }

    /**
     * Sets a socket option. This method can be invoked multiple times to set different socket options.
     *
     * @param <T>   the type of the socket option value
     * @param name  the socket option
     * @param value the value of the socket option, a value of {@code null} may be a valid value for some socket
     *              options.
     * @return this builder
     * @throws NetException If an error occurs
     * @see StandardSocketOptions
     */
    public <T> @Nonnull TcpClientBuilder socketOption(@Nonnull SocketOption<T> name, T value) throws NetException {
        socketOptions.put(name, value);
        return this;
    }

    /**
     * Binds the client's socket to the specified local address.
     *
     * @param localAddress the local address the client is bound to, may be {@code null} to bind to the automatically
     *                     assigned address
     * @return this builder
     * @throws NetException If an error occurs
     */
    public @Nonnull TcpClientBuilder bind(@Nullable InetSocketAddress localAddress) {
        this.localAddress = localAddress;
        return this;
    }

    /**
     * Connects this client's socket to the specified remote address and configures the socket to listen for
     * connections. And a new {@link TcpClient} instance is returned.
     *
     * @param remoteAddress the remote address the client connects to
     * @return a new {@link TcpClient} with the configurations
     * @throws NetException If an error occurs
     */
    public @Nonnull TcpClient connect(@Nonnull InetSocketAddress remoteAddress) throws NetException {
        return Fs.uncheck(() -> new TcpClientImpl(
                localAddress,
                remoteAddress,
                socketOptions,
                bufSize
            ),
            NetException::new
        );
    }

    private static final class TcpClientImpl extends AbstractChannelContext<SocketChannel> implements TcpClient {

        // private final @Nonnull SocketChannel client;
        private final @Nonnull InetSocketAddress localAddress;
        private final @Nonnull InetSocketAddress remoteAddress;
        private final @Nonnull NetSelector selector;
        private final @Nonnull IOOperator ioOperator;

        // private volatile boolean closed = false;

        @SuppressWarnings("resource")
        private TcpClientImpl(
            @Nullable InetSocketAddress localAddress,
            @Nonnull InetSocketAddress remoteAddress,
            @Nonnull Map<SocketOption<?>, Object> socketOptions,
            int bufSize
        ) throws Exception {
            super(SocketChannel.open());
            SocketChannel client = channel();
            this.remoteAddress = remoteAddress;
            socketOptions.forEach((name, value) ->
                Fs.uncheck(() -> client.setOption(Fs.as(name), value), NetException::new));
            this.selector = NetSelector.open();
            client.bind(localAddress);
            this.localAddress = (InetSocketAddress) client.getLocalAddress();
            client.configureBlocking(true);
            client.connect(remoteAddress);
            client.configureBlocking(false);
            client.register(selector.selector(), SelectionKey.OP_READ);
            this.ioOperator = IOOperator.get(bufSize);
        }

        @Override
        public synchronized void close() throws NetException {
            Fs.uncheck(() -> {
                SocketChannel client = channel();
                client.close();
                selector.cancel(client);
                selector.close();
            }, NetException::new);
        }

        @Override
        public @Nonnull InetSocketAddress remoteAddress() {
            return remoteAddress;
        }

        @Override
        public @Nonnull InetSocketAddress localAddress() {
            return localAddress;
        }

        @Override
        public boolean isConnected() {
            @SuppressWarnings("resource")
            SocketChannel client = channel();
            return client.isConnected();
        }

        @Override
        public boolean isClosed() {
            @SuppressWarnings("resource")
            ByteChannel channel = channel();
            return !channel.isOpen();
        }

        @Override
        public void readWait() {
            selector.select(0);
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> keys = selectedKeys.iterator();
            while (keys.hasNext()) {
                // SelectionKey key = keys.next();
                keys.next();
                // ignored
                keys.remove();
            }
        }

        @Override
        public void readWakeUp() {
            selector.wakeup();
        }

        @Override
        protected @Nonnull IOOperator ioOperator() {
            return ioOperator;
        }
    }
}