001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.util.HashMap;
021import java.util.concurrent.ArrayBlockingQueue;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.ThreadPoolExecutor;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicInteger;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.util.Threads;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
033import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
034
035/**
036 * A very simple {@code }RpcScheduler} that serves incoming requests in order. This can be used for
037 * HMaster, where no prioritization is needed.
038 */
039@InterfaceAudience.Private
040public class FifoRpcScheduler extends RpcScheduler {
041  private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class);
042  protected final int handlerCount;
043  protected final int maxQueueLength;
044  protected final AtomicInteger queueSize = new AtomicInteger(0);
045  protected ThreadPoolExecutor executor;
046
047  public FifoRpcScheduler(Configuration conf, int handlerCount) {
048    this.handlerCount = handlerCount;
049    this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
050      handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
051  }
052
053  @Override
054  public void init(Context context) {
055    // no-op
056  }
057
058  @Override
059  public void start() {
060    LOG.info("Using {} as user call queue; handlerCount={}; maxQueueLength={}",
061      this.getClass().getSimpleName(), handlerCount, maxQueueLength);
062    this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
063      new ArrayBlockingQueue<>(maxQueueLength),
064      new ThreadFactoryBuilder().setNameFormat("FifoRpcScheduler.handler-pool-%d").setDaemon(true)
065        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
066      new ThreadPoolExecutor.CallerRunsPolicy());
067  }
068
069  @Override
070  public void stop() {
071    this.executor.shutdown();
072  }
073
074  private static class FifoCallRunner implements Runnable {
075    private final CallRunner callRunner;
076
077    FifoCallRunner(CallRunner cr) {
078      this.callRunner = cr;
079    }
080
081    CallRunner getCallRunner() {
082      return callRunner;
083    }
084
085    @Override
086    public void run() {
087      callRunner.run();
088    }
089
090  }
091
092  @Override
093  public boolean dispatch(final CallRunner task) {
094    return executeRpcCall(executor, queueSize, task);
095  }
096
097  protected boolean executeRpcCall(final ThreadPoolExecutor executor, final AtomicInteger queueSize,
098    final CallRunner task) {
099    // Executors provide no offer, so make our own.
100    int queued = queueSize.getAndIncrement();
101    if (maxQueueLength > 0 && queued >= maxQueueLength) {
102      queueSize.decrementAndGet();
103      return false;
104    }
105
106    executor.execute(new FifoCallRunner(task) {
107      @Override
108      public void run() {
109        task.setStatus(RpcServer.getStatus());
110        task.run();
111        queueSize.decrementAndGet();
112      }
113    });
114
115    return true;
116  }
117
118  @Override
119  public int getGeneralQueueLength() {
120    return executor.getQueue().size();
121  }
122
123  @Override
124  public int getPriorityQueueLength() {
125    return 0;
126  }
127
128  @Override
129  public int getReplicationQueueLength() {
130    return 0;
131  }
132
133  @Override
134  public int getActiveRpcHandlerCount() {
135    return executor.getActiveCount();
136  }
137
138  @Override
139  public int getActiveGeneralRpcHandlerCount() {
140    return getActiveRpcHandlerCount();
141  }
142
143  @Override
144  public int getActivePriorityRpcHandlerCount() {
145    return 0;
146  }
147
148  @Override
149  public int getActiveReplicationRpcHandlerCount() {
150    return 0;
151  }
152
153  @Override
154  public int getActiveMetaPriorityRpcHandlerCount() {
155    return 0;
156  }
157
158  @Override
159  public long getNumGeneralCallsDropped() {
160    return 0;
161  }
162
163  @Override
164  public long getNumLifoModeSwitches() {
165    return 0;
166  }
167
168  @Override
169  public int getWriteQueueLength() {
170    return 0;
171  }
172
173  @Override
174  public int getReadQueueLength() {
175    return 0;
176  }
177
178  @Override
179  public int getScanQueueLength() {
180    return 0;
181  }
182
183  @Override
184  public int getActiveWriteRpcHandlerCount() {
185    return 0;
186  }
187
188  @Override
189  public int getActiveReadRpcHandlerCount() {
190    return 0;
191  }
192
193  @Override
194  public int getActiveScanRpcHandlerCount() {
195    return 0;
196  }
197
198  @Override
199  public int getMetaPriorityQueueLength() {
200    return 0;
201  }
202
203  @Override
204  public CallQueueInfo getCallQueueInfo() {
205    String queueName = "Fifo Queue";
206
207    HashMap<String, Long> methodCount = new HashMap<>();
208    HashMap<String, Long> methodSize = new HashMap<>();
209
210    CallQueueInfo callQueueInfo = new CallQueueInfo();
211    callQueueInfo.setCallMethodCount(queueName, methodCount);
212    callQueueInfo.setCallMethodSize(queueName, methodSize);
213
214    updateMethodCountAndSizeByQueue(executor.getQueue(), methodCount, methodSize);
215
216    return callQueueInfo;
217  }
218
219  protected void updateMethodCountAndSizeByQueue(BlockingQueue<Runnable> queue,
220    HashMap<String, Long> methodCount, HashMap<String, Long> methodSize) {
221    for (Runnable r : queue) {
222      FifoCallRunner mcr = (FifoCallRunner) r;
223      RpcCall rpcCall = mcr.getCallRunner().getRpcCall();
224
225      String method = getCallMethod(mcr.getCallRunner());
226      if (StringUtil.isNullOrEmpty(method)) {
227        method = "Unknown";
228      }
229
230      long size = rpcCall.getSize();
231
232      methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L));
233      methodSize.put(method, size + methodSize.getOrDefault(method, 0L));
234    }
235  }
236
237  protected String getCallMethod(final CallRunner task) {
238    RpcCall call = task.getRpcCall();
239    if (call != null && call.getMethod() != null) {
240      return call.getMethod().getName();
241    }
242    return null;
243  }
244}