博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java高并发异步Socket编程
阅读量:6997 次
发布时间:2019-06-27

本文共 12262 字,大约阅读时间需要 40 分钟。

  hot3.png

Java可扩展IO

Doug Lee

大纲

  • 可扩展的网络服务
  • 事件驱动
  • Reactor 模式
    • 基础版
    • 多线程版
    • 其他变体
  • java.io包中分阻塞IO API一览

网络应用服务器

  • Web服务器,分布式对象系统等等
  • 它们的共同特点
    • Read请求
    • 解码请求报文
    • 业务处理
    • 编码响应报文
    • 发送响应
  • 实际应用中每一个步骤的不一样
    • XML解析
    • 文件传输
    • 动态生成网页
    • 计算型服务

经典的服务设计

输入图片说明

每个线程运行一个handler

经典的ServerSocket监听循环

class Server implements Runnable {  public void run() {	try {      ServerSocket ss = new ServerSocket(PORT);      while (!Thread.interrupted())        new Thread(new Handler(ss.accept())).start();        // 这里使用单线程或者线程池    } catch (IOException ex) { /* ... */ }  }    static class Handler implements Runnable {    final Socket socket;    Handler(Socket s) { socket = s; }    public void run() {		try {	        byte[] input = new byte[MAX_INPUT];	        socket.getInputStream().read(input);	        byte[] output = process(input);	        socket.getOutputStream().write(output);	      } catch (IOException ex) { /* ... */ }    }    private byte[] process(byte[] cmd) { /* ... */ }  }}

Note: 异常处理省略

高可扩展性目标

  • 压力持续增大时服务优雅的降级(客户端增多)
  • 性能随着资源(CPU,内存,磁盘,带宽)的提升持续增加
  • 高可用和性能的目标
    • 低延迟
    • 应对请求尖峰
    • 服务质量可控
  • 分而治之是解决扩展性问题的常用方法

分而治之

  • 把处理流程切分成更小的task,每个task都是非阻塞的
  • 只有当任务准备好才去执行,IO事件模型通常是触发器机制
  • java.nio包包含基本的非阻塞机制
    • 非阻塞读和写
    • 分发任务 把task关联到IO事件模型
  • 更多想法
    • 事件驱动设计

事件驱动设计

  • 通常更高效
    • 更少的资源 不需要为每一个client启动一个线程
    • 减少开销 减少上下文切换以及更少的锁竞争
    • 事件分发较慢 必须把action和时间绑定在一起
  • 编程难度较高
    • 必须把流程分割为一系列非阻塞的任务单元
      • 与GUI的事件模型类似
      • 不能消除所有的阻塞,如:GC、内存页错误等等
    • 必须注意服务的状态的变化

Java AWT图形编程的事件驱动

输入图片说明

事件驱动的IO模型与此类似,但设计上有所不同

Reactor模型

  • Reacttor负责分发事件到对应的handler,类似AWT线程
  • Handlers是非阻塞的任务,类似AWT中的ActionListeners
  • Manage负责把handler绑定到事件上
  • 参见施密特等人所著的《Pattern-Oriented Software Architecture》卷2 以及Richard Stevens关于网络的数以及MattWelsh的SEDA框架等等

Reactor基础版

输入图片说明

单线程版

java.nio提供的支持

  • Channels

    通道是文件、socket之间的连接, 支持非阻塞读取

  • Buffers

    数组对象,可以被Channel直接读写

  • Selectors

    负责筛选那些Channel有IO事件

  • SelectionKeys

    保存IO事件状态和绑定对象

Reactor 1:创建

class Reactor implements Runnable {	final Selector selector;	final ServerSocketChannel serverSocket;	Reactor(int port) throws IOException {		selector = Selector.open();		serverSocket = ServerSocketChannel.open(); 		serverSocket.socket().bind( new InetSocketAddress(port) ); 		serverSocket.configureBlocking(false);    	SelectionKey sk = serverSocket.register( selector, SelectionKey.OP_ACCEPT );    	sk.attach(new Acceptor());	}	/*		也可以用SPI provider,更明确一些: 		SelectorProvider p = SelectorProvider.provider(); 		selector = p.openSelector();		serverSocket = p.openServerSocketChannel(); 	*/

Reactor 2:分发循环

public void run() {  // 通常启动一个新线程    	try {    		while (!Thread.interrupted()) {        		selector.select();				Set selected = selector.selectedKeys(); 				Iterator it = selected.iterator(); 				while (it.hasNext())					dispatch((SelectionKey)(it.next()); 				selected.clear();			}		} catch (IOException ex) { /* ... */ }	}	void dispatch(SelectionKey k) {		Runnable r = (Runnable)(k.attachment()); 		if (r != null) 			r.run(); 	}

Reactor 3:Acceptor

class Acceptor implements Runnable { // 内置类    	public void run() {    		try {        		SocketChannel c = serverSocket.accept();        		if (c != null)          			new Handler(selector, c);      		} catch (IOException ex) { /* ... */ }		} 	}}

输入图片说明

Reactor 4:建立Handler

final class Handler implements Runnable {	final SocketChannel socket;	final SelectionKey sk;	ByteBuffer input = ByteBuffer.allocate(MAXIN); 	ByteBuffer output = ByteBuffer.allocate(MAXOUT); 	static final int READING = 0, SENDING = 1;	int state = READING;	Handler(Selector sel, SocketChannel c) throws IOException {		socket = c; 		c.configureBlocking(false);		// 尝试监听读事件		sk = socket.register(sel, 0); 		sk.attach(this); 		sk.interestOps(SelectionKey.OP_READ); 		sel.wakeup();	}	boolean inputIsComplete() { /* ... */ } 	boolean outputIsComplete() { /* ... */ } 	void process() { /* ... */ }

Reactor 5:处理请求

public void run() {	    try {	    	if (state == READING) read();	    	else if (state == SENDING) send();	    } catch (IOException ex) { /* ... */ }	}		void read() throws IOException {	    socket.read(input);	    if (inputIsComplete()) {			process();			state = SENDING;			// 读完之后,通常监听写事件			sk.interestOps(SelectionKey.OP_WRITE);		} 	}		void send() throws IOException { 		socket.write(output);		if (outputIsComplete()) 			sk.cancel();	} }

Handlers状态流转

GoF的状态模式的简单应用

class Handler { // ...	public void run() { // 初始状态是read		socket.read(input);		if (inputIsComplete()) {			process();			sk.attach(new Sender()); 			sk.interest(SelectionKey.OP_WRITE); 			sk.selector().wakeup();		}	}		class Sender implements Runnable { 		public void run(){ // ...        	socket.write(output);			if (outputIsComplete()) 				sk.cancel(); 		}	}}

多线程版

  • 启动适当的线程数,尤其是多核情况下
  • 工作线程
    • Reactors必须快速触发handlers

      Handler任务重,会拖慢Reactor

    • 把计算型的工作交给其他的线程

  • 多个Readctor线程
    • Reactor线程专注于IO操作

    • 把压力分摊到其他的reactor

      负载均衡要考虑适配CPU和IO速率

Worker线程

  • 只处理计算型任务,加速Reactor线程

    类似于POSA2书上的Proactor模式

  • 比改成事件驱动模式简单

    只处理计算型任务

  • 重复IO比较难处理

    最好在第一次读的时候就全部读出来到缓存中

  • 使用线程池,方便控制

    通常只需少量的线程,比客户端数量少的多

Worker线程池

输入图片说明

带线程池的Handler

class Handler implements Runnable {	// 使用 util.concurrent中的线程池	static PooledExecutor pool = new PooledExecutor(...);	static final int PROCESSING = 3;	// ...	synchronized void read() { // ...		socket.read(input);		if (inputIsComplete()) {  			state = PROCESSING;  			pool.execute(new Processer());		}	}	synchronized void processAndHandOff() {		process();		state = SENDING; // or rebind attachment		sk.interest(SelectionKey.OP_WRITE);	}		class Processer implements Runnable {		public void run() { processAndHandOff(); }	}}

调用任务的方式

  • 链式传递

    每个任务负责调用下一个任务,通常是效率最高的,但容易出问题

  • 由到每个handler的分发器回调

    通过设置状态,绑定对象等方式实现,GoF Mediator模式的变体

  • 队列

    就像上面的例子,通过队列传递buffer中的数据

  • Future

    调用方通过join或者wait/notify方法获取每个task的执行结果

使用PooledExecutor

  • 调度worker线程
  • 主要方法:execute(Runnable r)
  • 有如下控制参数
    • 队列类型

    • 最大线程数

    • 最小线程数

    • "Warm" versus on-demand threads

    • 自动回收空闲线程

      需要的时候再创建新的线程

    • 多种策略应对任务饱和

      阻塞,丢弃,producer-runs,等等

多个Reactor线程

  • 使用Reactor池
    • 适配CPU和IO速率

    • 静态创建或动态创建

      每一个reactor都有自己的Selector,线程,分发循环

    • 主acceptor分发给其他的reactors

Selector[] selectors;int next = 0;class Acceptor { // ...	public synchronized void run() { ...		Socket connection = serverSocket.accept(); 		if (connection != null)			new Handler(selectors[next], connection); 		if (++next == selectors.length) 			next = 0;	} }

多Reactor示例

输入图片说明

其它的java.nio特性

  • 每个Reactor包含多个Selector

    把不同的handler绑定到不同的IO事件时需要特别小心同步问题

  • 文件传输

    自动文件传输:file-to-net或者net-to-file复制

  • 内存映射文件

    通过buffers访问文件

  • 直接访问buffer

    有时可以达到零拷贝的目的

    But have setup and finalization overhead

    非常适合长连接应用

扩展网络连接

  • 同时收到多个请求

    客户端建立连接

    客户端发送一连串的消息/请求

    客户端断开连接

  • 举个例子

    数据库事务监视器

    多人在线游戏、聊天室等等

  • 扩展上文的基础网络模型

    保持许多相对长时间的存活的客户端

    跟踪客户端,保持会话状态(包括丢弃)

    分布式服务,横跨多个主机

API一览

  • Buffer

  • ByteBuffer

    (CharBuffer、LongBuffer等等)

  • Channel

  • SelectableChannel

  • SocketChannel

  • ServerSocketChannel

  • FileChannel

  • Selector

  • SelectionKey

Buffer

abstract class Buffer {  int     capacity();  int     position();  Buffer  position(int newPosition);  int     limit();  Buffer  limit(int newLimit);  Buffer  mark();  Buffer  reset();  Buffer  clear();  Buffer  flip();  Buffer  rewind();  int     remaining();  boolean hasRemaining();  boolean isReadOnly();}

输入图片说明

ByteBuffer

abstract class ByteBuffer extends Buffer {	static ByteBuffer allocateDirect(int capacity);	static ByteBuffer allocate(int capacity);	static ByteBuffer wrap(byte[] src, int offset, int len);	static ByteBuffer wrap(byte[] src);	boolean      isDirect();	ByteOrder    order();	ByteBuffer   order(ByteOrder bo);	ByteBuffer   slice();	ByteBuffer   duplicate();	ByteBuffer   compact();	ByteBuffer   asReadOnlyBuffer();	byte 		get();	byte 		get(int index);	ByteBuffer 	get(byte[] dst, int offset, int length);	ByteBuffer 	get(byte[] dst);	ByteBuffer 	put(byte b);	ByteBuffer 	put(int index, byte b);	ByteBuffer 	put(byte[] src, int offset, int length);	ByteBuffer 	put(ByteBuffer src);	ByteBuffer 	put(byte[] src);	char 		getChar();	char 		getChar(int index);	ByteBuffer 	putChar(char value);	ByteBuffer 	putChar(int index, char value);		CharBuffer  asCharBuffer();	short 			getShort();	short 			getShort(int index);	ByteBuffer 		putShort(short value);	ByteBuffer		putShort(int index, short value);	ShortBuffer  asShortBuffer();	int          getInt();	int          getInt(int index);	ByteBuffer   putInt(int value);	ByteBuffer   putInt(int index, int value);	IntBuffer    asIntBuffer();	long         getLong();	long         getLong(int index);	ByteBuffer   putLong(long value);	ByteBuffer   putLong(int index, long value);	LongBuffer   asLongBuffer();	float 		getFloat();	float 		getFloat(int index);	ByteBuffer 	putFloat(float value);	ByteBuffer 	putFloat(int index, float value);	FloatBuffer  asFloatBuffer();	double 		getDouble();	double 		getDouble(int index);	ByteBuffer 	putDouble(double value);	ByteBuffer 	putDouble(int index, double value);	DoubleBuffer asDoubleBuffer();}

Channel

interface Channel {  boolean isOpen();  void    close() throws IOException;}interface ReadableByteChannel extends Channel {  int     read(ByteBuffer dst) throws IOException;}interface WritableByteChannel  extends Channel {  int     write(ByteBuffer src) throws IOException;}interface ScatteringByteChannel extends ReadableByteChannel {  int     read(ByteBuffer[] dsts, int offset, int length) throws IOException;  int     read(ByteBuffer[] dsts) throws IOException;}interface GatheringByteChannel extends WritableByteChannel {  int     write(ByteBuffer[] srcs, int offset, int length) throws IOException;  int     write(ByteBuffer[] srcs) throws IOException;}

SelectableChannel

abstract class SelectableChannel implements Channel {	int          validOps();	boolean      isRegistered();	SelectionKey keyFor(Selector sel);	SelectionKey register(Selector sel, int ops) throws ClosedChannelException;	void 		configureBlocking(boolean block) throws IOException;	boolean 	isBlocking();	Object 		blockingLock();}

SocketChannel

abstract class SocketChannel implements ByteChannel ... {  static SocketChannel open() throws IOException;  Socket  socket();  int     validOps();  boolean isConnected();  boolean isConnectionPending();  boolean isInputOpen();  boolean isOutputOpen();  boolean connect(SocketAddress remote) throws IOException;  boolean finishConnect() throws IOException;  void    shutdownInput() throws IOException;  void    shutdownOutput() throws IOException;  int     read(ByteBuffer dst) throws IOException;  int     read(ByteBuffer[] dsts, int offset, int length)              throws IOException;  int     read(ByteBuffer[] dsts) throws IOException;  int     write(ByteBuffer src) throws IOException;  int     write(ByteBuffer[] srcs, int offset, int length)              throws IOException;  int     write(ByteBuffer[] srcs) throws IOException;}

ServerSocketChannel

abstract class ServerSocketChannel extends ... {  static ServerSocketChannel open() throws IOException;  int           validOps();  ServerSocket  socket();  SocketChannel accept() throws IOException;}

FileChannel

abstract class FileChannel implements ... {  int  read(ByteBuffer dst);  int  read(ByteBuffer dst, long position);  int  read(ByteBuffer[] dsts, int offset, int length);  int  read(ByteBuffer[] dsts);  int  write(ByteBuffer src);  int  write(ByteBuffer src, long position);  int  write(ByteBuffer[] srcs, int offset, int length);  int  write(ByteBuffer[] srcs);  long position();  void position(long newPosition);  long size();  void truncate(long size);  void force(boolean flushMetaDataToo);  int  transferTo(long position, int count,                  WritableByteChannel dst);  int  transferFrom(ReadableByteChannel src,                    long position, int count);  FileLock lock(long position, long size, boolean shared);  FileLock lock();  FileLock tryLock(long pos, long size, boolean shared);  FileLock tryLock();  static final int MAP_RO, MAP_RW,  MAP_COW;  MappedByteBuffer map(int mode, long position, int size);}NOTE: 所有的方法都抛IOException

Selector

abstract class Selector {  static Selector open() throws IOException;  Set  keys();  Set  selectedKeys();  int  selectNow() throws IOException;  int  select(long timeout) throws IOException;  int  select() throws IOException;  void wakeup();  void close() throws IOException;}

SelectionKey

abstract class SelectionKey {	static final int  OP_READ, OP_WRITE, OP_CONNECT, OP_ACCEPT;	SelectableChannel channel();	Selector 	selector();	boolean 	isValid();	void 	cancel();	int 	interestOps();	void 	interestOps(int ops);	int 	readyOps();	boolean isReadable();	boolean isWritable();	boolean isConnectable();	boolean isAcceptable();	Object 	attach(Object ob);	Object 	attachment();}

转载于:https://my.oschina.net/enyo/blog/910100

你可能感兴趣的文章