mycat1.6实现了AIO和NIO,本文只讲解NIO的流程。
在reactor模型,acceptor线程负责接受TCP连接请求。acceptor的实现类是io.mycat.net.NIOAcceptor。mycat将socket包装成前端连接FrontendConnection。
reactor线程负责处理IO请求,reactor的实现类是io.mycat.net.NIOReactor。reactor获取SelectionKey所绑定的连接AbstractConnection,调用AbstractConnection中的asynRead()和doNextWriteCheck()方法,来进行读写操作。
下面讲解上图1~3的过程。
public final class NIOAcceptor extends Thread implements SocketAcceptor{
private static final Logger LOGGER = LoggerFactory.getLogger(NIOAcceptor.class);
private static final AcceptIdGenerator ID_GENERATOR = new AcceptIdGenerator();
private final int port;
private final Selector selector;
private final ServerSocketChannel serverChannel;
private final FrontendConnectionFactory factory;
private long acceptCount;
private final NIOReactorPool reactorPool;
public NIOAcceptor(String name, String bindIp,int port,
FrontendConnectionFactory factory, NIOReactorPool reactorPool)
throws IOException {
//设置acceptor线程名称
super.setName(name);
//设置端口号
this.port = port;
//给acceptor分配选择器
this.selector = Selector.open();
//打开一个server-socket channel
this.serverChannel = ServerSocketChannel.open();
//server-socket channel设置为非阻塞
this.serverChannel.configureBlocking(false);
/** 设置TCP属性 */
//设置TCP属性,重用端口
serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
//设置TCP属性,设置接收缓存
serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024 * 16 * 2);
// backlog=100 等待连接的最大数量
serverChannel.bind(new InetSocketAddress(bindIp, port), 100);
//注册OP_ACCEPT事件
this.serverChannel.register(selector, SelectionKey.OP_ACCEPT);
//设置连接工厂类,此处是FrontendConnectionFactory
this.factory = factory;
//设置reactorPool
this.reactorPool = reactorPool;
}
run方法中for无限循环,selector.select(1000L)注册器设置1秒超时返回准备就绪的SelectionKey,如果SelectionKey有效且关注的是OP_ACCEPT事件,那么注册连接,否则取消SelectionKey,最后在finally块中清空所有key
@Override
public void run() {
final Selector tSelector = this.selector;
for (;;) {
++acceptCount;
try {
tSelector.select(1000L);
Set<SelectionKey> keys = tSelector.selectedKeys();
try {
for (SelectionKey key : keys) {
//判断key是否有效,是否是acceptor事件
if (key.isValid() && key.isAcceptable()) {
//注册连接
accept();
} else {
//取消key
key.cancel();
}
}
} finally {
//清空selectedKey
keys.clear();
}
} catch (Exception e) {
LOGGER.warn(getName(), e);
}
}
}
通过accept()获取SocketChannel,设置SocketChannel为非阻塞模式,包装成前段连接FrontendConnection,设置必要参数,获取一个reactor线程,并将前段连接FrontendConnection注册到该reactor线程中进行读写操作。此处调用reactor.postRegister(c)方法只是将连接放入注册队列,后面会详细讲解。
private void accept() {
SocketChannel channel = null;
try {
//ServerSocketChannel设置成非阻塞模式。
//在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。
channel = serverChannel.accept();
//channel设置成非阻塞模式
channel.configureBlocking(false);
//channel包装成前段连接
FrontendConnection c = factory.make(channel);
c.setAccepted(true);
//设置连接ID
c.setId(ID_GENERATOR.getId());
NIOProcessor processor = (NIOProcessor) MycatServer.getInstance()
.nextProcessor();
c.setProcessor(processor);
//获取一个reactor
NIOReactor reactor = reactorPool.getNextReactor();
//注册连接
reactor.postRegister(c);
} catch (Exception e) {
LOGGER.warn(getName(), e);
closeChannel(channel);
}
}
先关闭socket,再关闭channel
private static void closeChannel(SocketChannel channel) {
if (channel == null) {
return;
}
Socket socket = channel.socket();
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
LOGGER.error("closeChannelError", e);
}
}
try {
channel.close();
} catch (IOException e) {
LOGGER.error("closeChannelError", e);
}
}
下面讲解上图4~7的过程。
NIOReactor初始化的时候设置一个读写线程RW,所有的IO操作实际上在这个线程中处理的。
public final class NIOReactor {
private static final Logger LOGGER = LoggerFactory.getLogger(NIOReactor.class);
private final String name;
private final RW reactorR;
public NIOReactor(String name) throws IOException {
this.name = name;
this.reactorR = new RW();
}
NIOReactor的启动实际上是启动RW线程。
final void startup() {
new Thread(reactorR, name + "-RW").start();
}
RW的初始化时,会获取Selector实例,并初始化一个连接注册队列。
private final class RW implements Runnable {
private final Selector selector;
private final ConcurrentLinkedQueue<AbstractConnection> registerQueue;
private long reactCount;
private RW() throws IOException {
this.selector = Selector.open();
this.registerQueue = new ConcurrentLinkedQueue<AbstractConnection>();
}
当selector.select(500L)方法选择一组键,其相应的通道已为 I/O 操作准备就绪,那么调用register(selector)方法(上图所示5)注册连接。如果有读写事件就绪,那么通过获取SelectionKey绑定的连接Connection,调用Connection的相应的读写方法。本文介绍首次建立连接,我们重点看register(selector)方法。
@Override
public void run() {
final Selector selector = this.selector;
Set<SelectionKey> keys = null;
for (;;) {
++reactCount;
try {
selector.select(500L);
register(selector);
keys = selector.selectedKeys();
for (SelectionKey key : keys) {
AbstractConnection con = null;
try {
Object att = key.attachment();
if (att != null) {
con = (AbstractConnection) att;
if (key.isValid() && key.isReadable()) {
try {
con.asynRead();
} catch (IOException e) {
con.close("program err:" + e.toString());
continue;
} catch (Exception e) {
LOGGER.warn("caught err:", e);
con.close("program err:" + e.toString());
continue;
}
}
if (key.isValid() && key.isWritable()) {
con.doNextWriteCheck();
}
} else {
key.cancel();
}
} catch (CancelledKeyException e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(con + " socket key canceled");
}
} catch (Exception e) {
LOGGER.warn(con + " " + e);
} catch (final Throwable e){
// Catch exceptions such as OOM and close connection if exists
//so that the reactor can keep running!
// @author Uncle-pan
// @since 2016-03-30
if(con != null){
con.close("Bad: "+e);
}
LOGGER.error("caught err: ", e);
continue;
}
}
} catch (Exception e) {
LOGGER.warn(name, e);
} catch (final Throwable e){
// Catch exceptions such as OOM so that the reactor can keep running!
// @author Uncle-pan
// @since 2016-03-30
LOGGER.error("caught err: ", e);
} finally {
if (keys != null) {
keys.clear();
}
}
}
}
当selector.select()方法获取的就绪事件或者500毫秒超时之后,那么调用register(selector)方法(上图所示5)注册连接。在register方法(上图所示6)中,循环注册队列registerQueue中的连接。
private void register(Selector selector) {
AbstractConnection c = null;
if (registerQueue.isEmpty()) {
return;
}
while ((c = registerQueue.poll()) != null) {
try {
//注册读时间
((NIOSocketWR) c.getSocketWR()).register(selector);
//向客户端发送握手报文
c.register();
} catch (Exception e) {
c.close("register err" + e.toString());
}
}
}
如上图7,NIOSocketWR的register(selector)方法给channel注册读事件
public void register(Selector selector) throws IOException {
try {
processKey = channel.register(selector, SelectionKey.OP_READ, con);
} finally {
if (con.isClosed.get()) {
clearSelectionKey();
}
}
}
为什么不直接在NIOAcceptor的accept()中调用NIOSocketWR的register(selector)注册读事件,而是通过注册队列的方式异步注册,主要的原因是acceptor线程直接channel.register(selector,int)方法和reactor线程的selector.select()方法会发生死锁。所以注册必须由reactor一个线程完成。
如上图7,调用AbstractConnection的register方法发送握手报文。
@Override
public void register() throws IOException {
if (!isClosed.get()) {
// 生成认证数据
byte[] rand1 = RandomUtil.randomBytes(8);
byte[] rand2 = RandomUtil.randomBytes(12);
// 保存认证数据
byte[] seed = new byte[rand1.length + rand2.length];
System.arraycopy(rand1, 0, seed, 0, rand1.length);
System.arraycopy(rand2, 0, seed, rand1.length, rand2.length);
this.seed = seed;
// 发送握手数据包
boolean useHandshakeV10 = MycatServer.getInstance().getConfig().getSystem().getUseHandshakeV10() == 1;
if(useHandshakeV10) {
HandshakeV10Packet hs = new HandshakeV10Packet();
hs.packetId = 0;
hs.protocolVersion = Versions.PROTOCOL_VERSION;
hs.serverVersion = Versions.SERVER_VERSION;
hs.threadId = id;
hs.seed = rand1;
hs.serverCapabilities = getServerCapabilities();
hs.serverCharsetIndex = (byte) (charsetIndex & 0xff);
hs.serverStatus = 2;
hs.restOfScrambleBuff = rand2;
hs.write(this);
} else {
HandshakePacket hs = new HandshakePacket();
hs.packetId = 0;
hs.protocolVersion = Versions.PROTOCOL_VERSION;
hs.serverVersion = Versions.SERVER_VERSION;
hs.threadId = id;
hs.seed = rand1;
hs.serverCapabilities = getServerCapabilities();
hs.serverCharsetIndex = (byte) (charsetIndex & 0xff);
hs.serverStatus = 2;
hs.restOfScrambleBuff = rand2;
hs.write(this);
}
// asynread response
this.asynRead();
}
}
如上图8,调用HandshakePacket的write(FrontendConnection c)方法将拼装好的握手报文写入buffer,并发送到客户端。
public void write(FrontendConnection c) {
ByteBuffer buffer = c.allocate();
BufferUtil.writeUB3(buffer, calcPacketSize());
buffer.put(packetId);
buffer.put(protocolVersion);
BufferUtil.writeWithNull(buffer, serverVersion);
BufferUtil.writeUB4(buffer, threadId);
BufferUtil.writeWithNull(buffer, seed);
BufferUtil.writeUB2(buffer, serverCapabilities);
buffer.put(serverCharsetIndex);
BufferUtil.writeUB2(buffer, serverStatus);
buffer.put(FILLER_13);
// buffer.position(buffer.position() + 13);
BufferUtil.writeWithNull(buffer, restOfScrambleBuff);
c.write(buffer);
}
如上图9,将buffer放入写队列并进行异步写操作。
@Override
public final void write(ByteBuffer buffer) {
if (isSupportCompress()) {
ByteBuffer newBuffer = CompressUtil.compressMysqlPacket(buffer, this, compressUnfinishedDataQueue);
writeQueue.offer(newBuffer);
} else {
writeQueue.offer(buffer);
}
// if ansyn write finishe event got lock before me ,then writing
// flag is set false but not start a write request
// so we check again
try {
this.socketWR.doNextWriteCheck();
} catch (Exception e) {
LOGGER.warn("write err:", e);
this.close("write err:" + e);
}
}
如上图10,调用NIOSocketWR的write0()方法,把写队列中的数据写出去,如果数据没有写完,那么继续关注写事件。
public void doNextWriteCheck() {
if (!writing.compareAndSet(false, true)) {
return;
}
try {
boolean noMoreData = write0();
writing.set(false);
if (noMoreData && con.writeQueue.isEmpty()) {
if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) != 0)) {
disableWrite();
}
} else {
if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) == 0)) {
enableWrite(false);
}
}
} catch (IOException e) {
if (AbstractConnection.LOGGER.isDebugEnabled()) {
AbstractConnection.LOGGER.debug("caught err:", e);
}
con.close("err:" + e);
}
}
如果AbstractConnection的writeBuffer有数据可写,先写writeBuffer。然后再写writeQueue写队列中的数据。
private boolean write0() throws IOException {
int written = 0;
ByteBuffer buffer = con.writeBuffer;
if (buffer != null) {
while (buffer.hasRemaining()) {
written = channel.write(buffer);
if (written > 0) {
con.netOutBytes += written;
con.processor.addNetOutBytes(written);
con.lastWriteTime = TimeUtil.currentTimeMillis();
} else {
break;
}
}
if (buffer.hasRemaining()) {
con.writeAttempts++;
return false;
} else {
con.writeBuffer = null;
con.recycle(buffer);
}
}
while ((buffer = con.writeQueue.poll()) != null) {
if (buffer.limit() == 0) {
con.recycle(buffer);
con.close("quit send");
return true;
}
buffer.flip();
try {
while (buffer.hasRemaining()) {
written = channel.write(buffer);// java.io.IOException:
// Connection reset by peer
if (written > 0) {
con.lastWriteTime = TimeUtil.currentTimeMillis();
con.netOutBytes += written;
con.processor.addNetOutBytes(written);
con.lastWriteTime = TimeUtil.currentTimeMillis();
} else {
break;
}
}
} catch (IOException e) {
con.recycle(buffer);
throw e;
}
if (buffer.hasRemaining()) {
con.writeBuffer = buffer;
con.writeAttempts++;
return false;
} else {
con.recycle(buffer);
}
}
return true;
}
数据写完了,取消对写事件的关注。
private void disableWrite() {
try {
SelectionKey key = this.processKey;
key.interestOps(key.interestOps() & OP_NOT_WRITE);
} catch (Exception e) {
AbstractConnection.LOGGER.warn("can't disable write " + e + " con "
+ con);
}
}
有数据需要写,关注写事件。
private void enableWrite(boolean wakeup) {
boolean needWakeup = false;
try {
SelectionKey key = this.processKey;
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
needWakeup = true;
} catch (Exception e) {
AbstractConnection.LOGGER.warn("can't enable write " + e);
}
if (needWakeup && wakeup) {
processKey.selector().wakeup();
}
}
如果经过上面的流程数据还没有写完,那么会关注写事件,在NIOReactor类的RW线程类中在写事件就绪之后,会调用con.doNextWriteCheck()放写数据,实际上写的过程上我们是一样的,就不在重复。也是上图12~16的过程。
NIOReactor类中RW类的run方法,在写事件就绪后,调用con.doNextWriteCheck()方法写数据。
@Override
public void run() {
final Selector selector = this.selector;
Set<SelectionKey> keys = null;
for (;;) {
++reactCount;
try {
selector.select(500L);
register(selector);
keys = selector.selectedKeys();
for (SelectionKey key : keys) {
AbstractConnection con = null;
try {
Object att = key.attachment();
if (att != null) {
con = (AbstractConnection) att;
if (key.isValid() && key.isReadable()) {
try {
con.asynRead();
} catch (IOException e) {
con.close("program err:" + e.toString());
continue;
} catch (Exception e) {
LOGGER.warn("caught err:", e);
con.close("program err:" + e.toString());
continue;
}
}
if (key.isValid() && key.isWritable()) {
con.doNextWriteCheck();
}
} else {
key.cancel();
}
} catch (CancelledKeyException e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(con + " socket key canceled");
}
} catch (Exception e) {
LOGGER.warn(con + " " + e);
} catch (final Throwable e){
// Catch exceptions such as OOM and close connection if exists
//so that the reactor can keep running!
// @author Uncle-pan
// @since 2016-03-30
if(con != null){
con.close("Bad: "+e);
}
LOGGER.error("caught err: ", e);
continue;
}
}
} catch (Exception e) {
LOGGER.warn(name, e);
} catch (final Throwable e){
// Catch exceptions such as OOM so that the reactor can keep running!
// @author Uncle-pan
// @since 2016-03-30
LOGGER.error("caught err: ", e);
} finally {
if (keys != null) {
keys.clear();
}
}
}
}
AbstractConnection的doNextWriteCheck()放中实际上调用的NIOSocketWR类的doNextWriteCheck()方法,过程和上图的10~12的过程。如果经过13~17的流程数据还没有写完,则继续重复13~17的流程直到数据写完为止。
本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。