001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.util.HashMap;
021import java.util.concurrent.ArrayBlockingQueue;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.ThreadPoolExecutor;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicInteger;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.util.Threads;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
033import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
034
035/**
036 * A very simple {@code }RpcScheduler} that serves incoming requests in order. This can be used for
037 * HMaster, where no prioritization is needed.
038 */
039@InterfaceAudience.Private
040public class FifoRpcScheduler extends RpcScheduler {
041  private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class);
042  protected final int handlerCount;
043  protected final int maxQueueLength;
044  protected final AtomicInteger queueSize = new AtomicInteger(0);
045  protected ThreadPoolExecutor executor;
046
047  public FifoRpcScheduler(Configuration conf, int handlerCount) {
048    this.handlerCount = handlerCount;
049    this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
050      handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
051  }
052
053  @Override
054  public void init(Context context) {
055    // no-op
056  }
057
058  @Override
059  public void start() {
060    LOG.info("Using {} as user call queue; handlerCount={}; maxQueueLength={}",
061      this.getClass().getSimpleName(), handlerCount, maxQueueLength);
062    this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
063      new ArrayBlockingQueue<>(maxQueueLength),
064      new ThreadFactoryBuilder().setNameFormat("FifoRpcScheduler.handler-pool-%d").setDaemon(true)
065        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
066      new ThreadPoolExecutor.CallerRunsPolicy());
067  }
068
069  @Override
070  public void stop() {
071    if (this.executor != null) {
072      this.executor.shutdown();
073    }
074  }
075
076  private static class FifoCallRunner implements Runnable {
077    private final CallRunner callRunner;
078
079    FifoCallRunner(CallRunner cr) {
080      this.callRunner = cr;
081    }
082
083    CallRunner getCallRunner() {
084      return callRunner;
085    }
086
087    @Override
088    public void run() {
089      callRunner.run();
090    }
091
092  }
093
094  @Override
095  public boolean dispatch(final CallRunner task) {
096    return executeRpcCall(executor, queueSize, task);
097  }
098
099  protected boolean executeRpcCall(final ThreadPoolExecutor executor, final AtomicInteger queueSize,
100    final CallRunner task) {
101    // Executors provide no offer, so make our own.
102    int queued = queueSize.getAndIncrement();
103    if (maxQueueLength > 0 && queued >= maxQueueLength) {
104      queueSize.decrementAndGet();
105      return false;
106    }
107
108    executor.execute(new FifoCallRunner(task) {
109      @Override
110      public void run() {
111        task.setStatus(RpcServer.getStatus());
112        task.run();
113        queueSize.decrementAndGet();
114      }
115    });
116
117    return true;
118  }
119
120  @Override
121  public int getGeneralQueueLength() {
122    return executor.getQueue().size();
123  }
124
125  @Override
126  public int getPriorityQueueLength() {
127    return 0;
128  }
129
130  @Override
131  public int getReplicationQueueLength() {
132    return 0;
133  }
134
135  @Override
136  public int getBulkLoadQueueLength() {
137    return 0;
138  }
139
140  @Override
141  public int getActiveRpcHandlerCount() {
142    return executor.getActiveCount();
143  }
144
145  @Override
146  public int getActiveGeneralRpcHandlerCount() {
147    return getActiveRpcHandlerCount();
148  }
149
150  @Override
151  public int getActivePriorityRpcHandlerCount() {
152    return 0;
153  }
154
155  @Override
156  public int getActiveReplicationRpcHandlerCount() {
157    return 0;
158  }
159
160  @Override
161  public int getActiveBulkLoadRpcHandlerCount() {
162    return 0;
163  }
164
165  @Override
166  public int getActiveMetaPriorityRpcHandlerCount() {
167    return 0;
168  }
169
170  @Override
171  public long getNumGeneralCallsDropped() {
172    return 0;
173  }
174
175  @Override
176  public long getNumLifoModeSwitches() {
177    return 0;
178  }
179
180  @Override
181  public int getWriteQueueLength() {
182    return 0;
183  }
184
185  @Override
186  public int getReadQueueLength() {
187    return 0;
188  }
189
190  @Override
191  public int getScanQueueLength() {
192    return 0;
193  }
194
195  @Override
196  public int getActiveWriteRpcHandlerCount() {
197    return 0;
198  }
199
200  @Override
201  public int getActiveReadRpcHandlerCount() {
202    return 0;
203  }
204
205  @Override
206  public int getActiveScanRpcHandlerCount() {
207    return 0;
208  }
209
210  @Override
211  public int getMetaPriorityQueueLength() {
212    return 0;
213  }
214
215  @Override
216  public CallQueueInfo getCallQueueInfo() {
217    String queueName = "Fifo Queue";
218
219    HashMap<String, Long> methodCount = new HashMap<>();
220    HashMap<String, Long> methodSize = new HashMap<>();
221
222    CallQueueInfo callQueueInfo = new CallQueueInfo();
223    callQueueInfo.setCallMethodCount(queueName, methodCount);
224    callQueueInfo.setCallMethodSize(queueName, methodSize);
225
226    updateMethodCountAndSizeByQueue(executor.getQueue(), methodCount, methodSize);
227
228    return callQueueInfo;
229  }
230
231  protected void updateMethodCountAndSizeByQueue(BlockingQueue<Runnable> queue,
232    HashMap<String, Long> methodCount, HashMap<String, Long> methodSize) {
233    for (Runnable r : queue) {
234      FifoCallRunner mcr = (FifoCallRunner) r;
235      RpcCall rpcCall = mcr.getCallRunner().getRpcCall();
236
237      String method = getCallMethod(mcr.getCallRunner());
238      if (StringUtil.isNullOrEmpty(method)) {
239        method = "Unknown";
240      }
241
242      long size = rpcCall.getSize();
243
244      methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L));
245      methodSize.put(method, size + methodSize.getOrDefault(method, 0L));
246    }
247  }
248
249  protected String getCallMethod(final CallRunner task) {
250    RpcCall call = task.getRpcCall();
251    if (call != null && call.getMethod() != null) {
252      return call.getMethod().getName();
253    }
254    return null;
255  }
256}