001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.util.HashMap; 021import java.util.concurrent.ArrayBlockingQueue; 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.ThreadPoolExecutor; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.atomic.AtomicInteger; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.util.Threads; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 033import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; 034 035/** 036 * A very simple {@code }RpcScheduler} that serves incoming requests in order. This can be used for 037 * HMaster, where no prioritization is needed. 038 */ 039@InterfaceAudience.Private 040public class FifoRpcScheduler extends RpcScheduler { 041 private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class); 042 protected final int handlerCount; 043 protected final int maxQueueLength; 044 protected final AtomicInteger queueSize = new AtomicInteger(0); 045 protected ThreadPoolExecutor executor; 046 047 public FifoRpcScheduler(Configuration conf, int handlerCount) { 048 this.handlerCount = handlerCount; 049 this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 050 handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 051 } 052 053 @Override 054 public void init(Context context) { 055 // no-op 056 } 057 058 @Override 059 public void start() { 060 LOG.info("Using {} as user call queue; handlerCount={}; maxQueueLength={}", 061 this.getClass().getSimpleName(), handlerCount, maxQueueLength); 062 this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS, 063 new ArrayBlockingQueue<>(maxQueueLength), 064 new ThreadFactoryBuilder().setNameFormat("FifoRpcScheduler.handler-pool-%d").setDaemon(true) 065 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 066 new ThreadPoolExecutor.CallerRunsPolicy()); 067 } 068 069 @Override 070 public void stop() { 071 if (this.executor != null) { 072 this.executor.shutdown(); 073 } 074 } 075 076 private static class FifoCallRunner implements Runnable { 077 private final CallRunner callRunner; 078 079 FifoCallRunner(CallRunner cr) { 080 this.callRunner = cr; 081 } 082 083 CallRunner getCallRunner() { 084 return callRunner; 085 } 086 087 @Override 088 public void run() { 089 callRunner.run(); 090 } 091 092 } 093 094 @Override 095 public boolean dispatch(final CallRunner task) { 096 return executeRpcCall(executor, queueSize, task); 097 } 098 099 protected boolean executeRpcCall(final ThreadPoolExecutor executor, final AtomicInteger queueSize, 100 final CallRunner task) { 101 // Executors provide no offer, so make our own. 102 int queued = queueSize.getAndIncrement(); 103 if (maxQueueLength > 0 && queued >= maxQueueLength) { 104 queueSize.decrementAndGet(); 105 return false; 106 } 107 108 executor.execute(new FifoCallRunner(task) { 109 @Override 110 public void run() { 111 task.setStatus(RpcServer.getStatus()); 112 task.run(); 113 queueSize.decrementAndGet(); 114 } 115 }); 116 117 return true; 118 } 119 120 @Override 121 public int getGeneralQueueLength() { 122 return executor.getQueue().size(); 123 } 124 125 @Override 126 public int getPriorityQueueLength() { 127 return 0; 128 } 129 130 @Override 131 public int getReplicationQueueLength() { 132 return 0; 133 } 134 135 @Override 136 public int getBulkLoadQueueLength() { 137 return 0; 138 } 139 140 @Override 141 public int getActiveRpcHandlerCount() { 142 return executor.getActiveCount(); 143 } 144 145 @Override 146 public int getActiveGeneralRpcHandlerCount() { 147 return getActiveRpcHandlerCount(); 148 } 149 150 @Override 151 public int getActivePriorityRpcHandlerCount() { 152 return 0; 153 } 154 155 @Override 156 public int getActiveReplicationRpcHandlerCount() { 157 return 0; 158 } 159 160 @Override 161 public int getActiveBulkLoadRpcHandlerCount() { 162 return 0; 163 } 164 165 @Override 166 public int getActiveMetaPriorityRpcHandlerCount() { 167 return 0; 168 } 169 170 @Override 171 public long getNumGeneralCallsDropped() { 172 return 0; 173 } 174 175 @Override 176 public long getNumLifoModeSwitches() { 177 return 0; 178 } 179 180 @Override 181 public int getWriteQueueLength() { 182 return 0; 183 } 184 185 @Override 186 public int getReadQueueLength() { 187 return 0; 188 } 189 190 @Override 191 public int getScanQueueLength() { 192 return 0; 193 } 194 195 @Override 196 public int getActiveWriteRpcHandlerCount() { 197 return 0; 198 } 199 200 @Override 201 public int getActiveReadRpcHandlerCount() { 202 return 0; 203 } 204 205 @Override 206 public int getActiveScanRpcHandlerCount() { 207 return 0; 208 } 209 210 @Override 211 public int getMetaPriorityQueueLength() { 212 return 0; 213 } 214 215 @Override 216 public CallQueueInfo getCallQueueInfo() { 217 String queueName = "Fifo Queue"; 218 219 HashMap<String, Long> methodCount = new HashMap<>(); 220 HashMap<String, Long> methodSize = new HashMap<>(); 221 222 CallQueueInfo callQueueInfo = new CallQueueInfo(); 223 callQueueInfo.setCallMethodCount(queueName, methodCount); 224 callQueueInfo.setCallMethodSize(queueName, methodSize); 225 226 updateMethodCountAndSizeByQueue(executor.getQueue(), methodCount, methodSize); 227 228 return callQueueInfo; 229 } 230 231 protected void updateMethodCountAndSizeByQueue(BlockingQueue<Runnable> queue, 232 HashMap<String, Long> methodCount, HashMap<String, Long> methodSize) { 233 for (Runnable r : queue) { 234 FifoCallRunner mcr = (FifoCallRunner) r; 235 RpcCall rpcCall = mcr.getCallRunner().getRpcCall(); 236 237 String method = getCallMethod(mcr.getCallRunner()); 238 if (StringUtil.isNullOrEmpty(method)) { 239 method = "Unknown"; 240 } 241 242 long size = rpcCall.getSize(); 243 244 methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L)); 245 methodSize.put(method, size + methodSize.getOrDefault(method, 0L)); 246 } 247 } 248 249 protected String getCallMethod(final CallRunner task) { 250 RpcCall call = task.getRpcCall(); 251 if (call != null && call.getMethod() != null) { 252 return call.getMethod().getName(); 253 } 254 return null; 255 } 256}