Scalable in java 翻译
目录
Scalable network services
可扩展的网络服务Event-driven processing
事件驱动处理Reactor pattern
Reactor 模式Basic version
基本版本Multithreaded versions
多线程版本Other variants
其他变体
Walkthrough of java.nio nonblocking IO APIs
预编排java.nio nonblocking IO APIs
Network Services 网络服务
- Web services, Distributed Objects, etc 网络服务,分布式对象,等等。
- Most have same basic structure: 大多数都有以下的基本结构
- Read request 读请求
- Decode request 解码请求
- Process service 处理服务
- Encode reply 编码响应
- Send reply 发送响应
- But differ in nature and cost of each step 当然在实际应用中每一步的运行效率都是不同的,例如其中可能涉及到xml解析、文件传输、web页面的加载、计算服务等不同功能
- XML parsing, File transfer, Web page generation, computational services, …
Classic Service Designs 传统的服务设计
- Each handler may be started in its own thread 在一般的网络服务当中都会为每一个连接的处理开启一个新的线程,我们可以看下大致的示意图:
public class Test {
private static Integer PORT = 8080;
public static void main(String[] args) {
Server server = new Server();
server.run();
}
public static class Server implements Runnable {
@Override
public void run() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted()) {
new Thread(new Handler(ss.accept())).start();
}
} catch (IOException ex) {
}
}
}
public static class Handler implements Runnable {
final Socket socket;
public Integer MAX_INPUT = 1024;
Handler(Socket s) {
socket = s;
}
@Override
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) {
/* ... */
}
}
private byte[] process(byte[] cmd) {
System.out.println(cmd);
return cmd;
}
}
}
Classic ServerSocket Loop 构建高性能可伸缩的IO服务
- Scalability Goals 在构建高性能可伸缩的IO服务过程中,我们希望达到以下的目标
- Graceful degradation under increasing load (more clients) 在海量的负载连接情况下优雅降级
- Continuous improvement with increasing resources (CPU, memory, disk, bandwidth) 能随着硬件资源的增加,性能持续改进
- Also meet availability and performance goals 还要满足可用性和性能目标
- Short latencies 低延时
- Meeting peak demand 高吞吐量
- Tunable quality of service 可调节服务质量
Divide and Conquer 分治法
- Divide-and-conquer is usually the best approach for achieving any scalability goal 分而治之通常是实现任何可伸缩性目标的最佳方法
- Divide processing into small tasks 将处理过程分成小任务
- Each task performs an action without blocking 每个任务执行一个操作而不阻塞
- Execute each task when it is enabled 在启用时执行每个任务
- Here, an IO event usually serves as trigger 在这里IO通常用作触发器
- Basic mechanisms supported in java.nio java.nio包就很好的实现了上述的机制
Non-blocking
reads and writes非阻塞
读和写Dispatch
tasks associated with sensed IO events调度
与感知到的IO事件相关的任务
- Endless variation possible 无限变化的可能
- A family of event-driven designs 所以结合一系列基于事件驱动模式的设计,给高性能IO服务的架构与设计带来丰富的可扩展性
- Divide processing into small tasks 将处理过程分成小任务
Event-driven Designs 基于事件驱动模式的设计
-
Usually more efficient than alternatives 基于事件驱动的架构设计通常比其他架构模型更加有效
- Fewer resources 更节约资源
- Don’t usually need a thread per client 通常不需要针对每个客户端去启动一个线程
- Less overhead 减少线程开销
- Less context switching, often less locking 更少的上下文切换和更少的互斥锁
- But dispatching can be slower 但是调度可能会更慢
- Must manually bind actions to events 必须手动将动作绑定到事件
- Fewer resources 更节约资源
-
Usually harder to program 编码的难度更高
- Must break up into simple non-blocking actions 相关功能必须分解成简单的非阻塞操作
- Similar to GUI event-driven actions 类似与GUI的事件驱动机制
- Cannot eliminate all blocking: GC, page faults, etc 当然也不可能把所有阻塞都消除掉,特别是GC, page faults(内存缺页中断)等
- Must keep track of logical state of service 由于是基于事件驱动的,所以需要跟踪服务的相关状态(因为你需要知道什么时候事件会发生)
- Must break up into simple non-blocking actions 相关功能必须分解成简单的非阻塞操作
Background: Events in AWT 背景:AWT中事件驱动设计
- Event-driven IO uses similar ideas but in different designs 下图是AWT中事件驱动设计的一个简单示意图,可以看到,在不同的架构设计中的基于事件驱动的IO操作使用的基本思路是一致的;
Reactor Pattern Reactor模式
-
Reactor
responds to IO events by dispatching the appropriate handlerReactor
模式中会通过分配适当的handler(处理程序)来响应IO事件- Similar to AWT thread 类似与AWT 事件处理线程
-
Handlers
perform non-blocking actions 每个handler执行非阻塞的操作- Similar to AWT ActionListeners 类似于AWT ActionListeners 事件监听
-
Manage by binding handlers to events 通过将handler绑定到事件进行管理
- Similar to AWT addActionListener 类似与AWT addActionListener 添加事件监听
-
See Schmidt et al, Pattern-Oriented Software Architecture, Volume 2 (POSA2) 参见Schmidt et al,面向模式的软件体系结构,第2卷(POSA2)
- Also Richard Stevens’s networking books, Matt Welsh’s SEDA framework, etc 还有Richard Stevens的网络书籍,Matt Welsh的SEDA框架,等等
Basic Reactor Design 单线程模式
- java.nio Support
java.nio
的一些概念 - Channels
- Connections to files, sockets etc that support non-blocking reads 支持非阻塞读写的socket连接
- Buffers
- Array-like objects that can be directly read or written by Channels 用于被Channels读写的字节数组对象
- Selectors
- Tell which of a set of Channels have IO events 用于判断channle发生IO事件的选择器
- SelectionKeys
- Maintain IO event status and bindings 负责IO事件的状态与绑定
接下来我们一步步看下基于Reactor模式的服务端设计代码示例
-
Reactor 1: Setup 第一步 Rector线程的初始化
-
Reactor 2: Dispatch Loop 第二步 分发调度
-
Reactor 3: Acceptor 第二步 分发调度 接收器
-
Reactor 4: Handler setup 第四步 Handler初始化
-
Reactor 5: Request handling 第五步 请求处理
-
Per-State Handlers 基于GoF状态对象模式对Handler类的一个优化实现,不需要再进行状态的判断
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Reactor implements Runnable{
public static void main(String[] args) {
try {
new Reactor(8080).run();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); //注册accept事件
sk.attach(new Acceptor()); //调用Acceptor()为回调方法
}
public void run() {
try {
while (!Thread.interrupted()) {//循环
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()){
//dispatch分发事件
dispatch((SelectionKey)it.next());
}
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment()); //调用SelectionKey绑定的调用对象
if (r != null)
r.run();
}
// Acceptor 连接处理类
class Acceptor implements Runnable { // inner
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
}
catch(IOException ex) { /* ... */ }
}
}
}
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
public class Handler implements Runnable{
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(1024);
ByteBuffer output = ByteBuffer.allocate(1024);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
// Optionally try first read now
sk = socket.register(sel, 0);
sk.attach(this); //将Handler绑定到SelectionKey上
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() { /* ... */
return false;
}
boolean outputIsComplete() { /* ... */
return false;
}
void process() { /* ... */ }
/**
* 这是Handler处理类
*/
/**/
@Override
public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete()) sk.cancel();
}
/**
* 下面是基于GoF状态对象模式对Handler类的一个优化实现,不需要再进行状态的判断。
*/
/*
@Override
public void run() { // initial state is reader
try {
socket.read(input);
} catch (IOException e) {
throw new RuntimeException(e);
}
if (inputIsComplete()) {
process();
sk.attach(new Sender());
sk.interestOps(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}
class Sender implements Runnable {
public void run(){ // ...
try {
socket.write(output);
} catch (IOException e) {
throw new RuntimeException(e);
}
if (outputIsComplete()) sk.cancel();
}
}
*/
}
Multithreaded Designs 多线程设计
- Strategically add threads for scalability 策略性地添加线程以提高可伸缩性
- Mainly applicable to multiprocessors 主要适用于多处理器
- Worker Threads 增加工作线程 专门处理非IO操作
- Reactors should quickly trigger handlers 反应器线程需要迅速触发处理流程
- Handler processing slows down Reactor 如果处理过程也就是process()方法产生阻塞会拖慢反应器线程的性能
- Offload non-IO processing to other threads 我们需要把一些非IO操作交给Woker线程来做
- Reactors should quickly trigger handlers 反应器线程需要迅速触发处理流程
- Multiple Reactor Threads 拆分并增加反应器Reactor线程
- Reactor threads can saturate doing IO 在压力较大时可以饱和处理IO操作,提高处理能力
- Distribute load to other reactors 维持多个Reactor线程也可以做负载均衡使用
- Load-balance to match CPU and IO rates 线程的数量可以根据程序本身是CPU密集型还是IO密集型操作来进行合理的分配
Worker Threads 工作线程
- Offload non-IO processing to speed up Reactor thread 通过卸载非IO操作来提升Reactor 线程的处理性能
- Similar to POSA2 Proactor designs 类似与POSA2 中Proactor的设计
- Simpler than reworking compute-bound processing into event-driven form 比将非IO操作重新设计为事件驱动的方式更简单
- Should still be pure nonblocking computation 仍然应该是纯粹的非阻塞计算
- Enough processing to outweigh overhead 足够的处理超过开销
- Should still be pure nonblocking computation 仍然应该是纯粹的非阻塞计算
- But harder to overlap processing with IO 但是很难将处理与IO重叠
- Best when can first read all input into a buffer 最好能在第一时间将所有输入读入缓冲区 (这里我理解的是最好一次性读取缓冲区数据,方便异步非IO操作处理数据)
- Use thread pool so can tune and control 可以通过线程池的方式对线程进行调优与控制
- Normally need many fewer threads than clients 通常需要比客户端少得多的线程
Handler with Thread Pool 带有线程池的处理器
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Handler implements Runnable{
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;
// uses util.concurrent thread pool
static ThreadPoolExecutor pool = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());
static final int PROCESSING = 3;
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(1024);
ByteBuffer output = ByteBuffer.allocate(1024);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
// Optionally try first read now
sk = socket.register(sel, 0);
sk.attach(this); //将Handler绑定到SelectionKey上
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() { /* ... */
return false;
}
boolean outputIsComplete() { /* ... */
return false;
}
void process() { /* ... */ }
/**
* 这是Handler处理类
*/
/**/
@Override
public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { }
}
synchronized void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
// process();
// state = SENDING;
// // Normally also do first write now
// sk.interestOps(SelectionKey.OP_WRITE);
state = PROCESSING;
pool.execute(new Processer());
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete()) sk.cancel();
}
synchronized void processAndHandOff() {
process();
state = SENDING; // or rebind attachment
sk.interestOps(SelectionKey.OP_WRITE);
}
class Processer implements Runnable {
public void run() { processAndHandOff(); }
}
}
Coordinating Tasks 协调任务
当你把非IO操作放到线程池中运行时,你需要注意以下几点问题
- Handoffs 交接
- Each task enables, triggers, or calls next one Usually fastest but can be brittle 任务之间的协调与控制,每个任务的启动、执行、传递的速度是很快的,不容易协调与控制
Callbacks
to per-handler dispatcher 每个hander中dispatch的回调与状态控制- Sets state, attachment, etc A variant of GoF Mediator pattern 设置状态、附件等。GoF中介模式的一种变体
Queues
不同线程之间缓冲区的线程安全问题- For example, passing buffers across stages 例如,跨阶段传递缓冲区
Futures
- When each task produces a result Coordination layered on top of join or wait/notify 需要任务返回结果时,任务线程等待和唤醒状态间的切换
Using PooledExecutor 可以使用PooledExecutor线程池框架解决上面的问题
- A tunable worker thread pool 这是一个可控的任务线程池
- Main method execute(Runnable r) 主函数采用execute(Runnable r)
- Controls for: 它具备以下功能,可以很好的对池中的线程与任务进行控制与管理
- The kind of task queue (any Channel) 任务队列的类型(任何通道)
- Maximum number of threads 最大线程数
- Minimum number of threads 最小线程数
- “Warm” versus on-demand threads “预热"以应对随机应变的线程 按需要判断线程的活动状态,及时处理空闲线程
- Keep-alive interval until idle threads die 保持活动间隔,直到空闲线程死亡
- to be later replaced by new ones if necessary 如有必要,以后再用新的代替
- Saturation policy 饱和策略 当执行任务数量超过线程池中线程数量时,有一系列的阻塞、限流的策略
- block, drop, producer-runs, etc 阻塞,抛弃,producer-runs等
Multiple Reactor Threads 基于多个反应器的多线程模式
- Using Reactor Pools 使用反应器线程池
- Use to match CPU and IO rates 一方面根据实际情况用于匹配调节CPU处理与IO读写的效率,提高系统资源的利用率
- Static or dynamic construction 另一方面在静态或动态构造中
- Each with own Selector, Thread, dispatch loop 每个反应器线程都包含对应的Selector,Thread,dispatchloop
- Main acceptor distributes to other reactors 下面是一个简单的代码示例与示意图(Netty就是基于这个模式设计的,一个处理Accpet连接的mainReactor线程,多个处理IO事件的subReactor线程)
Selector[] selectors; // Selector集合,每一个Selector 对应一个subReactor线程
//mainReactor线程
class Acceptor { // ...
public synchronized void run() {
//...
Socket connection = serverSocket.accept();
if (connection != null)
new Handler(selectors[next], connection);
if (++next == selectors.length)
next = 0;
}
}
Using other java.nio features 在服务的设计当中,我们还需要注意与java.nio包特性的结合
- Multiple Selectors per Reactor 每个反应器有多个选择器
- To bind different handlers to different IO events May need careful synchronization to coordinate 要将不同的处理程序绑定到不同的IO事件可能需要细致的同步协调编码
- File transfer 文件传输
- Automated file-to-net or net-to-file copying 自动文件到网络或网络到文件复制
- Memory-mapped files 内存映射文件的方式
- Access files via buffers 通过缓存区访问文件
- Direct buffers 直接缓冲区的方式
- Can sometimes achieve zero-copy transfer But have setup and finalization overhead Best for applications with long-lived connections 在合适的情况下可以使用零拷贝传输,但同时这会带来初始化与内存释放的问题(需要池化与主动释放)
Connection-Based Extensions 基于连接的扩展
- Instead of a single service request 并不是单个服务请求
- Client connects 客户端连接
- Client sends a series of messages/requests 客户端发送一系列的消息/请求
- Client disconnects 客户端断开连接
- Examples 样例
- Databases and Transaction monitors 数据库和事务监视器
- Multi-participant games, chat, etc 多人游戏、聊天等
- Can extend basic network service patterns 可以扩展基本的网络服务模式
- Handle many relatively long-lived clients 处理许多寿命相对较长的客户机
- Track client and session state (including drops) 跟踪客户端和会话状态(包括丢弃)
- Distribute services across multiple hosts 跨多个主机分发服务
API Walkthrough
- Buffer
- ByteBuffer
- (CharBuffer, LongBuffer, etc not shown.)
- Channel
- SelectableChannel
- SocketChannel
- ServerSocketChannel
- FileChannel
- Selector
- SelectionKey
Buffer
abstract class Buffer {
int capacity();
int position();
Buffer position(int newPosition);
int limit();
Buffer limit(int newLimit);
Buffer mark();
Buffer reset();
Buffer clear();
Buffer flip();
Buffer rewind();
int remaining();
boolean hasRemaining();
boolean isReadOnly();
}
ByteBuffer
abstract class ByteBuffer extends Buffer {
static ByteBuffer allocateDirect(int capacity);
static ByteBuffer allocate(int capacity);
static ByteBuffer wrap(byte[] src, int offset, int len);
static ByteBuffer wrap(byte[] src);
boolean isDirect();
ByteOrder order();
ByteBuffer order(ByteOrder bo);
ByteBuffer slice();
ByteBuffer duplicate();
ByteBuffer compact();
ByteBuffer asReadOnlyBuffer();
byte get();
byte get(int index);
ByteBuffer get(byte[] dst, int offset, int length);
ByteBuffer get(byte[] dst);
ByteBuffer put(byte b);
ByteBuffer put(int index, byte b);
ByteBuffer put(byte[] src, int offset, int length);
ByteBuffer put(ByteBuffer src);
ByteBuffer put(byte[] src);
char getChar();
char getChar(int index);
ByteBuffer putChar(char value);
ByteBuffer putChar(int index, char value);
CharBuffer asCharBuffer();
short getShort();
short getShort(int index);
ByteBuffer putShort(short value);
ByteBuffer putShort(int index, short value);
ShortBuffer asShortBuffer();
int getInt();
int getInt(int index);
ByteBuffer putInt(int value);
ByteBuffer putInt(int index, int value);
IntBuffer asIntBuffer();
long getLong();
long getLong(int index);
ByteBuffer putLong(long value);
ByteBuffer putLong(int index, long value);
LongBuffer asLongBuffer();
float getFloat();
float getFloat(int index);
ByteBuffer putFloat(float value);
ByteBuffer putFloat(int index, float value);
FloatBuffer asFloatBuffer();
double getDouble();
double getDouble(int index);
ByteBuffer putDouble(double value);
ByteBuffer putDouble(int index, double value);
DoubleBuffer asDoubleBuffer();
}
Channel
interface Channel {
boolean isOpen();
void close() throws IOException;
}
interface ReadableByteChannel extends Channel {
int read(ByteBuffer dst) throws IOException;
}
interface WritableByteChannel extends Channel {
int write(ByteBuffer src) throws IOException;
}
interface ScatteringByteChannel extends ReadableByteChannel {
int read(ByteBuffer[] dsts, int offset, int length)
throws IOException;
int read(ByteBuffer[] dsts) throws IOException;
}
interface GatheringByteChannel extends WritableByteChannel {
int write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
int write(ByteBuffer[] srcs) throws IOException;
}
SelectableChannel
abstract class SelectableChannel implements Channel {
int validOps();
boolean isRegistered();
SelectionKey keyFor(Selector sel);
SelectionKey register(Selector sel, int ops)
throws ClosedChannelException;
void configureBlocking(boolean block)
throws IOException;
boolean isBlocking();
Object blockingLock();
}
SocketChannel
abstract class SocketChannel implements ByteChannel{
static SocketChannel open() throws IOException;
Socket socket();
int validOps();
boolean isConnected();
boolean isConnectionPending();
boolean isInputOpen();
boolean isOutputOpen();
boolean connect(SocketAddress remote) throws IOException;
boolean finishConnect() throws IOException;
void shutdownInput() throws IOException;
void shutdownOutput() throws IOException;
int read(ByteBuffer dst) throws IOException;
int read(ByteBuffer[] dsts, int offset, int length)
throws IOException;
int read(ByteBuffer[] dsts) throws IOException;
int write(ByteBuffer src) throws IOException;
int write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
int write(ByteBuffer[] srcs) throws IOException;
}
ServerSocketChannel
abstract class ServerSocketChannel extends SocketChannel {
static ServerSocketChannel open() throws IOException;
int validOps();
ServerSocket socket();
SocketChannel accept() throws IOException;
}
FileChannel
abstract class FileChannel implements ... {
int read(ByteBuffer dst);
int read(ByteBuffer dst, long position);
int read(ByteBuffer[] dsts, int offset, int length);
int read(ByteBuffer[] dsts);
int write(ByteBuffer src);
int write(ByteBuffer src, long position);
int write(ByteBuffer[] srcs, int offset, int length);
int write(ByteBuffer[] srcs);
long position();
void position(long newPosition);
long size();
void truncate(long size);
void force(boolean flushMetaDataToo);
int transferTo(long position, int count,
WritableByteChannel dst);
int transferFrom(ReadableByteChannel src,
long position, int count);
FileLock lock(long position, long size, boolean shared);
FileLock lock();
FileLock tryLock(long pos, long size, boolean shared);
FileLock tryLock();
static final int MAP_RO, MAP_RW, MAP_COW;
MappedByteBuffer map(int mode, long position, int size);
}
Selector
abstract class Selector {
static Selector open() throws IOException;
Set keys();
Set selectedKeys();
int selectNow() throws IOException;
int select(long timeout) throws IOException;
int select() throws IOException;
void wakeup();
void close() throws IOException;
}
SelectionKey
abstract class SelectionKey {
static final int OP_READ, OP_WRITE,
OP_CONNECT, OP_ACCEPT;
SelectableChannel channel();
Selector selector();
boolean isValid();
void cancel();
int interestOps();
void interestOps(int ops);
int readyOps();
boolean isReadable();
boolean isWritable();
boolean isConnectable();
boolean isAcceptable();
Object attach(Object ob);
Object attachment();
}