001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotEquals;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.lang.reflect.Constructor;
026import java.net.InetAddress;
027import java.security.PrivilegedAction;
028import java.security.PrivilegedExceptionAction;
029import java.security.cert.X509Certificate;
030import java.util.Collections;
031import java.util.List;
032import java.util.Map;
033import java.util.Optional;
034import java.util.concurrent.CompletableFuture;
035import java.util.concurrent.TimeUnit;
036import java.util.stream.Collectors;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.ExtendedCellScanner;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.ipc.RpcCall;
042import org.apache.hadoop.hbase.ipc.RpcCallback;
043import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
044import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
045import org.apache.hadoop.hbase.security.User;
046import org.apache.hadoop.hbase.testclassification.MasterTests;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.junit.jupiter.api.Tag;
049import org.junit.jupiter.api.Test;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
055import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
056import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
057import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
058import org.apache.hbase.thirdparty.com.google.protobuf.Message;
059
060import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
065
066/**
067 * Tests for Online SlowLog Provider Service
068 */
069@Tag(MediumTests.TAG)
070@Tag(MasterTests.TAG)
071public class TestNamedQueueRecorder {
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
074
075  private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil();
076  private static final List<HBaseProtos.NameBytesPair> REQUEST_HEADERS =
077    ImmutableList.<HBaseProtos.NameBytesPair> builder()
078      .add(HBaseProtos.NameBytesPair.newBuilder().setName("1")
079        .setValue(ByteString.copyFromUtf8("r")).build())
080      .add(HBaseProtos.NameBytesPair.newBuilder().setName("2")
081        .setValue(ByteString.copyFromUtf8("h")).build())
082      .build();
083  private static final List<HBaseProtos.NameBytesPair> CONNECTION_HEADERS =
084    ImmutableList.<HBaseProtos.NameBytesPair> builder()
085      .add(HBaseProtos.NameBytesPair.newBuilder().setName("1")
086        .setValue(ByteString.copyFromUtf8("c")).build())
087      .add(HBaseProtos.NameBytesPair.newBuilder().setName("2")
088        .setValue(ByteString.copyFromUtf8("h")).build())
089      .build();
090
091  private NamedQueueRecorder namedQueueRecorder;
092
093  private static int i = 0;
094
095  private static Configuration applySlowLogRecorderConf(int eventSize) {
096    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
097    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
098    conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize);
099    return conf;
100  }
101
102  /**
103   * confirm that for a ringbuffer of slow logs, payload on given index of buffer has expected
104   * elements
105   * @param i               index of ringbuffer logs
106   * @param j               data value that was put on index i
107   * @param slowLogPayloads list of payload retrieved from {@link NamedQueueRecorder}
108   * @return if actual values are as per expectations
109   */
110  private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) {
111    boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j);
112    boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j);
113    boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j);
114    return isClassExpected && isClientExpected && isUserExpected;
115  }
116
117  @Test
118  public void testOnlieSlowLogConsumption() throws Exception {
119
120    Configuration conf = applySlowLogRecorderConf(8);
121    Constructor<NamedQueueRecorder> constructor =
122      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
123    constructor.setAccessible(true);
124    namedQueueRecorder = constructor.newInstance(conf);
125    AdminProtos.SlowLogResponseRequest request =
126      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
127
128    namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
129    assertEquals(getSlowLogPayloads(request).size(), 0);
130    LOG.debug("Initially ringbuffer of Slow Log records is empty");
131
132    int i = 0;
133
134    // add 5 records initially
135    for (; i < 5; i++) {
136      RpcLogDetails rpcLogDetails =
137        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
138      namedQueueRecorder.addRecord(rpcLogDetails);
139    }
140
141    assertNotEquals(-1,
142      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 5));
143    List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
144    assertTrue(confirmPayloadParams(0, 5, slowLogPayloads));
145    assertTrue(confirmPayloadParams(1, 4, slowLogPayloads));
146    assertTrue(confirmPayloadParams(2, 3, slowLogPayloads));
147    assertTrue(confirmPayloadParams(3, 2, slowLogPayloads));
148    assertTrue(confirmPayloadParams(4, 1, slowLogPayloads));
149
150    // add 2 more records
151    for (; i < 7; i++) {
152      RpcLogDetails rpcLogDetails =
153        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
154      namedQueueRecorder.addRecord(rpcLogDetails);
155    }
156
157    assertNotEquals(-1,
158      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 7));
159
160    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
161      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
162      return slowLogPayloadsList.size() == 7 && confirmPayloadParams(0, 7, slowLogPayloadsList)
163        && confirmPayloadParams(5, 2, slowLogPayloadsList)
164        && confirmPayloadParams(6, 1, slowLogPayloadsList);
165    }));
166
167    // add 3 more records
168    for (; i < 10; i++) {
169      RpcLogDetails rpcLogDetails =
170        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
171      namedQueueRecorder.addRecord(rpcLogDetails);
172    }
173
174    assertNotEquals(-1,
175      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 8));
176
177    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
178      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
179      // confirm ringbuffer is full
180      return slowLogPayloadsList.size() == 8 && confirmPayloadParams(7, 3, slowLogPayloadsList)
181        && confirmPayloadParams(0, 10, slowLogPayloadsList)
182        && confirmPayloadParams(1, 9, slowLogPayloadsList);
183    }));
184
185    // add 4 more records
186    for (; i < 14; i++) {
187      RpcLogDetails rpcLogDetails =
188        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
189      namedQueueRecorder.addRecord(rpcLogDetails);
190    }
191
192    assertNotEquals(-1,
193      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 8));
194
195    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
196      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
197      // confirm ringbuffer is full
198      // and ordered events
199      return slowLogPayloadsList.size() == 8 && confirmPayloadParams(0, 14, slowLogPayloadsList)
200        && confirmPayloadParams(1, 13, slowLogPayloadsList)
201        && confirmPayloadParams(2, 12, slowLogPayloadsList)
202        && confirmPayloadParams(3, 11, slowLogPayloadsList);
203    }));
204
205    AdminProtos.SlowLogResponseRequest largeLogRequest =
206      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15)
207        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build();
208    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
209      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(largeLogRequest);
210      // confirm ringbuffer is full
211      // and ordered events
212      return slowLogPayloadsList.size() == 8 && confirmPayloadParams(0, 14, slowLogPayloadsList)
213        && confirmPayloadParams(1, 13, slowLogPayloadsList)
214        && confirmPayloadParams(2, 12, slowLogPayloadsList)
215        && confirmPayloadParams(3, 11, slowLogPayloadsList);
216    }));
217
218    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
219      boolean isRingBufferCleaned =
220        namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
221
222      LOG.debug("cleared the ringbuffer of Online Slow Log records");
223
224      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
225      // confirm ringbuffer is empty
226      return slowLogPayloadsList.size() == 0 && isRingBufferCleaned;
227    }));
228
229  }
230
231  private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
232    NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
233    namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
234    namedQueueGetRequest.setSlowLogResponseRequest(request);
235    NamedQueueGetResponse namedQueueGetResponse =
236      namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
237    return namedQueueGetResponse == null
238      ? Collections.emptyList()
239      : namedQueueGetResponse.getSlowLogPayloads();
240  }
241
242  @Test
243  public void testOnlineSlowLogWithHighRecords() throws Exception {
244
245    Configuration conf = applySlowLogRecorderConf(14);
246    Constructor<NamedQueueRecorder> constructor =
247      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
248    constructor.setAccessible(true);
249    namedQueueRecorder = constructor.newInstance(conf);
250    AdminProtos.SlowLogResponseRequest request =
251      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
252
253    assertEquals(getSlowLogPayloads(request).size(), 0);
254    LOG.debug("Initially ringbuffer of Slow Log records is empty");
255
256    for (int i = 0; i < 14 * 11; i++) {
257      RpcLogDetails rpcLogDetails =
258        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
259      namedQueueRecorder.addRecord(rpcLogDetails);
260    }
261    LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
262
263    assertNotEquals(-1,
264      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14));
265
266    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
267      List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
268
269      // confirm strict order of slow log payloads
270      return slowLogPayloads.size() == 14 && confirmPayloadParams(0, 154, slowLogPayloads)
271        && confirmPayloadParams(1, 153, slowLogPayloads)
272        && confirmPayloadParams(2, 152, slowLogPayloads)
273        && confirmPayloadParams(3, 151, slowLogPayloads)
274        && confirmPayloadParams(4, 150, slowLogPayloads)
275        && confirmPayloadParams(5, 149, slowLogPayloads)
276        && confirmPayloadParams(6, 148, slowLogPayloads)
277        && confirmPayloadParams(7, 147, slowLogPayloads)
278        && confirmPayloadParams(8, 146, slowLogPayloads)
279        && confirmPayloadParams(9, 145, slowLogPayloads)
280        && confirmPayloadParams(10, 144, slowLogPayloads)
281        && confirmPayloadParams(11, 143, slowLogPayloads)
282        && confirmPayloadParams(12, 142, slowLogPayloads)
283        && confirmPayloadParams(13, 141, slowLogPayloads);
284    }));
285
286    boolean isRingBufferCleaned =
287      namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
288    assertTrue(isRingBufferCleaned);
289    LOG.debug("cleared the ringbuffer of Online Slow Log records");
290    List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
291
292    // confirm ringbuffer is empty
293    assertEquals(slowLogPayloads.size(), 0);
294  }
295
296  @Test
297  public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception {
298    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
299    conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY);
300
301    Constructor<NamedQueueRecorder> constructor =
302      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
303    constructor.setAccessible(true);
304    namedQueueRecorder = constructor.newInstance(conf);
305    AdminProtos.SlowLogResponseRequest request =
306      AdminProtos.SlowLogResponseRequest.newBuilder().build();
307    assertEquals(getSlowLogPayloads(request).size(), 0);
308    LOG.debug("Initially ringbuffer of Slow Log records is empty");
309    for (int i = 0; i < 300; i++) {
310      RpcLogDetails rpcLogDetails =
311        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
312      namedQueueRecorder.addRecord(rpcLogDetails);
313    }
314    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
315      List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
316      return slowLogPayloads.size() == 0;
317    }));
318
319  }
320
321  @Test
322  public void testOnlineSlowLogWithDisableConfig() throws Exception {
323    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
324    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false);
325    Constructor<NamedQueueRecorder> constructor =
326      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
327    constructor.setAccessible(true);
328    namedQueueRecorder = constructor.newInstance(conf);
329
330    AdminProtos.SlowLogResponseRequest request =
331      AdminProtos.SlowLogResponseRequest.newBuilder().build();
332    assertEquals(getSlowLogPayloads(request).size(), 0);
333    LOG.debug("Initially ringbuffer of Slow Log records is empty");
334    for (int i = 0; i < 300; i++) {
335      RpcLogDetails rpcLogDetails =
336        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
337      namedQueueRecorder.addRecord(rpcLogDetails);
338    }
339    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
340      List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
341      return slowLogPayloads.size() == 0;
342    }));
343    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
344  }
345
346  @Test
347  public void testSlowLogFilters() throws Exception {
348
349    Configuration conf = applySlowLogRecorderConf(30);
350    Constructor<NamedQueueRecorder> constructor =
351      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
352    constructor.setAccessible(true);
353    namedQueueRecorder = constructor.newInstance(conf);
354    AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder()
355      .setLimit(15).setUserName("userName_87").build();
356
357    assertEquals(getSlowLogPayloads(request).size(), 0);
358
359    LOG.debug("Initially ringbuffer of Slow Log records is empty");
360
361    for (int i = 0; i < 100; i++) {
362      RpcLogDetails rpcLogDetails =
363        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
364      namedQueueRecorder.addRecord(rpcLogDetails);
365    }
366    LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter");
367
368    assertNotEquals(-1,
369      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 1));
370
371    AdminProtos.SlowLogResponseRequest requestClient = AdminProtos.SlowLogResponseRequest
372      .newBuilder().setLimit(15).setClientAddress("client_85").build();
373    assertNotEquals(-1,
374      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestClient).size() == 1));
375
376    AdminProtos.SlowLogResponseRequest requestSlowLog =
377      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
378    assertNotEquals(-1,
379      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15));
380  }
381
382  @Test
383  public void testSlowLogFilterWithClientAddress() throws Exception {
384    Configuration conf = applySlowLogRecorderConf(10);
385    Constructor<NamedQueueRecorder> constructor =
386      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
387    constructor.setAccessible(true);
388    namedQueueRecorder = constructor.newInstance(conf);
389    AdminProtos.SlowLogResponseRequest request =
390      AdminProtos.SlowLogResponseRequest.newBuilder().build();
391    assertEquals(getSlowLogPayloads(request).size(), 0);
392
393    String[] clientAddressArray = new String[] { "[127:1:1:1:1:1:1:1]:1", "[127:1:1:1:1:1:1:1]:2",
394      "[127:1:1:1:1:1:1:1]:3", "127.0.0.1:1", "127.0.0.1:2" };
395    boolean isSlowLog;
396    boolean isLargeLog;
397    for (int i = 0; i < 10; i++) {
398      if (i % 2 == 0) {
399        isSlowLog = true;
400        isLargeLog = false;
401      } else {
402        isSlowLog = false;
403        isLargeLog = true;
404      }
405      RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1),
406        clientAddressArray[i % 5], "class_" + (i + 1), isSlowLog, isLargeLog);
407      namedQueueRecorder.addRecord(rpcLogDetails);
408    }
409
410    AdminProtos.SlowLogResponseRequest largeLogRequestIPv6WithPort =
411      AdminProtos.SlowLogResponseRequest.newBuilder()
412        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
413        .setClientAddress("[127:1:1:1:1:1:1:1]:2").build();
414    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
415      () -> getSlowLogPayloads(largeLogRequestIPv6WithPort).size() == 1));
416    AdminProtos.SlowLogResponseRequest largeLogRequestIPv6WithoutPort =
417      AdminProtos.SlowLogResponseRequest.newBuilder()
418        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
419        .setClientAddress("[127:1:1:1:1:1:1:1]").build();
420    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
421      () -> getSlowLogPayloads(largeLogRequestIPv6WithoutPort).size() == 3));
422    AdminProtos.SlowLogResponseRequest largeLogRequestIPv4WithPort =
423      AdminProtos.SlowLogResponseRequest.newBuilder()
424        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
425        .setClientAddress("127.0.0.1:1").build();
426    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
427      () -> getSlowLogPayloads(largeLogRequestIPv4WithPort).size() == 1));
428    AdminProtos.SlowLogResponseRequest largeLogRequestIPv4WithoutPort =
429      AdminProtos.SlowLogResponseRequest.newBuilder()
430        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
431        .setClientAddress("127.0.0.1").build();
432    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
433      () -> getSlowLogPayloads(largeLogRequestIPv4WithoutPort).size() == 2));
434  }
435
436  @Test
437  public void testConcurrentSlowLogEvents() throws Exception {
438
439    Configuration conf = applySlowLogRecorderConf(50000);
440    Constructor<NamedQueueRecorder> constructor =
441      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
442    constructor.setAccessible(true);
443    namedQueueRecorder = constructor.newInstance(conf);
444    AdminProtos.SlowLogResponseRequest request =
445      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
446    AdminProtos.SlowLogResponseRequest largeLogRequest =
447      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000)
448        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build();
449    assertEquals(getSlowLogPayloads(request).size(), 0);
450    LOG.debug("Initially ringbuffer of Slow Log records is empty");
451
452    for (int j = 0; j < 1000; j++) {
453
454      CompletableFuture.runAsync(() -> {
455        for (int i = 0; i < 3500; i++) {
456          RpcLogDetails rpcLogDetails =
457            getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
458          namedQueueRecorder.addRecord(rpcLogDetails);
459        }
460      });
461
462    }
463
464    Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
465
466    assertNotEquals(-1,
467      HBASE_TESTING_UTILITY.waitFor(5000, () -> getSlowLogPayloads(request).size() > 10000));
468    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(5000,
469      () -> getSlowLogPayloads(largeLogRequest).size() > 10000));
470  }
471
472  @Test
473  public void testSlowLargeLogEvents() throws Exception {
474    Configuration conf = applySlowLogRecorderConf(28);
475    Constructor<NamedQueueRecorder> constructor =
476      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
477    constructor.setAccessible(true);
478    namedQueueRecorder = constructor.newInstance(conf);
479
480    AdminProtos.SlowLogResponseRequest request =
481      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
482
483    assertEquals(getSlowLogPayloads(request).size(), 0);
484    LOG.debug("Initially ringbuffer of Slow Log records is empty");
485
486    boolean isSlowLog;
487    boolean isLargeLog;
488    for (int i = 0; i < 14 * 11; i++) {
489      if (i % 2 == 0) {
490        isSlowLog = true;
491        isLargeLog = false;
492      } else {
493        isSlowLog = false;
494        isLargeLog = true;
495      }
496      RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1),
497        "class_" + (i + 1), isSlowLog, isLargeLog);
498      namedQueueRecorder.addRecord(rpcLogDetails);
499    }
500    LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
501
502    assertNotEquals(-1,
503      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14));
504
505    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
506      List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
507
508      // confirm strict order of slow log payloads
509      return slowLogPayloads.size() == 14 && confirmPayloadParams(0, 153, slowLogPayloads)
510        && confirmPayloadParams(1, 151, slowLogPayloads)
511        && confirmPayloadParams(2, 149, slowLogPayloads)
512        && confirmPayloadParams(3, 147, slowLogPayloads)
513        && confirmPayloadParams(4, 145, slowLogPayloads)
514        && confirmPayloadParams(5, 143, slowLogPayloads)
515        && confirmPayloadParams(6, 141, slowLogPayloads)
516        && confirmPayloadParams(7, 139, slowLogPayloads)
517        && confirmPayloadParams(8, 137, slowLogPayloads)
518        && confirmPayloadParams(9, 135, slowLogPayloads)
519        && confirmPayloadParams(10, 133, slowLogPayloads)
520        && confirmPayloadParams(11, 131, slowLogPayloads)
521        && confirmPayloadParams(12, 129, slowLogPayloads)
522        && confirmPayloadParams(13, 127, slowLogPayloads);
523    }));
524
525    AdminProtos.SlowLogResponseRequest largeLogRequest =
526      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11)
527        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build();
528
529    assertNotEquals(-1,
530      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(largeLogRequest).size() == 14));
531
532    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
533      List<SlowLogPayload> largeLogPayloads = getSlowLogPayloads(largeLogRequest);
534
535      // confirm strict order of slow log payloads
536      return largeLogPayloads.size() == 14 && confirmPayloadParams(0, 154, largeLogPayloads)
537        && confirmPayloadParams(1, 152, largeLogPayloads)
538        && confirmPayloadParams(2, 150, largeLogPayloads)
539        && confirmPayloadParams(3, 148, largeLogPayloads)
540        && confirmPayloadParams(4, 146, largeLogPayloads)
541        && confirmPayloadParams(5, 144, largeLogPayloads)
542        && confirmPayloadParams(6, 142, largeLogPayloads)
543        && confirmPayloadParams(7, 140, largeLogPayloads)
544        && confirmPayloadParams(8, 138, largeLogPayloads)
545        && confirmPayloadParams(9, 136, largeLogPayloads)
546        && confirmPayloadParams(10, 134, largeLogPayloads)
547        && confirmPayloadParams(11, 132, largeLogPayloads)
548        && confirmPayloadParams(12, 130, largeLogPayloads)
549        && confirmPayloadParams(13, 128, largeLogPayloads);
550    }));
551  }
552
553  @Test
554  public void testSlowLogMixedFilters() throws Exception {
555
556    Configuration conf = applySlowLogRecorderConf(30);
557    Constructor<NamedQueueRecorder> constructor =
558      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
559    constructor.setAccessible(true);
560    namedQueueRecorder = constructor.newInstance(conf);
561    AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder()
562      .setLimit(15).setUserName("userName_87").setClientAddress("client_88").build();
563
564    assertEquals(getSlowLogPayloads(request).size(), 0);
565
566    for (int i = 0; i < 100; i++) {
567      RpcLogDetails rpcLogDetails =
568        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
569      namedQueueRecorder.addRecord(rpcLogDetails);
570    }
571
572    assertNotEquals(-1,
573      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 2));
574
575    AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder()
576      .setLimit(15).setUserName("userName_1").setClientAddress("client_2").build();
577    assertEquals(0, getSlowLogPayloads(request2).size());
578
579    AdminProtos.SlowLogResponseRequest request3 = AdminProtos.SlowLogResponseRequest.newBuilder()
580      .setLimit(15).setUserName("userName_87").setClientAddress("client_88")
581      .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND).build();
582    assertEquals(0, getSlowLogPayloads(request3).size());
583
584    AdminProtos.SlowLogResponseRequest request4 = AdminProtos.SlowLogResponseRequest.newBuilder()
585      .setLimit(15).setUserName("userName_87").setClientAddress("client_87")
586      .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND).build();
587    assertEquals(1, getSlowLogPayloads(request4).size());
588
589    AdminProtos.SlowLogResponseRequest request5 = AdminProtos.SlowLogResponseRequest.newBuilder()
590      .setLimit(15).setUserName("userName_88").setClientAddress("client_89")
591      .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR).build();
592    assertEquals(2, getSlowLogPayloads(request5).size());
593
594    AdminProtos.SlowLogResponseRequest requestSlowLog =
595      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
596    assertNotEquals(-1,
597      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15));
598  }
599
600  @Test
601  public void testOnlineSlowLogScanPayloadDefaultDisabled() throws Exception {
602    Configuration conf = applySlowLogRecorderConf(1);
603    conf.unset(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED);
604    Constructor<NamedQueueRecorder> constructor =
605      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
606    constructor.setAccessible(true);
607    namedQueueRecorder = constructor.newInstance(conf);
608    AdminProtos.SlowLogResponseRequest request =
609      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
610
611    assertEquals(0, getSlowLogPayloads(request).size());
612    LOG.debug("Initially ringbuffer of Slow Log records is empty");
613    RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
614    namedQueueRecorder.addRecord(rpcLogDetails);
615    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
616      Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
617      if (slowLogPayload.isPresent()) {
618        return !slowLogPayload.get().hasScan();
619      }
620      return false;
621    }));
622  }
623
624  @Test
625  public void testOnlineSlowLogScanPayloadExplicitlyDisabled() throws Exception {
626    Configuration conf = applySlowLogRecorderConf(1);
627    conf.setBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, false);
628    Constructor<NamedQueueRecorder> constructor =
629      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
630    constructor.setAccessible(true);
631    namedQueueRecorder = constructor.newInstance(conf);
632    AdminProtos.SlowLogResponseRequest request =
633      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
634
635    assertEquals(0, getSlowLogPayloads(request).size());
636    LOG.debug("Initially ringbuffer of Slow Log records is empty");
637    RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
638    namedQueueRecorder.addRecord(rpcLogDetails);
639    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
640      Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
641      if (slowLogPayload.isPresent()) {
642        return !slowLogPayload.get().hasScan();
643      }
644      return false;
645    }));
646  }
647
648  @Test
649  public void testOnlineSlowLogScanPayloadExplicitlyEnabled() throws Exception {
650    Configuration conf = applySlowLogRecorderConf(1);
651    conf.setBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, true);
652    Constructor<NamedQueueRecorder> constructor =
653      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
654    constructor.setAccessible(true);
655    namedQueueRecorder = constructor.newInstance(conf);
656    AdminProtos.SlowLogResponseRequest request =
657      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
658
659    assertEquals(0, getSlowLogPayloads(request).size());
660    LOG.debug("Initially ringbuffer of Slow Log records is empty");
661    RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
662    namedQueueRecorder.addRecord(rpcLogDetails);
663    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
664      Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
665      if (slowLogPayload.isPresent()) {
666        return slowLogPayload.get().hasScan();
667      }
668      return false;
669    }));
670  }
671
672  @Test
673  public void testOnlineSlowLogRequestAttributes() throws Exception {
674    Configuration conf = applySlowLogRecorderConf(1);
675    Constructor<NamedQueueRecorder> constructor =
676      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
677    constructor.setAccessible(true);
678    namedQueueRecorder = constructor.newInstance(conf);
679    AdminProtos.SlowLogResponseRequest request =
680      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
681
682    assertEquals(0, getSlowLogPayloads(request).size());
683    LOG.debug("Initially ringbuffer of Slow Log records is empty");
684    RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
685    namedQueueRecorder.addRecord(rpcLogDetails);
686    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
687      Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
688      if (slowLogPayload.isPresent() && !slowLogPayload.get().getRequestAttributeList().isEmpty()) {
689        return slowLogPayload.get().getRequestAttributeList().containsAll(REQUEST_HEADERS);
690      }
691      return false;
692    }));
693  }
694
695  @Test
696  public void testOnlineSlowLogConnectionAttributes() throws Exception {
697    Configuration conf = applySlowLogRecorderConf(1);
698    Constructor<NamedQueueRecorder> constructor =
699      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
700    constructor.setAccessible(true);
701    namedQueueRecorder = constructor.newInstance(conf);
702    AdminProtos.SlowLogResponseRequest request =
703      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
704
705    assertEquals(0, getSlowLogPayloads(request).size());
706    LOG.debug("Initially ringbuffer of Slow Log records is empty");
707    RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
708    namedQueueRecorder.addRecord(rpcLogDetails);
709    assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
710      Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
711      if (
712        slowLogPayload.isPresent() && !slowLogPayload.get().getConnectionAttributeList().isEmpty()
713      ) {
714        return slowLogPayload.get().getConnectionAttributeList().containsAll(CONNECTION_HEADERS);
715      }
716      return false;
717    }));
718  }
719
720  static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className,
721    int forcedParamIndex) {
722    RpcCall rpcCall = getRpcCall(userName, forcedParamIndex);
723    return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className, true,
724      true);
725  }
726
727  static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) {
728    RpcCall rpcCall = getRpcCall(userName);
729    return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className, true,
730      true);
731  }
732
733  private static RpcLogDetails getRpcLogDetailsOfScan() {
734    // forcedParamIndex of 0 results in a ScanRequest
735    return getRpcLogDetails("userName_1", "client_1", "class_1", 0);
736  }
737
738  private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className,
739    boolean isSlowLog, boolean isLargeLog) {
740    RpcCall rpcCall = getRpcCall(userName);
741    return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className,
742      isSlowLog, isLargeLog);
743  }
744
745  private static RpcCall getRpcCall(String userName) {
746    return getRpcCall(userName, Optional.empty());
747  }
748
749  private static RpcCall getRpcCall(String userName, int forcedParamIndex) {
750    return getRpcCall(userName, Optional.of(forcedParamIndex));
751  }
752
753  @SuppressWarnings("checkstyle:methodlength")
754  private static RpcCall getRpcCall(String userName, Optional<Integer> forcedParamIndex) {
755    RpcCall rpcCall = new RpcCall() {
756      @Override
757      public BlockingService getService() {
758        return null;
759      }
760
761      @Override
762      public Descriptors.MethodDescriptor getMethod() {
763        return null;
764      }
765
766      @Override
767      public Message getParam() {
768        return getMessage(forcedParamIndex);
769      }
770
771      @Override
772      public ExtendedCellScanner getCellScanner() {
773        return null;
774      }
775
776      @Override
777      public long getReceiveTime() {
778        return 0;
779      }
780
781      @Override
782      public long getStartTime() {
783        return 0;
784      }
785
786      @Override
787      public void setStartTime(long startTime) {
788      }
789
790      @Override
791      public int getTimeout() {
792        return 0;
793      }
794
795      @Override
796      public int getPriority() {
797        return 0;
798      }
799
800      @Override
801      public long getDeadline() {
802        return 0;
803      }
804
805      @Override
806      public long getSize() {
807        return 0;
808      }
809
810      @Override
811      public RPCProtos.RequestHeader getHeader() {
812        return null;
813      }
814
815      @Override
816      public Map<String, byte[]> getConnectionAttributes() {
817        return CONNECTION_HEADERS.stream().collect(Collectors
818          .toMap(HBaseProtos.NameBytesPair::getName, pair -> pair.getValue().toByteArray()));
819      }
820
821      @Override
822      public Map<String, byte[]> getRequestAttributes() {
823        return REQUEST_HEADERS.stream().collect(Collectors.toMap(HBaseProtos.NameBytesPair::getName,
824          pair -> pair.getValue().toByteArray()));
825      }
826
827      @Override
828      public byte[] getRequestAttribute(String key) {
829        return null;
830      }
831
832      @Override
833      public int getRemotePort() {
834        return 0;
835      }
836
837      @Override
838      public void setResponse(Message param, ExtendedCellScanner cells, Throwable errorThrowable,
839        String error) {
840      }
841
842      @Override
843      public void sendResponseIfReady() throws IOException {
844      }
845
846      @Override
847      public void cleanup() {
848      }
849
850      @Override
851      public String toShortString() {
852        return null;
853      }
854
855      @Override
856      public long disconnectSince() {
857        return 0;
858      }
859
860      @Override
861      public boolean isClientCellBlockSupported() {
862        return false;
863      }
864
865      @Override
866      public Optional<User> getRequestUser() {
867        return getUser(userName);
868      }
869
870      @Override
871      public Optional<X509Certificate[]> getClientCertificateChain() {
872        return Optional.empty();
873      }
874
875      @Override
876      public InetAddress getRemoteAddress() {
877        return null;
878      }
879
880      @Override
881      public HBaseProtos.VersionInfo getClientVersionInfo() {
882        return null;
883      }
884
885      @Override
886      public void setCallBack(RpcCallback callback) {
887      }
888
889      @Override
890      public boolean isRetryImmediatelySupported() {
891        return false;
892      }
893
894      @Override
895      public long getResponseCellSize() {
896        return 0;
897      }
898
899      @Override
900      public void incrementResponseCellSize(long cellSize) {
901      }
902
903      @Override
904      public long getBlockBytesScanned() {
905        return 0;
906      }
907
908      @Override
909      public void incrementBlockBytesScanned(long blockSize) {
910      }
911
912      @Override
913      public long getResponseExceptionSize() {
914        return 0;
915      }
916
917      @Override
918      public void incrementResponseExceptionSize(long exceptionSize) {
919      }
920    };
921    return rpcCall;
922  }
923
924  private static Message getMessage(Optional<Integer> forcedParamIndex) {
925
926    i = (i + 1) % 3;
927
928    Message message = null;
929
930    switch (forcedParamIndex.orElse(i)) {
931
932      case 0: {
933        message = ClientProtos.ScanRequest.newBuilder()
934          .setRegion(
935            HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region1"))
936              .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME).build())
937          .build();
938        break;
939      }
940      case 1: {
941        message = ClientProtos.MutateRequest.newBuilder()
942          .setRegion(
943            HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region2"))
944              .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
945          .setMutation(ClientProtos.MutationProto.newBuilder()
946            .setRow(ByteString.copyFromUtf8("row123")).build())
947          .build();
948        break;
949      }
950      case 2: {
951        message = ClientProtos.GetRequest.newBuilder()
952          .setRegion(
953            HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region2"))
954              .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
955          .setGet(ClientProtos.Get.newBuilder().setRow(ByteString.copyFromUtf8("row123")).build())
956          .build();
957        break;
958      }
959      default:
960        throw new RuntimeException("Not supposed to get here?");
961    }
962
963    return message;
964
965  }
966
967  private static Optional<User> getUser(String userName) {
968
969    return Optional.of(new User() {
970      @Override
971      public String getShortName() {
972        return userName;
973      }
974
975      @Override
976      public <T> T runAs(PrivilegedAction<T> action) {
977        return null;
978      }
979
980      @Override
981      public <T> T runAs(PrivilegedExceptionAction<T> action) {
982        return null;
983      }
984    });
985
986  }
987
988}