001/**
002
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.ipc;
021
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.atomic.AtomicInteger;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.Abortable;
027import org.apache.hadoop.hbase.HBaseInterfaceAudience;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.yetus.audience.InterfaceStability;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
033import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
039import org.apache.hbase.thirdparty.com.google.protobuf.Message;
040
041/**
042 * RPC Executor that uses different queues for reads and writes.
043 * With the options to use different queues/executors for gets and scans.
044 * Each handler has its own queue and there is no stealing.
045 */
046@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
047@InterfaceStability.Evolving
048public class RWQueueRpcExecutor extends RpcExecutor {
049  private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);
050
051  public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
052      "hbase.ipc.server.callqueue.read.ratio";
053  public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY =
054      "hbase.ipc.server.callqueue.scan.ratio";
055
056  private final QueueBalancer writeBalancer;
057  private final QueueBalancer readBalancer;
058  private final QueueBalancer scanBalancer;
059  private final int writeHandlersCount;
060  private final int readHandlersCount;
061  private final int scanHandlersCount;
062  private final int numWriteQueues;
063  private final int numReadQueues;
064  private final int numScanQueues;
065
066  private final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0);
067  private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0);
068  private final AtomicInteger activeScanHandlerCount = new AtomicInteger(0);
069
070  public RWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
071      final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
072    super(name, handlerCount, maxQueueLength, priority, conf, abortable);
073
074    float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
075    float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
076
077    numWriteQueues = calcNumWriters(this.numCallQueues, callqReadShare);
078    writeHandlersCount = Math.max(numWriteQueues, calcNumWriters(handlerCount, callqReadShare));
079
080    int readQueues = calcNumReaders(this.numCallQueues, callqReadShare);
081    int readHandlers = Math.max(readQueues, calcNumReaders(handlerCount, callqReadShare));
082
083    int scanQueues = Math.max(0, (int)Math.floor(readQueues * callqScanShare));
084    int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * callqScanShare));
085
086    if ((readQueues - scanQueues) > 0) {
087      readQueues -= scanQueues;
088      readHandlers -= scanHandlers;
089    } else {
090      scanQueues = 0;
091      scanHandlers = 0;
092    }
093
094    numReadQueues = readQueues;
095    readHandlersCount = readHandlers;
096    numScanQueues = scanQueues;
097    scanHandlersCount = scanHandlers;
098
099    this.writeBalancer = getBalancer(numWriteQueues);
100    this.readBalancer = getBalancer(numReadQueues);
101    this.scanBalancer = numScanQueues > 0 ? getBalancer(numScanQueues) : null;
102
103    initializeQueues(numWriteQueues);
104    initializeQueues(numReadQueues);
105    initializeQueues(numScanQueues);
106
107    LOG.info(getName() + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount
108      + " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + " scanQueues="
109      + numScanQueues + " scanHandlers=" + scanHandlersCount);
110  }
111
112  @Override
113  protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) {
114    // at least 1 read queue and 1 write queue
115    return Math.max(2, (int) Math.round(handlerCount * callQueuesHandlersFactor));
116  }
117
118  @Override
119  protected void startHandlers(final int port) {
120    startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port,
121      activeWriteHandlerCount);
122    startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port,
123      activeReadHandlerCount);
124    if (numScanQueues > 0) {
125      startHandlers(".scan", scanHandlersCount, queues, numWriteQueues + numReadQueues,
126        numScanQueues, port, activeScanHandlerCount);
127    }
128  }
129
130  @Override
131  public boolean dispatch(final CallRunner callTask) throws InterruptedException {
132    RpcCall call = callTask.getRpcCall();
133    int queueIndex;
134    if (isWriteRequest(call.getHeader(), call.getParam())) {
135      queueIndex = writeBalancer.getNextQueue();
136    } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam())) {
137      queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
138    } else {
139      queueIndex = numWriteQueues + readBalancer.getNextQueue();
140    }
141
142    BlockingQueue<CallRunner> queue = queues.get(queueIndex);
143    if (queue.size() >= currentQueueLimit) {
144      return false;
145    }
146    return queue.offer(callTask);
147  }
148
149  @Override
150  public int getWriteQueueLength() {
151    int length = 0;
152    for (int i = 0; i < numWriteQueues; i++) {
153      length += queues.get(i).size();
154    }
155    return length;
156  }
157
158  @Override
159  public int getReadQueueLength() {
160    int length = 0;
161    for (int i = numWriteQueues; i < (numWriteQueues + numReadQueues); i++) {
162      length += queues.get(i).size();
163    }
164    return length;
165  }
166
167  @Override
168  public int getScanQueueLength() {
169    int length = 0;
170    for (int i = numWriteQueues + numReadQueues;
171        i < (numWriteQueues + numReadQueues + numScanQueues); i++) {
172      length += queues.get(i).size();
173    }
174    return length;
175  }
176
177  @Override
178  public int getActiveHandlerCount() {
179    return activeWriteHandlerCount.get() + activeReadHandlerCount.get()
180        + activeScanHandlerCount.get();
181  }
182
183  @Override
184  public int getActiveWriteHandlerCount() {
185    return activeWriteHandlerCount.get();
186  }
187
188  @Override
189  public int getActiveReadHandlerCount() {
190    return activeReadHandlerCount.get();
191  }
192
193  @Override
194  public int getActiveScanHandlerCount() {
195    return activeScanHandlerCount.get();
196  }
197
198  private boolean isWriteRequest(final RequestHeader header, final Message param) {
199    // TODO: Is there a better way to do this?
200    if (param instanceof MultiRequest) {
201      MultiRequest multi = (MultiRequest)param;
202      for (RegionAction regionAction : multi.getRegionActionList()) {
203        for (Action action: regionAction.getActionList()) {
204          if (action.hasMutation()) {
205            return true;
206          }
207        }
208      }
209    }
210    if (param instanceof MutateRequest) {
211      return true;
212    }
213    // Below here are methods for master. It's a pretty brittle version of this.
214    // Not sure that master actually needs a read/write queue since 90% of requests to
215    // master are writing to status or changing the meta table.
216    // All other read requests are admin generated and can be processed whenever.
217    // However changing that would require a pretty drastic change and should be done for
218    // the next major release and not as a fix for HBASE-14239
219    if (param instanceof RegionServerStatusProtos.ReportRegionStateTransitionRequest) {
220      return true;
221    }
222    if (param instanceof RegionServerStatusProtos.RegionServerStartupRequest) {
223      return true;
224    }
225    if (param instanceof RegionServerStatusProtos.RegionServerReportRequest) {
226      return true;
227    }
228    return false;
229  }
230
231  private boolean isScanRequest(final RequestHeader header, final Message param) {
232    if (param instanceof ScanRequest) {
233      // The first scan request will be executed as a "short read"
234      ScanRequest request = (ScanRequest)param;
235      return request.hasScannerId();
236    }
237    return false;
238  }
239
240  /*
241   * Calculate the number of writers based on the "total count" and the read share.
242   * You'll get at least one writer.
243   */
244  private static int calcNumWriters(final int count, final float readShare) {
245    return Math.max(1, count - Math.max(1, (int)Math.round(count * readShare)));
246  }
247
248  /*
249   * Calculate the number of readers based on the "total count" and the read share.
250   * You'll get at least one reader.
251   */
252  private static int calcNumReaders(final int count, final float readShare) {
253    return count - calcNumWriters(count, readShare);
254  }
255}