001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotEquals; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.lang.reflect.Constructor; 026import java.net.InetAddress; 027import java.security.PrivilegedAction; 028import java.security.PrivilegedExceptionAction; 029import java.security.cert.X509Certificate; 030import java.util.Collections; 031import java.util.List; 032import java.util.Map; 033import java.util.Optional; 034import java.util.concurrent.CompletableFuture; 035import java.util.concurrent.TimeUnit; 036import java.util.stream.Collectors; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.ExtendedCellScanner; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.ipc.RpcCall; 042import org.apache.hadoop.hbase.ipc.RpcCallback; 043import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 044import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 045import org.apache.hadoop.hbase.security.User; 046import org.apache.hadoop.hbase.testclassification.MasterTests; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.junit.jupiter.api.Tag; 049import org.junit.jupiter.api.Test; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; 055import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 056import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 057import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 058import org.apache.hbase.thirdparty.com.google.protobuf.Message; 059 060import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; 065 066/** 067 * Tests for Online SlowLog Provider Service 068 */ 069@Tag(MediumTests.TAG) 070@Tag(MasterTests.TAG) 071public class TestNamedQueueRecorder { 072 073 private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class); 074 075 private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil(); 076 private static final List<HBaseProtos.NameBytesPair> REQUEST_HEADERS = 077 ImmutableList.<HBaseProtos.NameBytesPair> builder() 078 .add(HBaseProtos.NameBytesPair.newBuilder().setName("1") 079 .setValue(ByteString.copyFromUtf8("r")).build()) 080 .add(HBaseProtos.NameBytesPair.newBuilder().setName("2") 081 .setValue(ByteString.copyFromUtf8("h")).build()) 082 .build(); 083 private static final List<HBaseProtos.NameBytesPair> CONNECTION_HEADERS = 084 ImmutableList.<HBaseProtos.NameBytesPair> builder() 085 .add(HBaseProtos.NameBytesPair.newBuilder().setName("1") 086 .setValue(ByteString.copyFromUtf8("c")).build()) 087 .add(HBaseProtos.NameBytesPair.newBuilder().setName("2") 088 .setValue(ByteString.copyFromUtf8("h")).build()) 089 .build(); 090 091 private NamedQueueRecorder namedQueueRecorder; 092 093 private static int i = 0; 094 095 private static Configuration applySlowLogRecorderConf(int eventSize) { 096 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 097 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); 098 conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize); 099 return conf; 100 } 101 102 /** 103 * confirm that for a ringbuffer of slow logs, payload on given index of buffer has expected 104 * elements 105 * @param i index of ringbuffer logs 106 * @param j data value that was put on index i 107 * @param slowLogPayloads list of payload retrieved from {@link NamedQueueRecorder} 108 * @return if actual values are as per expectations 109 */ 110 private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) { 111 boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j); 112 boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j); 113 boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j); 114 return isClassExpected && isClientExpected && isUserExpected; 115 } 116 117 @Test 118 public void testOnlieSlowLogConsumption() throws Exception { 119 120 Configuration conf = applySlowLogRecorderConf(8); 121 Constructor<NamedQueueRecorder> constructor = 122 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 123 constructor.setAccessible(true); 124 namedQueueRecorder = constructor.newInstance(conf); 125 AdminProtos.SlowLogResponseRequest request = 126 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); 127 128 namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); 129 assertEquals(getSlowLogPayloads(request).size(), 0); 130 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 131 132 int i = 0; 133 134 // add 5 records initially 135 for (; i < 5; i++) { 136 RpcLogDetails rpcLogDetails = 137 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 138 namedQueueRecorder.addRecord(rpcLogDetails); 139 } 140 141 assertNotEquals(-1, 142 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 5)); 143 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 144 assertTrue(confirmPayloadParams(0, 5, slowLogPayloads)); 145 assertTrue(confirmPayloadParams(1, 4, slowLogPayloads)); 146 assertTrue(confirmPayloadParams(2, 3, slowLogPayloads)); 147 assertTrue(confirmPayloadParams(3, 2, slowLogPayloads)); 148 assertTrue(confirmPayloadParams(4, 1, slowLogPayloads)); 149 150 // add 2 more records 151 for (; i < 7; i++) { 152 RpcLogDetails rpcLogDetails = 153 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 154 namedQueueRecorder.addRecord(rpcLogDetails); 155 } 156 157 assertNotEquals(-1, 158 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 7)); 159 160 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 161 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 162 return slowLogPayloadsList.size() == 7 && confirmPayloadParams(0, 7, slowLogPayloadsList) 163 && confirmPayloadParams(5, 2, slowLogPayloadsList) 164 && confirmPayloadParams(6, 1, slowLogPayloadsList); 165 })); 166 167 // add 3 more records 168 for (; i < 10; i++) { 169 RpcLogDetails rpcLogDetails = 170 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 171 namedQueueRecorder.addRecord(rpcLogDetails); 172 } 173 174 assertNotEquals(-1, 175 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 8)); 176 177 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 178 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 179 // confirm ringbuffer is full 180 return slowLogPayloadsList.size() == 8 && confirmPayloadParams(7, 3, slowLogPayloadsList) 181 && confirmPayloadParams(0, 10, slowLogPayloadsList) 182 && confirmPayloadParams(1, 9, slowLogPayloadsList); 183 })); 184 185 // add 4 more records 186 for (; i < 14; i++) { 187 RpcLogDetails rpcLogDetails = 188 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 189 namedQueueRecorder.addRecord(rpcLogDetails); 190 } 191 192 assertNotEquals(-1, 193 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 8)); 194 195 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 196 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 197 // confirm ringbuffer is full 198 // and ordered events 199 return slowLogPayloadsList.size() == 8 && confirmPayloadParams(0, 14, slowLogPayloadsList) 200 && confirmPayloadParams(1, 13, slowLogPayloadsList) 201 && confirmPayloadParams(2, 12, slowLogPayloadsList) 202 && confirmPayloadParams(3, 11, slowLogPayloadsList); 203 })); 204 205 AdminProtos.SlowLogResponseRequest largeLogRequest = 206 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15) 207 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build(); 208 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 209 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(largeLogRequest); 210 // confirm ringbuffer is full 211 // and ordered events 212 return slowLogPayloadsList.size() == 8 && confirmPayloadParams(0, 14, slowLogPayloadsList) 213 && confirmPayloadParams(1, 13, slowLogPayloadsList) 214 && confirmPayloadParams(2, 12, slowLogPayloadsList) 215 && confirmPayloadParams(3, 11, slowLogPayloadsList); 216 })); 217 218 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 219 boolean isRingBufferCleaned = 220 namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); 221 222 LOG.debug("cleared the ringbuffer of Online Slow Log records"); 223 224 List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request); 225 // confirm ringbuffer is empty 226 return slowLogPayloadsList.size() == 0 && isRingBufferCleaned; 227 })); 228 229 } 230 231 private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) { 232 NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); 233 namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); 234 namedQueueGetRequest.setSlowLogResponseRequest(request); 235 NamedQueueGetResponse namedQueueGetResponse = 236 namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); 237 return namedQueueGetResponse == null 238 ? Collections.emptyList() 239 : namedQueueGetResponse.getSlowLogPayloads(); 240 } 241 242 @Test 243 public void testOnlineSlowLogWithHighRecords() throws Exception { 244 245 Configuration conf = applySlowLogRecorderConf(14); 246 Constructor<NamedQueueRecorder> constructor = 247 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 248 constructor.setAccessible(true); 249 namedQueueRecorder = constructor.newInstance(conf); 250 AdminProtos.SlowLogResponseRequest request = 251 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build(); 252 253 assertEquals(getSlowLogPayloads(request).size(), 0); 254 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 255 256 for (int i = 0; i < 14 * 11; i++) { 257 RpcLogDetails rpcLogDetails = 258 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 259 namedQueueRecorder.addRecord(rpcLogDetails); 260 } 261 LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records"); 262 263 assertNotEquals(-1, 264 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14)); 265 266 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 267 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 268 269 // confirm strict order of slow log payloads 270 return slowLogPayloads.size() == 14 && confirmPayloadParams(0, 154, slowLogPayloads) 271 && confirmPayloadParams(1, 153, slowLogPayloads) 272 && confirmPayloadParams(2, 152, slowLogPayloads) 273 && confirmPayloadParams(3, 151, slowLogPayloads) 274 && confirmPayloadParams(4, 150, slowLogPayloads) 275 && confirmPayloadParams(5, 149, slowLogPayloads) 276 && confirmPayloadParams(6, 148, slowLogPayloads) 277 && confirmPayloadParams(7, 147, slowLogPayloads) 278 && confirmPayloadParams(8, 146, slowLogPayloads) 279 && confirmPayloadParams(9, 145, slowLogPayloads) 280 && confirmPayloadParams(10, 144, slowLogPayloads) 281 && confirmPayloadParams(11, 143, slowLogPayloads) 282 && confirmPayloadParams(12, 142, slowLogPayloads) 283 && confirmPayloadParams(13, 141, slowLogPayloads); 284 })); 285 286 boolean isRingBufferCleaned = 287 namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); 288 assertTrue(isRingBufferCleaned); 289 LOG.debug("cleared the ringbuffer of Online Slow Log records"); 290 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 291 292 // confirm ringbuffer is empty 293 assertEquals(slowLogPayloads.size(), 0); 294 } 295 296 @Test 297 public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception { 298 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 299 conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY); 300 301 Constructor<NamedQueueRecorder> constructor = 302 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 303 constructor.setAccessible(true); 304 namedQueueRecorder = constructor.newInstance(conf); 305 AdminProtos.SlowLogResponseRequest request = 306 AdminProtos.SlowLogResponseRequest.newBuilder().build(); 307 assertEquals(getSlowLogPayloads(request).size(), 0); 308 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 309 for (int i = 0; i < 300; i++) { 310 RpcLogDetails rpcLogDetails = 311 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 312 namedQueueRecorder.addRecord(rpcLogDetails); 313 } 314 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 315 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 316 return slowLogPayloads.size() == 0; 317 })); 318 319 } 320 321 @Test 322 public void testOnlineSlowLogWithDisableConfig() throws Exception { 323 Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); 324 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false); 325 Constructor<NamedQueueRecorder> constructor = 326 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 327 constructor.setAccessible(true); 328 namedQueueRecorder = constructor.newInstance(conf); 329 330 AdminProtos.SlowLogResponseRequest request = 331 AdminProtos.SlowLogResponseRequest.newBuilder().build(); 332 assertEquals(getSlowLogPayloads(request).size(), 0); 333 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 334 for (int i = 0; i < 300; i++) { 335 RpcLogDetails rpcLogDetails = 336 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 337 namedQueueRecorder.addRecord(rpcLogDetails); 338 } 339 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 340 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 341 return slowLogPayloads.size() == 0; 342 })); 343 conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); 344 } 345 346 @Test 347 public void testSlowLogFilters() throws Exception { 348 349 Configuration conf = applySlowLogRecorderConf(30); 350 Constructor<NamedQueueRecorder> constructor = 351 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 352 constructor.setAccessible(true); 353 namedQueueRecorder = constructor.newInstance(conf); 354 AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder() 355 .setLimit(15).setUserName("userName_87").build(); 356 357 assertEquals(getSlowLogPayloads(request).size(), 0); 358 359 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 360 361 for (int i = 0; i < 100; i++) { 362 RpcLogDetails rpcLogDetails = 363 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 364 namedQueueRecorder.addRecord(rpcLogDetails); 365 } 366 LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter"); 367 368 assertNotEquals(-1, 369 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 1)); 370 371 AdminProtos.SlowLogResponseRequest requestClient = AdminProtos.SlowLogResponseRequest 372 .newBuilder().setLimit(15).setClientAddress("client_85").build(); 373 assertNotEquals(-1, 374 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestClient).size() == 1)); 375 376 AdminProtos.SlowLogResponseRequest requestSlowLog = 377 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); 378 assertNotEquals(-1, 379 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15)); 380 } 381 382 @Test 383 public void testSlowLogFilterWithClientAddress() throws Exception { 384 Configuration conf = applySlowLogRecorderConf(10); 385 Constructor<NamedQueueRecorder> constructor = 386 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 387 constructor.setAccessible(true); 388 namedQueueRecorder = constructor.newInstance(conf); 389 AdminProtos.SlowLogResponseRequest request = 390 AdminProtos.SlowLogResponseRequest.newBuilder().build(); 391 assertEquals(getSlowLogPayloads(request).size(), 0); 392 393 String[] clientAddressArray = new String[] { "[127:1:1:1:1:1:1:1]:1", "[127:1:1:1:1:1:1:1]:2", 394 "[127:1:1:1:1:1:1:1]:3", "127.0.0.1:1", "127.0.0.1:2" }; 395 boolean isSlowLog; 396 boolean isLargeLog; 397 for (int i = 0; i < 10; i++) { 398 if (i % 2 == 0) { 399 isSlowLog = true; 400 isLargeLog = false; 401 } else { 402 isSlowLog = false; 403 isLargeLog = true; 404 } 405 RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), 406 clientAddressArray[i % 5], "class_" + (i + 1), isSlowLog, isLargeLog); 407 namedQueueRecorder.addRecord(rpcLogDetails); 408 } 409 410 AdminProtos.SlowLogResponseRequest largeLogRequestIPv6WithPort = 411 AdminProtos.SlowLogResponseRequest.newBuilder() 412 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG) 413 .setClientAddress("[127:1:1:1:1:1:1:1]:2").build(); 414 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 415 () -> getSlowLogPayloads(largeLogRequestIPv6WithPort).size() == 1)); 416 AdminProtos.SlowLogResponseRequest largeLogRequestIPv6WithoutPort = 417 AdminProtos.SlowLogResponseRequest.newBuilder() 418 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG) 419 .setClientAddress("[127:1:1:1:1:1:1:1]").build(); 420 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 421 () -> getSlowLogPayloads(largeLogRequestIPv6WithoutPort).size() == 3)); 422 AdminProtos.SlowLogResponseRequest largeLogRequestIPv4WithPort = 423 AdminProtos.SlowLogResponseRequest.newBuilder() 424 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG) 425 .setClientAddress("127.0.0.1:1").build(); 426 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 427 () -> getSlowLogPayloads(largeLogRequestIPv4WithPort).size() == 1)); 428 AdminProtos.SlowLogResponseRequest largeLogRequestIPv4WithoutPort = 429 AdminProtos.SlowLogResponseRequest.newBuilder() 430 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG) 431 .setClientAddress("127.0.0.1").build(); 432 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, 433 () -> getSlowLogPayloads(largeLogRequestIPv4WithoutPort).size() == 2)); 434 } 435 436 @Test 437 public void testConcurrentSlowLogEvents() throws Exception { 438 439 Configuration conf = applySlowLogRecorderConf(50000); 440 Constructor<NamedQueueRecorder> constructor = 441 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 442 constructor.setAccessible(true); 443 namedQueueRecorder = constructor.newInstance(conf); 444 AdminProtos.SlowLogResponseRequest request = 445 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build(); 446 AdminProtos.SlowLogResponseRequest largeLogRequest = 447 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000) 448 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build(); 449 assertEquals(getSlowLogPayloads(request).size(), 0); 450 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 451 452 for (int j = 0; j < 1000; j++) { 453 454 CompletableFuture.runAsync(() -> { 455 for (int i = 0; i < 3500; i++) { 456 RpcLogDetails rpcLogDetails = 457 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 458 namedQueueRecorder.addRecord(rpcLogDetails); 459 } 460 }); 461 462 } 463 464 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); 465 466 assertNotEquals(-1, 467 HBASE_TESTING_UTILITY.waitFor(5000, () -> getSlowLogPayloads(request).size() > 10000)); 468 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(5000, 469 () -> getSlowLogPayloads(largeLogRequest).size() > 10000)); 470 } 471 472 @Test 473 public void testSlowLargeLogEvents() throws Exception { 474 Configuration conf = applySlowLogRecorderConf(28); 475 Constructor<NamedQueueRecorder> constructor = 476 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 477 constructor.setAccessible(true); 478 namedQueueRecorder = constructor.newInstance(conf); 479 480 AdminProtos.SlowLogResponseRequest request = 481 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build(); 482 483 assertEquals(getSlowLogPayloads(request).size(), 0); 484 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 485 486 boolean isSlowLog; 487 boolean isLargeLog; 488 for (int i = 0; i < 14 * 11; i++) { 489 if (i % 2 == 0) { 490 isSlowLog = true; 491 isLargeLog = false; 492 } else { 493 isSlowLog = false; 494 isLargeLog = true; 495 } 496 RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), 497 "class_" + (i + 1), isSlowLog, isLargeLog); 498 namedQueueRecorder.addRecord(rpcLogDetails); 499 } 500 LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records"); 501 502 assertNotEquals(-1, 503 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14)); 504 505 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 506 List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request); 507 508 // confirm strict order of slow log payloads 509 return slowLogPayloads.size() == 14 && confirmPayloadParams(0, 153, slowLogPayloads) 510 && confirmPayloadParams(1, 151, slowLogPayloads) 511 && confirmPayloadParams(2, 149, slowLogPayloads) 512 && confirmPayloadParams(3, 147, slowLogPayloads) 513 && confirmPayloadParams(4, 145, slowLogPayloads) 514 && confirmPayloadParams(5, 143, slowLogPayloads) 515 && confirmPayloadParams(6, 141, slowLogPayloads) 516 && confirmPayloadParams(7, 139, slowLogPayloads) 517 && confirmPayloadParams(8, 137, slowLogPayloads) 518 && confirmPayloadParams(9, 135, slowLogPayloads) 519 && confirmPayloadParams(10, 133, slowLogPayloads) 520 && confirmPayloadParams(11, 131, slowLogPayloads) 521 && confirmPayloadParams(12, 129, slowLogPayloads) 522 && confirmPayloadParams(13, 127, slowLogPayloads); 523 })); 524 525 AdminProtos.SlowLogResponseRequest largeLogRequest = 526 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11) 527 .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build(); 528 529 assertNotEquals(-1, 530 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(largeLogRequest).size() == 14)); 531 532 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 533 List<SlowLogPayload> largeLogPayloads = getSlowLogPayloads(largeLogRequest); 534 535 // confirm strict order of slow log payloads 536 return largeLogPayloads.size() == 14 && confirmPayloadParams(0, 154, largeLogPayloads) 537 && confirmPayloadParams(1, 152, largeLogPayloads) 538 && confirmPayloadParams(2, 150, largeLogPayloads) 539 && confirmPayloadParams(3, 148, largeLogPayloads) 540 && confirmPayloadParams(4, 146, largeLogPayloads) 541 && confirmPayloadParams(5, 144, largeLogPayloads) 542 && confirmPayloadParams(6, 142, largeLogPayloads) 543 && confirmPayloadParams(7, 140, largeLogPayloads) 544 && confirmPayloadParams(8, 138, largeLogPayloads) 545 && confirmPayloadParams(9, 136, largeLogPayloads) 546 && confirmPayloadParams(10, 134, largeLogPayloads) 547 && confirmPayloadParams(11, 132, largeLogPayloads) 548 && confirmPayloadParams(12, 130, largeLogPayloads) 549 && confirmPayloadParams(13, 128, largeLogPayloads); 550 })); 551 } 552 553 @Test 554 public void testSlowLogMixedFilters() throws Exception { 555 556 Configuration conf = applySlowLogRecorderConf(30); 557 Constructor<NamedQueueRecorder> constructor = 558 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 559 constructor.setAccessible(true); 560 namedQueueRecorder = constructor.newInstance(conf); 561 AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder() 562 .setLimit(15).setUserName("userName_87").setClientAddress("client_88").build(); 563 564 assertEquals(getSlowLogPayloads(request).size(), 0); 565 566 for (int i = 0; i < 100; i++) { 567 RpcLogDetails rpcLogDetails = 568 getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); 569 namedQueueRecorder.addRecord(rpcLogDetails); 570 } 571 572 assertNotEquals(-1, 573 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 2)); 574 575 AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder() 576 .setLimit(15).setUserName("userName_1").setClientAddress("client_2").build(); 577 assertEquals(0, getSlowLogPayloads(request2).size()); 578 579 AdminProtos.SlowLogResponseRequest request3 = AdminProtos.SlowLogResponseRequest.newBuilder() 580 .setLimit(15).setUserName("userName_87").setClientAddress("client_88") 581 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND).build(); 582 assertEquals(0, getSlowLogPayloads(request3).size()); 583 584 AdminProtos.SlowLogResponseRequest request4 = AdminProtos.SlowLogResponseRequest.newBuilder() 585 .setLimit(15).setUserName("userName_87").setClientAddress("client_87") 586 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND).build(); 587 assertEquals(1, getSlowLogPayloads(request4).size()); 588 589 AdminProtos.SlowLogResponseRequest request5 = AdminProtos.SlowLogResponseRequest.newBuilder() 590 .setLimit(15).setUserName("userName_88").setClientAddress("client_89") 591 .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR).build(); 592 assertEquals(2, getSlowLogPayloads(request5).size()); 593 594 AdminProtos.SlowLogResponseRequest requestSlowLog = 595 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); 596 assertNotEquals(-1, 597 HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15)); 598 } 599 600 @Test 601 public void testOnlineSlowLogScanPayloadDefaultDisabled() throws Exception { 602 Configuration conf = applySlowLogRecorderConf(1); 603 conf.unset(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED); 604 Constructor<NamedQueueRecorder> constructor = 605 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 606 constructor.setAccessible(true); 607 namedQueueRecorder = constructor.newInstance(conf); 608 AdminProtos.SlowLogResponseRequest request = 609 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); 610 611 assertEquals(0, getSlowLogPayloads(request).size()); 612 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 613 RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); 614 namedQueueRecorder.addRecord(rpcLogDetails); 615 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 616 Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny(); 617 if (slowLogPayload.isPresent()) { 618 return !slowLogPayload.get().hasScan(); 619 } 620 return false; 621 })); 622 } 623 624 @Test 625 public void testOnlineSlowLogScanPayloadExplicitlyDisabled() throws Exception { 626 Configuration conf = applySlowLogRecorderConf(1); 627 conf.setBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, false); 628 Constructor<NamedQueueRecorder> constructor = 629 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 630 constructor.setAccessible(true); 631 namedQueueRecorder = constructor.newInstance(conf); 632 AdminProtos.SlowLogResponseRequest request = 633 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); 634 635 assertEquals(0, getSlowLogPayloads(request).size()); 636 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 637 RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); 638 namedQueueRecorder.addRecord(rpcLogDetails); 639 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 640 Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny(); 641 if (slowLogPayload.isPresent()) { 642 return !slowLogPayload.get().hasScan(); 643 } 644 return false; 645 })); 646 } 647 648 @Test 649 public void testOnlineSlowLogScanPayloadExplicitlyEnabled() throws Exception { 650 Configuration conf = applySlowLogRecorderConf(1); 651 conf.setBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, true); 652 Constructor<NamedQueueRecorder> constructor = 653 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 654 constructor.setAccessible(true); 655 namedQueueRecorder = constructor.newInstance(conf); 656 AdminProtos.SlowLogResponseRequest request = 657 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); 658 659 assertEquals(0, getSlowLogPayloads(request).size()); 660 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 661 RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); 662 namedQueueRecorder.addRecord(rpcLogDetails); 663 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 664 Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny(); 665 if (slowLogPayload.isPresent()) { 666 return slowLogPayload.get().hasScan(); 667 } 668 return false; 669 })); 670 } 671 672 @Test 673 public void testOnlineSlowLogRequestAttributes() throws Exception { 674 Configuration conf = applySlowLogRecorderConf(1); 675 Constructor<NamedQueueRecorder> constructor = 676 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 677 constructor.setAccessible(true); 678 namedQueueRecorder = constructor.newInstance(conf); 679 AdminProtos.SlowLogResponseRequest request = 680 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); 681 682 assertEquals(0, getSlowLogPayloads(request).size()); 683 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 684 RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); 685 namedQueueRecorder.addRecord(rpcLogDetails); 686 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 687 Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny(); 688 if (slowLogPayload.isPresent() && !slowLogPayload.get().getRequestAttributeList().isEmpty()) { 689 return slowLogPayload.get().getRequestAttributeList().containsAll(REQUEST_HEADERS); 690 } 691 return false; 692 })); 693 } 694 695 @Test 696 public void testOnlineSlowLogConnectionAttributes() throws Exception { 697 Configuration conf = applySlowLogRecorderConf(1); 698 Constructor<NamedQueueRecorder> constructor = 699 NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); 700 constructor.setAccessible(true); 701 namedQueueRecorder = constructor.newInstance(conf); 702 AdminProtos.SlowLogResponseRequest request = 703 AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); 704 705 assertEquals(0, getSlowLogPayloads(request).size()); 706 LOG.debug("Initially ringbuffer of Slow Log records is empty"); 707 RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); 708 namedQueueRecorder.addRecord(rpcLogDetails); 709 assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { 710 Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny(); 711 if ( 712 slowLogPayload.isPresent() && !slowLogPayload.get().getConnectionAttributeList().isEmpty() 713 ) { 714 return slowLogPayload.get().getConnectionAttributeList().containsAll(CONNECTION_HEADERS); 715 } 716 return false; 717 })); 718 } 719 720 static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className, 721 int forcedParamIndex) { 722 RpcCall rpcCall = getRpcCall(userName, forcedParamIndex); 723 return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className, true, 724 true); 725 } 726 727 static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) { 728 RpcCall rpcCall = getRpcCall(userName); 729 return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className, true, 730 true); 731 } 732 733 private static RpcLogDetails getRpcLogDetailsOfScan() { 734 // forcedParamIndex of 0 results in a ScanRequest 735 return getRpcLogDetails("userName_1", "client_1", "class_1", 0); 736 } 737 738 private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className, 739 boolean isSlowLog, boolean isLargeLog) { 740 RpcCall rpcCall = getRpcCall(userName); 741 return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className, 742 isSlowLog, isLargeLog); 743 } 744 745 private static RpcCall getRpcCall(String userName) { 746 return getRpcCall(userName, Optional.empty()); 747 } 748 749 private static RpcCall getRpcCall(String userName, int forcedParamIndex) { 750 return getRpcCall(userName, Optional.of(forcedParamIndex)); 751 } 752 753 @SuppressWarnings("checkstyle:methodlength") 754 private static RpcCall getRpcCall(String userName, Optional<Integer> forcedParamIndex) { 755 RpcCall rpcCall = new RpcCall() { 756 @Override 757 public BlockingService getService() { 758 return null; 759 } 760 761 @Override 762 public Descriptors.MethodDescriptor getMethod() { 763 return null; 764 } 765 766 @Override 767 public Message getParam() { 768 return getMessage(forcedParamIndex); 769 } 770 771 @Override 772 public ExtendedCellScanner getCellScanner() { 773 return null; 774 } 775 776 @Override 777 public long getReceiveTime() { 778 return 0; 779 } 780 781 @Override 782 public long getStartTime() { 783 return 0; 784 } 785 786 @Override 787 public void setStartTime(long startTime) { 788 } 789 790 @Override 791 public int getTimeout() { 792 return 0; 793 } 794 795 @Override 796 public int getPriority() { 797 return 0; 798 } 799 800 @Override 801 public long getDeadline() { 802 return 0; 803 } 804 805 @Override 806 public long getSize() { 807 return 0; 808 } 809 810 @Override 811 public RPCProtos.RequestHeader getHeader() { 812 return null; 813 } 814 815 @Override 816 public Map<String, byte[]> getConnectionAttributes() { 817 return CONNECTION_HEADERS.stream().collect(Collectors 818 .toMap(HBaseProtos.NameBytesPair::getName, pair -> pair.getValue().toByteArray())); 819 } 820 821 @Override 822 public Map<String, byte[]> getRequestAttributes() { 823 return REQUEST_HEADERS.stream().collect(Collectors.toMap(HBaseProtos.NameBytesPair::getName, 824 pair -> pair.getValue().toByteArray())); 825 } 826 827 @Override 828 public byte[] getRequestAttribute(String key) { 829 return null; 830 } 831 832 @Override 833 public int getRemotePort() { 834 return 0; 835 } 836 837 @Override 838 public void setResponse(Message param, ExtendedCellScanner cells, Throwable errorThrowable, 839 String error) { 840 } 841 842 @Override 843 public void sendResponseIfReady() throws IOException { 844 } 845 846 @Override 847 public void cleanup() { 848 } 849 850 @Override 851 public String toShortString() { 852 return null; 853 } 854 855 @Override 856 public long disconnectSince() { 857 return 0; 858 } 859 860 @Override 861 public boolean isClientCellBlockSupported() { 862 return false; 863 } 864 865 @Override 866 public Optional<User> getRequestUser() { 867 return getUser(userName); 868 } 869 870 @Override 871 public Optional<X509Certificate[]> getClientCertificateChain() { 872 return Optional.empty(); 873 } 874 875 @Override 876 public InetAddress getRemoteAddress() { 877 return null; 878 } 879 880 @Override 881 public HBaseProtos.VersionInfo getClientVersionInfo() { 882 return null; 883 } 884 885 @Override 886 public void setCallBack(RpcCallback callback) { 887 } 888 889 @Override 890 public boolean isRetryImmediatelySupported() { 891 return false; 892 } 893 894 @Override 895 public long getResponseCellSize() { 896 return 0; 897 } 898 899 @Override 900 public void incrementResponseCellSize(long cellSize) { 901 } 902 903 @Override 904 public long getBlockBytesScanned() { 905 return 0; 906 } 907 908 @Override 909 public void incrementBlockBytesScanned(long blockSize) { 910 } 911 912 @Override 913 public long getResponseExceptionSize() { 914 return 0; 915 } 916 917 @Override 918 public void incrementResponseExceptionSize(long exceptionSize) { 919 } 920 }; 921 return rpcCall; 922 } 923 924 private static Message getMessage(Optional<Integer> forcedParamIndex) { 925 926 i = (i + 1) % 3; 927 928 Message message = null; 929 930 switch (forcedParamIndex.orElse(i)) { 931 932 case 0: { 933 message = ClientProtos.ScanRequest.newBuilder() 934 .setRegion( 935 HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region1")) 936 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME).build()) 937 .build(); 938 break; 939 } 940 case 1: { 941 message = ClientProtos.MutateRequest.newBuilder() 942 .setRegion( 943 HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region2")) 944 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)) 945 .setMutation(ClientProtos.MutationProto.newBuilder() 946 .setRow(ByteString.copyFromUtf8("row123")).build()) 947 .build(); 948 break; 949 } 950 case 2: { 951 message = ClientProtos.GetRequest.newBuilder() 952 .setRegion( 953 HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region2")) 954 .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)) 955 .setGet(ClientProtos.Get.newBuilder().setRow(ByteString.copyFromUtf8("row123")).build()) 956 .build(); 957 break; 958 } 959 default: 960 throw new RuntimeException("Not supposed to get here?"); 961 } 962 963 return message; 964 965 } 966 967 private static Optional<User> getUser(String userName) { 968 969 return Optional.of(new User() { 970 @Override 971 public String getShortName() { 972 return userName; 973 } 974 975 @Override 976 public <T> T runAs(PrivilegedAction<T> action) { 977 return null; 978 } 979 980 @Override 981 public <T> T runAs(PrivilegedExceptionAction<T> action) { 982 return null; 983 } 984 }); 985 986 } 987 988}