Java可扩展IO
Doug Lee
大纲
- 可扩展的网络服务
- 事件驱动
- Reactor 模式
- 基础版
- 多线程版
- 其他变体
- java.io包中分阻塞IO API一览
网络应用服务器
- Web服务器,分布式对象系统等等
- 它们的共同特点
- Read请求
- 解码请求报文
- 业务处理
- 编码响应报文
- 发送响应
- 实际应用中每一个步骤的不一样
- XML解析
- 文件传输
- 动态生成网页
- 计算型服务
经典的服务设计
每个线程运行一个handler
经典的ServerSocket监听循环
class Server implements Runnable { public void run() { try { ServerSocket ss = new ServerSocket(PORT); while (!Thread.interrupted()) new Thread(new Handler(ss.accept())).start(); // 这里使用单线程或者线程池 } catch (IOException ex) { /* ... */ } } static class Handler implements Runnable { final Socket socket; Handler(Socket s) { socket = s; } public void run() { try { byte[] input = new byte[MAX_INPUT]; socket.getInputStream().read(input); byte[] output = process(input); socket.getOutputStream().write(output); } catch (IOException ex) { /* ... */ } } private byte[] process(byte[] cmd) { /* ... */ } }}
Note: 异常处理省略
高可扩展性目标
- 压力持续增大时服务优雅的降级(客户端增多)
- 性能随着资源(CPU,内存,磁盘,带宽)的提升持续增加
- 高可用和性能的目标
- 低延迟
- 应对请求尖峰
- 服务质量可控
- 分而治之是解决扩展性问题的常用方法
分而治之
- 把处理流程切分成更小的task,每个task都是非阻塞的
- 只有当任务准备好才去执行,IO事件模型通常是触发器机制
- java.nio包包含基本的非阻塞机制
- 非阻塞读和写
- 分发任务 把task关联到IO事件模型
- 更多想法
- 事件驱动设计
事件驱动设计
- 通常更高效
- 更少的资源 不需要为每一个client启动一个线程
- 减少开销 减少上下文切换以及更少的锁竞争
- 事件分发较慢 必须把action和时间绑定在一起
- 编程难度较高
- 必须把流程分割为一系列非阻塞的任务单元
- 与GUI的事件模型类似
- 不能消除所有的阻塞,如:GC、内存页错误等等
- 必须注意服务的状态的变化
- 必须把流程分割为一系列非阻塞的任务单元
Java AWT图形编程的事件驱动
事件驱动的IO模型与此类似,但设计上有所不同
Reactor模型
- Reacttor负责分发事件到对应的handler,类似AWT线程
- Handlers是非阻塞的任务,类似AWT中的ActionListeners
- Manage负责把handler绑定到事件上
- 参见施密特等人所著的《Pattern-Oriented Software Architecture》卷2 以及Richard Stevens关于网络的数以及MattWelsh的SEDA框架等等
Reactor基础版
单线程版
java.nio提供的支持
-
Channels
通道是文件、socket之间的连接, 支持非阻塞读取
-
Buffers
数组对象,可以被Channel直接读写
-
Selectors
负责筛选那些Channel有IO事件
-
SelectionKeys
保存IO事件状态和绑定对象
Reactor 1:创建
class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket; Reactor(int port) throws IOException { selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind( new InetSocketAddress(port) ); serverSocket.configureBlocking(false); SelectionKey sk = serverSocket.register( selector, SelectionKey.OP_ACCEPT ); sk.attach(new Acceptor()); } /* 也可以用SPI provider,更明确一些: SelectorProvider p = SelectorProvider.provider(); selector = p.openSelector(); serverSocket = p.openServerSocketChannel(); */
Reactor 2:分发循环
public void run() { // 通常启动一个新线程 try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) dispatch((SelectionKey)(it.next()); selected.clear(); } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { Runnable r = (Runnable)(k.attachment()); if (r != null) r.run(); }
Reactor 3:Acceptor
class Acceptor implements Runnable { // 内置类 public void run() { try { SocketChannel c = serverSocket.accept(); if (c != null) new Handler(selector, c); } catch (IOException ex) { /* ... */ } } }}
Reactor 4:建立Handler
final class Handler implements Runnable { final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(MAXIN); ByteBuffer output = ByteBuffer.allocate(MAXOUT); static final int READING = 0, SENDING = 1; int state = READING; Handler(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); // 尝试监听读事件 sk = socket.register(sel, 0); sk.attach(this); sk.interestOps(SelectionKey.OP_READ); sel.wakeup(); } boolean inputIsComplete() { /* ... */ } boolean outputIsComplete() { /* ... */ } void process() { /* ... */ }
Reactor 5:处理请求
public void run() { try { if (state == READING) read(); else if (state == SENDING) send(); } catch (IOException ex) { /* ... */ } } void read() throws IOException { socket.read(input); if (inputIsComplete()) { process(); state = SENDING; // 读完之后,通常监听写事件 sk.interestOps(SelectionKey.OP_WRITE); } } void send() throws IOException { socket.write(output); if (outputIsComplete()) sk.cancel(); } }
Handlers状态流转
GoF的状态模式的简单应用
class Handler { // ... public void run() { // 初始状态是read socket.read(input); if (inputIsComplete()) { process(); sk.attach(new Sender()); sk.interest(SelectionKey.OP_WRITE); sk.selector().wakeup(); } } class Sender implements Runnable { public void run(){ // ... socket.write(output); if (outputIsComplete()) sk.cancel(); } }}
多线程版
- 启动适当的线程数,尤其是多核情况下
- 工作线程
-
Reactors必须快速触发handlers
Handler任务重,会拖慢Reactor
-
把计算型的工作交给其他的线程
-
- 多个Readctor线程
-
Reactor线程专注于IO操作
-
把压力分摊到其他的reactor
负载均衡要考虑适配CPU和IO速率
-
Worker线程
-
只处理计算型任务,加速Reactor线程
类似于POSA2书上的Proactor模式
-
比改成事件驱动模式简单
只处理计算型任务
-
重复IO比较难处理
最好在第一次读的时候就全部读出来到缓存中
-
使用线程池,方便控制
通常只需少量的线程,比客户端数量少的多
Worker线程池
带线程池的Handler
class Handler implements Runnable { // 使用 util.concurrent中的线程池 static PooledExecutor pool = new PooledExecutor(...); static final int PROCESSING = 3; // ... synchronized void read() { // ... socket.read(input); if (inputIsComplete()) { state = PROCESSING; pool.execute(new Processer()); } } synchronized void processAndHandOff() { process(); state = SENDING; // or rebind attachment sk.interest(SelectionKey.OP_WRITE); } class Processer implements Runnable { public void run() { processAndHandOff(); } }}
调用任务的方式
-
链式传递
每个任务负责调用下一个任务,通常是效率最高的,但容易出问题
-
由到每个handler的分发器回调
通过设置状态,绑定对象等方式实现,GoF Mediator模式的变体
-
队列
就像上面的例子,通过队列传递buffer中的数据
-
Future
调用方通过join或者wait/notify方法获取每个task的执行结果
使用PooledExecutor
- 调度worker线程
- 主要方法:execute(Runnable r)
- 有如下控制参数
-
队列类型
-
最大线程数
-
最小线程数
-
"Warm" versus on-demand threads
-
自动回收空闲线程
需要的时候再创建新的线程
-
多种策略应对任务饱和
阻塞,丢弃,producer-runs,等等
-
多个Reactor线程
- 使用Reactor池
-
适配CPU和IO速率
-
静态创建或动态创建
每一个reactor都有自己的Selector,线程,分发循环
-
主acceptor分发给其他的reactors
-
Selector[] selectors;int next = 0;class Acceptor { // ... public synchronized void run() { ... Socket connection = serverSocket.accept(); if (connection != null) new Handler(selectors[next], connection); if (++next == selectors.length) next = 0; } }
多Reactor示例
其它的java.nio特性
-
每个Reactor包含多个Selector
把不同的handler绑定到不同的IO事件时需要特别小心同步问题
-
文件传输
自动文件传输:file-to-net或者net-to-file复制
-
内存映射文件
通过buffers访问文件
-
直接访问buffer
有时可以达到零拷贝的目的
But have setup and finalization overhead
非常适合长连接应用
扩展网络连接
-
同时收到多个请求
客户端建立连接
客户端发送一连串的消息/请求
客户端断开连接
-
举个例子
数据库事务监视器
多人在线游戏、聊天室等等
-
扩展上文的基础网络模型
保持许多相对长时间的存活的客户端
跟踪客户端,保持会话状态(包括丢弃)
分布式服务,横跨多个主机
API一览
-
Buffer
-
ByteBuffer
(CharBuffer、LongBuffer等等)
-
Channel
-
SelectableChannel
-
SocketChannel
-
ServerSocketChannel
-
FileChannel
-
Selector
-
SelectionKey
Buffer
abstract class Buffer { int capacity(); int position(); Buffer position(int newPosition); int limit(); Buffer limit(int newLimit); Buffer mark(); Buffer reset(); Buffer clear(); Buffer flip(); Buffer rewind(); int remaining(); boolean hasRemaining(); boolean isReadOnly();}
ByteBuffer
abstract class ByteBuffer extends Buffer { static ByteBuffer allocateDirect(int capacity); static ByteBuffer allocate(int capacity); static ByteBuffer wrap(byte[] src, int offset, int len); static ByteBuffer wrap(byte[] src); boolean isDirect(); ByteOrder order(); ByteBuffer order(ByteOrder bo); ByteBuffer slice(); ByteBuffer duplicate(); ByteBuffer compact(); ByteBuffer asReadOnlyBuffer(); byte get(); byte get(int index); ByteBuffer get(byte[] dst, int offset, int length); ByteBuffer get(byte[] dst); ByteBuffer put(byte b); ByteBuffer put(int index, byte b); ByteBuffer put(byte[] src, int offset, int length); ByteBuffer put(ByteBuffer src); ByteBuffer put(byte[] src); char getChar(); char getChar(int index); ByteBuffer putChar(char value); ByteBuffer putChar(int index, char value); CharBuffer asCharBuffer(); short getShort(); short getShort(int index); ByteBuffer putShort(short value); ByteBuffer putShort(int index, short value); ShortBuffer asShortBuffer(); int getInt(); int getInt(int index); ByteBuffer putInt(int value); ByteBuffer putInt(int index, int value); IntBuffer asIntBuffer(); long getLong(); long getLong(int index); ByteBuffer putLong(long value); ByteBuffer putLong(int index, long value); LongBuffer asLongBuffer(); float getFloat(); float getFloat(int index); ByteBuffer putFloat(float value); ByteBuffer putFloat(int index, float value); FloatBuffer asFloatBuffer(); double getDouble(); double getDouble(int index); ByteBuffer putDouble(double value); ByteBuffer putDouble(int index, double value); DoubleBuffer asDoubleBuffer();}
Channel
interface Channel { boolean isOpen(); void close() throws IOException;}interface ReadableByteChannel extends Channel { int read(ByteBuffer dst) throws IOException;}interface WritableByteChannel extends Channel { int write(ByteBuffer src) throws IOException;}interface ScatteringByteChannel extends ReadableByteChannel { int read(ByteBuffer[] dsts, int offset, int length) throws IOException; int read(ByteBuffer[] dsts) throws IOException;}interface GatheringByteChannel extends WritableByteChannel { int write(ByteBuffer[] srcs, int offset, int length) throws IOException; int write(ByteBuffer[] srcs) throws IOException;}
SelectableChannel
abstract class SelectableChannel implements Channel { int validOps(); boolean isRegistered(); SelectionKey keyFor(Selector sel); SelectionKey register(Selector sel, int ops) throws ClosedChannelException; void configureBlocking(boolean block) throws IOException; boolean isBlocking(); Object blockingLock();}
SocketChannel
abstract class SocketChannel implements ByteChannel ... { static SocketChannel open() throws IOException; Socket socket(); int validOps(); boolean isConnected(); boolean isConnectionPending(); boolean isInputOpen(); boolean isOutputOpen(); boolean connect(SocketAddress remote) throws IOException; boolean finishConnect() throws IOException; void shutdownInput() throws IOException; void shutdownOutput() throws IOException; int read(ByteBuffer dst) throws IOException; int read(ByteBuffer[] dsts, int offset, int length) throws IOException; int read(ByteBuffer[] dsts) throws IOException; int write(ByteBuffer src) throws IOException; int write(ByteBuffer[] srcs, int offset, int length) throws IOException; int write(ByteBuffer[] srcs) throws IOException;}
ServerSocketChannel
abstract class ServerSocketChannel extends ... { static ServerSocketChannel open() throws IOException; int validOps(); ServerSocket socket(); SocketChannel accept() throws IOException;}
FileChannel
abstract class FileChannel implements ... { int read(ByteBuffer dst); int read(ByteBuffer dst, long position); int read(ByteBuffer[] dsts, int offset, int length); int read(ByteBuffer[] dsts); int write(ByteBuffer src); int write(ByteBuffer src, long position); int write(ByteBuffer[] srcs, int offset, int length); int write(ByteBuffer[] srcs); long position(); void position(long newPosition); long size(); void truncate(long size); void force(boolean flushMetaDataToo); int transferTo(long position, int count, WritableByteChannel dst); int transferFrom(ReadableByteChannel src, long position, int count); FileLock lock(long position, long size, boolean shared); FileLock lock(); FileLock tryLock(long pos, long size, boolean shared); FileLock tryLock(); static final int MAP_RO, MAP_RW, MAP_COW; MappedByteBuffer map(int mode, long position, int size);}NOTE: 所有的方法都抛IOException
Selector
abstract class Selector { static Selector open() throws IOException; Set keys(); Set selectedKeys(); int selectNow() throws IOException; int select(long timeout) throws IOException; int select() throws IOException; void wakeup(); void close() throws IOException;}
SelectionKey
abstract class SelectionKey { static final int OP_READ, OP_WRITE, OP_CONNECT, OP_ACCEPT; SelectableChannel channel(); Selector selector(); boolean isValid(); void cancel(); int interestOps(); void interestOps(int ops); int readyOps(); boolean isReadable(); boolean isWritable(); boolean isConnectable(); boolean isAcceptable(); Object attach(Object ob); Object attachment();}