001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.namequeues; 021 022import java.io.IOException; 023import java.lang.reflect.Constructor; 024import java.net.InetAddress; 025import java.security.PrivilegedAction; 026import java.security.PrivilegedExceptionAction; 027import java.util.Collections; 028import java.util.List; 029import java.util.Optional; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.TimeUnit; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.CellScanner; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.ipc.RpcCall; 039import org.apache.hadoop.hbase.ipc.RpcCallback; 040import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 041import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 042import org.apache.hadoop.hbase.security.User; 043import org.apache.hadoop.hbase.testclassification.MasterTests; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.junit.Assert; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; 053import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 054import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 055import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 056import org.apache.hbase.thirdparty.com.google.protobuf.Message; 057 058import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; 063 064/** 065 * Tests for Online SlowLog Provider Service 066 */ 067@Category({MasterTests.class, MediumTests.class}) 068public class TestNamedQueueRecorder { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestNamedQueueRecorder.class); 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class); 075 076 private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility(); 077 078 private NamedQueueRecorder namedQueueRecorder; 079 080 private static int i = 0; 081 082 private static Configuration applySlowLogRecorderConf(int eventSize) { 083 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 084 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); 085 conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize); 086 return conf; 087 } 088 089 /** 090 * confirm that for a ringbuffer of slow logs, payload on given index of buffer 091 * has expected elements 092 * 093 * @param i index of ringbuffer logs 094 * @param j data value that was put on index i 095 * @param slowLogPayloads list of payload retrieved from {@link NamedQueueRecorder} 096 * @return if actual values are as per expectations 097 */ 098 private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) { 099 boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j); 100 boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j); 101 boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j); 102 return isClassExpected && isClientExpected && isUserExpected; 103 } 104 105 @Test 106 public void testOnlieSlowLogConsumption() throws Exception{ 107 108 Configuration conf = applySlowLogRecorderConf(8); 109 Constructor<NamedQueueRecorder> constructor = 110 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 111 constructor.setAccessible(true); 112 namedQueueRecorder = constructor.newInstance(conf); 113 AdminProtos.SlowLogResponseRequest request = 114 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); 115 116 namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); 117 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 118 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 119 120 int i = 0; 121 122 // add 5 records initially 123 for (; i < 5; i++) { 124 RpcLogDetails rpcLogDetails = 125 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 126 namedQueueRecorder.addRecord(rpcLogDetails); 127 } 128 129 Assert.assertNotEquals(-1, 130 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 5)); 131 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 132 Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads)); 133 Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads)); 134 Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads)); 135 Assert.assertTrue(confirmPayloadParams(3, 2, slowLogPayloads)); 136 Assert.assertTrue(confirmPayloadParams(4, 1, slowLogPayloads)); 137 138 // add 2 more records 139 for (; i < 7; i++) { 140 RpcLogDetails rpcLogDetails = 141 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 142 namedQueueRecorder.addRecord(rpcLogDetails); 143 } 144 145 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 146 () -> getSlowLogPayloads(request).size() == 7)); 147 148 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 149 () -> { 150 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 151 return slowLogPayloadsList.size() == 7 152 && confirmPayloadParams(0, 7, slowLogPayloadsList) 153 && confirmPayloadParams(5, 2, slowLogPayloadsList) 154 && confirmPayloadParams(6, 1, slowLogPayloadsList); 155 }) 156 ); 157 158 // add 3 more records 159 for (; i < 10; i++) { 160 RpcLogDetails rpcLogDetails = 161 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 162 namedQueueRecorder.addRecord(rpcLogDetails); 163 } 164 165 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 166 () -> getSlowLogPayloads(request).size() == 8)); 167 168 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 169 () -> { 170 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 171 // confirm ringbuffer is full 172 return slowLogPayloadsList.size() == 8 173 && confirmPayloadParams(7, 3, slowLogPayloadsList) 174 && confirmPayloadParams(0, 10, slowLogPayloadsList) 175 && confirmPayloadParams(1, 9, slowLogPayloadsList); 176 }) 177 ); 178 179 // add 4 more records 180 for (; i < 14; i++) { 181 RpcLogDetails rpcLogDetails = 182 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 183 namedQueueRecorder.addRecord(rpcLogDetails); 184 } 185 186 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 187 () -> getSlowLogPayloads(request).size() == 8)); 188 189 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 190 () -> { 191 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 192 // confirm ringbuffer is full 193 // and ordered events 194 return slowLogPayloadsList.size() == 8 195 && confirmPayloadParams(0, 14, slowLogPayloadsList) 196 && confirmPayloadParams(1, 13, slowLogPayloadsList) 197 && confirmPayloadParams(2, 12, slowLogPayloadsList) 198 && confirmPayloadParams(3, 11, slowLogPayloadsList); 199 }) 200 ); 201 202 AdminProtos.SlowLogResponseRequest largeLogRequest = 203 AdminProtos.SlowLogResponseRequest.newBuilder() 204 .setLimit(15) 205 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG) 206 .build(); 207 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 208 () -> { 209 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(largeLogRequest); 210 // confirm ringbuffer is full 211 // and ordered events 212 return slowLogPayloadsList.size() == 8 213 && confirmPayloadParams(0, 14, slowLogPayloadsList) 214 && confirmPayloadParams(1, 13, slowLogPayloadsList) 215 && confirmPayloadParams(2, 12, slowLogPayloadsList) 216 && confirmPayloadParams(3, 11, slowLogPayloadsList); 217 }) 218 ); 219 220 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 221 () -> { 222 boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue( 223 NamedQueuePayload.NamedQueueEvent.SLOW_LOG); 224 225 LOG.debug("cleared the ringbuffer of Online Slow Log records"); 226 227 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 228 // confirm ringbuffer is empty 229 return slowLogPayloadsList.size() == 0 && isRingBufferCleaned; 230 }) 231 ); 232 233 } 234 235 private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) { 236 NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); 237 namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); 238 namedQueueGetRequest.setSlowLogResponseRequest(request); 239 NamedQueueGetResponse namedQueueGetResponse = 240 namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); 241 return namedQueueGetResponse == null ? 242 Collections.emptyList() : namedQueueGetResponse.getSlowLogPayloads(); 243 } 244 245 @Test 246 public void testOnlineSlowLogWithHighRecords() throws Exception { 247 248 Configuration conf = applySlowLogRecorderConf(14); 249 Constructor<NamedQueueRecorder> constructor = 250 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 251 constructor.setAccessible(true); 252 namedQueueRecorder = constructor.newInstance(conf); 253 AdminProtos.SlowLogResponseRequest request = 254 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build(); 255 256 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 257 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 258 259 for (int i = 0; i < 14 * 11; i++) { 260 RpcLogDetails rpcLogDetails = 261 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 262 namedQueueRecorder.addRecord(rpcLogDetails); 263 } 264 LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records"); 265 266 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 267 () -> getSlowLogPayloads(request).size() == 14)); 268 269 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 270 () -> { 271 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 272 273 // confirm strict order of slow log payloads 274 return slowLogPayloads.size() == 14 275 && confirmPayloadParams(0, 154, slowLogPayloads) 276 && confirmPayloadParams(1, 153, slowLogPayloads) 277 && confirmPayloadParams(2, 152, slowLogPayloads) 278 && confirmPayloadParams(3, 151, slowLogPayloads) 279 && confirmPayloadParams(4, 150, slowLogPayloads) 280 && confirmPayloadParams(5, 149, slowLogPayloads) 281 && confirmPayloadParams(6, 148, slowLogPayloads) 282 && confirmPayloadParams(7, 147, slowLogPayloads) 283 && confirmPayloadParams(8, 146, slowLogPayloads) 284 && confirmPayloadParams(9, 145, slowLogPayloads) 285 && confirmPayloadParams(10, 144, slowLogPayloads) 286 && confirmPayloadParams(11, 143, slowLogPayloads) 287 && confirmPayloadParams(12, 142, slowLogPayloads) 288 && confirmPayloadParams(13, 141, slowLogPayloads); 289 }) 290 ); 291 292 boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue( 293 NamedQueuePayload.NamedQueueEvent.SLOW_LOG); 294 Assert.assertTrue(isRingBufferCleaned); 295 LOG.debug("cleared the ringbuffer of Online Slow Log records"); 296 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 297 298 // confirm ringbuffer is empty 299 Assert.assertEquals(slowLogPayloads.size(), 0); 300 } 301 302 @Test 303 public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception { 304 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 305 conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY); 306 307 Constructor<NamedQueueRecorder> constructor = 308 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 309 constructor.setAccessible(true); 310 namedQueueRecorder = constructor.newInstance(conf); 311 AdminProtos.SlowLogResponseRequest request = 312 AdminProtos.SlowLogResponseRequest.newBuilder().build(); 313 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 314 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 315 for (int i = 0; i < 300; i++) { 316 RpcLogDetails rpcLogDetails = 317 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 318 namedQueueRecorder.addRecord(rpcLogDetails); 319 } 320 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 321 () -> { 322 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 323 return slowLogPayloads.size() == 0; 324 }) 325 ); 326 327 } 328 329 @Test 330 public void testOnlineSlowLogWithDisableConfig() throws Exception { 331 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 332 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false); 333 Constructor<NamedQueueRecorder> constructor = 334 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 335 constructor.setAccessible(true); 336 namedQueueRecorder = constructor.newInstance(conf); 337 338 AdminProtos.SlowLogResponseRequest request = 339 AdminProtos.SlowLogResponseRequest.newBuilder().build(); 340 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 341 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 342 for (int i = 0; i < 300; i++) { 343 RpcLogDetails rpcLogDetails = 344 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 345 namedQueueRecorder.addRecord(rpcLogDetails); 346 } 347 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 348 () -> { 349 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 350 return slowLogPayloads.size() == 0; 351 }) 352 ); 353 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); 354 } 355 356 @Test 357 public void testSlowLogFilters() throws Exception { 358 359 Configuration conf = applySlowLogRecorderConf(30); 360 Constructor<NamedQueueRecorder> constructor = 361 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 362 constructor.setAccessible(true); 363 namedQueueRecorder = constructor.newInstance(conf); 364 AdminProtos.SlowLogResponseRequest request = 365 AdminProtos.SlowLogResponseRequest.newBuilder() 366 .setLimit(15) 367 .setUserName("userName_87") 368 .build(); 369 370 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 371 372 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 373 374 for (int i = 0; i < 100; i++) { 375 RpcLogDetails rpcLogDetails = 376 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 377 namedQueueRecorder.addRecord(rpcLogDetails); 378 } 379 LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter"); 380 381 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 382 () -> getSlowLogPayloads(request).size() == 1)); 383 384 AdminProtos.SlowLogResponseRequest requestClient = 385 AdminProtos.SlowLogResponseRequest.newBuilder() 386 .setLimit(15) 387 .setClientAddress("client_85") 388 .build(); 389 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 390 () -> getSlowLogPayloads(requestClient).size() == 1)); 391 392 AdminProtos.SlowLogResponseRequest requestSlowLog = 393 AdminProtos.SlowLogResponseRequest.newBuilder() 394 .setLimit(15) 395 .build(); 396 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 397 () -> getSlowLogPayloads(requestSlowLog).size() == 15)); 398 } 399 400 @Test 401 public void testConcurrentSlowLogEvents() throws Exception { 402 403 Configuration conf = applySlowLogRecorderConf(50000); 404 Constructor<NamedQueueRecorder> constructor = 405 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 406 constructor.setAccessible(true); 407 namedQueueRecorder = constructor.newInstance(conf); 408 AdminProtos.SlowLogResponseRequest request = 409 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build(); 410 AdminProtos.SlowLogResponseRequest largeLogRequest = 411 AdminProtos.SlowLogResponseRequest.newBuilder() 412 .setLimit(500000) 413 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG) 414 .build(); 415 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 416 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 417 418 for (int j = 0; j < 1000; j++) { 419 420 CompletableFuture.runAsync(() -> { 421 for (int i = 0; i < 3500; i++) { 422 RpcLogDetails rpcLogDetails = 423 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 424 namedQueueRecorder.addRecord(rpcLogDetails); 425 } 426 }); 427 428 } 429 430 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); 431 432 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor( 433 5000, () -> getSlowLogPayloads(request).size() > 10000)); 434 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor( 435 5000, () -> getSlowLogPayloads(largeLogRequest).size() > 10000)); 436 } 437 438 @Test 439 public void testSlowLargeLogEvents() throws Exception { 440 Configuration conf = applySlowLogRecorderConf(28); 441 Constructor<NamedQueueRecorder> constructor = 442 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 443 constructor.setAccessible(true); 444 namedQueueRecorder = constructor.newInstance(conf); 445 446 AdminProtos.SlowLogResponseRequest request = 447 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build(); 448 449 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 450 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 451 452 boolean isSlowLog; 453 boolean isLargeLog; 454 for (int i = 0; i < 14 * 11; i++) { 455 if (i % 2 == 0) { 456 isSlowLog = true; 457 isLargeLog = false; 458 } else { 459 isSlowLog = false; 460 isLargeLog = true; 461 } 462 RpcLogDetails rpcLogDetails = 463 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1), 464 isSlowLog, isLargeLog); 465 namedQueueRecorder.addRecord(rpcLogDetails); 466 } 467 LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records"); 468 469 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 470 () -> getSlowLogPayloads(request).size() == 14)); 471 472 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 473 () -> { 474 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 475 476 // confirm strict order of slow log payloads 477 return slowLogPayloads.size() == 14 478 && confirmPayloadParams(0, 153, slowLogPayloads) 479 && confirmPayloadParams(1, 151, slowLogPayloads) 480 && confirmPayloadParams(2, 149, slowLogPayloads) 481 && confirmPayloadParams(3, 147, slowLogPayloads) 482 && confirmPayloadParams(4, 145, slowLogPayloads) 483 && confirmPayloadParams(5, 143, slowLogPayloads) 484 && confirmPayloadParams(6, 141, slowLogPayloads) 485 && confirmPayloadParams(7, 139, slowLogPayloads) 486 && confirmPayloadParams(8, 137, slowLogPayloads) 487 && confirmPayloadParams(9, 135, slowLogPayloads) 488 && confirmPayloadParams(10, 133, slowLogPayloads) 489 && confirmPayloadParams(11, 131, slowLogPayloads) 490 && confirmPayloadParams(12, 129, slowLogPayloads) 491 && confirmPayloadParams(13, 127, slowLogPayloads); 492 }) 493 ); 494 495 AdminProtos.SlowLogResponseRequest largeLogRequest = 496 AdminProtos.SlowLogResponseRequest.newBuilder() 497 .setLimit(14 * 11) 498 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG) 499 .build(); 500 501 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 502 () -> getSlowLogPayloads(largeLogRequest).size() == 14)); 503 504 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 505 () -> { 506 List<SlowLogPayload> largeLogPayloads = getSlowLogPayloads(largeLogRequest); 507 508 // confirm strict order of slow log payloads 509 return largeLogPayloads.size() == 14 510 && confirmPayloadParams(0, 154, largeLogPayloads) 511 && confirmPayloadParams(1, 152, largeLogPayloads) 512 && confirmPayloadParams(2, 150, largeLogPayloads) 513 && confirmPayloadParams(3, 148, largeLogPayloads) 514 && confirmPayloadParams(4, 146, largeLogPayloads) 515 && confirmPayloadParams(5, 144, largeLogPayloads) 516 && confirmPayloadParams(6, 142, largeLogPayloads) 517 && confirmPayloadParams(7, 140, largeLogPayloads) 518 && confirmPayloadParams(8, 138, largeLogPayloads) 519 && confirmPayloadParams(9, 136, largeLogPayloads) 520 && confirmPayloadParams(10, 134, largeLogPayloads) 521 && confirmPayloadParams(11, 132, largeLogPayloads) 522 && confirmPayloadParams(12, 130, largeLogPayloads) 523 && confirmPayloadParams(13, 128, largeLogPayloads); 524 }) 525 ); 526 } 527 528 @Test 529 public void testSlowLogMixedFilters() throws Exception { 530 531 Configuration conf = applySlowLogRecorderConf(30); 532 Constructor<NamedQueueRecorder> constructor = 533 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 534 constructor.setAccessible(true); 535 namedQueueRecorder = constructor.newInstance(conf); 536 AdminProtos.SlowLogResponseRequest request = 537 AdminProtos.SlowLogResponseRequest.newBuilder() 538 .setLimit(15) 539 .setUserName("userName_87") 540 .setClientAddress("client_88") 541 .build(); 542 543 Assert.assertEquals(getSlowLogPayloads(request).size(), 0); 544 545 for (int i = 0; i < 100; i++) { 546 RpcLogDetails rpcLogDetails = 547 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 548 namedQueueRecorder.addRecord(rpcLogDetails); 549 } 550 551 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 552 () -> getSlowLogPayloads(request).size() == 2)); 553 554 AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder() 555 .setLimit(15) 556 .setUserName("userName_1") 557 .setClientAddress("client_2") 558 .build(); 559 Assert.assertEquals(0, getSlowLogPayloads(request2).size()); 560 561 AdminProtos.SlowLogResponseRequest request3 = 562 AdminProtos.SlowLogResponseRequest.newBuilder() 563 .setLimit(15) 564 .setUserName("userName_87") 565 .setClientAddress("client_88") 566 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND) 567 .build(); 568 Assert.assertEquals(0, getSlowLogPayloads(request3).size()); 569 570 AdminProtos.SlowLogResponseRequest request4 = 571 AdminProtos.SlowLogResponseRequest.newBuilder() 572 .setLimit(15) 573 .setUserName("userName_87") 574 .setClientAddress("client_87") 575 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND) 576 .build(); 577 Assert.assertEquals(1, getSlowLogPayloads(request4).size()); 578 579 AdminProtos.SlowLogResponseRequest request5 = 580 AdminProtos.SlowLogResponseRequest.newBuilder() 581 .setLimit(15) 582 .setUserName("userName_88") 583 .setClientAddress("client_89") 584 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR) 585 .build(); 586 Assert.assertEquals(2, getSlowLogPayloads(request5).size()); 587 588 AdminProtos.SlowLogResponseRequest requestSlowLog = 589 AdminProtos.SlowLogResponseRequest.newBuilder() 590 .setLimit(15) 591 .build(); 592 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 593 () -> getSlowLogPayloads(requestSlowLog).size() == 15)); 594 } 595 596 static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) { 597 RpcCall rpcCall = getRpcCall(userName); 598 return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, true, true); 599 } 600 601 private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, 602 String className, boolean isSlowLog, boolean isLargeLog) { 603 RpcCall rpcCall = getRpcCall(userName); 604 return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, isSlowLog, 605 isLargeLog); 606 } 607 608 private static RpcCall getRpcCall(String userName) { 609 RpcCall rpcCall = new RpcCall() { 610 @Override 611 public BlockingService getService() { 612 return null; 613 } 614 615 @Override 616 public Descriptors.MethodDescriptor getMethod() { 617 return null; 618 } 619 620 @Override 621 public Message getParam() { 622 return getMessage(); 623 } 624 625 @Override 626 public CellScanner getCellScanner() { 627 return null; 628 } 629 630 @Override 631 public long getReceiveTime() { 632 return 0; 633 } 634 635 @Override 636 public long getStartTime() { 637 return 0; 638 } 639 640 @Override 641 public void setStartTime(long startTime) { 642 643 } 644 645 @Override 646 public int getTimeout() { 647 return 0; 648 } 649 650 @Override 651 public int getPriority() { 652 return 0; 653 } 654 655 @Override 656 public long getDeadline() { 657 return 0; 658 } 659 660 @Override 661 public long getSize() { 662 return 0; 663 } 664 665 @Override 666 public RPCProtos.RequestHeader getHeader() { 667 return null; 668 } 669 670 @Override 671 public int getRemotePort() { 672 return 0; 673 } 674 675 @Override 676 public void setResponse(Message param, CellScanner cells, 677 Throwable errorThrowable, String error) { 678 } 679 680 @Override 681 public void sendResponseIfReady() throws IOException { 682 } 683 684 @Override 685 public void cleanup() { 686 } 687 688 @Override 689 public String toShortString() { 690 return null; 691 } 692 693 @Override 694 public long disconnectSince() { 695 return 0; 696 } 697 698 @Override 699 public boolean isClientCellBlockSupported() { 700 return false; 701 } 702 703 @Override 704 public Optional<User> getRequestUser() { 705 return getUser(userName); 706 } 707 708 @Override 709 public InetAddress getRemoteAddress() { 710 return null; 711 } 712 713 @Override 714 public HBaseProtos.VersionInfo getClientVersionInfo() { 715 return null; 716 } 717 718 @Override 719 public void setCallBack(RpcCallback callback) { 720 } 721 722 @Override 723 public boolean isRetryImmediatelySupported() { 724 return false; 725 } 726 727 @Override 728 public long getResponseCellSize() { 729 return 0; 730 } 731 732 @Override 733 public void incrementResponseCellSize(long cellSize) { 734 } 735 736 @Override 737 public long getResponseBlockSize() { 738 return 0; 739 } 740 741 @Override 742 public void incrementResponseBlockSize(long blockSize) { 743 } 744 745 @Override 746 public long getResponseExceptionSize() { 747 return 0; 748 } 749 750 @Override 751 public void incrementResponseExceptionSize(long exceptionSize) { 752 } 753 }; 754 return rpcCall; 755 } 756 757 private static Message getMessage() { 758 759 i = (i + 1) % 3; 760 761 Message message = null; 762 763 switch (i) { 764 765 case 0: { 766 message = ClientProtos.ScanRequest.newBuilder() 767 .setRegion(HBaseProtos.RegionSpecifier.newBuilder() 768 .setValue(ByteString.copyFromUtf8("region1")) 769 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME) 770 .build()) 771 .build(); 772 break; 773 } 774 case 1: { 775 message = ClientProtos.MutateRequest.newBuilder() 776 .setRegion(HBaseProtos.RegionSpecifier.newBuilder() 777 .setValue(ByteString.copyFromUtf8("region2")) 778 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)) 779 .setMutation(ClientProtos.MutationProto.newBuilder() 780 .setRow(ByteString.copyFromUtf8("row123")) 781 .build()) 782 .build(); 783 break; 784 } 785 case 2: { 786 message = ClientProtos.GetRequest.newBuilder() 787 .setRegion(HBaseProtos.RegionSpecifier.newBuilder() 788 .setValue(ByteString.copyFromUtf8("region2")) 789 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)) 790 .setGet(ClientProtos.Get.newBuilder() 791 .setRow(ByteString.copyFromUtf8("row123")) 792 .build()) 793 .build(); 794 break; 795 } 796 default: 797 throw new RuntimeException("Not supposed to get here?"); 798 } 799 800 return message; 801 802 } 803 804 private static Optional<User> getUser(String userName) { 805 806 return Optional.of(new User() { 807 @Override 808 public String getShortName() { 809 return userName; 810 } 811 812 @Override 813 public <T> T runAs(PrivilegedAction<T> action) { 814 return null; 815 } 816 817 @Override 818 public <T> T runAs(PrivilegedExceptionAction<T> action) { 819 return null; 820 } 821 }); 822 823 } 824 825}