001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.lang.reflect.Method;
021import java.util.HashMap;
022import java.util.Map;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.ipc.AnnotationReadingPriorityFunction;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030import org.apache.hbase.thirdparty.com.google.protobuf.Message;
031import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
032
033import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
044
045/**
046 * Priority function specifically for the region server.
047 */
048@InterfaceAudience.Private
049class RSAnnotationReadingPriorityFunction extends AnnotationReadingPriorityFunction<RSRpcServices> {
050
051  private static final Logger LOG =
052    LoggerFactory.getLogger(RSAnnotationReadingPriorityFunction.class);
053
054  /** Used to control the scan delay, currently sqrt(numNextCall * weight) */
055  public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight";
056
057  @SuppressWarnings("unchecked")
058  private final Class<? extends Message>[] knownArgumentClasses =
059    new Class[] { GetRegionInfoRequest.class, GetStoreFileRequest.class, CloseRegionRequest.class,
060      FlushRegionRequest.class, CompactRegionRequest.class, GetRequest.class, MutateRequest.class,
061      ScanRequest.class };
062
063  // Some caches for helping performance
064  private final Map<String, Class<? extends Message>> argumentToClassMap = new HashMap<>();
065  private final Map<String, Map<Class<? extends Message>, Method>> methodMap = new HashMap<>();
066
067  private final float scanVirtualTimeWeight;
068
069  RSAnnotationReadingPriorityFunction(RSRpcServices rpcServices) {
070    super(rpcServices);
071    if (methodMap.get("getRegion") == null) {
072      methodMap.put("hasRegion", new HashMap<>());
073      methodMap.put("getRegion", new HashMap<>());
074    }
075    for (Class<? extends Message> cls : knownArgumentClasses) {
076      argumentToClassMap.put(cls.getName(), cls);
077      try {
078        methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion"));
079        methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion"));
080      } catch (Exception e) {
081        throw new RuntimeException(e);
082      }
083    }
084
085    Configuration conf = rpcServices.getConfiguration();
086    scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f);
087  }
088
089  @Override
090  protected int normalizePriority(int priority) {
091    return priority;
092  }
093
094  @Override
095  protected int getBasePriority(RequestHeader header, Message param) {
096    // Trust the client-set priorities if set
097    if (header.hasPriority()) {
098      return header.getPriority();
099    }
100    if (param instanceof BulkLoadHFileRequest) {
101      return HConstants.BULKLOAD_QOS;
102    }
103
104    String cls = param.getClass().getName();
105    Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
106    RegionSpecifier regionSpecifier = null;
107    // check whether the request has reference to meta region or now.
108    try {
109      // Check if the param has a region specifier; the pb methods are hasRegion and getRegion if
110      // hasRegion returns true. Not all listed methods have region specifier each time. For
111      // example, the ScanRequest has it on setup but thereafter relies on the scannerid rather than
112      // send the region over every time.
113      Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass);
114      if (hasRegion != null && (Boolean) hasRegion.invoke(param, (Object[]) null)) {
115        Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
116        regionSpecifier = (RegionSpecifier) getRegion.invoke(param, (Object[]) null);
117        Region region = rpcServices.getRegion(regionSpecifier);
118        if (region.getRegionInfo().getTable().isSystemTable()) {
119          if (LOG.isTraceEnabled()) {
120            LOG.trace(
121              "High priority because region=" + region.getRegionInfo().getRegionNameAsString());
122          }
123          return HConstants.SYSTEMTABLE_QOS;
124        }
125      }
126    } catch (Exception ex) {
127      // Not good throwing an exception out of here, a runtime anyways. Let the query go into the
128      // server and have it throw the exception if still an issue. Just mark it normal priority.
129      if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex);
130      return HConstants.NORMAL_QOS;
131    }
132
133    if (param instanceof ScanRequest) { // scanner methods...
134      ScanRequest request = (ScanRequest) param;
135      if (!request.hasScannerId()) {
136        return HConstants.NORMAL_QOS;
137      }
138      RegionScanner scanner = rpcServices.getScanner(request.getScannerId());
139      if (scanner != null && scanner.getRegionInfo().getTable().isSystemTable()) {
140        if (LOG.isTraceEnabled()) {
141          // Scanner requests are small in size so TextFormat version should not overwhelm log.
142          LOG.trace("High priority scanner request " + TextFormat.shortDebugString(request));
143        }
144        return HConstants.SYSTEMTABLE_QOS;
145      }
146    }
147
148    return HConstants.NORMAL_QOS;
149  }
150
151  /**
152   * Based on the request content, returns the deadline of the request.
153   * @return Deadline of this request. 0 now, otherwise msec of 'delay'
154   */
155  @Override
156  public long getDeadline(RequestHeader header, Message param) {
157    if (param instanceof ScanRequest) {
158      ScanRequest request = (ScanRequest) param;
159      if (!request.hasScannerId()) {
160        return 0;
161      }
162
163      // get the 'virtual time' of the scanner, and applies sqrt() to get a
164      // nice curve for the delay. More a scanner is used the less priority it gets.
165      // The weight is used to have more control on the delay.
166      long vtime = rpcServices.getScannerVirtualTime(request.getScannerId());
167      return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight));
168    }
169    return 0;
170  }
171}