001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.concurrent.ArrayBlockingQueue;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.ThreadPoolExecutor;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicInteger;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.DaemonThreadFactory;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
034
035/**
036 * A very simple {@code }RpcScheduler} that serves incoming requests in order.
037 *
038 * This can be used for HMaster, where no prioritization is needed.
039 */
040@InterfaceAudience.Private
041public class FifoRpcScheduler extends RpcScheduler {
042  private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class);
043  protected final int handlerCount;
044  protected final int maxQueueLength;
045  protected final AtomicInteger queueSize = new AtomicInteger(0);
046  protected ThreadPoolExecutor executor;
047
048  public FifoRpcScheduler(Configuration conf, int handlerCount) {
049    this.handlerCount = handlerCount;
050    this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
051        handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
052  }
053
054  @Override
055  public void init(Context context) {
056    // no-op
057  }
058
059  @Override
060  public void start() {
061    LOG.info("Using {} as user call queue; handlerCount={}; maxQueueLength={}",
062      this.getClass().getSimpleName(), handlerCount, maxQueueLength);
063    this.executor = new ThreadPoolExecutor(
064        handlerCount,
065        handlerCount,
066        60,
067        TimeUnit.SECONDS,
068        new ArrayBlockingQueue<>(maxQueueLength),
069        new DaemonThreadFactory("FifoRpcScheduler.handler"),
070        new ThreadPoolExecutor.CallerRunsPolicy());
071  }
072
073  @Override
074  public void stop() {
075    this.executor.shutdown();
076  }
077
078  private static class FifoCallRunner implements Runnable {
079    private final CallRunner callRunner;
080
081    FifoCallRunner(CallRunner cr) {
082      this.callRunner = cr;
083    }
084
085    CallRunner getCallRunner() {
086      return callRunner;
087    }
088
089    @Override
090    public void run() {
091      callRunner.run();
092    }
093
094  }
095
096  @Override
097  public boolean dispatch(final CallRunner task) throws IOException, InterruptedException {
098    return executeRpcCall(executor, queueSize, task);
099  }
100
101  protected boolean executeRpcCall(final ThreadPoolExecutor executor, final AtomicInteger queueSize,
102      final CallRunner task) {
103    // Executors provide no offer, so make our own.
104    int queued = queueSize.getAndIncrement();
105    if (maxQueueLength > 0 && queued >= maxQueueLength) {
106      queueSize.decrementAndGet();
107      return false;
108    }
109
110    executor.execute(new FifoCallRunner(task){
111      @Override
112      public void run() {
113        task.setStatus(RpcServer.getStatus());
114        task.run();
115        queueSize.decrementAndGet();
116      }
117    });
118
119    return true;
120  }
121
122  @Override
123  public int getGeneralQueueLength() {
124    return executor.getQueue().size();
125  }
126
127  @Override
128  public int getPriorityQueueLength() {
129    return 0;
130  }
131
132  @Override
133  public int getReplicationQueueLength() {
134    return 0;
135  }
136
137  @Override
138  public int getActiveRpcHandlerCount() {
139    return executor.getActiveCount();
140  }
141
142  @Override
143  public int getActiveGeneralRpcHandlerCount() {
144    return getActiveRpcHandlerCount();
145  }
146
147  @Override
148  public int getActivePriorityRpcHandlerCount() {
149    return 0;
150  }
151
152  @Override
153  public int getActiveReplicationRpcHandlerCount() {
154    return 0;
155  }
156
157  @Override
158  public int getActiveMetaPriorityRpcHandlerCount() {
159    return 0;
160  }
161
162  @Override
163  public long getNumGeneralCallsDropped() {
164    return 0;
165  }
166
167  @Override
168  public long getNumLifoModeSwitches() {
169    return 0;
170  }
171
172  @Override
173  public int getWriteQueueLength() {
174    return 0;
175  }
176
177  @Override
178  public int getReadQueueLength() {
179    return 0;
180  }
181
182  @Override
183  public int getScanQueueLength() {
184    return 0;
185  }
186
187  @Override
188  public int getActiveWriteRpcHandlerCount() {
189    return 0;
190  }
191
192  @Override
193  public int getActiveReadRpcHandlerCount() {
194    return 0;
195  }
196
197  @Override
198  public int getActiveScanRpcHandlerCount() {
199    return 0;
200  }
201
202  @Override
203  public int getMetaPriorityQueueLength() {
204    return 0;
205  }
206
207  @Override
208  public CallQueueInfo getCallQueueInfo() {
209    String queueName = "Fifo Queue";
210
211    HashMap<String, Long> methodCount = new HashMap<>();
212    HashMap<String, Long> methodSize = new HashMap<>();
213
214    CallQueueInfo callQueueInfo = new CallQueueInfo();
215    callQueueInfo.setCallMethodCount(queueName, methodCount);
216    callQueueInfo.setCallMethodSize(queueName, methodSize);
217
218    updateMethodCountAndSizeByQueue(executor.getQueue(), methodCount, methodSize);
219
220    return callQueueInfo;
221  }
222
223  protected void updateMethodCountAndSizeByQueue(BlockingQueue<Runnable> queue,
224      HashMap<String, Long> methodCount, HashMap<String, Long> methodSize) {
225    for (Runnable r : queue) {
226      FifoCallRunner mcr = (FifoCallRunner) r;
227      RpcCall rpcCall = mcr.getCallRunner().getRpcCall();
228
229      String method = getCallMethod(mcr.getCallRunner());
230      if (StringUtil.isNullOrEmpty(method)) {
231        method = "Unknown";
232      }
233
234      long size = rpcCall.getSize();
235
236      methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L));
237      methodSize.put(method, size + methodSize.getOrDefault(method, 0L));
238    }
239  }
240
241  protected String getCallMethod(final CallRunner task) {
242    RpcCall call = task.getRpcCall();
243    if (call != null && call.getMethod() != null) {
244      return call.getMethod().getName();
245    }
246    return null;
247  }
248}