001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.lang.reflect.Method;
021import java.util.HashMap;
022import java.util.Map;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.ipc.PriorityFunction;
026import org.apache.hadoop.hbase.ipc.QosPriority;
027import org.apache.hadoop.hbase.security.User;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import org.apache.hbase.thirdparty.com.google.protobuf.Message;
033import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
034
035import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
045
046/**
047 * Reads special method annotations and table names to figure a priority for use by QoS facility in
048 * ipc; e.g: rpcs to hbase:meta get priority.
049 */
050// TODO: Remove.  This is doing way too much work just to figure a priority.  Do as Elliott
051// suggests and just have the client specify a priority.
052
053//The logic for figuring out high priority RPCs is as follows:
054//1. if the method is annotated with a QosPriority of QOS_HIGH,
055//   that is honored
056//2. parse out the protobuf message and see if the request is for meta
057//   region, and if so, treat it as a high priority RPC
058//Some optimizations for (2) are done here -
059//Clients send the argument classname as part of making the RPC. The server
060//decides whether to deserialize the proto argument message based on the
061//pre-established set of argument classes (knownArgumentClasses below).
062//This prevents the server from having to deserialize all proto argument
063//messages prematurely.
064//All the argument classes declare a 'getRegion' method that returns a
065//RegionSpecifier object. Methods can be invoked on the returned object
066//to figure out whether it is a meta region or not.
067@InterfaceAudience.Private
068public class AnnotationReadingPriorityFunction implements PriorityFunction {
069  private static final Logger LOG =
070    LoggerFactory.getLogger(AnnotationReadingPriorityFunction.class.getName());
071
072  /** Used to control the scan delay, currently sqrt(numNextCall * weight) */
073  public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight";
074
075  protected final Map<String, Integer> annotatedQos;
076  //We need to mock the regionserver instance for some unit tests (set via
077  //setRegionServer method.
078  private RSRpcServices rpcServices;
079  @SuppressWarnings("unchecked")
080  private final Class<? extends Message>[] knownArgumentClasses = new Class[]{
081      GetRegionInfoRequest.class,
082      GetStoreFileRequest.class,
083      CloseRegionRequest.class,
084      FlushRegionRequest.class,
085      CompactRegionRequest.class,
086      GetRequest.class,
087      MutateRequest.class,
088      ScanRequest.class
089  };
090
091  // Some caches for helping performance
092  private final Map<String, Class<? extends Message>> argumentToClassMap = new HashMap<>();
093  private final Map<String, Map<Class<? extends Message>, Method>> methodMap = new HashMap<>();
094
095  private final float scanVirtualTimeWeight;
096
097  /**
098   * Calls {@link #AnnotationReadingPriorityFunction(RSRpcServices, Class)} using the result of
099   * {@code rpcServices#getClass()}
100   *
101   * @param rpcServices
102   *          The RPC server implementation
103   */
104  public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) {
105    this(rpcServices, rpcServices.getClass());
106  }
107
108  /**
109   * Constructs the priority function given the RPC server implementation and the annotations on the
110   * methods in the provided {@code clz}.
111   *
112   * @param rpcServices
113   *          The RPC server implementation
114   * @param clz
115   *          The concrete RPC server implementation's class
116   */
117  public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices,
118      Class<? extends RSRpcServices> clz) {
119    Map<String,Integer> qosMap = new HashMap<>();
120    for (Method m : clz.getMethods()) {
121      QosPriority p = m.getAnnotation(QosPriority.class);
122      if (p != null) {
123        // Since we protobuf'd, and then subsequently, when we went with pb style, method names
124        // are capitalized.  This meant that this brittle compare of method names gotten by
125        // reflection no longer matched the method names coming in over pb.  TODO: Get rid of this
126        // check.  For now, workaround is to capitalize the names we got from reflection so they
127        // have chance of matching the pb ones.
128        String capitalizedMethodName = capitalize(m.getName());
129        qosMap.put(capitalizedMethodName, p.priority());
130      }
131    }
132    this.rpcServices = rpcServices;
133    this.annotatedQos = qosMap;
134    if (methodMap.get("getRegion") == null) {
135      methodMap.put("hasRegion", new HashMap<>());
136      methodMap.put("getRegion", new HashMap<>());
137    }
138    for (Class<? extends Message> cls : knownArgumentClasses) {
139      argumentToClassMap.put(cls.getName(), cls);
140      try {
141        methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion"));
142        methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion"));
143      } catch (Exception e) {
144        throw new RuntimeException(e);
145      }
146    }
147
148    Configuration conf = rpcServices.getConfiguration();
149    scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f);
150  }
151
152  private String capitalize(final String s) {
153    StringBuilder strBuilder = new StringBuilder(s);
154    strBuilder.setCharAt(0, Character.toUpperCase(strBuilder.charAt(0)));
155    return strBuilder.toString();
156  }
157
158  /**
159   * Returns a 'priority' based on the request type.
160   *
161   * Currently the returned priority is used for queue selection.
162   * See the SimpleRpcScheduler as example. It maintains a queue per 'priory type'
163   * HIGH_QOS (meta requests), REPLICATION_QOS (replication requests),
164   * NORMAL_QOS (user requests).
165   */
166  @Override
167  public int getPriority(RequestHeader header, Message param, User user) {
168    int priorityByAnnotation = getAnnotatedPriority(header);
169
170    if (priorityByAnnotation >= 0) {
171      return priorityByAnnotation;
172    }
173    return getBasePriority(header, param);
174  }
175
176  /**
177   * See if the method has an annotation.
178   * @param header
179   * @return Return the priority from the annotation. If there isn't
180   * an annotation, this returns something below zero.
181   */
182  protected int getAnnotatedPriority(RequestHeader header) {
183    String methodName = header.getMethodName();
184    Integer priorityByAnnotation = annotatedQos.get(methodName);
185    if (priorityByAnnotation != null) {
186      return priorityByAnnotation;
187    }
188    return -1;
189  }
190
191  /**
192   * Get the priority for a given request from the header and the param
193   * This doesn't consider which user is sending the request at all.
194   * This doesn't consider annotations
195   */
196  protected int getBasePriority(RequestHeader header, Message param) {
197    if (param == null) {
198      return HConstants.NORMAL_QOS;
199    }
200
201    // Trust the client-set priorities if set
202    if (header.hasPriority()) {
203      return header.getPriority();
204    }
205
206    String cls = param.getClass().getName();
207    Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
208    RegionSpecifier regionSpecifier = null;
209    //check whether the request has reference to meta region or now.
210    try {
211      // Check if the param has a region specifier; the pb methods are hasRegion and getRegion if
212      // hasRegion returns true.  Not all listed methods have region specifier each time.  For
213      // example, the ScanRequest has it on setup but thereafter relies on the scannerid rather than
214      // send the region over every time.
215      Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass);
216      if (hasRegion != null && (Boolean)hasRegion.invoke(param, (Object[])null)) {
217        Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
218        regionSpecifier = (RegionSpecifier)getRegion.invoke(param, (Object[])null);
219        Region region = rpcServices.getRegion(regionSpecifier);
220        if (region.getRegionInfo().getTable().isSystemTable()) {
221          if (LOG.isTraceEnabled()) {
222            LOG.trace("High priority because region=" +
223              region.getRegionInfo().getRegionNameAsString());
224          }
225          return HConstants.SYSTEMTABLE_QOS;
226        }
227      }
228    } catch (Exception ex) {
229      // Not good throwing an exception out of here, a runtime anyways.  Let the query go into the
230      // server and have it throw the exception if still an issue.  Just mark it normal priority.
231      if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex);
232      return HConstants.NORMAL_QOS;
233    }
234
235    if (param instanceof ScanRequest) { // scanner methods...
236      ScanRequest request = (ScanRequest)param;
237      if (!request.hasScannerId()) {
238        return HConstants.NORMAL_QOS;
239      }
240      RegionScanner scanner = rpcServices.getScanner(request.getScannerId());
241      if (scanner != null && scanner.getRegionInfo().getTable().isSystemTable()) {
242        if (LOG.isTraceEnabled()) {
243          // Scanner requests are small in size so TextFormat version should not overwhelm log.
244          LOG.trace("High priority scanner request " + TextFormat.shortDebugString(request));
245        }
246        return HConstants.SYSTEMTABLE_QOS;
247      }
248    }
249
250    return HConstants.NORMAL_QOS;
251  }
252
253  /**
254   * Based on the request content, returns the deadline of the request.
255   *
256   * @param header
257   * @param param
258   * @return Deadline of this request. 0 now, otherwise msec of 'delay'
259   */
260  @Override
261  public long getDeadline(RequestHeader header, Message param) {
262    if (param instanceof ScanRequest) {
263      ScanRequest request = (ScanRequest)param;
264      if (!request.hasScannerId()) {
265        return 0;
266      }
267
268      // get the 'virtual time' of the scanner, and applies sqrt() to get a
269      // nice curve for the delay. More a scanner is used the less priority it gets.
270      // The weight is used to have more control on the delay.
271      long vtime = rpcServices.getScannerVirtualTime(request.getScannerId());
272      return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight));
273    }
274    return 0;
275  }
276
277  void setRegionServer(final HRegionServer hrs) {
278    this.rpcServices = hrs.getRSRpcServices();
279  }
280}