001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.util.HashMap; 022import java.util.concurrent.ArrayBlockingQueue; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.ThreadPoolExecutor; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicInteger; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.util.Threads; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 034import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; 035 036/** 037 * A very simple {@code }RpcScheduler} that serves incoming requests in order. 038 * 039 * This can be used for HMaster, where no prioritization is needed. 040 */ 041@InterfaceAudience.Private 042public class FifoRpcScheduler extends RpcScheduler { 043 private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class); 044 protected final int handlerCount; 045 protected final int maxQueueLength; 046 protected final AtomicInteger queueSize = new AtomicInteger(0); 047 protected ThreadPoolExecutor executor; 048 049 public FifoRpcScheduler(Configuration conf, int handlerCount) { 050 this.handlerCount = handlerCount; 051 this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 052 handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 053 } 054 055 @Override 056 public void init(Context context) { 057 // no-op 058 } 059 060 @Override 061 public void start() { 062 LOG.info("Using {} as user call queue; handlerCount={}; maxQueueLength={}", 063 this.getClass().getSimpleName(), handlerCount, maxQueueLength); 064 this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS, 065 new ArrayBlockingQueue<>(maxQueueLength), 066 new ThreadFactoryBuilder().setNameFormat("FifoRpcScheduler.handler-pool-%d").setDaemon(true) 067 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 068 new ThreadPoolExecutor.CallerRunsPolicy()); 069 } 070 071 @Override 072 public void stop() { 073 this.executor.shutdown(); 074 } 075 076 private static class FifoCallRunner implements Runnable { 077 private final CallRunner callRunner; 078 079 FifoCallRunner(CallRunner cr) { 080 this.callRunner = cr; 081 } 082 083 CallRunner getCallRunner() { 084 return callRunner; 085 } 086 087 @Override 088 public void run() { 089 callRunner.run(); 090 } 091 092 } 093 094 @Override 095 public boolean dispatch(final CallRunner task) throws IOException, InterruptedException { 096 return executeRpcCall(executor, queueSize, task); 097 } 098 099 protected boolean executeRpcCall(final ThreadPoolExecutor executor, final AtomicInteger queueSize, 100 final CallRunner task) { 101 // Executors provide no offer, so make our own. 102 int queued = queueSize.getAndIncrement(); 103 if (maxQueueLength > 0 && queued >= maxQueueLength) { 104 queueSize.decrementAndGet(); 105 return false; 106 } 107 108 executor.execute(new FifoCallRunner(task){ 109 @Override 110 public void run() { 111 task.setStatus(RpcServer.getStatus()); 112 task.run(); 113 queueSize.decrementAndGet(); 114 } 115 }); 116 117 return true; 118 } 119 120 @Override 121 public int getGeneralQueueLength() { 122 return executor.getQueue().size(); 123 } 124 125 @Override 126 public int getPriorityQueueLength() { 127 return 0; 128 } 129 130 @Override 131 public int getReplicationQueueLength() { 132 return 0; 133 } 134 135 @Override 136 public int getActiveRpcHandlerCount() { 137 return executor.getActiveCount(); 138 } 139 140 @Override 141 public int getActiveGeneralRpcHandlerCount() { 142 return getActiveRpcHandlerCount(); 143 } 144 145 @Override 146 public int getActivePriorityRpcHandlerCount() { 147 return 0; 148 } 149 150 @Override 151 public int getActiveReplicationRpcHandlerCount() { 152 return 0; 153 } 154 155 @Override 156 public int getActiveMetaPriorityRpcHandlerCount() { 157 return 0; 158 } 159 160 @Override 161 public long getNumGeneralCallsDropped() { 162 return 0; 163 } 164 165 @Override 166 public long getNumLifoModeSwitches() { 167 return 0; 168 } 169 170 @Override 171 public int getWriteQueueLength() { 172 return 0; 173 } 174 175 @Override 176 public int getReadQueueLength() { 177 return 0; 178 } 179 180 @Override 181 public int getScanQueueLength() { 182 return 0; 183 } 184 185 @Override 186 public int getActiveWriteRpcHandlerCount() { 187 return 0; 188 } 189 190 @Override 191 public int getActiveReadRpcHandlerCount() { 192 return 0; 193 } 194 195 @Override 196 public int getActiveScanRpcHandlerCount() { 197 return 0; 198 } 199 200 @Override 201 public int getMetaPriorityQueueLength() { 202 return 0; 203 } 204 205 @Override 206 public CallQueueInfo getCallQueueInfo() { 207 String queueName = "Fifo Queue"; 208 209 HashMap<String, Long> methodCount = new HashMap<>(); 210 HashMap<String, Long> methodSize = new HashMap<>(); 211 212 CallQueueInfo callQueueInfo = new CallQueueInfo(); 213 callQueueInfo.setCallMethodCount(queueName, methodCount); 214 callQueueInfo.setCallMethodSize(queueName, methodSize); 215 216 updateMethodCountAndSizeByQueue(executor.getQueue(), methodCount, methodSize); 217 218 return callQueueInfo; 219 } 220 221 protected void updateMethodCountAndSizeByQueue(BlockingQueue<Runnable> queue, 222 HashMap<String, Long> methodCount, HashMap<String, Long> methodSize) { 223 for (Runnable r : queue) { 224 FifoCallRunner mcr = (FifoCallRunner) r; 225 RpcCall rpcCall = mcr.getCallRunner().getRpcCall(); 226 227 String method = getCallMethod(mcr.getCallRunner()); 228 if (StringUtil.isNullOrEmpty(method)) { 229 method = "Unknown"; 230 } 231 232 long size = rpcCall.getSize(); 233 234 methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L)); 235 methodSize.put(method, size + methodSize.getOrDefault(method, 0L)); 236 } 237 } 238 239 protected String getCallMethod(final CallRunner task) { 240 RpcCall call = task.getRpcCall(); 241 if (call != null && call.getMethod() != null) { 242 return call.getMethod().getName(); 243 } 244 return null; 245 } 246}