001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.util.HashMap; 022import java.util.concurrent.ArrayBlockingQueue; 023import java.util.concurrent.ThreadPoolExecutor; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.atomic.AtomicInteger; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.DaemonThreadFactory; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; 033 034/** 035 * A very simple {@code }RpcScheduler} that serves incoming requests in order. 036 * 037 * This can be used for HMaster, where no prioritization is needed. 038 */ 039@InterfaceAudience.Private 040public class FifoRpcScheduler extends RpcScheduler { 041 private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class); 042 private final int handlerCount; 043 private final int maxQueueLength; 044 private final AtomicInteger queueSize = new AtomicInteger(0); 045 private ThreadPoolExecutor executor; 046 047 public FifoRpcScheduler(Configuration conf, int handlerCount) { 048 this.handlerCount = handlerCount; 049 this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 050 handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); 051 LOG.info("Using " + this.getClass().getSimpleName() + " as user call queue; handlerCount=" + 052 handlerCount + "; maxQueueLength=" + maxQueueLength); 053 } 054 055 @Override 056 public void init(Context context) { 057 // no-op 058 } 059 060 @Override 061 public void start() { 062 this.executor = new ThreadPoolExecutor( 063 handlerCount, 064 handlerCount, 065 60, 066 TimeUnit.SECONDS, 067 new ArrayBlockingQueue<>(maxQueueLength), 068 new DaemonThreadFactory("FifoRpcScheduler.handler"), 069 new ThreadPoolExecutor.CallerRunsPolicy()); 070 } 071 072 @Override 073 public void stop() { 074 this.executor.shutdown(); 075 } 076 077 private static class FifoCallRunner implements Runnable { 078 private final CallRunner callRunner; 079 080 FifoCallRunner(CallRunner cr) { 081 this.callRunner = cr; 082 } 083 084 CallRunner getCallRunner() { 085 return callRunner; 086 } 087 088 @Override 089 public void run() { 090 callRunner.run(); 091 } 092 093 } 094 095 @Override 096 public boolean dispatch(final CallRunner task) throws IOException, InterruptedException { 097 // Executors provide no offer, so make our own. 098 int queued = queueSize.getAndIncrement(); 099 if (maxQueueLength > 0 && queued >= maxQueueLength) { 100 queueSize.decrementAndGet(); 101 return false; 102 } 103 104 executor.execute(new FifoCallRunner(task){ 105 @Override 106 public void run() { 107 task.setStatus(RpcServer.getStatus()); 108 task.run(); 109 queueSize.decrementAndGet(); 110 } 111 }); 112 113 return true; 114 } 115 116 @Override 117 public int getGeneralQueueLength() { 118 return executor.getQueue().size(); 119 } 120 121 @Override 122 public int getPriorityQueueLength() { 123 return 0; 124 } 125 126 @Override 127 public int getReplicationQueueLength() { 128 return 0; 129 } 130 131 @Override 132 public int getActiveRpcHandlerCount() { 133 return executor.getActiveCount(); 134 } 135 136 @Override 137 public long getNumGeneralCallsDropped() { 138 return 0; 139 } 140 141 @Override 142 public long getNumLifoModeSwitches() { 143 return 0; 144 } 145 146 @Override 147 public int getWriteQueueLength() { 148 return 0; 149 } 150 151 @Override 152 public int getReadQueueLength() { 153 return 0; 154 } 155 156 @Override 157 public int getScanQueueLength() { 158 return 0; 159 } 160 161 @Override 162 public int getActiveWriteRpcHandlerCount() { 163 return 0; 164 } 165 166 @Override 167 public int getActiveReadRpcHandlerCount() { 168 return 0; 169 } 170 171 @Override 172 public int getActiveScanRpcHandlerCount() { 173 return 0; 174 } 175 176 @Override 177 public int getMetaPriorityQueueLength() { 178 return 0; 179 } 180 181 @Override 182 public CallQueueInfo getCallQueueInfo() { 183 String queueName = "Fifo Queue"; 184 185 HashMap<String, Long> methodCount = new HashMap<>(); 186 HashMap<String, Long> methodSize = new HashMap<>(); 187 188 CallQueueInfo callQueueInfo = new CallQueueInfo(); 189 callQueueInfo.setCallMethodCount(queueName, methodCount); 190 callQueueInfo.setCallMethodSize(queueName, methodSize); 191 192 193 for (Runnable r:executor.getQueue()) { 194 FifoCallRunner mcr = (FifoCallRunner) r; 195 RpcCall rpcCall = mcr.getCallRunner().getRpcCall(); 196 197 String method; 198 199 if (null==rpcCall.getMethod() || 200 StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) { 201 method = "Unknown"; 202 } 203 204 long size = rpcCall.getSize(); 205 206 methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L)); 207 methodSize.put(method, size + methodSize.getOrDefault(method, 0L)); 208 } 209 210 return callQueueInfo; 211 } 212 213}