基於版本:CDH5.4.2安全
上述版本較老,可是目前生產上是使用這個版本,因此以此爲例。app
說明:socket
客戶端API發送的請求將會被RPCServer的Listener線程監聽到。ide
Listener線程將分配Reader給到此Channel用戶後續請求的相應。oop
Reader線程將請求包裝成CallRunner實例,並將經過RpcScheduler線程根據請求屬性分類dispatch到不一樣的Executor線程。this
Executor線程將會保存這個CallRunner實例到隊列。atom
每個Executor隊列都被綁定了指定個數的Handler線程進行消費,消費很簡單,即拿出隊列的CallRunner實例,執行器run()方法。spa
run()方法將會組裝response到Responder線程中。線程
Responder線程將會不斷地將不一樣Channel的結果返回到客戶端。debug
整體來講服務端RPC處理機制是一個生產者消費者模型。
RpcServer是在master或者regionserver啓動時候進行初始化的,關鍵代碼以下:
public HRegionServer(Configuration conf, CoordinatedStateManager csm)
throws IOException, InterruptedException {
this.fsOk = true;
this.conf = conf;
checkCodecs(this.conf);
.....
rpcServices.start();
.....
}
/** Starts the service. Must be called before any calls will be handled. */
@Override
public synchronized void start() {
if (started) return;
......
responder.start();
listener.start();
scheduler.start();
started = true;
}
Listener經過NIO機制進行端口監聽,客戶端API鏈接服務端指定端口將會被監聽。
Listener對於API請求的接收:
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
try {
......
// 當一個API請求過來時候將會打開一個Channel,Listener將會分配一個Reader註冊。
// reader實例個數有限,採起順序分配和複用,即一個reader可能爲多個Channel服務。
Reader reader = getReader();
try {
reader.startAdd();
SelectionKey readKey = reader.registerChannel(channel);
// 同時也將保存這個Channel,用於後續的結果返回等
c = getConnection(channel, System.currentTimeMillis());
readKey.attach(c);
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
......
}
}
上述中Reader個數是有限的而且能夠順序複用的,個數能夠經過以下參數進行設定,默認爲10個。
this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
當生產能力不足時,能夠考慮增長此配置值。
Reader讀取請求幷包裝請求
當Reader實例被分配到一個Channel後,它將讀取此通道過來的請求,幷包裝成CallRunner用於調度。
void doRead(SelectionKey key) throws InterruptedException {
......
try {
// 此時將調用connection的讀取和處理方法
count = c.readAndProcess();
......
}
}
public int readAndProcess() throws IOException, InterruptedException {
......
// 經過connectionPreambleRead標記爲判斷此連接是否爲新鏈接,若是是新的那麼須要讀取
// 頭部報文信息,用於判斷當前連接屬性,好比是當前採起的是哪一種安全模式?
if (!connectionPreambleRead) {
count = readPreamble();
if (!connectionPreambleRead) {
return count;
}
......
count = channelRead(channel, data);
if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
// 實際處理請求,裏面也會根據連接的頭報文讀取時候判斷出的兩種模式進行不一樣的處理。
process();
}
return count;
}
private void process() throws IOException, InterruptedException {
......
if (useSasl) {
// Kerberos安全模式
saslReadAndProcess(data.array());
} else {
// AuthMethod.SIMPLE模式
processOneRpc(data.array());
}
.......
}
以下以AuthMethod.SIMPLE模式爲例進行分析:
private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
if (connectionHeaderRead) {
// 處理具體請求
processRequest(buf);
} else {
// 再次判斷連接Header是否讀取,未讀取則取出頭報文用以肯定請求的服務和方法等。
processConnectionHeader(buf);
this.connectionHeaderRead = true;
if (!authorizeConnection()) {
throw new AccessDeniedException("Connection from " + this + " for service "
connectionHeader.getServiceName() + " is unauthorized for user: " + user);
}
}
}
protected void processRequest(byte[] buf) throws IOException, InterruptedException {
long totalRequestSize = buf.length;
......
// 這裏將會判斷RpcServer作接收到的請求是否超過了maxQueueSize,注意這個值爲
// RpcServer級別的變量
if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
final Call callTooBig =
new Call(id, this.service, null, null, null, null, this,
responder, totalRequestSize, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
"Call queue is full on " + getListenerAddress() +
", is hbase.ipc.server.max.callqueue.size too small?");
responder.doRespond(callTooBig);
return;
}
......
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
totalRequestSize,
traceInfo);
// 此時請求段處理結束,將請求包裝成CallRunner後發送到不一樣的Executer的隊列中去。
scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider));
}
注意這個值爲 RpcServer級別的變量,默認值爲1G,超過此閾值將會出現Call queue is full錯誤。
callQueueSize的大小會在請求接收的時候增長,在請求處理結束(調用完畢CallRunner的run方法後)減去相應值。
this.maxQueueSize =this.conf.getInt("hbase.ipc.server.max.callqueue.size",DEFAULT_MAX_CALLQUEUE_SIZE);
客戶端請求在通過接收和包裝爲CallRunner後將會被具體的Scheduler進行dispatch,master和regionserver
調度器並不相同,這裏以regionserver的調度器進行講解。具體爲:SimpleRpcScheduler。
public RSRpcServices(HRegionServer rs) throws IOException {
......
RpcSchedulerFactory rpcSchedulerFactory;
try {
Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
SimpleRpcSchedulerFactory.class);
rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
請求轉發
前面已經提到請求包裝完CallRunner後由具體的RpcScheduler實現類的dispacth方法進行轉發。
具體代碼爲:
執行器介紹-隊列初始化
在此調度器中共分爲三個級別的調度執行器:
高優先請求級執行器
通常請求執行器
replication請求執行器
private final RpcExecutor callExecutor;
private final RpcExecutor priorityExecutor;
private final RpcExecutor replicationExecutor;
上述中callExecutor爲最主要通常請求執行器,在當前版本中此執行器中能夠將讀取和寫入初始化爲不一樣比例的隊列,並將handler也分紅不一樣比例進行隊列的綁定。即一個隊列上面只有被綁定的handler具體處理權限。默認的不劃分讀寫分離的場景下就只有一個隊列,全部請求都進入其中,全部的handler也將去處理這個隊列。
具體咱們以讀寫分離隊列爲例進行代碼分析:
float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues);
if (numCallQueues > 1 && callqReadShare > 0) {
// multiple read/write queues
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
// 實例化RW讀取執行器,構造參數中的爲讀寫比例,其中讀取又分爲通常讀取和scan讀取比例
// 後續將會調用重載的其餘構造方法,最終將會計算出各個讀取隊列的個數和handler的比例數
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
BoundedPriorityBlockingQueue.class, callPriority);
} else {
以下爲最終調用的重載構造方法:
public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers,
int numWriteQueues, int numReadQueues, float scanShare,
final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues));
int numScanQueues = Math.max(0, (int)Math.floor(numReadQueues * scanShare));
int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * scanShare));
if ((numReadQueues - numScanQueues) > 0) {
numReadQueues -= numScanQueues;
readHandlers -= scanHandlers;
} else {
numScanQueues = 0;
scanHandlers = 0;
}
// 肯定各個主要隊列參數
this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
this.readHandlersCount = Math.max(readHandlers, numReadQueues);
this.scanHandlersCount = Math.max(scanHandlers, numScanQueues);
this.numWriteQueues = numWriteQueues;
this.numReadQueues = numReadQueues;
this.numScanQueues = numScanQueues;
this.writeBalancer = getBalancer(numWriteQueues);
this.readBalancer = getBalancer(numReadQueues);
this.scanBalancer = getBalancer(numScanQueues);
queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
" readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount +
((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues +
" scanHandlers=" + scanHandlersCount));
// 初始化隊列列表,注意queues爲有序列表,以下隊列位置初始化後不會變更,在後續按照具體的請求
// 經過具體的getBalancer方法進行查找
for (int i = 0; i < numWriteQueues; ++i) {
queues.add((BlockingQueue<CallRunner>)
ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
}
for (int i = 0; i