001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.lang.reflect.Method; 021import java.util.HashMap; 022import java.util.Map; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.HConstants; 025import org.apache.hadoop.hbase.ipc.AnnotationReadingPriorityFunction; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030import org.apache.hbase.thirdparty.com.google.protobuf.Message; 031import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 032 033import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 035import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 044 045/** 046 * Priority function specifically for the region server. 047 */ 048@InterfaceAudience.Private 049public class RSAnnotationReadingPriorityFunction 050 extends AnnotationReadingPriorityFunction<RSRpcServices> { 051 052 private static final Logger LOG = 053 LoggerFactory.getLogger(RSAnnotationReadingPriorityFunction.class); 054 055 /** Used to control the scan delay, currently sqrt(numNextCall * weight) */ 056 public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight"; 057 058 // QOS for internal meta read requests 059 public static final int INTERNAL_READ_QOS = 250; 060 061 @SuppressWarnings("unchecked") 062 private final Class<? extends Message>[] knownArgumentClasses = 063 new Class[] { GetRegionInfoRequest.class, GetStoreFileRequest.class, CloseRegionRequest.class, 064 FlushRegionRequest.class, CompactRegionRequest.class, GetRequest.class, MutateRequest.class, 065 ScanRequest.class }; 066 067 // Some caches for helping performance 068 private final Map<String, Class<? extends Message>> argumentToClassMap = new HashMap<>(); 069 private final Map<String, Map<Class<? extends Message>, Method>> methodMap = new HashMap<>(); 070 071 private final float scanVirtualTimeWeight; 072 073 RSAnnotationReadingPriorityFunction(RSRpcServices rpcServices) { 074 super(rpcServices); 075 if (methodMap.get("getRegion") == null) { 076 methodMap.put("hasRegion", new HashMap<>()); 077 methodMap.put("getRegion", new HashMap<>()); 078 } 079 for (Class<? extends Message> cls : knownArgumentClasses) { 080 argumentToClassMap.put(cls.getName(), cls); 081 try { 082 methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion")); 083 methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion")); 084 } catch (Exception e) { 085 throw new RuntimeException(e); 086 } 087 } 088 089 Configuration conf = rpcServices.getConfiguration(); 090 scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f); 091 } 092 093 @Override 094 protected int normalizePriority(int priority) { 095 return priority; 096 } 097 098 @Override 099 protected int getBasePriority(RequestHeader header, Message param) { 100 // Trust the client-set priorities if set 101 if (header.hasPriority()) { 102 return header.getPriority(); 103 } 104 if (param instanceof BulkLoadHFileRequest) { 105 return HConstants.BULKLOAD_QOS; 106 } 107 108 String cls = param.getClass().getName(); 109 Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls); 110 RegionSpecifier regionSpecifier = null; 111 // check whether the request has reference to meta region or now. 112 try { 113 // Check if the param has a region specifier; the pb methods are hasRegion and getRegion if 114 // hasRegion returns true. Not all listed methods have region specifier each time. For 115 // example, the ScanRequest has it on setup but thereafter relies on the scannerid rather than 116 // send the region over every time. 117 Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass); 118 if (hasRegion != null && (Boolean) hasRegion.invoke(param, (Object[]) null)) { 119 Method getRegion = methodMap.get("getRegion").get(rpcArgClass); 120 regionSpecifier = (RegionSpecifier) getRegion.invoke(param, (Object[]) null); 121 Region region = rpcServices.getRegion(regionSpecifier); 122 if (region.getRegionInfo().getTable().isSystemTable()) { 123 if (LOG.isTraceEnabled()) { 124 LOG.trace( 125 "High priority because region=" + region.getRegionInfo().getRegionNameAsString()); 126 } 127 return HConstants.SYSTEMTABLE_QOS; 128 } 129 } 130 } catch (Exception ex) { 131 // Not good throwing an exception out of here, a runtime anyways. Let the query go into the 132 // server and have it throw the exception if still an issue. Just mark it normal priority. 133 if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex); 134 return HConstants.NORMAL_QOS; 135 } 136 137 if (param instanceof ScanRequest) { // scanner methods... 138 ScanRequest request = (ScanRequest) param; 139 if (!request.hasScannerId()) { 140 return HConstants.NORMAL_QOS; 141 } 142 RegionScanner scanner = rpcServices.getScanner(request.getScannerId()); 143 if (scanner != null && scanner.getRegionInfo().getTable().isSystemTable()) { 144 if (LOG.isTraceEnabled()) { 145 // Scanner requests are small in size so TextFormat version should not overwhelm log. 146 LOG.trace("High priority scanner request " + TextFormat.shortDebugString(request)); 147 } 148 return HConstants.SYSTEMTABLE_QOS; 149 } 150 } 151 152 return HConstants.NORMAL_QOS; 153 } 154 155 /** 156 * Based on the request content, returns the deadline of the request. 157 * @return Deadline of this request. 0 now, otherwise msec of 'delay' 158 */ 159 @Override 160 public long getDeadline(RequestHeader header, Message param) { 161 if (param instanceof ScanRequest) { 162 ScanRequest request = (ScanRequest) param; 163 if (!request.hasScannerId()) { 164 return 0; 165 } 166 167 // get the 'virtual time' of the scanner, and applies sqrt() to get a 168 // nice curve for the delay. More a scanner is used the less priority it gets. 169 // The weight is used to have more control on the delay. 170 long vtime = rpcServices.getScannerVirtualTime(request.getScannerId()); 171 return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight)); 172 } 173 return 0; 174 } 175}