1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import java.lang.reflect.Method;
21 import java.util.HashMap;
22 import java.util.Map;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.TableName;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.ipc.PriorityFunction;
31 import org.apache.hadoop.hbase.ipc.QosPriority;
32 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
33 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
34 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
35 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
36 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
37 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
38 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
39 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
40 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
41 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
42 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
43 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
44 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
45 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
46 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
47 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
48
49 import com.google.common.annotations.VisibleForTesting;
50 import com.google.protobuf.Message;
51 import com.google.protobuf.TextFormat;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 @InterfaceAudience.Private
76 class AnnotationReadingPriorityFunction implements PriorityFunction {
77 public static final Log LOG =
78 LogFactory.getLog(AnnotationReadingPriorityFunction.class.getName());
79
80
81 public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight";
82
83 private final Map<String, Integer> annotatedQos;
84
85
86 private RSRpcServices rpcServices;
87 @SuppressWarnings("unchecked")
88 private final Class<? extends Message>[] knownArgumentClasses = new Class[]{
89 GetRegionInfoRequest.class,
90 GetStoreFileRequest.class,
91 CloseRegionRequest.class,
92 FlushRegionRequest.class,
93 SplitRegionRequest.class,
94 CompactRegionRequest.class,
95 GetRequest.class,
96 MutateRequest.class,
97 ScanRequest.class
98 };
99
100
101 private final Map<String, Class<? extends Message>> argumentToClassMap =
102 new HashMap<String, Class<? extends Message>>();
103 private final Map<String, Map<Class<? extends Message>, Method>> methodMap =
104 new HashMap<String, Map<Class<? extends Message>, Method>>();
105
106 private final float scanVirtualTimeWeight;
107
108
109
110
111
112
113
114
115 AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) {
116 this(rpcServices, rpcServices.getClass());
117 }
118
119
120
121
122
123
124
125
126
127
128 AnnotationReadingPriorityFunction(final RSRpcServices rpcServices,
129 Class<? extends RSRpcServices> clz) {
130 Map<String,Integer> qosMap = new HashMap<String,Integer>();
131 for (Method m : clz.getMethods()) {
132 QosPriority p = m.getAnnotation(QosPriority.class);
133 if (p != null) {
134
135
136
137
138
139 String capitalizedMethodName = capitalize(m.getName());
140 qosMap.put(capitalizedMethodName, p.priority());
141 }
142 }
143 this.rpcServices = rpcServices;
144 this.annotatedQos = qosMap;
145 if (methodMap.get("getRegion") == null) {
146 methodMap.put("hasRegion", new HashMap<Class<? extends Message>, Method>());
147 methodMap.put("getRegion", new HashMap<Class<? extends Message>, Method>());
148 }
149 for (Class<? extends Message> cls : knownArgumentClasses) {
150 argumentToClassMap.put(cls.getName(), cls);
151 try {
152 methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion"));
153 methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion"));
154 } catch (Exception e) {
155 throw new RuntimeException(e);
156 }
157 }
158
159 Configuration conf = rpcServices.getConfiguration();
160 scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f);
161 }
162
163 private String capitalize(final String s) {
164 StringBuilder strBuilder = new StringBuilder(s);
165 strBuilder.setCharAt(0, Character.toUpperCase(strBuilder.charAt(0)));
166 return strBuilder.toString();
167 }
168
169
170
171
172
173
174
175
176
177 @Override
178 public int getPriority(RequestHeader header, Message param) {
179 String methodName = header.getMethodName();
180 Integer priorityByAnnotation = annotatedQos.get(methodName);
181 if (priorityByAnnotation != null) {
182 return priorityByAnnotation;
183 }
184 if (param == null) {
185 return HConstants.NORMAL_QOS;
186 }
187 if (param instanceof MultiRequest) {
188
189
190 return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS;
191 }
192 String cls = param.getClass().getName();
193 Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
194 RegionSpecifier regionSpecifier = null;
195
196 try {
197
198
199
200
201 Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass);
202 if (hasRegion != null && (Boolean)hasRegion.invoke(param, (Object[])null)) {
203 Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
204 regionSpecifier = (RegionSpecifier)getRegion.invoke(param, (Object[])null);
205 Region region = rpcServices.getRegion(regionSpecifier);
206 if (region.getRegionInfo().isSystemTable()) {
207 if (LOG.isTraceEnabled()) {
208 LOG.trace("High priority because region=" +
209 region.getRegionInfo().getRegionNameAsString());
210 }
211 return HConstants.SYSTEMTABLE_QOS;
212 }
213 }
214 } catch (Exception ex) {
215
216
217 if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex);
218 return HConstants.NORMAL_QOS;
219 }
220
221 if (param instanceof ScanRequest) {
222 ScanRequest request = (ScanRequest)param;
223 if (!request.hasScannerId()) {
224 return HConstants.NORMAL_QOS;
225 }
226 RegionScanner scanner = rpcServices.getScanner(request.getScannerId());
227 if (scanner != null && scanner.getRegionInfo().isSystemTable()) {
228 if (LOG.isTraceEnabled()) {
229
230 LOG.trace("High priority scanner request " + TextFormat.shortDebugString(request));
231 }
232 return HConstants.SYSTEMTABLE_QOS;
233 }
234 }
235
236
237
238 if (param instanceof ReportRegionStateTransitionRequest) {
239 ReportRegionStateTransitionRequest tRequest = (ReportRegionStateTransitionRequest) param;
240 for (RegionStateTransition transition : tRequest.getTransitionList()) {
241 if (transition.getRegionInfoList() != null) {
242 for (HBaseProtos.RegionInfo info : transition.getRegionInfoList()) {
243 TableName tn = ProtobufUtil.toTableName(info.getTableName());
244 if (tn.isSystemTable()) {
245 return HConstants.SYSTEMTABLE_QOS;
246 }
247 }
248 }
249 }
250 }
251 return HConstants.NORMAL_QOS;
252 }
253
254
255
256
257
258
259
260
261 @Override
262 public long getDeadline(RequestHeader header, Message param) {
263 if (param instanceof ScanRequest) {
264 ScanRequest request = (ScanRequest)param;
265 if (!request.hasScannerId()) {
266 return 0;
267 }
268
269
270
271
272 long vtime = rpcServices.getScannerVirtualTime(request.getScannerId());
273 return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight));
274 }
275 return 0;
276 }
277
278 @VisibleForTesting
279 void setRegionServer(final HRegionServer hrs) {
280 this.rpcServices = hrs.getRSRpcServices();
281 }
282 }