001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.lang.reflect.Method;
021import java.util.HashMap;
022import java.util.Map;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.ipc.PriorityFunction;
026import org.apache.hadoop.hbase.ipc.QosPriority;
027import org.apache.hadoop.hbase.security.User;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import org.apache.hbase.thirdparty.com.google.protobuf.Message;
033import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
034
035import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
045
046/**
047 * Reads special method annotations and table names to figure a priority for use by QoS facility in
048 * ipc; e.g: rpcs to hbase:meta get priority.
049 */
050// TODO: Remove. This is doing way too much work just to figure a priority. Do as Elliott
051// suggests and just have the client specify a priority.
052
053// The logic for figuring out high priority RPCs is as follows:
054// 1. if the method is annotated with a QosPriority of QOS_HIGH,
055// that is honored
056// 2. parse out the protobuf message and see if the request is for meta
057// region, and if so, treat it as a high priority RPC
058// Some optimizations for (2) are done here -
059// Clients send the argument classname as part of making the RPC. The server
060// decides whether to deserialize the proto argument message based on the
061// pre-established set of argument classes (knownArgumentClasses below).
062// This prevents the server from having to deserialize all proto argument
063// messages prematurely.
064// All the argument classes declare a 'getRegion' method that returns a
065// RegionSpecifier object. Methods can be invoked on the returned object
066// to figure out whether it is a meta region or not.
067@InterfaceAudience.Private
068public class AnnotationReadingPriorityFunction implements PriorityFunction {
069  private static final Logger LOG =
070    LoggerFactory.getLogger(AnnotationReadingPriorityFunction.class.getName());
071
072  /** Used to control the scan delay, currently sqrt(numNextCall * weight) */
073  public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight";
074
075  protected final Map<String, Integer> annotatedQos;
076  // We need to mock the regionserver instance for some unit tests (set via
077  // setRegionServer method.
078  private RSRpcServices rpcServices;
079  @SuppressWarnings("unchecked")
080  private final Class<? extends Message>[] knownArgumentClasses =
081    new Class[] { GetRegionInfoRequest.class, GetStoreFileRequest.class, CloseRegionRequest.class,
082      FlushRegionRequest.class, CompactRegionRequest.class, GetRequest.class, MutateRequest.class,
083      ScanRequest.class };
084
085  // Some caches for helping performance
086  private final Map<String, Class<? extends Message>> argumentToClassMap = new HashMap<>();
087  private final Map<String, Map<Class<? extends Message>, Method>> methodMap = new HashMap<>();
088
089  private final float scanVirtualTimeWeight;
090
091  /**
092   * Calls {@link #AnnotationReadingPriorityFunction(RSRpcServices, Class)} using the result of
093   * {@code rpcServices#getClass()} n * The RPC server implementation
094   */
095  public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) {
096    this(rpcServices, rpcServices.getClass());
097  }
098
099  /**
100   * Constructs the priority function given the RPC server implementation and the annotations on the
101   * methods in the provided {@code clz}. n * The RPC server implementation n * The concrete RPC
102   * server implementation's class
103   */
104  public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices,
105    Class<? extends RSRpcServices> clz) {
106    Map<String, Integer> qosMap = new HashMap<>();
107    for (Method m : clz.getMethods()) {
108      QosPriority p = m.getAnnotation(QosPriority.class);
109      if (p != null) {
110        // Since we protobuf'd, and then subsequently, when we went with pb style, method names
111        // are capitalized. This meant that this brittle compare of method names gotten by
112        // reflection no longer matched the method names coming in over pb. TODO: Get rid of this
113        // check. For now, workaround is to capitalize the names we got from reflection so they
114        // have chance of matching the pb ones.
115        String capitalizedMethodName = capitalize(m.getName());
116        qosMap.put(capitalizedMethodName, p.priority());
117      }
118    }
119    this.rpcServices = rpcServices;
120    this.annotatedQos = qosMap;
121    if (methodMap.get("getRegion") == null) {
122      methodMap.put("hasRegion", new HashMap<>());
123      methodMap.put("getRegion", new HashMap<>());
124    }
125    for (Class<? extends Message> cls : knownArgumentClasses) {
126      argumentToClassMap.put(cls.getName(), cls);
127      try {
128        methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion"));
129        methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion"));
130      } catch (Exception e) {
131        throw new RuntimeException(e);
132      }
133    }
134
135    Configuration conf = rpcServices.getConfiguration();
136    scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f);
137  }
138
139  private String capitalize(final String s) {
140    StringBuilder strBuilder = new StringBuilder(s);
141    strBuilder.setCharAt(0, Character.toUpperCase(strBuilder.charAt(0)));
142    return strBuilder.toString();
143  }
144
145  /**
146   * Returns a 'priority' based on the request type. Currently the returned priority is used for
147   * queue selection. See the SimpleRpcScheduler as example. It maintains a queue per 'priory type'
148   * HIGH_QOS (meta requests), REPLICATION_QOS (replication requests), NORMAL_QOS (user requests).
149   */
150  @Override
151  public int getPriority(RequestHeader header, Message param, User user) {
152    int priorityByAnnotation = getAnnotatedPriority(header);
153
154    if (priorityByAnnotation >= 0) {
155      return priorityByAnnotation;
156    }
157    return getBasePriority(header, param);
158  }
159
160  /**
161   * See if the method has an annotation. n * @return Return the priority from the annotation. If
162   * there isn't an annotation, this returns something below zero.
163   */
164  protected int getAnnotatedPriority(RequestHeader header) {
165    String methodName = header.getMethodName();
166    Integer priorityByAnnotation = annotatedQos.get(methodName);
167    if (priorityByAnnotation != null) {
168      return priorityByAnnotation;
169    }
170    return -1;
171  }
172
173  /**
174   * Get the priority for a given request from the header and the param This doesn't consider which
175   * user is sending the request at all. This doesn't consider annotations
176   */
177  protected int getBasePriority(RequestHeader header, Message param) {
178    if (param == null) {
179      return HConstants.NORMAL_QOS;
180    }
181
182    // Trust the client-set priorities if set
183    if (header.hasPriority()) {
184      return header.getPriority();
185    }
186
187    String cls = param.getClass().getName();
188    Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
189    RegionSpecifier regionSpecifier = null;
190    // check whether the request has reference to meta region or now.
191    try {
192      // Check if the param has a region specifier; the pb methods are hasRegion and getRegion if
193      // hasRegion returns true. Not all listed methods have region specifier each time. For
194      // example, the ScanRequest has it on setup but thereafter relies on the scannerid rather than
195      // send the region over every time.
196      Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass);
197      if (hasRegion != null && (Boolean) hasRegion.invoke(param, (Object[]) null)) {
198        Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
199        regionSpecifier = (RegionSpecifier) getRegion.invoke(param, (Object[]) null);
200        Region region = rpcServices.getRegion(regionSpecifier);
201        if (region.getRegionInfo().getTable().isSystemTable()) {
202          if (LOG.isTraceEnabled()) {
203            LOG.trace(
204              "High priority because region=" + region.getRegionInfo().getRegionNameAsString());
205          }
206          return HConstants.SYSTEMTABLE_QOS;
207        }
208      }
209    } catch (Exception ex) {
210      // Not good throwing an exception out of here, a runtime anyways. Let the query go into the
211      // server and have it throw the exception if still an issue. Just mark it normal priority.
212      if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex);
213      return HConstants.NORMAL_QOS;
214    }
215
216    if (param instanceof ScanRequest) { // scanner methods...
217      ScanRequest request = (ScanRequest) param;
218      if (!request.hasScannerId()) {
219        return HConstants.NORMAL_QOS;
220      }
221      RegionScanner scanner = rpcServices.getScanner(request.getScannerId());
222      if (scanner != null && scanner.getRegionInfo().getTable().isSystemTable()) {
223        if (LOG.isTraceEnabled()) {
224          // Scanner requests are small in size so TextFormat version should not overwhelm log.
225          LOG.trace("High priority scanner request " + TextFormat.shortDebugString(request));
226        }
227        return HConstants.SYSTEMTABLE_QOS;
228      }
229    }
230
231    return HConstants.NORMAL_QOS;
232  }
233
234  /**
235   * Based on the request content, returns the deadline of the request. nn * @return Deadline of
236   * this request. 0 now, otherwise msec of 'delay'
237   */
238  @Override
239  public long getDeadline(RequestHeader header, Message param) {
240    if (param instanceof ScanRequest) {
241      ScanRequest request = (ScanRequest) param;
242      if (!request.hasScannerId()) {
243        return 0;
244      }
245
246      // get the 'virtual time' of the scanner, and applies sqrt() to get a
247      // nice curve for the delay. More a scanner is used the less priority it gets.
248      // The weight is used to have more control on the delay.
249      long vtime = rpcServices.getScannerVirtualTime(request.getScannerId());
250      return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight));
251    }
252    return 0;
253  }
254
255  void setRegionServer(final HRegionServer hrs) {
256    this.rpcServices = hrs.getRSRpcServices();
257  }
258}