001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.lang.reflect.Method; 021import java.util.HashMap; 022import java.util.Map; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029import org.apache.hadoop.hbase.ipc.PriorityFunction; 030import org.apache.hadoop.hbase.ipc.QosPriority; 031import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 032import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 033import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 035import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 041 042import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 043import org.apache.hbase.thirdparty.com.google.protobuf.Message; 044import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 045import org.apache.hadoop.hbase.security.User; 046 047/** 048 * Reads special method annotations and table names to figure a priority for use by QoS facility in 049 * ipc; e.g: rpcs to hbase:meta get priority. 050 */ 051// TODO: Remove. This is doing way too much work just to figure a priority. Do as Elliott 052// suggests and just have the client specify a priority. 053 054//The logic for figuring out high priority RPCs is as follows: 055//1. if the method is annotated with a QosPriority of QOS_HIGH, 056// that is honored 057//2. parse out the protobuf message and see if the request is for meta 058// region, and if so, treat it as a high priority RPC 059//Some optimizations for (2) are done here - 060//Clients send the argument classname as part of making the RPC. The server 061//decides whether to deserialize the proto argument message based on the 062//pre-established set of argument classes (knownArgumentClasses below). 063//This prevents the server from having to deserialize all proto argument 064//messages prematurely. 065//All the argument classes declare a 'getRegion' method that returns a 066//RegionSpecifier object. Methods can be invoked on the returned object 067//to figure out whether it is a meta region or not. 068@InterfaceAudience.Private 069public class AnnotationReadingPriorityFunction implements PriorityFunction { 070 private static final Logger LOG = 071 LoggerFactory.getLogger(AnnotationReadingPriorityFunction.class.getName()); 072 073 /** Used to control the scan delay, currently sqrt(numNextCall * weight) */ 074 public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight"; 075 076 protected final Map<String, Integer> annotatedQos; 077 //We need to mock the regionserver instance for some unit tests (set via 078 //setRegionServer method. 079 private RSRpcServices rpcServices; 080 @SuppressWarnings("unchecked") 081 private final Class<? extends Message>[] knownArgumentClasses = new Class[]{ 082 GetRegionInfoRequest.class, 083 GetStoreFileRequest.class, 084 CloseRegionRequest.class, 085 FlushRegionRequest.class, 086 CompactRegionRequest.class, 087 GetRequest.class, 088 MutateRequest.class, 089 ScanRequest.class 090 }; 091 092 // Some caches for helping performance 093 private final Map<String, Class<? extends Message>> argumentToClassMap = new HashMap<>(); 094 private final Map<String, Map<Class<? extends Message>, Method>> methodMap = new HashMap<>(); 095 096 private final float scanVirtualTimeWeight; 097 098 /** 099 * Calls {@link #AnnotationReadingPriorityFunction(RSRpcServices, Class)} using the result of 100 * {@code rpcServices#getClass()} 101 * 102 * @param rpcServices 103 * The RPC server implementation 104 */ 105 public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) { 106 this(rpcServices, rpcServices.getClass()); 107 } 108 109 /** 110 * Constructs the priority function given the RPC server implementation and the annotations on the 111 * methods in the provided {@code clz}. 112 * 113 * @param rpcServices 114 * The RPC server implementation 115 * @param clz 116 * The concrete RPC server implementation's class 117 */ 118 public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices, 119 Class<? extends RSRpcServices> clz) { 120 Map<String,Integer> qosMap = new HashMap<>(); 121 for (Method m : clz.getMethods()) { 122 QosPriority p = m.getAnnotation(QosPriority.class); 123 if (p != null) { 124 // Since we protobuf'd, and then subsequently, when we went with pb style, method names 125 // are capitalized. This meant that this brittle compare of method names gotten by 126 // reflection no longer matched the method names coming in over pb. TODO: Get rid of this 127 // check. For now, workaround is to capitalize the names we got from reflection so they 128 // have chance of matching the pb ones. 129 String capitalizedMethodName = capitalize(m.getName()); 130 qosMap.put(capitalizedMethodName, p.priority()); 131 } 132 } 133 this.rpcServices = rpcServices; 134 this.annotatedQos = qosMap; 135 if (methodMap.get("getRegion") == null) { 136 methodMap.put("hasRegion", new HashMap<>()); 137 methodMap.put("getRegion", new HashMap<>()); 138 } 139 for (Class<? extends Message> cls : knownArgumentClasses) { 140 argumentToClassMap.put(cls.getName(), cls); 141 try { 142 methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion")); 143 methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion")); 144 } catch (Exception e) { 145 throw new RuntimeException(e); 146 } 147 } 148 149 Configuration conf = rpcServices.getConfiguration(); 150 scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f); 151 } 152 153 private String capitalize(final String s) { 154 StringBuilder strBuilder = new StringBuilder(s); 155 strBuilder.setCharAt(0, Character.toUpperCase(strBuilder.charAt(0))); 156 return strBuilder.toString(); 157 } 158 159 /** 160 * Returns a 'priority' based on the request type. 161 * 162 * Currently the returned priority is used for queue selection. 163 * See the SimpleRpcScheduler as example. It maintains a queue per 'priory type' 164 * HIGH_QOS (meta requests), REPLICATION_QOS (replication requests), 165 * NORMAL_QOS (user requests). 166 */ 167 @Override 168 public int getPriority(RequestHeader header, Message param, User user) { 169 int priorityByAnnotation = getAnnotatedPriority(header); 170 171 if (priorityByAnnotation >= 0) { 172 return priorityByAnnotation; 173 } 174 return getBasePriority(header, param); 175 } 176 177 /** 178 * See if the method has an annotation. 179 * @param header 180 * @return Return the priority from the annotation. If there isn't 181 * an annotation, this returns something below zero. 182 */ 183 protected int getAnnotatedPriority(RequestHeader header) { 184 String methodName = header.getMethodName(); 185 Integer priorityByAnnotation = annotatedQos.get(methodName); 186 if (priorityByAnnotation != null) { 187 return priorityByAnnotation; 188 } 189 return -1; 190 } 191 192 /** 193 * Get the priority for a given request from the header and the param 194 * This doesn't consider which user is sending the request at all. 195 * This doesn't consider annotations 196 */ 197 protected int getBasePriority(RequestHeader header, Message param) { 198 if (param == null) { 199 return HConstants.NORMAL_QOS; 200 } 201 202 // Trust the client-set priorities if set 203 if (header.hasPriority()) { 204 return header.getPriority(); 205 } 206 207 String cls = param.getClass().getName(); 208 Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls); 209 RegionSpecifier regionSpecifier = null; 210 //check whether the request has reference to meta region or now. 211 try { 212 // Check if the param has a region specifier; the pb methods are hasRegion and getRegion if 213 // hasRegion returns true. Not all listed methods have region specifier each time. For 214 // example, the ScanRequest has it on setup but thereafter relies on the scannerid rather than 215 // send the region over every time. 216 Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass); 217 if (hasRegion != null && (Boolean)hasRegion.invoke(param, (Object[])null)) { 218 Method getRegion = methodMap.get("getRegion").get(rpcArgClass); 219 regionSpecifier = (RegionSpecifier)getRegion.invoke(param, (Object[])null); 220 Region region = rpcServices.getRegion(regionSpecifier); 221 if (region.getRegionInfo().getTable().isSystemTable()) { 222 if (LOG.isTraceEnabled()) { 223 LOG.trace("High priority because region=" + 224 region.getRegionInfo().getRegionNameAsString()); 225 } 226 return HConstants.SYSTEMTABLE_QOS; 227 } 228 } 229 } catch (Exception ex) { 230 // Not good throwing an exception out of here, a runtime anyways. Let the query go into the 231 // server and have it throw the exception if still an issue. Just mark it normal priority. 232 if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex); 233 return HConstants.NORMAL_QOS; 234 } 235 236 if (param instanceof ScanRequest) { // scanner methods... 237 ScanRequest request = (ScanRequest)param; 238 if (!request.hasScannerId()) { 239 return HConstants.NORMAL_QOS; 240 } 241 RegionScanner scanner = rpcServices.getScanner(request.getScannerId()); 242 if (scanner != null && scanner.getRegionInfo().getTable().isSystemTable()) { 243 if (LOG.isTraceEnabled()) { 244 // Scanner requests are small in size so TextFormat version should not overwhelm log. 245 LOG.trace("High priority scanner request " + TextFormat.shortDebugString(request)); 246 } 247 return HConstants.SYSTEMTABLE_QOS; 248 } 249 } 250 251 return HConstants.NORMAL_QOS; 252 } 253 254 /** 255 * Based on the request content, returns the deadline of the request. 256 * 257 * @param header 258 * @param param 259 * @return Deadline of this request. 0 now, otherwise msec of 'delay' 260 */ 261 @Override 262 public long getDeadline(RequestHeader header, Message param) { 263 if (param instanceof ScanRequest) { 264 ScanRequest request = (ScanRequest)param; 265 if (!request.hasScannerId()) { 266 return 0; 267 } 268 269 // get the 'virtual time' of the scanner, and applies sqrt() to get a 270 // nice curve for the delay. More a scanner is used the less priority it gets. 271 // The weight is used to have more control on the delay. 272 long vtime = rpcServices.getScannerVirtualTime(request.getScannerId()); 273 return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight)); 274 } 275 return 0; 276 } 277 278 @VisibleForTesting 279 void setRegionServer(final HRegionServer hrs) { 280 this.rpcServices = hrs.getRSRpcServices(); 281 } 282}