001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.util.HashMap; 022import java.util.concurrent.ArrayBlockingQueue; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.ThreadPoolExecutor; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicInteger; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.DaemonThreadFactory; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; 034 035/** 036 * A very simple {@code }RpcScheduler} that serves incoming requests in order. 037 * 038 * This can be used for HMaster, where no prioritization is needed. 039 */ 040@InterfaceAudience.Private 041public class FifoRpcScheduler extends RpcScheduler { 042 private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class); 043 protected final int handlerCount; 044 protected final int maxQueueLength; 045 protected final AtomicInteger queueSize = new AtomicInteger(0); 046 protected ThreadPoolExecutor executor; 047 048 public FifoRpcScheduler(Configuration conf, int handlerCount) { 049 this.handlerCount = handlerCount; 050 this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 051 handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 052 } 053 054 @Override 055 public void init(Context context) { 056 // no-op 057 } 058 059 @Override 060 public void start() { 061 LOG.info("Using {} as user call queue; handlerCount={}; maxQueueLength={}", 062 this.getClass().getSimpleName(), handlerCount, maxQueueLength); 063 this.executor = new ThreadPoolExecutor( 064 handlerCount, 065 handlerCount, 066 60, 067 TimeUnit.SECONDS, 068 new ArrayBlockingQueue<>(maxQueueLength), 069 new DaemonThreadFactory("FifoRpcScheduler.handler"), 070 new ThreadPoolExecutor.CallerRunsPolicy()); 071 } 072 073 @Override 074 public void stop() { 075 this.executor.shutdown(); 076 } 077 078 private static class FifoCallRunner implements Runnable { 079 private final CallRunner callRunner; 080 081 FifoCallRunner(CallRunner cr) { 082 this.callRunner = cr; 083 } 084 085 CallRunner getCallRunner() { 086 return callRunner; 087 } 088 089 @Override 090 public void run() { 091 callRunner.run(); 092 } 093 094 } 095 096 @Override 097 public boolean dispatch(final CallRunner task) throws IOException, InterruptedException { 098 return executeRpcCall(executor, queueSize, task); 099 } 100 101 protected boolean executeRpcCall(final ThreadPoolExecutor executor, final AtomicInteger queueSize, 102 final CallRunner task) { 103 // Executors provide no offer, so make our own. 104 int queued = queueSize.getAndIncrement(); 105 if (maxQueueLength > 0 && queued >= maxQueueLength) { 106 queueSize.decrementAndGet(); 107 return false; 108 } 109 110 executor.execute(new FifoCallRunner(task){ 111 @Override 112 public void run() { 113 task.setStatus(RpcServer.getStatus()); 114 task.run(); 115 queueSize.decrementAndGet(); 116 } 117 }); 118 119 return true; 120 } 121 122 @Override 123 public int getGeneralQueueLength() { 124 return executor.getQueue().size(); 125 } 126 127 @Override 128 public int getPriorityQueueLength() { 129 return 0; 130 } 131 132 @Override 133 public int getReplicationQueueLength() { 134 return 0; 135 } 136 137 @Override 138 public int getActiveRpcHandlerCount() { 139 return executor.getActiveCount(); 140 } 141 142 @Override 143 public int getActiveGeneralRpcHandlerCount() { 144 return getActiveRpcHandlerCount(); 145 } 146 147 @Override 148 public int getActivePriorityRpcHandlerCount() { 149 return 0; 150 } 151 152 @Override 153 public int getActiveReplicationRpcHandlerCount() { 154 return 0; 155 } 156 157 @Override 158 public int getActiveMetaPriorityRpcHandlerCount() { 159 return 0; 160 } 161 162 @Override 163 public long getNumGeneralCallsDropped() { 164 return 0; 165 } 166 167 @Override 168 public long getNumLifoModeSwitches() { 169 return 0; 170 } 171 172 @Override 173 public int getWriteQueueLength() { 174 return 0; 175 } 176 177 @Override 178 public int getReadQueueLength() { 179 return 0; 180 } 181 182 @Override 183 public int getScanQueueLength() { 184 return 0; 185 } 186 187 @Override 188 public int getActiveWriteRpcHandlerCount() { 189 return 0; 190 } 191 192 @Override 193 public int getActiveReadRpcHandlerCount() { 194 return 0; 195 } 196 197 @Override 198 public int getActiveScanRpcHandlerCount() { 199 return 0; 200 } 201 202 @Override 203 public int getMetaPriorityQueueLength() { 204 return 0; 205 } 206 207 @Override 208 public CallQueueInfo getCallQueueInfo() { 209 String queueName = "Fifo Queue"; 210 211 HashMap<String, Long> methodCount = new HashMap<>(); 212 HashMap<String, Long> methodSize = new HashMap<>(); 213 214 CallQueueInfo callQueueInfo = new CallQueueInfo(); 215 callQueueInfo.setCallMethodCount(queueName, methodCount); 216 callQueueInfo.setCallMethodSize(queueName, methodSize); 217 218 updateMethodCountAndSizeByQueue(executor.getQueue(), methodCount, methodSize); 219 220 return callQueueInfo; 221 } 222 223 protected void updateMethodCountAndSizeByQueue(BlockingQueue<Runnable> queue, 224 HashMap<String, Long> methodCount, HashMap<String, Long> methodSize) { 225 for (Runnable r : queue) { 226 FifoCallRunner mcr = (FifoCallRunner) r; 227 RpcCall rpcCall = mcr.getCallRunner().getRpcCall(); 228 229 String method = getCallMethod(mcr.getCallRunner()); 230 if (StringUtil.isNullOrEmpty(method)) { 231 method = "Unknown"; 232 } 233 234 long size = rpcCall.getSize(); 235 236 methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L)); 237 methodSize.put(method, size + methodSize.getOrDefault(method, 0L)); 238 } 239 } 240 241 protected String getCallMethod(final CallRunner task) { 242 RpcCall call = task.getRpcCall(); 243 if (call != null && call.getMethod() != null) { 244 return call.getMethod().getName(); 245 } 246 return null; 247 } 248}