001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.util.Queue;
021import java.util.concurrent.atomic.AtomicInteger;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.Abortable;
024import org.apache.hadoop.hbase.HBaseInterfaceAudience;
025import org.apache.hadoop.hbase.conf.ConfigurationObserver;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.apache.yetus.audience.InterfaceStability;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import org.apache.hbase.thirdparty.com.google.protobuf.Message;
032
033import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
040
041/**
042 * RPC Executor that uses different queues for reads and writes. With the options to use different
043 * queues/executors for gets and scans. Each handler has its own queue and there is no stealing.
044 */
045@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
046@InterfaceStability.Evolving
047public class RWQueueRpcExecutor extends RpcExecutor {
048  private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);
049
050  public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
051    "hbase.ipc.server.callqueue.read.ratio";
052  public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
053    "hbase.ipc.server.callqueue.scan.ratio";
054
055  private final QueueBalancer writeBalancer;
056  private final QueueBalancer readBalancer;
057  private final QueueBalancer scanBalancer;
058  private final int writeHandlersCount;
059  private final int readHandlersCount;
060  private final int scanHandlersCount;
061  private final int numWriteQueues;
062  private final int numReadQueues;
063  private final int numScanQueues;
064
065  private final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0);
066  private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0);
067  private final AtomicInteger activeScanHandlerCount = new AtomicInteger(0);
068
069  public RWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
070    final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
071    super(name, handlerCount, maxQueueLength, priority, conf, abortable);
072
073    float callqReadShare = getReadShare(conf);
074    float callqScanShare = getScanShare(conf);
075
076    numWriteQueues = calcNumWriters(this.numCallQueues, callqReadShare);
077    writeHandlersCount = Math.max(numWriteQueues, calcNumWriters(handlerCount, callqReadShare));
078
079    int readQueues = calcNumReaders(this.numCallQueues, callqReadShare);
080    int readHandlers = Math.max(readQueues, calcNumReaders(handlerCount, callqReadShare));
081
082    int scanQueues = Math.max(0, (int) Math.floor(readQueues * callqScanShare));
083    int scanHandlers = Math.max(0, (int) Math.floor(readHandlers * callqScanShare));
084
085    if ((readQueues - scanQueues) > 0) {
086      readQueues -= scanQueues;
087      readHandlers -= scanHandlers;
088    } else {
089      scanQueues = 0;
090      scanHandlers = 0;
091    }
092
093    numReadQueues = readQueues;
094    readHandlersCount = readHandlers;
095    numScanQueues = scanQueues;
096    scanHandlersCount = scanHandlers;
097
098    initializeQueues(numWriteQueues);
099    initializeQueues(numReadQueues);
100    initializeQueues(numScanQueues);
101
102    this.writeBalancer = getBalancer(name, conf, queues.subList(0, numWriteQueues));
103    this.readBalancer =
104      getBalancer(name, conf, queues.subList(numWriteQueues, numWriteQueues + numReadQueues));
105    this.scanBalancer = numScanQueues > 0
106      ? getBalancer(name, conf,
107        queues.subList(numWriteQueues + numReadQueues,
108          numWriteQueues + numReadQueues + numScanQueues))
109      : null;
110
111    LOG.info(getName() + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount
112      + " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + " scanQueues="
113      + numScanQueues + " scanHandlers=" + scanHandlersCount);
114  }
115
116  @Override
117  protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) {
118    // at least 1 read queue and 1 write queue
119    return Math.max(2, (int) Math.round(handlerCount * callQueuesHandlersFactor));
120  }
121
122  @Override
123  protected void startHandlers(final int port) {
124    startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port,
125      activeWriteHandlerCount);
126    startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port,
127      activeReadHandlerCount);
128    if (numScanQueues > 0) {
129      startHandlers(".scan", scanHandlersCount, queues, numWriteQueues + numReadQueues,
130        numScanQueues, port, activeScanHandlerCount);
131    }
132  }
133
134  @Override
135  public boolean dispatch(final CallRunner callTask) {
136    RpcCall call = callTask.getRpcCall();
137    return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()),
138      shouldDispatchToScanQueue(callTask), callTask);
139  }
140
141  protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue,
142    final CallRunner callTask) {
143    int queueIndex;
144    if (toWriteQueue) {
145      queueIndex = writeBalancer.getNextQueue(callTask);
146    } else if (toScanQueue) {
147      queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue(callTask);
148    } else {
149      queueIndex = numWriteQueues + readBalancer.getNextQueue(callTask);
150    }
151
152    Queue<CallRunner> queue = queues.get(queueIndex);
153    if (queue.size() >= currentQueueLimit) {
154      return false;
155    }
156    return queue.offer(callTask);
157  }
158
159  @Override
160  public int getWriteQueueLength() {
161    int length = 0;
162    for (int i = 0; i < numWriteQueues; i++) {
163      length += queues.get(i).size();
164    }
165    return length;
166  }
167
168  @Override
169  public int getReadQueueLength() {
170    int length = 0;
171    for (int i = numWriteQueues; i < (numWriteQueues + numReadQueues); i++) {
172      length += queues.get(i).size();
173    }
174    return length;
175  }
176
177  @Override
178  public int getScanQueueLength() {
179    int length = 0;
180    for (int i = numWriteQueues + numReadQueues; i
181        < (numWriteQueues + numReadQueues + numScanQueues); i++) {
182      length += queues.get(i).size();
183    }
184    return length;
185  }
186
187  @Override
188  public int getActiveHandlerCount() {
189    return activeWriteHandlerCount.get() + activeReadHandlerCount.get()
190      + activeScanHandlerCount.get();
191  }
192
193  @Override
194  public int getActiveWriteHandlerCount() {
195    return activeWriteHandlerCount.get();
196  }
197
198  @Override
199  public int getActiveReadHandlerCount() {
200    return activeReadHandlerCount.get();
201  }
202
203  @Override
204  public int getActiveScanHandlerCount() {
205    return activeScanHandlerCount.get();
206  }
207
208  protected boolean isWriteRequest(final RequestHeader header, final Message param) {
209    // TODO: Is there a better way to do this?
210    if (param instanceof MultiRequest) {
211      MultiRequest multi = (MultiRequest) param;
212      for (RegionAction regionAction : multi.getRegionActionList()) {
213        for (Action action : regionAction.getActionList()) {
214          if (action.hasMutation()) {
215            return true;
216          }
217        }
218      }
219    }
220    if (param instanceof MutateRequest) {
221      return true;
222    }
223    // Below here are methods for master. It's a pretty brittle version of this.
224    // Not sure that master actually needs a read/write queue since 90% of requests to
225    // master are writing to status or changing the meta table.
226    // All other read requests are admin generated and can be processed whenever.
227    // However changing that would require a pretty drastic change and should be done for
228    // the next major release and not as a fix for HBASE-14239
229    if (param instanceof RegionServerStatusProtos.ReportRegionStateTransitionRequest) {
230      return true;
231    }
232    if (param instanceof RegionServerStatusProtos.RegionServerStartupRequest) {
233      return true;
234    }
235    if (param instanceof RegionServerStatusProtos.RegionServerReportRequest) {
236      return true;
237    }
238    return false;
239  }
240
241  QueueBalancer getWriteBalancer() {
242    return writeBalancer;
243  }
244
245  QueueBalancer getReadBalancer() {
246    return readBalancer;
247  }
248
249  QueueBalancer getScanBalancer() {
250    return scanBalancer;
251  }
252
253  private boolean isScanRequest(final RequestHeader header, final Message param) {
254    return param instanceof ScanRequest;
255  }
256
257  protected boolean shouldDispatchToScanQueue(final CallRunner task) {
258    RpcCall call = task.getRpcCall();
259    return numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam());
260  }
261
262  protected float getReadShare(final Configuration conf) {
263    return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
264  }
265
266  protected float getScanShare(final Configuration conf) {
267    return conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
268  }
269
270  /*
271   * Calculate the number of writers based on the "total count" and the read share. You'll get at
272   * least one writer.
273   */
274  private static int calcNumWriters(final int count, final float readShare) {
275    return Math.max(1, count - Math.max(1, (int) Math.round(count * readShare)));
276  }
277
278  /*
279   * Calculate the number of readers based on the "total count" and the read share. You'll get at
280   * least one reader.
281   */
282  private static int calcNumReaders(final int count, final float readShare) {
283    return count - calcNumWriters(count, readShare);
284  }
285
286  @Override
287  public void onConfigurationChange(Configuration conf) {
288    super.onConfigurationChange(conf);
289    propagateBalancerConfigChange(writeBalancer, conf);
290    propagateBalancerConfigChange(readBalancer, conf);
291    propagateBalancerConfigChange(scanBalancer, conf);
292  }
293
294  private void propagateBalancerConfigChange(QueueBalancer balancer, Configuration conf) {
295    if (balancer instanceof ConfigurationObserver) {
296      ((ConfigurationObserver) balancer).onConfigurationChange(conf);
297    }
298  }
299}