TcpClientBuilder.java
package space.sunqian.common.net.tcp;
import space.sunqian.annotations.Nonnull;
import space.sunqian.annotations.Nullable;
import space.sunqian.common.Check;
import space.sunqian.common.Fs;
import space.sunqian.common.io.IOKit;
import space.sunqian.common.io.IOOperator;
import space.sunqian.common.io.communicate.AbstractChannelContext;
import space.sunqian.common.net.NetException;
import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
/**
* Builder for building new instances of {@link TcpClient} by {@link SocketChannel}.
*
* @author sunqian
*/
public class TcpClientBuilder {
private @Nullable InetSocketAddress localAddress;
private int bufSize = IOKit.bufferSize();
private final @Nonnull Map<SocketOption<?>, Object> socketOptions = new LinkedHashMap<>();
/**
* Sets the buffer size for advanced IO operations. Note this buffer size is not the kernel network buffer size, it
* is an I/O advanced operations buffer size.
*
* @param bufSize the buffer size for advanced IO operations
* @return this builder
* @throws IllegalArgumentException if the buffer size is negative or {@code 0}
*/
public @Nonnull TcpClientBuilder bufferSize(int bufSize) throws IllegalArgumentException {
Check.checkArgument(bufSize > 0, "bufSize must be positive");
this.bufSize = bufSize;
return this;
}
/**
* Sets a socket option. This method can be invoked multiple times to set different socket options.
*
* @param <T> the type of the socket option value
* @param name the socket option
* @param value the value of the socket option, a value of {@code null} may be a valid value for some socket
* options.
* @return this builder
* @throws NetException If an error occurs
* @see StandardSocketOptions
*/
public <T> @Nonnull TcpClientBuilder socketOption(@Nonnull SocketOption<T> name, T value) throws NetException {
socketOptions.put(name, value);
return this;
}
/**
* Binds the client's socket to the specified local address.
*
* @param localAddress the local address the client is bound to, may be {@code null} to bind to the automatically
* assigned address
* @return this builder
* @throws NetException If an error occurs
*/
public @Nonnull TcpClientBuilder bind(@Nullable InetSocketAddress localAddress) {
this.localAddress = localAddress;
return this;
}
/**
* Connects this client's socket to the specified remote address and configures the socket to listen for
* connections. And a new {@link TcpClient} instance is returned.
*
* @param remoteAddress the remote address the client connects to
* @return a new {@link TcpClient} with the configurations
* @throws NetException If an error occurs
*/
public @Nonnull TcpClient connect(@Nonnull InetSocketAddress remoteAddress) throws NetException {
return Fs.uncheck(() -> new TcpClientImpl(
localAddress,
remoteAddress,
socketOptions,
bufSize
),
NetException::new
);
}
private static final class TcpClientImpl extends AbstractChannelContext<SocketChannel> implements TcpClient {
// private final @Nonnull SocketChannel client;
private final @Nonnull InetSocketAddress localAddress;
private final @Nonnull InetSocketAddress remoteAddress;
private final @Nonnull Selector selector;
private final @Nonnull IOOperator ioOperator;
private volatile boolean closed = false;
@SuppressWarnings("resource")
private TcpClientImpl(
@Nullable InetSocketAddress localAddress,
@Nonnull InetSocketAddress remoteAddress,
@Nonnull Map<SocketOption<?>, Object> socketOptions,
int bufSize
) throws Exception {
super(SocketChannel.open());
SocketChannel client = channel();
this.remoteAddress = remoteAddress;
socketOptions.forEach((name, value) ->
Fs.uncheck(() -> client.setOption(Fs.as(name), value), NetException::new));
this.selector = Selector.open();
client.bind(localAddress);
this.localAddress = (InetSocketAddress) client.getLocalAddress();
client.configureBlocking(true);
client.connect(remoteAddress);
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
this.ioOperator = IOOperator.get(bufSize);
}
@Override
public synchronized void close() throws NetException {
if (closed) {
return;
}
Fs.uncheck(() -> {
SocketChannel client = channel();
client.close();
client.keyFor(selector).cancel();
selector.close();
}, NetException::new);
closed = true;
}
@Override
public @Nonnull InetSocketAddress remoteAddress() {
return remoteAddress;
}
@Override
public @Nonnull InetSocketAddress localAddress() {
return localAddress;
}
@SuppressWarnings("resource")
@Override
public boolean isConnected() {
SocketChannel client = channel();
return client.isConnected();
}
@Override
public boolean isClosed() {
return closed;
}
@Override
public void readWait() {
Fs.uncheck(() -> {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keys = selectedKeys.iterator();
while (keys.hasNext()) {
// SelectionKey key = keys.next();
keys.next();
// ignored
keys.remove();
}
}, NetException::new);
}
@Override
public void readWakeUp() {
selector.wakeup();
}
@Override
protected @Nonnull IOOperator ioOperator() {
return ioOperator;
}
}
}