001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.lang.reflect.Method; 021import java.util.HashMap; 022import java.util.Map; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.HConstants; 025import org.apache.hadoop.hbase.ipc.PriorityFunction; 026import org.apache.hadoop.hbase.ipc.QosPriority; 027import org.apache.hadoop.hbase.security.User; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import org.apache.hbase.thirdparty.com.google.protobuf.Message; 033import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 034 035import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 045 046/** 047 * Reads special method annotations and table names to figure a priority for use by QoS facility in 048 * ipc; e.g: rpcs to hbase:meta get priority. 049 */ 050// TODO: Remove. This is doing way too much work just to figure a priority. Do as Elliott 051// suggests and just have the client specify a priority. 052 053//The logic for figuring out high priority RPCs is as follows: 054//1. if the method is annotated with a QosPriority of QOS_HIGH, 055// that is honored 056//2. parse out the protobuf message and see if the request is for meta 057// region, and if so, treat it as a high priority RPC 058//Some optimizations for (2) are done here - 059//Clients send the argument classname as part of making the RPC. The server 060//decides whether to deserialize the proto argument message based on the 061//pre-established set of argument classes (knownArgumentClasses below). 062//This prevents the server from having to deserialize all proto argument 063//messages prematurely. 064//All the argument classes declare a 'getRegion' method that returns a 065//RegionSpecifier object. Methods can be invoked on the returned object 066//to figure out whether it is a meta region or not. 067@InterfaceAudience.Private 068public class AnnotationReadingPriorityFunction implements PriorityFunction { 069 private static final Logger LOG = 070 LoggerFactory.getLogger(AnnotationReadingPriorityFunction.class.getName()); 071 072 /** Used to control the scan delay, currently sqrt(numNextCall * weight) */ 073 public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight"; 074 075 protected final Map<String, Integer> annotatedQos; 076 //We need to mock the regionserver instance for some unit tests (set via 077 //setRegionServer method. 078 private RSRpcServices rpcServices; 079 @SuppressWarnings("unchecked") 080 private final Class<? extends Message>[] knownArgumentClasses = new Class[]{ 081 GetRegionInfoRequest.class, 082 GetStoreFileRequest.class, 083 CloseRegionRequest.class, 084 FlushRegionRequest.class, 085 CompactRegionRequest.class, 086 GetRequest.class, 087 MutateRequest.class, 088 ScanRequest.class 089 }; 090 091 // Some caches for helping performance 092 private final Map<String, Class<? extends Message>> argumentToClassMap = new HashMap<>(); 093 private final Map<String, Map<Class<? extends Message>, Method>> methodMap = new HashMap<>(); 094 095 private final float scanVirtualTimeWeight; 096 097 /** 098 * Calls {@link #AnnotationReadingPriorityFunction(RSRpcServices, Class)} using the result of 099 * {@code rpcServices#getClass()} 100 * 101 * @param rpcServices 102 * The RPC server implementation 103 */ 104 public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) { 105 this(rpcServices, rpcServices.getClass()); 106 } 107 108 /** 109 * Constructs the priority function given the RPC server implementation and the annotations on the 110 * methods in the provided {@code clz}. 111 * 112 * @param rpcServices 113 * The RPC server implementation 114 * @param clz 115 * The concrete RPC server implementation's class 116 */ 117 public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices, 118 Class<? extends RSRpcServices> clz) { 119 Map<String,Integer> qosMap = new HashMap<>(); 120 for (Method m : clz.getMethods()) { 121 QosPriority p = m.getAnnotation(QosPriority.class); 122 if (p != null) { 123 // Since we protobuf'd, and then subsequently, when we went with pb style, method names 124 // are capitalized. This meant that this brittle compare of method names gotten by 125 // reflection no longer matched the method names coming in over pb. TODO: Get rid of this 126 // check. For now, workaround is to capitalize the names we got from reflection so they 127 // have chance of matching the pb ones. 128 String capitalizedMethodName = capitalize(m.getName()); 129 qosMap.put(capitalizedMethodName, p.priority()); 130 } 131 } 132 this.rpcServices = rpcServices; 133 this.annotatedQos = qosMap; 134 if (methodMap.get("getRegion") == null) { 135 methodMap.put("hasRegion", new HashMap<>()); 136 methodMap.put("getRegion", new HashMap<>()); 137 } 138 for (Class<? extends Message> cls : knownArgumentClasses) { 139 argumentToClassMap.put(cls.getName(), cls); 140 try { 141 methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion")); 142 methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion")); 143 } catch (Exception e) { 144 throw new RuntimeException(e); 145 } 146 } 147 148 Configuration conf = rpcServices.getConfiguration(); 149 scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f); 150 } 151 152 private String capitalize(final String s) { 153 StringBuilder strBuilder = new StringBuilder(s); 154 strBuilder.setCharAt(0, Character.toUpperCase(strBuilder.charAt(0))); 155 return strBuilder.toString(); 156 } 157 158 /** 159 * Returns a 'priority' based on the request type. 160 * 161 * Currently the returned priority is used for queue selection. 162 * See the SimpleRpcScheduler as example. It maintains a queue per 'priory type' 163 * HIGH_QOS (meta requests), REPLICATION_QOS (replication requests), 164 * NORMAL_QOS (user requests). 165 */ 166 @Override 167 public int getPriority(RequestHeader header, Message param, User user) { 168 int priorityByAnnotation = getAnnotatedPriority(header); 169 170 if (priorityByAnnotation >= 0) { 171 return priorityByAnnotation; 172 } 173 return getBasePriority(header, param); 174 } 175 176 /** 177 * See if the method has an annotation. 178 * @param header 179 * @return Return the priority from the annotation. If there isn't 180 * an annotation, this returns something below zero. 181 */ 182 protected int getAnnotatedPriority(RequestHeader header) { 183 String methodName = header.getMethodName(); 184 Integer priorityByAnnotation = annotatedQos.get(methodName); 185 if (priorityByAnnotation != null) { 186 return priorityByAnnotation; 187 } 188 return -1; 189 } 190 191 /** 192 * Get the priority for a given request from the header and the param 193 * This doesn't consider which user is sending the request at all. 194 * This doesn't consider annotations 195 */ 196 protected int getBasePriority(RequestHeader header, Message param) { 197 if (param == null) { 198 return HConstants.NORMAL_QOS; 199 } 200 201 // Trust the client-set priorities if set 202 if (header.hasPriority()) { 203 return header.getPriority(); 204 } 205 206 String cls = param.getClass().getName(); 207 Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls); 208 RegionSpecifier regionSpecifier = null; 209 //check whether the request has reference to meta region or now. 210 try { 211 // Check if the param has a region specifier; the pb methods are hasRegion and getRegion if 212 // hasRegion returns true. Not all listed methods have region specifier each time. For 213 // example, the ScanRequest has it on setup but thereafter relies on the scannerid rather than 214 // send the region over every time. 215 Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass); 216 if (hasRegion != null && (Boolean)hasRegion.invoke(param, (Object[])null)) { 217 Method getRegion = methodMap.get("getRegion").get(rpcArgClass); 218 regionSpecifier = (RegionSpecifier)getRegion.invoke(param, (Object[])null); 219 Region region = rpcServices.getRegion(regionSpecifier); 220 if (region.getRegionInfo().getTable().isSystemTable()) { 221 if (LOG.isTraceEnabled()) { 222 LOG.trace("High priority because region=" + 223 region.getRegionInfo().getRegionNameAsString()); 224 } 225 return HConstants.SYSTEMTABLE_QOS; 226 } 227 } 228 } catch (Exception ex) { 229 // Not good throwing an exception out of here, a runtime anyways. Let the query go into the 230 // server and have it throw the exception if still an issue. Just mark it normal priority. 231 if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex); 232 return HConstants.NORMAL_QOS; 233 } 234 235 if (param instanceof ScanRequest) { // scanner methods... 236 ScanRequest request = (ScanRequest)param; 237 if (!request.hasScannerId()) { 238 return HConstants.NORMAL_QOS; 239 } 240 RegionScanner scanner = rpcServices.getScanner(request.getScannerId()); 241 if (scanner != null && scanner.getRegionInfo().getTable().isSystemTable()) { 242 if (LOG.isTraceEnabled()) { 243 // Scanner requests are small in size so TextFormat version should not overwhelm log. 244 LOG.trace("High priority scanner request " + TextFormat.shortDebugString(request)); 245 } 246 return HConstants.SYSTEMTABLE_QOS; 247 } 248 } 249 250 return HConstants.NORMAL_QOS; 251 } 252 253 /** 254 * Based on the request content, returns the deadline of the request. 255 * 256 * @param header 257 * @param param 258 * @return Deadline of this request. 0 now, otherwise msec of 'delay' 259 */ 260 @Override 261 public long getDeadline(RequestHeader header, Message param) { 262 if (param instanceof ScanRequest) { 263 ScanRequest request = (ScanRequest)param; 264 if (!request.hasScannerId()) { 265 return 0; 266 } 267 268 // get the 'virtual time' of the scanner, and applies sqrt() to get a 269 // nice curve for the delay. More a scanner is used the less priority it gets. 270 // The weight is used to have more control on the delay. 271 long vtime = rpcServices.getScannerVirtualTime(request.getScannerId()); 272 return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight)); 273 } 274 return 0; 275 } 276 277 void setRegionServer(final HRegionServer hrs) { 278 this.rpcServices = hrs.getRSRpcServices(); 279 } 280}