001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.lang.reflect.Method;
021import java.util.HashMap;
022import java.util.Map;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.ipc.AnnotationReadingPriorityFunction;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030import org.apache.hbase.thirdparty.com.google.protobuf.Message;
031import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
032
033import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
043
044/**
045 * Priority function specifically for the region server.
046 */
047@InterfaceAudience.Private
048class RSAnnotationReadingPriorityFunction extends AnnotationReadingPriorityFunction<RSRpcServices> {
049
050  private static final Logger LOG =
051    LoggerFactory.getLogger(RSAnnotationReadingPriorityFunction.class);
052
053  /** Used to control the scan delay, currently sqrt(numNextCall * weight) */
054  public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight";
055
056  @SuppressWarnings("unchecked")
057  private final Class<? extends Message>[] knownArgumentClasses =
058    new Class[] { GetRegionInfoRequest.class, GetStoreFileRequest.class, CloseRegionRequest.class,
059      FlushRegionRequest.class, CompactRegionRequest.class, GetRequest.class, MutateRequest.class,
060      ScanRequest.class };
061
062  // Some caches for helping performance
063  private final Map<String, Class<? extends Message>> argumentToClassMap = new HashMap<>();
064  private final Map<String, Map<Class<? extends Message>, Method>> methodMap = new HashMap<>();
065
066  private final float scanVirtualTimeWeight;
067
068  RSAnnotationReadingPriorityFunction(RSRpcServices rpcServices) {
069    super(rpcServices);
070    if (methodMap.get("getRegion") == null) {
071      methodMap.put("hasRegion", new HashMap<>());
072      methodMap.put("getRegion", new HashMap<>());
073    }
074    for (Class<? extends Message> cls : knownArgumentClasses) {
075      argumentToClassMap.put(cls.getName(), cls);
076      try {
077        methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion"));
078        methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion"));
079      } catch (Exception e) {
080        throw new RuntimeException(e);
081      }
082    }
083
084    Configuration conf = rpcServices.getConfiguration();
085    scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f);
086  }
087
088  @Override
089  protected int normalizePriority(int priority) {
090    return priority;
091  }
092
093  @Override
094  protected int getBasePriority(RequestHeader header, Message param) {
095    // Trust the client-set priorities if set
096    if (header.hasPriority()) {
097      return header.getPriority();
098    }
099    String cls = param.getClass().getName();
100    Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
101    RegionSpecifier regionSpecifier = null;
102    // check whether the request has reference to meta region or now.
103    try {
104      // Check if the param has a region specifier; the pb methods are hasRegion and getRegion if
105      // hasRegion returns true. Not all listed methods have region specifier each time. For
106      // example, the ScanRequest has it on setup but thereafter relies on the scannerid rather than
107      // send the region over every time.
108      Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass);
109      if (hasRegion != null && (Boolean) hasRegion.invoke(param, (Object[]) null)) {
110        Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
111        regionSpecifier = (RegionSpecifier) getRegion.invoke(param, (Object[]) null);
112        Region region = rpcServices.getRegion(regionSpecifier);
113        if (region.getRegionInfo().getTable().isSystemTable()) {
114          if (LOG.isTraceEnabled()) {
115            LOG.trace(
116              "High priority because region=" + region.getRegionInfo().getRegionNameAsString());
117          }
118          return HConstants.SYSTEMTABLE_QOS;
119        }
120      }
121    } catch (Exception ex) {
122      // Not good throwing an exception out of here, a runtime anyways. Let the query go into the
123      // server and have it throw the exception if still an issue. Just mark it normal priority.
124      if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex);
125      return HConstants.NORMAL_QOS;
126    }
127
128    if (param instanceof ScanRequest) { // scanner methods...
129      ScanRequest request = (ScanRequest) param;
130      if (!request.hasScannerId()) {
131        return HConstants.NORMAL_QOS;
132      }
133      RegionScanner scanner = rpcServices.getScanner(request.getScannerId());
134      if (scanner != null && scanner.getRegionInfo().getTable().isSystemTable()) {
135        if (LOG.isTraceEnabled()) {
136          // Scanner requests are small in size so TextFormat version should not overwhelm log.
137          LOG.trace("High priority scanner request " + TextFormat.shortDebugString(request));
138        }
139        return HConstants.SYSTEMTABLE_QOS;
140      }
141    }
142
143    return HConstants.NORMAL_QOS;
144  }
145
146  /**
147   * Based on the request content, returns the deadline of the request.
148   * @return Deadline of this request. 0 now, otherwise msec of 'delay'
149   */
150  @Override
151  public long getDeadline(RequestHeader header, Message param) {
152    if (param instanceof ScanRequest) {
153      ScanRequest request = (ScanRequest) param;
154      if (!request.hasScannerId()) {
155        return 0;
156      }
157
158      // get the 'virtual time' of the scanner, and applies sqrt() to get a
159      // nice curve for the delay. More a scanner is used the less priority it gets.
160      // The weight is used to have more control on the delay.
161      long vtime = rpcServices.getScannerVirtualTime(request.getScannerId());
162      return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight));
163    }
164    return 0;
165  }
166}