001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues; 019 020import java.io.IOException; 021import java.lang.reflect.Constructor; 022import java.net.InetAddress; 023import java.security.PrivilegedAction; 024import java.security.PrivilegedExceptionAction; 025import java.util.Collections; 026import java.util.List; 027import java.util.Optional; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.CellScanner; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.ipc.RpcCall; 036import org.apache.hadoop.hbase.ipc.RpcCallback; 037import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 038import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 039import org.apache.hadoop.hbase.security.User; 040import org.apache.hadoop.hbase.testclassification.MasterTests; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.junit.Assert; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; 050import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 051import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 052import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 053import org.apache.hbase.thirdparty.com.google.protobuf.Message; 054 055import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; 060 061/** 062 * Tests for Online SlowLog Provider Service 063 */ 064@Category({ MasterTests.class, MediumTests.class }) 065public class TestNamedQueueRecorder { 066 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestNamedQueueRecorder.class); 070 071 private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class); 072 073 private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility(); 074 075 private NamedQueueRecorder namedQueueRecorder; 076 077 private static int i = 0; 078 079 private static Configuration applySlowLogRecorderConf(int eventSize) { 080 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 081 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); 082 conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize); 083 return conf; 084 } 085 086 /** 087 * confirm that for a ringbuffer of slow logs, payload on given index of buffer has expected 088 * elements 089 * @param i index of ringbuffer logs 090 * @param j data value that was put on index i 091 * @param slowLogPayloads list of payload retrieved from {@link NamedQueueRecorder} 092 * @return if actual values are as per expectations 093 */ 094 private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) { 095 boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j); 096 boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j); 097 boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j); 098 return isClassExpected && isClientExpected && isUserExpected; 099 } 100 101 @Test 102 public void testOnlieSlowLogConsumption() throws Exception { 103 104 Configuration conf = applySlowLogRecorderConf(8); 105 Constructor<NamedQueueRecorder> constructor = 106 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 107 constructor.setAccessible(true); 108 namedQueueRecorder = constructor.newInstance(conf); 109 AdminProtos.SlowLogResponseRequest request = 110 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); 111 112 namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); 113 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 114 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 115 116 int i = 0; 117 118 // add 5 records initially 119 for (; i < 5; i++) { 120 RpcLogDetails rpcLogDetails = 121 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 122 namedQueueRecorder.addRecord(rpcLogDetails); 123 } 124 125 Assert.assertNotEquals(-1, 126 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 5)); 127 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 128 Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads)); 129 Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads)); 130 Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads)); 131 Assert.assertTrue(confirmPayloadParams(3, 2, slowLogPayloads)); 132 Assert.assertTrue(confirmPayloadParams(4, 1, slowLogPayloads)); 133 134 // add 2 more records 135 for (; i < 7; i++) { 136 RpcLogDetails rpcLogDetails = 137 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 138 namedQueueRecorder.addRecord(rpcLogDetails); 139 } 140 141 Assert.assertNotEquals(-1, 142 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 7)); 143 144 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 145 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 146 return slowLogPayloadsList.size() == 7 && confirmPayloadParams(0, 7, slowLogPayloadsList) 147 && confirmPayloadParams(5, 2, slowLogPayloadsList) 148 && confirmPayloadParams(6, 1, slowLogPayloadsList); 149 })); 150 151 // add 3 more records 152 for (; i < 10; i++) { 153 RpcLogDetails rpcLogDetails = 154 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 155 namedQueueRecorder.addRecord(rpcLogDetails); 156 } 157 158 Assert.assertNotEquals(-1, 159 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 8)); 160 161 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 162 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 163 // confirm ringbuffer is full 164 return slowLogPayloadsList.size() == 8 && confirmPayloadParams(7, 3, slowLogPayloadsList) 165 && confirmPayloadParams(0, 10, slowLogPayloadsList) 166 && confirmPayloadParams(1, 9, slowLogPayloadsList); 167 })); 168 169 // add 4 more records 170 for (; i < 14; i++) { 171 RpcLogDetails rpcLogDetails = 172 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 173 namedQueueRecorder.addRecord(rpcLogDetails); 174 } 175 176 Assert.assertNotEquals(-1, 177 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 8)); 178 179 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 180 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 181 // confirm ringbuffer is full 182 // and ordered events 183 return slowLogPayloadsList.size() == 8 && confirmPayloadParams(0, 14, slowLogPayloadsList) 184 && confirmPayloadParams(1, 13, slowLogPayloadsList) 185 && confirmPayloadParams(2, 12, slowLogPayloadsList) 186 && confirmPayloadParams(3, 11, slowLogPayloadsList); 187 })); 188 189 AdminProtos.SlowLogResponseRequest largeLogRequest = 190 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15) 191 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build(); 192 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 193 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(largeLogRequest); 194 // confirm ringbuffer is full 195 // and ordered events 196 return slowLogPayloadsList.size() == 8 && confirmPayloadParams(0, 14, slowLogPayloadsList) 197 && confirmPayloadParams(1, 13, slowLogPayloadsList) 198 && confirmPayloadParams(2, 12, slowLogPayloadsList) 199 && confirmPayloadParams(3, 11, slowLogPayloadsList); 200 })); 201 202 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 203 boolean isRingBufferCleaned = 204 namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); 205 206 LOG.debug("cleared the ringbuffer of Online Slow Log records"); 207 208 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 209 // confirm ringbuffer is empty 210 return slowLogPayloadsList.size() == 0 && isRingBufferCleaned; 211 })); 212 213 } 214 215 private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) { 216 NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); 217 namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); 218 namedQueueGetRequest.setSlowLogResponseRequest(request); 219 NamedQueueGetResponse namedQueueGetResponse = 220 namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); 221 return namedQueueGetResponse == null 222 ? Collections.emptyList() 223 : namedQueueGetResponse.getSlowLogPayloads(); 224 } 225 226 @Test 227 public void testOnlineSlowLogWithHighRecords() throws Exception { 228 229 Configuration conf = applySlowLogRecorderConf(14); 230 Constructor<NamedQueueRecorder> constructor = 231 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 232 constructor.setAccessible(true); 233 namedQueueRecorder = constructor.newInstance(conf); 234 AdminProtos.SlowLogResponseRequest request = 235 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build(); 236 237 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 238 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 239 240 for (int i = 0; i < 14 * 11; i++) { 241 RpcLogDetails rpcLogDetails = 242 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 243 namedQueueRecorder.addRecord(rpcLogDetails); 244 } 245 LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records"); 246 247 Assert.assertNotEquals(-1, 248 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14)); 249 250 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 251 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 252 253 // confirm strict order of slow log payloads 254 return slowLogPayloads.size() == 14 && confirmPayloadParams(0, 154, slowLogPayloads) 255 && confirmPayloadParams(1, 153, slowLogPayloads) 256 && confirmPayloadParams(2, 152, slowLogPayloads) 257 && confirmPayloadParams(3, 151, slowLogPayloads) 258 && confirmPayloadParams(4, 150, slowLogPayloads) 259 && confirmPayloadParams(5, 149, slowLogPayloads) 260 && confirmPayloadParams(6, 148, slowLogPayloads) 261 && confirmPayloadParams(7, 147, slowLogPayloads) 262 && confirmPayloadParams(8, 146, slowLogPayloads) 263 && confirmPayloadParams(9, 145, slowLogPayloads) 264 && confirmPayloadParams(10, 144, slowLogPayloads) 265 && confirmPayloadParams(11, 143, slowLogPayloads) 266 && confirmPayloadParams(12, 142, slowLogPayloads) 267 && confirmPayloadParams(13, 141, slowLogPayloads); 268 })); 269 270 boolean isRingBufferCleaned = 271 namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); 272 Assert.assertTrue(isRingBufferCleaned); 273 LOG.debug("cleared the ringbuffer of Online Slow Log records"); 274 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 275 276 // confirm ringbuffer is empty 277 Assert.assertEquals(slowLogPayloads.size(), 0); 278 } 279 280 @Test 281 public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception { 282 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 283 conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY); 284 285 Constructor<NamedQueueRecorder> constructor = 286 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 287 constructor.setAccessible(true); 288 namedQueueRecorder = constructor.newInstance(conf); 289 AdminProtos.SlowLogResponseRequest request = 290 AdminProtos.SlowLogResponseRequest.newBuilder().build(); 291 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 292 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 293 for (int i = 0; i < 300; i++) { 294 RpcLogDetails rpcLogDetails = 295 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 296 namedQueueRecorder.addRecord(rpcLogDetails); 297 } 298 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 299 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 300 return slowLogPayloads.size() == 0; 301 })); 302 303 } 304 305 @Test 306 public void testOnlineSlowLogWithDisableConfig() throws Exception { 307 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 308 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false); 309 Constructor<NamedQueueRecorder> constructor = 310 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 311 constructor.setAccessible(true); 312 namedQueueRecorder = constructor.newInstance(conf); 313 314 AdminProtos.SlowLogResponseRequest request = 315 AdminProtos.SlowLogResponseRequest.newBuilder().build(); 316 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 317 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 318 for (int i = 0; i < 300; i++) { 319 RpcLogDetails rpcLogDetails = 320 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 321 namedQueueRecorder.addRecord(rpcLogDetails); 322 } 323 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 324 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 325 return slowLogPayloads.size() == 0; 326 })); 327 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); 328 } 329 330 @Test 331 public void testSlowLogFilters() throws Exception { 332 333 Configuration conf = applySlowLogRecorderConf(30); 334 Constructor<NamedQueueRecorder> constructor = 335 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 336 constructor.setAccessible(true); 337 namedQueueRecorder = constructor.newInstance(conf); 338 AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder() 339 .setLimit(15).setUserName("userName_87").build(); 340 341 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 342 343 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 344 345 for (int i = 0; i < 100; i++) { 346 RpcLogDetails rpcLogDetails = 347 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 348 namedQueueRecorder.addRecord(rpcLogDetails); 349 } 350 LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter"); 351 352 Assert.assertNotEquals(-1, 353 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 1)); 354 355 AdminProtos.SlowLogResponseRequest requestClient = AdminProtos.SlowLogResponseRequest 356 .newBuilder().setLimit(15).setClientAddress("client_85").build(); 357 Assert.assertNotEquals(-1, 358 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestClient).size() == 1)); 359 360 AdminProtos.SlowLogResponseRequest requestSlowLog = 361 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); 362 Assert.assertNotEquals(-1, 363 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15)); 364 } 365 366 @Test 367 public void testConcurrentSlowLogEvents() throws Exception { 368 369 Configuration conf = applySlowLogRecorderConf(50000); 370 Constructor<NamedQueueRecorder> constructor = 371 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 372 constructor.setAccessible(true); 373 namedQueueRecorder = constructor.newInstance(conf); 374 AdminProtos.SlowLogResponseRequest request = 375 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build(); 376 AdminProtos.SlowLogResponseRequest largeLogRequest = 377 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000) 378 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build(); 379 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 380 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 381 382 for (int j = 0; j < 1000; j++) { 383 384 CompletableFuture.runAsync(() -> { 385 for (int i = 0; i < 3500; i++) { 386 RpcLogDetails rpcLogDetails = 387 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 388 namedQueueRecorder.addRecord(rpcLogDetails); 389 } 390 }); 391 392 } 393 394 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); 395 396 Assert.assertNotEquals(-1, 397 HBASE_TESTING_UTILITY.waitFor(5000, () -> getSlowLogPayloads(request).size() > 10000)); 398 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(5000, 399 () -> getSlowLogPayloads(largeLogRequest).size() > 10000)); 400 } 401 402 @Test 403 public void testSlowLargeLogEvents() throws Exception { 404 Configuration conf = applySlowLogRecorderConf(28); 405 Constructor<NamedQueueRecorder> constructor = 406 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 407 constructor.setAccessible(true); 408 namedQueueRecorder = constructor.newInstance(conf); 409 410 AdminProtos.SlowLogResponseRequest request = 411 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build(); 412 413 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 414 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 415 416 boolean isSlowLog; 417 boolean isLargeLog; 418 for (int i = 0; i < 14 * 11; i++) { 419 if (i % 2 == 0) { 420 isSlowLog = true; 421 isLargeLog = false; 422 } else { 423 isSlowLog = false; 424 isLargeLog = true; 425 } 426 RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), 427 "class_" + (i + 1), isSlowLog, isLargeLog); 428 namedQueueRecorder.addRecord(rpcLogDetails); 429 } 430 LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records"); 431 432 Assert.assertNotEquals(-1, 433 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14)); 434 435 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 436 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 437 438 // confirm strict order of slow log payloads 439 return slowLogPayloads.size() == 14 && confirmPayloadParams(0, 153, slowLogPayloads) 440 && confirmPayloadParams(1, 151, slowLogPayloads) 441 && confirmPayloadParams(2, 149, slowLogPayloads) 442 && confirmPayloadParams(3, 147, slowLogPayloads) 443 && confirmPayloadParams(4, 145, slowLogPayloads) 444 && confirmPayloadParams(5, 143, slowLogPayloads) 445 && confirmPayloadParams(6, 141, slowLogPayloads) 446 && confirmPayloadParams(7, 139, slowLogPayloads) 447 && confirmPayloadParams(8, 137, slowLogPayloads) 448 && confirmPayloadParams(9, 135, slowLogPayloads) 449 && confirmPayloadParams(10, 133, slowLogPayloads) 450 && confirmPayloadParams(11, 131, slowLogPayloads) 451 && confirmPayloadParams(12, 129, slowLogPayloads) 452 && confirmPayloadParams(13, 127, slowLogPayloads); 453 })); 454 455 AdminProtos.SlowLogResponseRequest largeLogRequest = 456 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11) 457 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build(); 458 459 Assert.assertNotEquals(-1, 460 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(largeLogRequest).size() == 14)); 461 462 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 463 List<SlowLogPayload> largeLogPayloads = getSlowLogPayloads(largeLogRequest); 464 465 // confirm strict order of slow log payloads 466 return largeLogPayloads.size() == 14 && confirmPayloadParams(0, 154, largeLogPayloads) 467 && confirmPayloadParams(1, 152, largeLogPayloads) 468 && confirmPayloadParams(2, 150, largeLogPayloads) 469 && confirmPayloadParams(3, 148, largeLogPayloads) 470 && confirmPayloadParams(4, 146, largeLogPayloads) 471 && confirmPayloadParams(5, 144, largeLogPayloads) 472 && confirmPayloadParams(6, 142, largeLogPayloads) 473 && confirmPayloadParams(7, 140, largeLogPayloads) 474 && confirmPayloadParams(8, 138, largeLogPayloads) 475 && confirmPayloadParams(9, 136, largeLogPayloads) 476 && confirmPayloadParams(10, 134, largeLogPayloads) 477 && confirmPayloadParams(11, 132, largeLogPayloads) 478 && confirmPayloadParams(12, 130, largeLogPayloads) 479 && confirmPayloadParams(13, 128, largeLogPayloads); 480 })); 481 } 482 483 @Test 484 public void testSlowLogMixedFilters() throws Exception { 485 486 Configuration conf = applySlowLogRecorderConf(30); 487 Constructor<NamedQueueRecorder> constructor = 488 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 489 constructor.setAccessible(true); 490 namedQueueRecorder = constructor.newInstance(conf); 491 AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder() 492 .setLimit(15).setUserName("userName_87").setClientAddress("client_88").build(); 493 494 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 495 496 for (int i = 0; i < 100; i++) { 497 RpcLogDetails rpcLogDetails = 498 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 499 namedQueueRecorder.addRecord(rpcLogDetails); 500 } 501 502 Assert.assertNotEquals(-1, 503 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 2)); 504 505 AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder() 506 .setLimit(15).setUserName("userName_1").setClientAddress("client_2").build(); 507 Assert.assertEquals(0, getSlowLogPayloads(request2).size()); 508 509 AdminProtos.SlowLogResponseRequest request3 = AdminProtos.SlowLogResponseRequest.newBuilder() 510 .setLimit(15).setUserName("userName_87").setClientAddress("client_88") 511 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND).build(); 512 Assert.assertEquals(0, getSlowLogPayloads(request3).size()); 513 514 AdminProtos.SlowLogResponseRequest request4 = AdminProtos.SlowLogResponseRequest.newBuilder() 515 .setLimit(15).setUserName("userName_87").setClientAddress("client_87") 516 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND).build(); 517 Assert.assertEquals(1, getSlowLogPayloads(request4).size()); 518 519 AdminProtos.SlowLogResponseRequest request5 = AdminProtos.SlowLogResponseRequest.newBuilder() 520 .setLimit(15).setUserName("userName_88").setClientAddress("client_89") 521 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR).build(); 522 Assert.assertEquals(2, getSlowLogPayloads(request5).size()); 523 524 AdminProtos.SlowLogResponseRequest requestSlowLog = 525 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); 526 Assert.assertNotEquals(-1, 527 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15)); 528 } 529 530 static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) { 531 RpcCall rpcCall = getRpcCall(userName); 532 return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, true, true); 533 } 534 535 private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className, 536 boolean isSlowLog, boolean isLargeLog) { 537 RpcCall rpcCall = getRpcCall(userName); 538 return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, isSlowLog, 539 isLargeLog); 540 } 541 542 private static RpcCall getRpcCall(String userName) { 543 RpcCall rpcCall = new RpcCall() { 544 @Override 545 public BlockingService getService() { 546 return null; 547 } 548 549 @Override 550 public Descriptors.MethodDescriptor getMethod() { 551 return null; 552 } 553 554 @Override 555 public Message getParam() { 556 return getMessage(); 557 } 558 559 @Override 560 public CellScanner getCellScanner() { 561 return null; 562 } 563 564 @Override 565 public long getReceiveTime() { 566 return 0; 567 } 568 569 @Override 570 public long getStartTime() { 571 return 0; 572 } 573 574 @Override 575 public void setStartTime(long startTime) { 576 577 } 578 579 @Override 580 public int getTimeout() { 581 return 0; 582 } 583 584 @Override 585 public int getPriority() { 586 return 0; 587 } 588 589 @Override 590 public long getDeadline() { 591 return 0; 592 } 593 594 @Override 595 public long getSize() { 596 return 0; 597 } 598 599 @Override 600 public RPCProtos.RequestHeader getHeader() { 601 return null; 602 } 603 604 @Override 605 public int getRemotePort() { 606 return 0; 607 } 608 609 @Override 610 public void setResponse(Message param, CellScanner cells, Throwable errorThrowable, 611 String error) { 612 } 613 614 @Override 615 public void sendResponseIfReady() throws IOException { 616 } 617 618 @Override 619 public void cleanup() { 620 } 621 622 @Override 623 public String toShortString() { 624 return null; 625 } 626 627 @Override 628 public long disconnectSince() { 629 return 0; 630 } 631 632 @Override 633 public boolean isClientCellBlockSupported() { 634 return false; 635 } 636 637 @Override 638 public Optional<User> getRequestUser() { 639 return getUser(userName); 640 } 641 642 @Override 643 public InetAddress getRemoteAddress() { 644 return null; 645 } 646 647 @Override 648 public HBaseProtos.VersionInfo getClientVersionInfo() { 649 return null; 650 } 651 652 @Override 653 public void setCallBack(RpcCallback callback) { 654 } 655 656 @Override 657 public boolean isRetryImmediatelySupported() { 658 return false; 659 } 660 661 @Override 662 public long getResponseCellSize() { 663 return 0; 664 } 665 666 @Override 667 public void incrementResponseCellSize(long cellSize) { 668 } 669 670 @Override 671 public long getResponseBlockSize() { 672 return 0; 673 } 674 675 @Override 676 public void incrementResponseBlockSize(long blockSize) { 677 } 678 679 @Override 680 public long getResponseExceptionSize() { 681 return 0; 682 } 683 684 @Override 685 public void incrementResponseExceptionSize(long exceptionSize) { 686 } 687 }; 688 return rpcCall; 689 } 690 691 private static Message getMessage() { 692 693 i = (i + 1) % 3; 694 695 Message message = null; 696 697 switch (i) { 698 699 case 0: { 700 message = ClientProtos.ScanRequest.newBuilder() 701 .setRegion( 702 HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region1")) 703 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME).build()) 704 .build(); 705 break; 706 } 707 case 1: { 708 message = ClientProtos.MutateRequest.newBuilder() 709 .setRegion( 710 HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region2")) 711 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)) 712 .setMutation(ClientProtos.MutationProto.newBuilder() 713 .setRow(ByteString.copyFromUtf8("row123")).build()) 714 .build(); 715 break; 716 } 717 case 2: { 718 message = ClientProtos.GetRequest.newBuilder() 719 .setRegion( 720 HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region2")) 721 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)) 722 .setGet(ClientProtos.Get.newBuilder().setRow(ByteString.copyFromUtf8("row123")).build()) 723 .build(); 724 break; 725 } 726 default: 727 throw new RuntimeException("Not supposed to get here?"); 728 } 729 730 return message; 731 732 } 733 734 private static Optional<User> getUser(String userName) { 735 736 return Optional.of(new User() { 737 @Override 738 public String getShortName() { 739 return userName; 740 } 741 742 @Override 743 public <T> T runAs(PrivilegedAction<T> action) { 744 return null; 745 } 746 747 @Override 748 public <T> T runAs(PrivilegedExceptionAction<T> action) { 749 return null; 750 } 751 }); 752 753 } 754 755}