001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.util.HashMap; 021import java.util.concurrent.ArrayBlockingQueue; 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.ThreadPoolExecutor; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.atomic.AtomicInteger; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.util.Threads; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 033import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; 034 035/** 036 * A very simple {@code }RpcScheduler} that serves incoming requests in order. This can be used for 037 * HMaster, where no prioritization is needed. 038 */ 039@InterfaceAudience.Private 040public class FifoRpcScheduler extends RpcScheduler { 041 private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class); 042 protected final int handlerCount; 043 protected final int maxQueueLength; 044 protected final AtomicInteger queueSize = new AtomicInteger(0); 045 protected ThreadPoolExecutor executor; 046 047 public FifoRpcScheduler(Configuration conf, int handlerCount) { 048 this.handlerCount = handlerCount; 049 this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 050 handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 051 } 052 053 @Override 054 public void init(Context context) { 055 // no-op 056 } 057 058 @Override 059 public void start() { 060 LOG.info("Using {} as user call queue; handlerCount={}; maxQueueLength={}", 061 this.getClass().getSimpleName(), handlerCount, maxQueueLength); 062 this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS, 063 new ArrayBlockingQueue<>(maxQueueLength), 064 new ThreadFactoryBuilder().setNameFormat("FifoRpcScheduler.handler-pool-%d").setDaemon(true) 065 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 066 new ThreadPoolExecutor.CallerRunsPolicy()); 067 } 068 069 @Override 070 public void stop() { 071 this.executor.shutdown(); 072 } 073 074 private static class FifoCallRunner implements Runnable { 075 private final CallRunner callRunner; 076 077 FifoCallRunner(CallRunner cr) { 078 this.callRunner = cr; 079 } 080 081 CallRunner getCallRunner() { 082 return callRunner; 083 } 084 085 @Override 086 public void run() { 087 callRunner.run(); 088 } 089 090 } 091 092 @Override 093 public boolean dispatch(final CallRunner task) { 094 return executeRpcCall(executor, queueSize, task); 095 } 096 097 protected boolean executeRpcCall(final ThreadPoolExecutor executor, final AtomicInteger queueSize, 098 final CallRunner task) { 099 // Executors provide no offer, so make our own. 100 int queued = queueSize.getAndIncrement(); 101 if (maxQueueLength > 0 && queued >= maxQueueLength) { 102 queueSize.decrementAndGet(); 103 return false; 104 } 105 106 executor.execute(new FifoCallRunner(task) { 107 @Override 108 public void run() { 109 task.setStatus(RpcServer.getStatus()); 110 task.run(); 111 queueSize.decrementAndGet(); 112 } 113 }); 114 115 return true; 116 } 117 118 @Override 119 public int getGeneralQueueLength() { 120 return executor.getQueue().size(); 121 } 122 123 @Override 124 public int getPriorityQueueLength() { 125 return 0; 126 } 127 128 @Override 129 public int getReplicationQueueLength() { 130 return 0; 131 } 132 133 @Override 134 public int getActiveRpcHandlerCount() { 135 return executor.getActiveCount(); 136 } 137 138 @Override 139 public int getActiveGeneralRpcHandlerCount() { 140 return getActiveRpcHandlerCount(); 141 } 142 143 @Override 144 public int getActivePriorityRpcHandlerCount() { 145 return 0; 146 } 147 148 @Override 149 public int getActiveReplicationRpcHandlerCount() { 150 return 0; 151 } 152 153 @Override 154 public int getActiveMetaPriorityRpcHandlerCount() { 155 return 0; 156 } 157 158 @Override 159 public long getNumGeneralCallsDropped() { 160 return 0; 161 } 162 163 @Override 164 public long getNumLifoModeSwitches() { 165 return 0; 166 } 167 168 @Override 169 public int getWriteQueueLength() { 170 return 0; 171 } 172 173 @Override 174 public int getReadQueueLength() { 175 return 0; 176 } 177 178 @Override 179 public int getScanQueueLength() { 180 return 0; 181 } 182 183 @Override 184 public int getActiveWriteRpcHandlerCount() { 185 return 0; 186 } 187 188 @Override 189 public int getActiveReadRpcHandlerCount() { 190 return 0; 191 } 192 193 @Override 194 public int getActiveScanRpcHandlerCount() { 195 return 0; 196 } 197 198 @Override 199 public int getMetaPriorityQueueLength() { 200 return 0; 201 } 202 203 @Override 204 public CallQueueInfo getCallQueueInfo() { 205 String queueName = "Fifo Queue"; 206 207 HashMap<String, Long> methodCount = new HashMap<>(); 208 HashMap<String, Long> methodSize = new HashMap<>(); 209 210 CallQueueInfo callQueueInfo = new CallQueueInfo(); 211 callQueueInfo.setCallMethodCount(queueName, methodCount); 212 callQueueInfo.setCallMethodSize(queueName, methodSize); 213 214 updateMethodCountAndSizeByQueue(executor.getQueue(), methodCount, methodSize); 215 216 return callQueueInfo; 217 } 218 219 protected void updateMethodCountAndSizeByQueue(BlockingQueue<Runnable> queue, 220 HashMap<String, Long> methodCount, HashMap<String, Long> methodSize) { 221 for (Runnable r : queue) { 222 FifoCallRunner mcr = (FifoCallRunner) r; 223 RpcCall rpcCall = mcr.getCallRunner().getRpcCall(); 224 225 String method = getCallMethod(mcr.getCallRunner()); 226 if (StringUtil.isNullOrEmpty(method)) { 227 method = "Unknown"; 228 } 229 230 long size = rpcCall.getSize(); 231 232 methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L)); 233 methodSize.put(method, size + methodSize.getOrDefault(method, 0L)); 234 } 235 } 236 237 protected String getCallMethod(final CallRunner task) { 238 RpcCall call = task.getRpcCall(); 239 if (call != null && call.getMethod() != null) { 240 return call.getMethod().getName(); 241 } 242 return null; 243 } 244}