001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.concurrent.ArrayBlockingQueue;
023import java.util.concurrent.ThreadPoolExecutor;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicInteger;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.DaemonThreadFactory;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
033
034/**
035 * A very simple {@code }RpcScheduler} that serves incoming requests in order.
036 *
037 * This can be used for HMaster, where no prioritization is needed.
038 */
039@InterfaceAudience.Private
040public class FifoRpcScheduler extends RpcScheduler {
041  private static final Logger LOG = LoggerFactory.getLogger(FifoRpcScheduler.class);
042  private final int handlerCount;
043  private final int maxQueueLength;
044  private final AtomicInteger queueSize = new AtomicInteger(0);
045  private ThreadPoolExecutor executor;
046
047  public FifoRpcScheduler(Configuration conf, int handlerCount) {
048    this.handlerCount = handlerCount;
049    this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
050        handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
051    LOG.info("Using " + this.getClass().getSimpleName() + " as user call queue; handlerCount=" +
052        handlerCount + "; maxQueueLength=" + maxQueueLength);
053  }
054
055  @Override
056  public void init(Context context) {
057    // no-op
058  }
059
060  @Override
061  public void start() {
062    this.executor = new ThreadPoolExecutor(
063        handlerCount,
064        handlerCount,
065        60,
066        TimeUnit.SECONDS,
067        new ArrayBlockingQueue<>(maxQueueLength),
068        new DaemonThreadFactory("FifoRpcScheduler.handler"),
069        new ThreadPoolExecutor.CallerRunsPolicy());
070  }
071
072  @Override
073  public void stop() {
074    this.executor.shutdown();
075  }
076
077  private static class FifoCallRunner implements Runnable {
078    private final CallRunner callRunner;
079
080    FifoCallRunner(CallRunner cr) {
081      this.callRunner = cr;
082    }
083
084    CallRunner getCallRunner() {
085      return callRunner;
086    }
087
088    @Override
089    public void run() {
090      callRunner.run();
091    }
092
093  }
094
095  @Override
096  public boolean dispatch(final CallRunner task) throws IOException, InterruptedException {
097    // Executors provide no offer, so make our own.
098    int queued = queueSize.getAndIncrement();
099    if (maxQueueLength > 0 && queued >= maxQueueLength) {
100      queueSize.decrementAndGet();
101      return false;
102    }
103
104    executor.execute(new FifoCallRunner(task){
105      @Override
106      public void run() {
107        task.setStatus(RpcServer.getStatus());
108        task.run();
109        queueSize.decrementAndGet();
110      }
111    });
112
113    return true;
114  }
115
116  @Override
117  public int getGeneralQueueLength() {
118    return executor.getQueue().size();
119  }
120
121  @Override
122  public int getPriorityQueueLength() {
123    return 0;
124  }
125
126  @Override
127  public int getReplicationQueueLength() {
128    return 0;
129  }
130
131  @Override
132  public int getActiveRpcHandlerCount() {
133    return executor.getActiveCount();
134  }
135
136  @Override
137  public long getNumGeneralCallsDropped() {
138    return 0;
139  }
140
141  @Override
142  public long getNumLifoModeSwitches() {
143    return 0;
144  }
145
146  @Override
147  public int getWriteQueueLength() {
148    return 0;
149  }
150
151  @Override
152  public int getReadQueueLength() {
153    return 0;
154  }
155
156  @Override
157  public int getScanQueueLength() {
158    return 0;
159  }
160
161  @Override
162  public int getActiveWriteRpcHandlerCount() {
163    return 0;
164  }
165
166  @Override
167  public int getActiveReadRpcHandlerCount() {
168    return 0;
169  }
170
171  @Override
172  public int getActiveScanRpcHandlerCount() {
173    return 0;
174  }
175
176  @Override
177  public int getMetaPriorityQueueLength() {
178    return 0;
179  }
180
181  @Override
182  public CallQueueInfo getCallQueueInfo() {
183    String queueName = "Fifo Queue";
184
185    HashMap<String, Long> methodCount = new HashMap<>();
186    HashMap<String, Long> methodSize = new HashMap<>();
187
188    CallQueueInfo callQueueInfo = new CallQueueInfo();
189    callQueueInfo.setCallMethodCount(queueName, methodCount);
190    callQueueInfo.setCallMethodSize(queueName, methodSize);
191
192
193    for (Runnable r:executor.getQueue()) {
194      FifoCallRunner mcr = (FifoCallRunner) r;
195      RpcCall rpcCall = mcr.getCallRunner().getRpcCall();
196
197      String method;
198
199      if (null==rpcCall.getMethod() ||
200            StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
201        method = "Unknown";
202      }
203
204      long size = rpcCall.getSize();
205
206      methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L));
207      methodSize.put(method, size + methodSize.getOrDefault(method, 0L));
208    }
209
210    return callQueueInfo;
211  }
212
213}