001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.lang.reflect.Method;
021import java.util.HashMap;
022import java.util.Map;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029import org.apache.hadoop.hbase.ipc.PriorityFunction;
030import org.apache.hadoop.hbase.ipc.QosPriority;
031import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
032import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
033import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
041
042import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
043import org.apache.hbase.thirdparty.com.google.protobuf.Message;
044import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
045import org.apache.hadoop.hbase.security.User;
046
047/**
048 * Reads special method annotations and table names to figure a priority for use by QoS facility in
049 * ipc; e.g: rpcs to hbase:meta get priority.
050 */
051// TODO: Remove.  This is doing way too much work just to figure a priority.  Do as Elliott
052// suggests and just have the client specify a priority.
053
054//The logic for figuring out high priority RPCs is as follows:
055//1. if the method is annotated with a QosPriority of QOS_HIGH,
056//   that is honored
057//2. parse out the protobuf message and see if the request is for meta
058//   region, and if so, treat it as a high priority RPC
059//Some optimizations for (2) are done here -
060//Clients send the argument classname as part of making the RPC. The server
061//decides whether to deserialize the proto argument message based on the
062//pre-established set of argument classes (knownArgumentClasses below).
063//This prevents the server from having to deserialize all proto argument
064//messages prematurely.
065//All the argument classes declare a 'getRegion' method that returns a
066//RegionSpecifier object. Methods can be invoked on the returned object
067//to figure out whether it is a meta region or not.
068@InterfaceAudience.Private
069public class AnnotationReadingPriorityFunction implements PriorityFunction {
070  private static final Logger LOG =
071    LoggerFactory.getLogger(AnnotationReadingPriorityFunction.class.getName());
072
073  /** Used to control the scan delay, currently sqrt(numNextCall * weight) */
074  public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight";
075
076  protected final Map<String, Integer> annotatedQos;
077  //We need to mock the regionserver instance for some unit tests (set via
078  //setRegionServer method.
079  private RSRpcServices rpcServices;
080  @SuppressWarnings("unchecked")
081  private final Class<? extends Message>[] knownArgumentClasses = new Class[]{
082      GetRegionInfoRequest.class,
083      GetStoreFileRequest.class,
084      CloseRegionRequest.class,
085      FlushRegionRequest.class,
086      CompactRegionRequest.class,
087      GetRequest.class,
088      MutateRequest.class,
089      ScanRequest.class
090  };
091
092  // Some caches for helping performance
093  private final Map<String, Class<? extends Message>> argumentToClassMap = new HashMap<>();
094  private final Map<String, Map<Class<? extends Message>, Method>> methodMap = new HashMap<>();
095
096  private final float scanVirtualTimeWeight;
097
098  /**
099   * Calls {@link #AnnotationReadingPriorityFunction(RSRpcServices, Class)} using the result of
100   * {@code rpcServices#getClass()}
101   *
102   * @param rpcServices
103   *          The RPC server implementation
104   */
105  public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) {
106    this(rpcServices, rpcServices.getClass());
107  }
108
109  /**
110   * Constructs the priority function given the RPC server implementation and the annotations on the
111   * methods in the provided {@code clz}.
112   *
113   * @param rpcServices
114   *          The RPC server implementation
115   * @param clz
116   *          The concrete RPC server implementation's class
117   */
118  public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices,
119      Class<? extends RSRpcServices> clz) {
120    Map<String,Integer> qosMap = new HashMap<>();
121    for (Method m : clz.getMethods()) {
122      QosPriority p = m.getAnnotation(QosPriority.class);
123      if (p != null) {
124        // Since we protobuf'd, and then subsequently, when we went with pb style, method names
125        // are capitalized.  This meant that this brittle compare of method names gotten by
126        // reflection no longer matched the method names coming in over pb.  TODO: Get rid of this
127        // check.  For now, workaround is to capitalize the names we got from reflection so they
128        // have chance of matching the pb ones.
129        String capitalizedMethodName = capitalize(m.getName());
130        qosMap.put(capitalizedMethodName, p.priority());
131      }
132    }
133    this.rpcServices = rpcServices;
134    this.annotatedQos = qosMap;
135    if (methodMap.get("getRegion") == null) {
136      methodMap.put("hasRegion", new HashMap<>());
137      methodMap.put("getRegion", new HashMap<>());
138    }
139    for (Class<? extends Message> cls : knownArgumentClasses) {
140      argumentToClassMap.put(cls.getName(), cls);
141      try {
142        methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion"));
143        methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion"));
144      } catch (Exception e) {
145        throw new RuntimeException(e);
146      }
147    }
148
149    Configuration conf = rpcServices.getConfiguration();
150    scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f);
151  }
152
153  private String capitalize(final String s) {
154    StringBuilder strBuilder = new StringBuilder(s);
155    strBuilder.setCharAt(0, Character.toUpperCase(strBuilder.charAt(0)));
156    return strBuilder.toString();
157  }
158
159  /**
160   * Returns a 'priority' based on the request type.
161   *
162   * Currently the returned priority is used for queue selection.
163   * See the SimpleRpcScheduler as example. It maintains a queue per 'priory type'
164   * HIGH_QOS (meta requests), REPLICATION_QOS (replication requests),
165   * NORMAL_QOS (user requests).
166   */
167  @Override
168  public int getPriority(RequestHeader header, Message param, User user) {
169    int priorityByAnnotation = getAnnotatedPriority(header);
170
171    if (priorityByAnnotation >= 0) {
172      return priorityByAnnotation;
173    }
174    return getBasePriority(header, param);
175  }
176
177  /**
178   * See if the method has an annotation.
179   * @param header
180   * @return Return the priority from the annotation. If there isn't
181   * an annotation, this returns something below zero.
182   */
183  protected int getAnnotatedPriority(RequestHeader header) {
184    String methodName = header.getMethodName();
185    Integer priorityByAnnotation = annotatedQos.get(methodName);
186    if (priorityByAnnotation != null) {
187      return priorityByAnnotation;
188    }
189    return -1;
190  }
191
192  /**
193   * Get the priority for a given request from the header and the param
194   * This doesn't consider which user is sending the request at all.
195   * This doesn't consider annotations
196   */
197  protected int getBasePriority(RequestHeader header, Message param) {
198    if (param == null) {
199      return HConstants.NORMAL_QOS;
200    }
201
202    // Trust the client-set priorities if set
203    if (header.hasPriority()) {
204      return header.getPriority();
205    }
206
207    String cls = param.getClass().getName();
208    Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
209    RegionSpecifier regionSpecifier = null;
210    //check whether the request has reference to meta region or now.
211    try {
212      // Check if the param has a region specifier; the pb methods are hasRegion and getRegion if
213      // hasRegion returns true.  Not all listed methods have region specifier each time.  For
214      // example, the ScanRequest has it on setup but thereafter relies on the scannerid rather than
215      // send the region over every time.
216      Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass);
217      if (hasRegion != null && (Boolean)hasRegion.invoke(param, (Object[])null)) {
218        Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
219        regionSpecifier = (RegionSpecifier)getRegion.invoke(param, (Object[])null);
220        Region region = rpcServices.getRegion(regionSpecifier);
221        if (region.getRegionInfo().getTable().isSystemTable()) {
222          if (LOG.isTraceEnabled()) {
223            LOG.trace("High priority because region=" +
224              region.getRegionInfo().getRegionNameAsString());
225          }
226          return HConstants.SYSTEMTABLE_QOS;
227        }
228      }
229    } catch (Exception ex) {
230      // Not good throwing an exception out of here, a runtime anyways.  Let the query go into the
231      // server and have it throw the exception if still an issue.  Just mark it normal priority.
232      if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex);
233      return HConstants.NORMAL_QOS;
234    }
235
236    if (param instanceof ScanRequest) { // scanner methods...
237      ScanRequest request = (ScanRequest)param;
238      if (!request.hasScannerId()) {
239        return HConstants.NORMAL_QOS;
240      }
241      RegionScanner scanner = rpcServices.getScanner(request.getScannerId());
242      if (scanner != null && scanner.getRegionInfo().getTable().isSystemTable()) {
243        if (LOG.isTraceEnabled()) {
244          // Scanner requests are small in size so TextFormat version should not overwhelm log.
245          LOG.trace("High priority scanner request " + TextFormat.shortDebugString(request));
246        }
247        return HConstants.SYSTEMTABLE_QOS;
248      }
249    }
250
251    return HConstants.NORMAL_QOS;
252  }
253
254  /**
255   * Based on the request content, returns the deadline of the request.
256   *
257   * @param header
258   * @param param
259   * @return Deadline of this request. 0 now, otherwise msec of 'delay'
260   */
261  @Override
262  public long getDeadline(RequestHeader header, Message param) {
263    if (param instanceof ScanRequest) {
264      ScanRequest request = (ScanRequest)param;
265      if (!request.hasScannerId()) {
266        return 0;
267      }
268
269      // get the 'virtual time' of the scanner, and applies sqrt() to get a
270      // nice curve for the delay. More a scanner is used the less priority it gets.
271      // The weight is used to have more control on the delay.
272      long vtime = rpcServices.getScannerVirtualTime(request.getScannerId());
273      return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight));
274    }
275    return 0;
276  }
277
278  @VisibleForTesting
279  void setRegionServer(final HRegionServer hrs) {
280    this.rpcServices = hrs.getRSRpcServices();
281  }
282}