001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.lang.reflect.Method; 021import java.util.HashMap; 022import java.util.Map; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.HConstants; 025import org.apache.hadoop.hbase.ipc.PriorityFunction; 026import org.apache.hadoop.hbase.ipc.QosPriority; 027import org.apache.hadoop.hbase.security.User; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import org.apache.hbase.thirdparty.com.google.protobuf.Message; 033import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 034 035import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 045 046/** 047 * Reads special method annotations and table names to figure a priority for use by QoS facility in 048 * ipc; e.g: rpcs to hbase:meta get priority. 049 */ 050// TODO: Remove. This is doing way too much work just to figure a priority. Do as Elliott 051// suggests and just have the client specify a priority. 052 053// The logic for figuring out high priority RPCs is as follows: 054// 1. if the method is annotated with a QosPriority of QOS_HIGH, 055// that is honored 056// 2. parse out the protobuf message and see if the request is for meta 057// region, and if so, treat it as a high priority RPC 058// Some optimizations for (2) are done here - 059// Clients send the argument classname as part of making the RPC. The server 060// decides whether to deserialize the proto argument message based on the 061// pre-established set of argument classes (knownArgumentClasses below). 062// This prevents the server from having to deserialize all proto argument 063// messages prematurely. 064// All the argument classes declare a 'getRegion' method that returns a 065// RegionSpecifier object. Methods can be invoked on the returned object 066// to figure out whether it is a meta region or not. 067@InterfaceAudience.Private 068public class AnnotationReadingPriorityFunction implements PriorityFunction { 069 private static final Logger LOG = 070 LoggerFactory.getLogger(AnnotationReadingPriorityFunction.class.getName()); 071 072 /** Used to control the scan delay, currently sqrt(numNextCall * weight) */ 073 public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight"; 074 075 protected final Map<String, Integer> annotatedQos; 076 // We need to mock the regionserver instance for some unit tests (set via 077 // setRegionServer method. 078 private RSRpcServices rpcServices; 079 @SuppressWarnings("unchecked") 080 private final Class<? extends Message>[] knownArgumentClasses = 081 new Class[] { GetRegionInfoRequest.class, GetStoreFileRequest.class, CloseRegionRequest.class, 082 FlushRegionRequest.class, CompactRegionRequest.class, GetRequest.class, MutateRequest.class, 083 ScanRequest.class }; 084 085 // Some caches for helping performance 086 private final Map<String, Class<? extends Message>> argumentToClassMap = new HashMap<>(); 087 private final Map<String, Map<Class<? extends Message>, Method>> methodMap = new HashMap<>(); 088 089 private final float scanVirtualTimeWeight; 090 091 /** 092 * Calls {@link #AnnotationReadingPriorityFunction(RSRpcServices, Class)} using the result of 093 * {@code rpcServices#getClass()} n * The RPC server implementation 094 */ 095 public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) { 096 this(rpcServices, rpcServices.getClass()); 097 } 098 099 /** 100 * Constructs the priority function given the RPC server implementation and the annotations on the 101 * methods in the provided {@code clz}. n * The RPC server implementation n * The concrete RPC 102 * server implementation's class 103 */ 104 public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices, 105 Class<? extends RSRpcServices> clz) { 106 Map<String, Integer> qosMap = new HashMap<>(); 107 for (Method m : clz.getMethods()) { 108 QosPriority p = m.getAnnotation(QosPriority.class); 109 if (p != null) { 110 // Since we protobuf'd, and then subsequently, when we went with pb style, method names 111 // are capitalized. This meant that this brittle compare of method names gotten by 112 // reflection no longer matched the method names coming in over pb. TODO: Get rid of this 113 // check. For now, workaround is to capitalize the names we got from reflection so they 114 // have chance of matching the pb ones. 115 String capitalizedMethodName = capitalize(m.getName()); 116 qosMap.put(capitalizedMethodName, p.priority()); 117 } 118 } 119 this.rpcServices = rpcServices; 120 this.annotatedQos = qosMap; 121 if (methodMap.get("getRegion") == null) { 122 methodMap.put("hasRegion", new HashMap<>()); 123 methodMap.put("getRegion", new HashMap<>()); 124 } 125 for (Class<? extends Message> cls : knownArgumentClasses) { 126 argumentToClassMap.put(cls.getName(), cls); 127 try { 128 methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion")); 129 methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion")); 130 } catch (Exception e) { 131 throw new RuntimeException(e); 132 } 133 } 134 135 Configuration conf = rpcServices.getConfiguration(); 136 scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f); 137 } 138 139 private String capitalize(final String s) { 140 StringBuilder strBuilder = new StringBuilder(s); 141 strBuilder.setCharAt(0, Character.toUpperCase(strBuilder.charAt(0))); 142 return strBuilder.toString(); 143 } 144 145 /** 146 * Returns a 'priority' based on the request type. Currently the returned priority is used for 147 * queue selection. See the SimpleRpcScheduler as example. It maintains a queue per 'priory type' 148 * HIGH_QOS (meta requests), REPLICATION_QOS (replication requests), NORMAL_QOS (user requests). 149 */ 150 @Override 151 public int getPriority(RequestHeader header, Message param, User user) { 152 int priorityByAnnotation = getAnnotatedPriority(header); 153 154 if (priorityByAnnotation >= 0) { 155 return priorityByAnnotation; 156 } 157 return getBasePriority(header, param); 158 } 159 160 /** 161 * See if the method has an annotation. n * @return Return the priority from the annotation. If 162 * there isn't an annotation, this returns something below zero. 163 */ 164 protected int getAnnotatedPriority(RequestHeader header) { 165 String methodName = header.getMethodName(); 166 Integer priorityByAnnotation = annotatedQos.get(methodName); 167 if (priorityByAnnotation != null) { 168 return priorityByAnnotation; 169 } 170 return -1; 171 } 172 173 /** 174 * Get the priority for a given request from the header and the param This doesn't consider which 175 * user is sending the request at all. This doesn't consider annotations 176 */ 177 protected int getBasePriority(RequestHeader header, Message param) { 178 if (param == null) { 179 return HConstants.NORMAL_QOS; 180 } 181 182 // Trust the client-set priorities if set 183 if (header.hasPriority()) { 184 return header.getPriority(); 185 } 186 187 String cls = param.getClass().getName(); 188 Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls); 189 RegionSpecifier regionSpecifier = null; 190 // check whether the request has reference to meta region or now. 191 try { 192 // Check if the param has a region specifier; the pb methods are hasRegion and getRegion if 193 // hasRegion returns true. Not all listed methods have region specifier each time. For 194 // example, the ScanRequest has it on setup but thereafter relies on the scannerid rather than 195 // send the region over every time. 196 Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass); 197 if (hasRegion != null && (Boolean) hasRegion.invoke(param, (Object[]) null)) { 198 Method getRegion = methodMap.get("getRegion").get(rpcArgClass); 199 regionSpecifier = (RegionSpecifier) getRegion.invoke(param, (Object[]) null); 200 Region region = rpcServices.getRegion(regionSpecifier); 201 if (region.getRegionInfo().getTable().isSystemTable()) { 202 if (LOG.isTraceEnabled()) { 203 LOG.trace( 204 "High priority because region=" + region.getRegionInfo().getRegionNameAsString()); 205 } 206 return HConstants.SYSTEMTABLE_QOS; 207 } 208 } 209 } catch (Exception ex) { 210 // Not good throwing an exception out of here, a runtime anyways. Let the query go into the 211 // server and have it throw the exception if still an issue. Just mark it normal priority. 212 if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex); 213 return HConstants.NORMAL_QOS; 214 } 215 216 if (param instanceof ScanRequest) { // scanner methods... 217 ScanRequest request = (ScanRequest) param; 218 if (!request.hasScannerId()) { 219 return HConstants.NORMAL_QOS; 220 } 221 RegionScanner scanner = rpcServices.getScanner(request.getScannerId()); 222 if (scanner != null && scanner.getRegionInfo().getTable().isSystemTable()) { 223 if (LOG.isTraceEnabled()) { 224 // Scanner requests are small in size so TextFormat version should not overwhelm log. 225 LOG.trace("High priority scanner request " + TextFormat.shortDebugString(request)); 226 } 227 return HConstants.SYSTEMTABLE_QOS; 228 } 229 } 230 231 return HConstants.NORMAL_QOS; 232 } 233 234 /** 235 * Based on the request content, returns the deadline of the request. nn * @return Deadline of 236 * this request. 0 now, otherwise msec of 'delay' 237 */ 238 @Override 239 public long getDeadline(RequestHeader header, Message param) { 240 if (param instanceof ScanRequest) { 241 ScanRequest request = (ScanRequest) param; 242 if (!request.hasScannerId()) { 243 return 0; 244 } 245 246 // get the 'virtual time' of the scanner, and applies sqrt() to get a 247 // nice curve for the delay. More a scanner is used the less priority it gets. 248 // The weight is used to have more control on the delay. 249 long vtime = rpcServices.getScannerVirtualTime(request.getScannerId()); 250 return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight)); 251 } 252 return 0; 253 } 254 255 void setRegionServer(final HRegionServer hrs) { 256 this.rpcServices = hrs.getRSRpcServices(); 257 } 258}