001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.util.Queue;
021import java.util.concurrent.atomic.AtomicInteger;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.Abortable;
024import org.apache.hadoop.hbase.HBaseInterfaceAudience;
025import org.apache.hadoop.hbase.conf.ConfigurationObserver;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.apache.yetus.audience.InterfaceStability;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import org.apache.hbase.thirdparty.com.google.protobuf.Message;
032
033import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
040
041/**
042 * RPC Executor that uses different queues for reads and writes. With the options to use different
043 * queues/executors for gets and scans. Each handler has its own queue and there is no stealing.
044 */
045@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
046@InterfaceStability.Evolving
047public class RWQueueRpcExecutor extends RpcExecutor {
048  private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);
049
050  public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
051    "hbase.ipc.server.callqueue.read.ratio";
052  public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
053    "hbase.ipc.server.callqueue.scan.ratio";
054
055  private final QueueBalancer writeBalancer;
056  private final QueueBalancer readBalancer;
057  private final QueueBalancer scanBalancer;
058  private final int writeHandlersCount;
059  private final int readHandlersCount;
060  private final int scanHandlersCount;
061  private final int numWriteQueues;
062  private final int numReadQueues;
063  private final int numScanQueues;
064
065  private final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0);
066  private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0);
067  private final AtomicInteger activeScanHandlerCount = new AtomicInteger(0);
068
069  public RWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
070    final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
071    super(name, handlerCount, maxQueueLength, priority, conf, abortable);
072
073    float callqReadShare = getReadShare(conf);
074    float callqScanShare = getScanShare(conf);
075
076    numWriteQueues = calcNumWriters(this.numCallQueues, callqReadShare);
077    writeHandlersCount = Math.max(numWriteQueues, calcNumWriters(handlerCount, callqReadShare));
078
079    int readQueues = calcNumReaders(this.numCallQueues, callqReadShare);
080    int readHandlers = Math.max(readQueues, calcNumReaders(handlerCount, callqReadShare));
081
082    int scanHandlers = Math.max(0, (int) Math.floor(readHandlers * callqScanShare));
083    int scanQueues =
084      scanHandlers > 0 ? Math.max(1, (int) Math.floor(readQueues * callqScanShare)) : 0;
085
086    if (scanQueues > 0) {
087      // if scanQueues > 0, the handler count of read should > 0, then we make readQueues >= 1
088      readQueues = Math.max(1, readQueues - scanQueues);
089      readHandlers -= scanHandlers;
090    } else {
091      scanQueues = 0;
092      scanHandlers = 0;
093    }
094
095    numReadQueues = readQueues;
096    readHandlersCount = readHandlers;
097    numScanQueues = scanQueues;
098    scanHandlersCount = scanHandlers;
099
100    initializeQueues(numWriteQueues);
101    initializeQueues(numReadQueues);
102    initializeQueues(numScanQueues);
103
104    this.writeBalancer = getBalancer(name, conf, queues.subList(0, numWriteQueues));
105    this.readBalancer =
106      getBalancer(name, conf, queues.subList(numWriteQueues, numWriteQueues + numReadQueues));
107    this.scanBalancer = numScanQueues > 0
108      ? getBalancer(name, conf,
109        queues.subList(numWriteQueues + numReadQueues,
110          numWriteQueues + numReadQueues + numScanQueues))
111      : null;
112
113    LOG.info(getName() + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount
114      + " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + " scanQueues="
115      + numScanQueues + " scanHandlers=" + scanHandlersCount);
116  }
117
118  @Override
119  protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) {
120    // at least 1 read queue and 1 write queue
121    return Math.max(2, (int) Math.round(handlerCount * callQueuesHandlersFactor));
122  }
123
124  @Override
125  protected void startHandlers(final int port) {
126    startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port,
127      activeWriteHandlerCount);
128    startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port,
129      activeReadHandlerCount);
130    if (numScanQueues > 0) {
131      startHandlers(".scan", scanHandlersCount, queues, numWriteQueues + numReadQueues,
132        numScanQueues, port, activeScanHandlerCount);
133    }
134  }
135
136  @Override
137  public boolean dispatch(final CallRunner callTask) {
138    RpcCall call = callTask.getRpcCall();
139    return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()),
140      shouldDispatchToScanQueue(callTask), callTask);
141  }
142
143  protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue,
144    final CallRunner callTask) {
145    int queueIndex;
146    if (toWriteQueue) {
147      queueIndex = writeBalancer.getNextQueue(callTask);
148    } else if (toScanQueue) {
149      queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue(callTask);
150    } else {
151      queueIndex = numWriteQueues + readBalancer.getNextQueue(callTask);
152    }
153    Queue<CallRunner> queue = queues.get(queueIndex);
154    if (queue.size() >= currentQueueLimit) {
155      return false;
156    }
157    return queue.offer(callTask);
158  }
159
160  @Override
161  public int getWriteQueueLength() {
162    int length = 0;
163    for (int i = 0; i < numWriteQueues; i++) {
164      length += queues.get(i).size();
165    }
166    return length;
167  }
168
169  @Override
170  public int getReadQueueLength() {
171    int length = 0;
172    for (int i = numWriteQueues; i < (numWriteQueues + numReadQueues); i++) {
173      length += queues.get(i).size();
174    }
175    return length;
176  }
177
178  @Override
179  public int getScanQueueLength() {
180    int length = 0;
181    for (int i = numWriteQueues + numReadQueues; i
182        < (numWriteQueues + numReadQueues + numScanQueues); i++) {
183      length += queues.get(i).size();
184    }
185    return length;
186  }
187
188  @Override
189  public int getActiveHandlerCount() {
190    return activeWriteHandlerCount.get() + activeReadHandlerCount.get()
191      + activeScanHandlerCount.get();
192  }
193
194  @Override
195  public int getActiveWriteHandlerCount() {
196    return activeWriteHandlerCount.get();
197  }
198
199  @Override
200  public int getActiveReadHandlerCount() {
201    return activeReadHandlerCount.get();
202  }
203
204  @Override
205  public int getActiveScanHandlerCount() {
206    return activeScanHandlerCount.get();
207  }
208
209  protected boolean isWriteRequest(final RequestHeader header, final Message param) {
210    // TODO: Is there a better way to do this?
211    if (param instanceof MultiRequest) {
212      MultiRequest multi = (MultiRequest) param;
213      for (RegionAction regionAction : multi.getRegionActionList()) {
214        for (Action action : regionAction.getActionList()) {
215          if (action.hasMutation()) {
216            return true;
217          }
218        }
219      }
220    }
221    if (param instanceof MutateRequest) {
222      return true;
223    }
224    // Below here are methods for master. It's a pretty brittle version of this.
225    // Not sure that master actually needs a read/write queue since 90% of requests to
226    // master are writing to status or changing the meta table.
227    // All other read requests are admin generated and can be processed whenever.
228    // However changing that would require a pretty drastic change and should be done for
229    // the next major release and not as a fix for HBASE-14239
230    if (param instanceof RegionServerStatusProtos.ReportRegionStateTransitionRequest) {
231      return true;
232    }
233    if (param instanceof RegionServerStatusProtos.RegionServerStartupRequest) {
234      return true;
235    }
236    if (param instanceof RegionServerStatusProtos.RegionServerReportRequest) {
237      return true;
238    }
239    return false;
240  }
241
242  QueueBalancer getWriteBalancer() {
243    return writeBalancer;
244  }
245
246  QueueBalancer getReadBalancer() {
247    return readBalancer;
248  }
249
250  QueueBalancer getScanBalancer() {
251    return scanBalancer;
252  }
253
254  private boolean isScanRequest(final RequestHeader header, final Message param) {
255    return param instanceof ScanRequest;
256  }
257
258  protected boolean shouldDispatchToScanQueue(final CallRunner task) {
259    RpcCall call = task.getRpcCall();
260    return numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam());
261  }
262
263  protected float getReadShare(final Configuration conf) {
264    return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
265  }
266
267  protected float getScanShare(final Configuration conf) {
268    return conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
269  }
270
271  /*
272   * Calculate the number of writers based on the "total count" and the read share. You'll get at
273   * least one writer.
274   */
275  private static int calcNumWriters(final int count, final float readShare) {
276    return Math.max(1, count - Math.max(1, (int) Math.round(count * readShare)));
277  }
278
279  /*
280   * Calculate the number of readers based on the "total count" and the read share. You'll get at
281   * least one reader.
282   */
283  private static int calcNumReaders(final int count, final float readShare) {
284    return count - calcNumWriters(count, readShare);
285  }
286
287  @Override
288  public void onConfigurationChange(Configuration conf) {
289    super.onConfigurationChange(conf);
290    propagateBalancerConfigChange(writeBalancer, conf);
291    propagateBalancerConfigChange(readBalancer, conf);
292    propagateBalancerConfigChange(scanBalancer, conf);
293  }
294
295  private void propagateBalancerConfigChange(QueueBalancer balancer, Configuration conf) {
296    if (balancer instanceof ConfigurationObserver) {
297      ((ConfigurationObserver) balancer).onConfigurationChange(conf);
298    }
299  }
300}