001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.regionserver.slowlog; 021 022import java.io.IOException; 023import java.net.InetAddress; 024import java.security.PrivilegedAction; 025import java.security.PrivilegedExceptionAction; 026import java.util.List; 027import java.util.Optional; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.TimeUnit; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.CellScanner; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.ipc.RpcCall; 037import org.apache.hadoop.hbase.ipc.RpcCallback; 038import org.apache.hadoop.hbase.security.User; 039import org.apache.hadoop.hbase.testclassification.MasterTests; 040import org.apache.hadoop.hbase.testclassification.MediumTests; 041import org.junit.Assert; 042import org.junit.ClassRule; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; 049import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 050import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 051import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 052import org.apache.hbase.thirdparty.com.google.protobuf.Message; 053 054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; 059 060/** 061 * Tests for Online SlowLog Provider Service 062 */ 063@Category({MasterTests.class, MediumTests.class}) 064public class TestSlowLogRecorder { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestSlowLogRecorder.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class); 071 072 private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility(); 073 074 private SlowLogRecorder slowLogRecorder; 075 076 private static int i = 0; 077 078 private static Configuration applySlowLogRecorderConf(int eventSize) { 079 080 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 081 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); 082 conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize); 083 return conf; 084 085 } 086 087 /** 088 * confirm that for a ringbuffer of slow logs, payload on given index of buffer 089 * has expected elements 090 * 091 * @param i index of ringbuffer logs 092 * @param j data value that was put on index i 093 * @param slowLogPayloads list of payload retrieved from {@link SlowLogRecorder} 094 * @return if actual values are as per expectations 095 */ 096 private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) { 097 098 boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j); 099 boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j); 100 boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j); 101 return isClassExpected && isClientExpected && isUserExpected; 102 } 103 104 @Test 105 public void testOnlieSlowLogConsumption() throws Exception { 106 107 Configuration conf = applySlowLogRecorderConf(8); 108 slowLogRecorder = new SlowLogRecorder(conf); 109 AdminProtos.SlowLogResponseRequest request = 110 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); 111 112 slowLogRecorder.clearSlowLogPayloads(); 113 Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); 114 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 115 116 int i = 0; 117 118 // add 5 records initially 119 for (; i < 5; i++) { 120 RpcLogDetails rpcLogDetails = 121 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 122 slowLogRecorder.addSlowLogPayload(rpcLogDetails); 123 } 124 125 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 126 () -> slowLogRecorder.getSlowLogPayloads(request).size() == 5)); 127 List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); 128 Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads)); 129 Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads)); 130 Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads)); 131 Assert.assertTrue(confirmPayloadParams(3, 2, slowLogPayloads)); 132 Assert.assertTrue(confirmPayloadParams(4, 1, slowLogPayloads)); 133 134 // add 2 more records 135 for (; i < 7; i++) { 136 RpcLogDetails rpcLogDetails = 137 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 138 slowLogRecorder.addSlowLogPayload(rpcLogDetails); 139 } 140 141 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 142 () -> slowLogRecorder.getSlowLogPayloads(request).size() == 7)); 143 144 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 145 () -> { 146 List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request); 147 return slowLogPayloadsList.size() == 7 148 && confirmPayloadParams(0, 7, slowLogPayloadsList) 149 && confirmPayloadParams(5, 2, slowLogPayloadsList) 150 && confirmPayloadParams(6, 1, slowLogPayloadsList); 151 }) 152 ); 153 154 // add 3 more records 155 for (; i < 10; i++) { 156 RpcLogDetails rpcLogDetails = 157 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 158 slowLogRecorder.addSlowLogPayload(rpcLogDetails); 159 } 160 161 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 162 () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8)); 163 164 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 165 () -> { 166 List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request); 167 // confirm ringbuffer is full 168 return slowLogPayloadsList.size() == 8 169 && confirmPayloadParams(7, 3, slowLogPayloadsList) 170 && confirmPayloadParams(0, 10, slowLogPayloadsList) 171 && confirmPayloadParams(1, 9, slowLogPayloadsList); 172 }) 173 ); 174 175 // add 4 more records 176 for (; i < 14; i++) { 177 RpcLogDetails rpcLogDetails = 178 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 179 slowLogRecorder.addSlowLogPayload(rpcLogDetails); 180 } 181 182 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 183 () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8)); 184 185 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 186 () -> { 187 List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request); 188 // confirm ringbuffer is full 189 // and ordered events 190 return slowLogPayloadsList.size() == 8 191 && confirmPayloadParams(0, 14, slowLogPayloadsList) 192 && confirmPayloadParams(1, 13, slowLogPayloadsList) 193 && confirmPayloadParams(2, 12, slowLogPayloadsList) 194 && confirmPayloadParams(3, 11, slowLogPayloadsList); 195 }) 196 ); 197 198 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 199 () -> { 200 List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getLargeLogPayloads(request); 201 // confirm ringbuffer is full 202 // and ordered events 203 return slowLogPayloadsList.size() == 8 204 && confirmPayloadParams(0, 14, slowLogPayloadsList) 205 && confirmPayloadParams(1, 13, slowLogPayloadsList) 206 && confirmPayloadParams(2, 12, slowLogPayloadsList) 207 && confirmPayloadParams(3, 11, slowLogPayloadsList); 208 }) 209 ); 210 211 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 212 () -> { 213 boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads(); 214 215 LOG.debug("cleared the ringbuffer of Online Slow Log records"); 216 217 List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request); 218 // confirm ringbuffer is empty 219 return slowLogPayloadsList.size() == 0 && isRingBufferCleaned; 220 }) 221 ); 222 223 } 224 225 @Test 226 public void testOnlineSlowLogWithHighRecords() throws Exception { 227 228 Configuration conf = applySlowLogRecorderConf(14); 229 slowLogRecorder = new SlowLogRecorder(conf); 230 AdminProtos.SlowLogResponseRequest request = 231 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build(); 232 233 Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); 234 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 235 236 for (int i = 0; i < 14 * 11; i++) { 237 RpcLogDetails rpcLogDetails = 238 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 239 slowLogRecorder.addSlowLogPayload(rpcLogDetails); 240 } 241 LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records"); 242 243 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 244 () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14)); 245 246 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 247 () -> { 248 List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); 249 250 // confirm strict order of slow log payloads 251 return slowLogPayloads.size() == 14 252 && confirmPayloadParams(0, 154, slowLogPayloads) 253 && confirmPayloadParams(1, 153, slowLogPayloads) 254 && confirmPayloadParams(2, 152, slowLogPayloads) 255 && confirmPayloadParams(3, 151, slowLogPayloads) 256 && confirmPayloadParams(4, 150, slowLogPayloads) 257 && confirmPayloadParams(5, 149, slowLogPayloads) 258 && confirmPayloadParams(6, 148, slowLogPayloads) 259 && confirmPayloadParams(7, 147, slowLogPayloads) 260 && confirmPayloadParams(8, 146, slowLogPayloads) 261 && confirmPayloadParams(9, 145, slowLogPayloads) 262 && confirmPayloadParams(10, 144, slowLogPayloads) 263 && confirmPayloadParams(11, 143, slowLogPayloads) 264 && confirmPayloadParams(12, 142, slowLogPayloads) 265 && confirmPayloadParams(13, 141, slowLogPayloads); 266 }) 267 ); 268 269 boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads(); 270 Assert.assertTrue(isRingBufferCleaned); 271 LOG.debug("cleared the ringbuffer of Online Slow Log records"); 272 List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); 273 274 // confirm ringbuffer is empty 275 Assert.assertEquals(slowLogPayloads.size(), 0); 276 277 } 278 279 @Test 280 public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception { 281 282 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 283 conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY); 284 285 slowLogRecorder = new SlowLogRecorder(conf); 286 AdminProtos.SlowLogResponseRequest request = 287 AdminProtos.SlowLogResponseRequest.newBuilder().build(); 288 Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); 289 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 290 291 for (int i = 0; i < 300; i++) { 292 RpcLogDetails rpcLogDetails = 293 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 294 slowLogRecorder.addSlowLogPayload(rpcLogDetails); 295 } 296 297 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 298 () -> { 299 List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); 300 return slowLogPayloads.size() == 0; 301 }) 302 ); 303 304 } 305 306 @Test 307 public void testOnlineSlowLogWithDisableConfig() throws Exception { 308 309 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 310 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false); 311 312 slowLogRecorder = new SlowLogRecorder(conf); 313 AdminProtos.SlowLogResponseRequest request = 314 AdminProtos.SlowLogResponseRequest.newBuilder().build(); 315 Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); 316 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 317 318 for (int i = 0; i < 300; i++) { 319 RpcLogDetails rpcLogDetails = 320 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 321 slowLogRecorder.addSlowLogPayload(rpcLogDetails); 322 } 323 324 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 325 () -> { 326 List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); 327 return slowLogPayloads.size() == 0; 328 }) 329 ); 330 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); 331 332 } 333 334 @Test 335 public void testSlowLogFilters() throws Exception { 336 337 Configuration conf = applySlowLogRecorderConf(30); 338 slowLogRecorder = new SlowLogRecorder(conf); 339 AdminProtos.SlowLogResponseRequest request = 340 AdminProtos.SlowLogResponseRequest.newBuilder() 341 .setLimit(15) 342 .setUserName("userName_87") 343 .build(); 344 345 Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); 346 347 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 348 349 for (int i = 0; i < 100; i++) { 350 RpcLogDetails rpcLogDetails = 351 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 352 slowLogRecorder.addSlowLogPayload(rpcLogDetails); 353 } 354 LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter"); 355 356 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 357 () -> slowLogRecorder.getSlowLogPayloads(request).size() == 1)); 358 359 AdminProtos.SlowLogResponseRequest requestClient = 360 AdminProtos.SlowLogResponseRequest.newBuilder() 361 .setLimit(15) 362 .setClientAddress("client_85") 363 .build(); 364 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 365 () -> slowLogRecorder.getSlowLogPayloads(requestClient).size() == 1)); 366 367 AdminProtos.SlowLogResponseRequest requestSlowLog = 368 AdminProtos.SlowLogResponseRequest.newBuilder() 369 .setLimit(15) 370 .build(); 371 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 372 () -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15)); 373 374 } 375 376 @Test 377 public void testConcurrentSlowLogEvents() throws Exception { 378 379 Configuration conf = applySlowLogRecorderConf(50000); 380 slowLogRecorder = new SlowLogRecorder(conf); 381 AdminProtos.SlowLogResponseRequest request = 382 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build(); 383 Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); 384 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 385 386 for (int j = 0; j < 1000; j++) { 387 388 CompletableFuture.runAsync(() -> { 389 for (int i = 0; i < 3500; i++) { 390 RpcLogDetails rpcLogDetails = 391 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 392 slowLogRecorder.addSlowLogPayload(rpcLogDetails); 393 } 394 }); 395 396 } 397 398 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); 399 400 slowLogRecorder.clearSlowLogPayloads(); 401 402 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor( 403 5000, () -> slowLogRecorder.getSlowLogPayloads(request).size() > 10000)); 404 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor( 405 5000, () -> slowLogRecorder.getLargeLogPayloads(request).size() > 10000)); 406 } 407 408 @Test 409 public void testSlowLargeLogEvents() throws Exception { 410 Configuration conf = applySlowLogRecorderConf(28); 411 slowLogRecorder = new SlowLogRecorder(conf); 412 AdminProtos.SlowLogResponseRequest request = 413 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build(); 414 415 Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); 416 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 417 418 boolean isSlowLog; 419 boolean isLargeLog; 420 for (int i = 0; i < 14 * 11; i++) { 421 if (i % 2 == 0) { 422 isSlowLog = true; 423 isLargeLog = false; 424 } else { 425 isSlowLog = false; 426 isLargeLog = true; 427 } 428 RpcLogDetails rpcLogDetails = 429 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1), 430 isSlowLog, isLargeLog); 431 slowLogRecorder.addSlowLogPayload(rpcLogDetails); 432 } 433 LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records"); 434 435 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 436 () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14)); 437 438 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 439 () -> { 440 List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); 441 442 // confirm strict order of slow log payloads 443 return slowLogPayloads.size() == 14 444 && confirmPayloadParams(0, 153, slowLogPayloads) 445 && confirmPayloadParams(1, 151, slowLogPayloads) 446 && confirmPayloadParams(2, 149, slowLogPayloads) 447 && confirmPayloadParams(3, 147, slowLogPayloads) 448 && confirmPayloadParams(4, 145, slowLogPayloads) 449 && confirmPayloadParams(5, 143, slowLogPayloads) 450 && confirmPayloadParams(6, 141, slowLogPayloads) 451 && confirmPayloadParams(7, 139, slowLogPayloads) 452 && confirmPayloadParams(8, 137, slowLogPayloads) 453 && confirmPayloadParams(9, 135, slowLogPayloads) 454 && confirmPayloadParams(10, 133, slowLogPayloads) 455 && confirmPayloadParams(11, 131, slowLogPayloads) 456 && confirmPayloadParams(12, 129, slowLogPayloads) 457 && confirmPayloadParams(13, 127, slowLogPayloads); 458 }) 459 ); 460 461 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 462 () -> slowLogRecorder.getLargeLogPayloads(request).size() == 14)); 463 464 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 465 () -> { 466 List<SlowLogPayload> largeLogPayloads = slowLogRecorder.getLargeLogPayloads(request); 467 468 // confirm strict order of slow log payloads 469 return largeLogPayloads.size() == 14 470 && confirmPayloadParams(0, 154, largeLogPayloads) 471 && confirmPayloadParams(1, 152, largeLogPayloads) 472 && confirmPayloadParams(2, 150, largeLogPayloads) 473 && confirmPayloadParams(3, 148, largeLogPayloads) 474 && confirmPayloadParams(4, 146, largeLogPayloads) 475 && confirmPayloadParams(5, 144, largeLogPayloads) 476 && confirmPayloadParams(6, 142, largeLogPayloads) 477 && confirmPayloadParams(7, 140, largeLogPayloads) 478 && confirmPayloadParams(8, 138, largeLogPayloads) 479 && confirmPayloadParams(9, 136, largeLogPayloads) 480 && confirmPayloadParams(10, 134, largeLogPayloads) 481 && confirmPayloadParams(11, 132, largeLogPayloads) 482 && confirmPayloadParams(12, 130, largeLogPayloads) 483 && confirmPayloadParams(13, 128, largeLogPayloads); 484 }) 485 ); 486 487 } 488 489 @Test 490 public void testSlowLogMixedFilters() throws Exception { 491 492 Configuration conf = applySlowLogRecorderConf(30); 493 slowLogRecorder = new SlowLogRecorder(conf); 494 AdminProtos.SlowLogResponseRequest request = 495 AdminProtos.SlowLogResponseRequest.newBuilder() 496 .setLimit(15) 497 .setUserName("userName_87") 498 .setClientAddress("client_88") 499 .build(); 500 501 Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); 502 503 for (int i = 0; i < 100; i++) { 504 RpcLogDetails rpcLogDetails = 505 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 506 slowLogRecorder.addSlowLogPayload(rpcLogDetails); 507 } 508 509 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 510 () -> slowLogRecorder.getSlowLogPayloads(request).size() == 2)); 511 512 AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder() 513 .setLimit(15) 514 .setUserName("userName_1") 515 .setClientAddress("client_2") 516 .build(); 517 Assert.assertEquals(0, slowLogRecorder.getSlowLogPayloads(request2).size()); 518 519 AdminProtos.SlowLogResponseRequest request3 = 520 AdminProtos.SlowLogResponseRequest.newBuilder() 521 .setLimit(15) 522 .setUserName("userName_87") 523 .setClientAddress("client_88") 524 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND) 525 .build(); 526 Assert.assertEquals(0, slowLogRecorder.getSlowLogPayloads(request3).size()); 527 528 AdminProtos.SlowLogResponseRequest request4 = 529 AdminProtos.SlowLogResponseRequest.newBuilder() 530 .setLimit(15) 531 .setUserName("userName_87") 532 .setClientAddress("client_87") 533 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND) 534 .build(); 535 Assert.assertEquals(1, slowLogRecorder.getSlowLogPayloads(request4).size()); 536 537 AdminProtos.SlowLogResponseRequest request5 = 538 AdminProtos.SlowLogResponseRequest.newBuilder() 539 .setLimit(15) 540 .setUserName("userName_88") 541 .setClientAddress("client_89") 542 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR) 543 .build(); 544 Assert.assertEquals(2, slowLogRecorder.getSlowLogPayloads(request5).size()); 545 546 AdminProtos.SlowLogResponseRequest requestSlowLog = 547 AdminProtos.SlowLogResponseRequest.newBuilder() 548 .setLimit(15) 549 .build(); 550 Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 551 () -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15)); 552 } 553 554 static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) { 555 RpcCall rpcCall = getRpcCall(userName); 556 return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, true, true); 557 } 558 559 private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, 560 String className, boolean isSlowLog, boolean isLargeLog) { 561 RpcCall rpcCall = getRpcCall(userName); 562 return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, isSlowLog, 563 isLargeLog); 564 } 565 566 private static RpcCall getRpcCall(String userName) { 567 RpcCall rpcCall = new RpcCall() { 568 @Override 569 public BlockingService getService() { 570 return null; 571 } 572 573 @Override 574 public Descriptors.MethodDescriptor getMethod() { 575 return null; 576 } 577 578 @Override 579 public Message getParam() { 580 return getMessage(); 581 } 582 583 @Override 584 public CellScanner getCellScanner() { 585 return null; 586 } 587 588 @Override 589 public long getReceiveTime() { 590 return 0; 591 } 592 593 @Override 594 public long getStartTime() { 595 return 0; 596 } 597 598 @Override 599 public void setStartTime(long startTime) { 600 601 } 602 603 @Override 604 public int getTimeout() { 605 return 0; 606 } 607 608 @Override 609 public int getPriority() { 610 return 0; 611 } 612 613 @Override 614 public long getDeadline() { 615 return 0; 616 } 617 618 @Override 619 public long getSize() { 620 return 0; 621 } 622 623 @Override 624 public RPCProtos.RequestHeader getHeader() { 625 return null; 626 } 627 628 @Override 629 public int getRemotePort() { 630 return 0; 631 } 632 633 @Override 634 public void setResponse(Message param, CellScanner cells, 635 Throwable errorThrowable, String error) { 636 } 637 638 @Override 639 public void sendResponseIfReady() throws IOException { 640 } 641 642 @Override 643 public void cleanup() { 644 } 645 646 @Override 647 public String toShortString() { 648 return null; 649 } 650 651 @Override 652 public long disconnectSince() { 653 return 0; 654 } 655 656 @Override 657 public boolean isClientCellBlockSupported() { 658 return false; 659 } 660 661 @Override 662 public Optional<User> getRequestUser() { 663 return getUser(userName); 664 } 665 666 @Override 667 public InetAddress getRemoteAddress() { 668 return null; 669 } 670 671 @Override 672 public HBaseProtos.VersionInfo getClientVersionInfo() { 673 return null; 674 } 675 676 @Override 677 public void setCallBack(RpcCallback callback) { 678 } 679 680 @Override 681 public boolean isRetryImmediatelySupported() { 682 return false; 683 } 684 685 @Override 686 public long getResponseCellSize() { 687 return 0; 688 } 689 690 @Override 691 public void incrementResponseCellSize(long cellSize) { 692 } 693 694 @Override 695 public long getResponseBlockSize() { 696 return 0; 697 } 698 699 @Override 700 public void incrementResponseBlockSize(long blockSize) { 701 } 702 703 @Override 704 public long getResponseExceptionSize() { 705 return 0; 706 } 707 708 @Override 709 public void incrementResponseExceptionSize(long exceptionSize) { 710 } 711 }; 712 return rpcCall; 713 } 714 715 private static Message getMessage() { 716 717 i = (i + 1) % 3; 718 719 Message message = null; 720 721 switch (i) { 722 723 case 0: { 724 message = ClientProtos.ScanRequest.newBuilder() 725 .setRegion(HBaseProtos.RegionSpecifier.newBuilder() 726 .setValue(ByteString.copyFromUtf8("region1")) 727 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME) 728 .build()) 729 .build(); 730 break; 731 } 732 case 1: { 733 message = ClientProtos.MutateRequest.newBuilder() 734 .setRegion(HBaseProtos.RegionSpecifier.newBuilder() 735 .setValue(ByteString.copyFromUtf8("region2")) 736 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)) 737 .setMutation(ClientProtos.MutationProto.newBuilder() 738 .setRow(ByteString.copyFromUtf8("row123")) 739 .build()) 740 .build(); 741 break; 742 } 743 case 2: { 744 message = ClientProtos.GetRequest.newBuilder() 745 .setRegion(HBaseProtos.RegionSpecifier.newBuilder() 746 .setValue(ByteString.copyFromUtf8("region2")) 747 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)) 748 .setGet(ClientProtos.Get.newBuilder() 749 .setRow(ByteString.copyFromUtf8("row123")) 750 .build()) 751 .build(); 752 break; 753 } 754 default: 755 throw new RuntimeException("Not supposed to get here?"); 756 } 757 758 return message; 759 760 } 761 762 private static Optional<User> getUser(String userName) { 763 764 return Optional.of(new User() { 765 766 767 @Override 768 public String getShortName() { 769 return userName; 770 } 771 772 773 @Override 774 public <T> T runAs(PrivilegedAction<T> action) { 775 return null; 776 } 777 778 779 @Override 780 public <T> T runAs(PrivilegedExceptionAction<T> action) throws 781 IOException, InterruptedException { 782 return null; 783 } 784 785 }); 786 787 } 788 789}