001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues;
019
020import java.io.IOException;
021import java.lang.reflect.Constructor;
022import java.net.InetAddress;
023import java.security.PrivilegedAction;
024import java.security.PrivilegedExceptionAction;
025import java.security.cert.X509Certificate;
026import java.util.Collections;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.concurrent.CompletableFuture;
031import java.util.concurrent.TimeUnit;
032import java.util.stream.Collectors;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.CellScanner;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.ipc.RpcCall;
039import org.apache.hadoop.hbase.ipc.RpcCallback;
040import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
041import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
042import org.apache.hadoop.hbase.security.User;
043import org.apache.hadoop.hbase.testclassification.MasterTests;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.junit.Assert;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
054import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
055import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
056import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
057import org.apache.hbase.thirdparty.com.google.protobuf.Message;
058
059import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
064
065/**
066 * Tests for Online SlowLog Provider Service
067 */
068@Category({ MasterTests.class, MediumTests.class })
069public class TestNamedQueueRecorder {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestNamedQueueRecorder.class);
074
075  private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
076
077  private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil();
078  private static final List<HBaseProtos.NameBytesPair> REQUEST_HEADERS =
079    ImmutableList.<HBaseProtos.NameBytesPair> builder()
080      .add(HBaseProtos.NameBytesPair.newBuilder().setName("1")
081        .setValue(ByteString.copyFromUtf8("r")).build())
082      .add(HBaseProtos.NameBytesPair.newBuilder().setName("2")
083        .setValue(ByteString.copyFromUtf8("h")).build())
084      .build();
085  private static final List<HBaseProtos.NameBytesPair> CONNECTION_HEADERS =
086    ImmutableList.<HBaseProtos.NameBytesPair> builder()
087      .add(HBaseProtos.NameBytesPair.newBuilder().setName("1")
088        .setValue(ByteString.copyFromUtf8("c")).build())
089      .add(HBaseProtos.NameBytesPair.newBuilder().setName("2")
090        .setValue(ByteString.copyFromUtf8("h")).build())
091      .build();
092
093  private NamedQueueRecorder namedQueueRecorder;
094
095  private static int i = 0;
096
097  private static Configuration applySlowLogRecorderConf(int eventSize) {
098    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
099    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
100    conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize);
101    return conf;
102  }
103
104  /**
105   * confirm that for a ringbuffer of slow logs, payload on given index of buffer has expected
106   * elements
107   * @param i               index of ringbuffer logs
108   * @param j               data value that was put on index i
109   * @param slowLogPayloads list of payload retrieved from {@link NamedQueueRecorder}
110   * @return if actual values are as per expectations
111   */
112  private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) {
113    boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j);
114    boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j);
115    boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j);
116    return isClassExpected && isClientExpected && isUserExpected;
117  }
118
119  @Test
120  public void testOnlieSlowLogConsumption() throws Exception {
121
122    Configuration conf = applySlowLogRecorderConf(8);
123    Constructor<NamedQueueRecorder> constructor =
124      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
125    constructor.setAccessible(true);
126    namedQueueRecorder = constructor.newInstance(conf);
127    AdminProtos.SlowLogResponseRequest request =
128      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
129
130    namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
131    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
132    LOG.debug("Initially ringbuffer of Slow Log records is empty");
133
134    int i = 0;
135
136    // add 5 records initially
137    for (; i < 5; i++) {
138      RpcLogDetails rpcLogDetails =
139        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
140      namedQueueRecorder.addRecord(rpcLogDetails);
141    }
142
143    Assert.assertNotEquals(-1,
144      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 5));
145    List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
146    Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads));
147    Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads));
148    Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads));
149    Assert.assertTrue(confirmPayloadParams(3, 2, slowLogPayloads));
150    Assert.assertTrue(confirmPayloadParams(4, 1, slowLogPayloads));
151
152    // add 2 more records
153    for (; i < 7; i++) {
154      RpcLogDetails rpcLogDetails =
155        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
156      namedQueueRecorder.addRecord(rpcLogDetails);
157    }
158
159    Assert.assertNotEquals(-1,
160      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 7));
161
162    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
163      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
164      return slowLogPayloadsList.size() == 7 && confirmPayloadParams(0, 7, slowLogPayloadsList)
165        && confirmPayloadParams(5, 2, slowLogPayloadsList)
166        && confirmPayloadParams(6, 1, slowLogPayloadsList);
167    }));
168
169    // add 3 more records
170    for (; i < 10; i++) {
171      RpcLogDetails rpcLogDetails =
172        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
173      namedQueueRecorder.addRecord(rpcLogDetails);
174    }
175
176    Assert.assertNotEquals(-1,
177      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 8));
178
179    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
180      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
181      // confirm ringbuffer is full
182      return slowLogPayloadsList.size() == 8 && confirmPayloadParams(7, 3, slowLogPayloadsList)
183        && confirmPayloadParams(0, 10, slowLogPayloadsList)
184        && confirmPayloadParams(1, 9, slowLogPayloadsList);
185    }));
186
187    // add 4 more records
188    for (; i < 14; i++) {
189      RpcLogDetails rpcLogDetails =
190        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
191      namedQueueRecorder.addRecord(rpcLogDetails);
192    }
193
194    Assert.assertNotEquals(-1,
195      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 8));
196
197    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
198      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
199      // confirm ringbuffer is full
200      // and ordered events
201      return slowLogPayloadsList.size() == 8 && confirmPayloadParams(0, 14, slowLogPayloadsList)
202        && confirmPayloadParams(1, 13, slowLogPayloadsList)
203        && confirmPayloadParams(2, 12, slowLogPayloadsList)
204        && confirmPayloadParams(3, 11, slowLogPayloadsList);
205    }));
206
207    AdminProtos.SlowLogResponseRequest largeLogRequest =
208      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15)
209        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build();
210    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
211      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(largeLogRequest);
212      // confirm ringbuffer is full
213      // and ordered events
214      return slowLogPayloadsList.size() == 8 && confirmPayloadParams(0, 14, slowLogPayloadsList)
215        && confirmPayloadParams(1, 13, slowLogPayloadsList)
216        && confirmPayloadParams(2, 12, slowLogPayloadsList)
217        && confirmPayloadParams(3, 11, slowLogPayloadsList);
218    }));
219
220    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
221      boolean isRingBufferCleaned =
222        namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
223
224      LOG.debug("cleared the ringbuffer of Online Slow Log records");
225
226      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
227      // confirm ringbuffer is empty
228      return slowLogPayloadsList.size() == 0 && isRingBufferCleaned;
229    }));
230
231  }
232
233  private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
234    NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
235    namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
236    namedQueueGetRequest.setSlowLogResponseRequest(request);
237    NamedQueueGetResponse namedQueueGetResponse =
238      namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
239    return namedQueueGetResponse == null
240      ? Collections.emptyList()
241      : namedQueueGetResponse.getSlowLogPayloads();
242  }
243
244  @Test
245  public void testOnlineSlowLogWithHighRecords() throws Exception {
246
247    Configuration conf = applySlowLogRecorderConf(14);
248    Constructor<NamedQueueRecorder> constructor =
249      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
250    constructor.setAccessible(true);
251    namedQueueRecorder = constructor.newInstance(conf);
252    AdminProtos.SlowLogResponseRequest request =
253      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
254
255    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
256    LOG.debug("Initially ringbuffer of Slow Log records is empty");
257
258    for (int i = 0; i < 14 * 11; i++) {
259      RpcLogDetails rpcLogDetails =
260        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
261      namedQueueRecorder.addRecord(rpcLogDetails);
262    }
263    LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
264
265    Assert.assertNotEquals(-1,
266      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14));
267
268    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
269      List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
270
271      // confirm strict order of slow log payloads
272      return slowLogPayloads.size() == 14 && confirmPayloadParams(0, 154, slowLogPayloads)
273        && confirmPayloadParams(1, 153, slowLogPayloads)
274        && confirmPayloadParams(2, 152, slowLogPayloads)
275        && confirmPayloadParams(3, 151, slowLogPayloads)
276        && confirmPayloadParams(4, 150, slowLogPayloads)
277        && confirmPayloadParams(5, 149, slowLogPayloads)
278        && confirmPayloadParams(6, 148, slowLogPayloads)
279        && confirmPayloadParams(7, 147, slowLogPayloads)
280        && confirmPayloadParams(8, 146, slowLogPayloads)
281        && confirmPayloadParams(9, 145, slowLogPayloads)
282        && confirmPayloadParams(10, 144, slowLogPayloads)
283        && confirmPayloadParams(11, 143, slowLogPayloads)
284        && confirmPayloadParams(12, 142, slowLogPayloads)
285        && confirmPayloadParams(13, 141, slowLogPayloads);
286    }));
287
288    boolean isRingBufferCleaned =
289      namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
290    Assert.assertTrue(isRingBufferCleaned);
291    LOG.debug("cleared the ringbuffer of Online Slow Log records");
292    List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
293
294    // confirm ringbuffer is empty
295    Assert.assertEquals(slowLogPayloads.size(), 0);
296  }
297
298  @Test
299  public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception {
300    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
301    conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY);
302
303    Constructor<NamedQueueRecorder> constructor =
304      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
305    constructor.setAccessible(true);
306    namedQueueRecorder = constructor.newInstance(conf);
307    AdminProtos.SlowLogResponseRequest request =
308      AdminProtos.SlowLogResponseRequest.newBuilder().build();
309    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
310    LOG.debug("Initially ringbuffer of Slow Log records is empty");
311    for (int i = 0; i < 300; i++) {
312      RpcLogDetails rpcLogDetails =
313        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
314      namedQueueRecorder.addRecord(rpcLogDetails);
315    }
316    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
317      List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
318      return slowLogPayloads.size() == 0;
319    }));
320
321  }
322
323  @Test
324  public void testOnlineSlowLogWithDisableConfig() throws Exception {
325    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
326    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false);
327    Constructor<NamedQueueRecorder> constructor =
328      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
329    constructor.setAccessible(true);
330    namedQueueRecorder = constructor.newInstance(conf);
331
332    AdminProtos.SlowLogResponseRequest request =
333      AdminProtos.SlowLogResponseRequest.newBuilder().build();
334    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
335    LOG.debug("Initially ringbuffer of Slow Log records is empty");
336    for (int i = 0; i < 300; i++) {
337      RpcLogDetails rpcLogDetails =
338        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
339      namedQueueRecorder.addRecord(rpcLogDetails);
340    }
341    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
342      List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
343      return slowLogPayloads.size() == 0;
344    }));
345    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
346  }
347
348  @Test
349  public void testSlowLogFilters() throws Exception {
350
351    Configuration conf = applySlowLogRecorderConf(30);
352    Constructor<NamedQueueRecorder> constructor =
353      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
354    constructor.setAccessible(true);
355    namedQueueRecorder = constructor.newInstance(conf);
356    AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder()
357      .setLimit(15).setUserName("userName_87").build();
358
359    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
360
361    LOG.debug("Initially ringbuffer of Slow Log records is empty");
362
363    for (int i = 0; i < 100; i++) {
364      RpcLogDetails rpcLogDetails =
365        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
366      namedQueueRecorder.addRecord(rpcLogDetails);
367    }
368    LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter");
369
370    Assert.assertNotEquals(-1,
371      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 1));
372
373    AdminProtos.SlowLogResponseRequest requestClient = AdminProtos.SlowLogResponseRequest
374      .newBuilder().setLimit(15).setClientAddress("client_85").build();
375    Assert.assertNotEquals(-1,
376      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestClient).size() == 1));
377
378    AdminProtos.SlowLogResponseRequest requestSlowLog =
379      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
380    Assert.assertNotEquals(-1,
381      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15));
382  }
383
384  @Test
385  public void testConcurrentSlowLogEvents() throws Exception {
386
387    Configuration conf = applySlowLogRecorderConf(50000);
388    Constructor<NamedQueueRecorder> constructor =
389      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
390    constructor.setAccessible(true);
391    namedQueueRecorder = constructor.newInstance(conf);
392    AdminProtos.SlowLogResponseRequest request =
393      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
394    AdminProtos.SlowLogResponseRequest largeLogRequest =
395      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000)
396        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build();
397    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
398    LOG.debug("Initially ringbuffer of Slow Log records is empty");
399
400    for (int j = 0; j < 1000; j++) {
401
402      CompletableFuture.runAsync(() -> {
403        for (int i = 0; i < 3500; i++) {
404          RpcLogDetails rpcLogDetails =
405            getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
406          namedQueueRecorder.addRecord(rpcLogDetails);
407        }
408      });
409
410    }
411
412    Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
413
414    Assert.assertNotEquals(-1,
415      HBASE_TESTING_UTILITY.waitFor(5000, () -> getSlowLogPayloads(request).size() > 10000));
416    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(5000,
417      () -> getSlowLogPayloads(largeLogRequest).size() > 10000));
418  }
419
420  @Test
421  public void testSlowLargeLogEvents() throws Exception {
422    Configuration conf = applySlowLogRecorderConf(28);
423    Constructor<NamedQueueRecorder> constructor =
424      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
425    constructor.setAccessible(true);
426    namedQueueRecorder = constructor.newInstance(conf);
427
428    AdminProtos.SlowLogResponseRequest request =
429      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
430
431    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
432    LOG.debug("Initially ringbuffer of Slow Log records is empty");
433
434    boolean isSlowLog;
435    boolean isLargeLog;
436    for (int i = 0; i < 14 * 11; i++) {
437      if (i % 2 == 0) {
438        isSlowLog = true;
439        isLargeLog = false;
440      } else {
441        isSlowLog = false;
442        isLargeLog = true;
443      }
444      RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1),
445        "class_" + (i + 1), isSlowLog, isLargeLog);
446      namedQueueRecorder.addRecord(rpcLogDetails);
447    }
448    LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
449
450    Assert.assertNotEquals(-1,
451      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14));
452
453    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
454      List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
455
456      // confirm strict order of slow log payloads
457      return slowLogPayloads.size() == 14 && confirmPayloadParams(0, 153, slowLogPayloads)
458        && confirmPayloadParams(1, 151, slowLogPayloads)
459        && confirmPayloadParams(2, 149, slowLogPayloads)
460        && confirmPayloadParams(3, 147, slowLogPayloads)
461        && confirmPayloadParams(4, 145, slowLogPayloads)
462        && confirmPayloadParams(5, 143, slowLogPayloads)
463        && confirmPayloadParams(6, 141, slowLogPayloads)
464        && confirmPayloadParams(7, 139, slowLogPayloads)
465        && confirmPayloadParams(8, 137, slowLogPayloads)
466        && confirmPayloadParams(9, 135, slowLogPayloads)
467        && confirmPayloadParams(10, 133, slowLogPayloads)
468        && confirmPayloadParams(11, 131, slowLogPayloads)
469        && confirmPayloadParams(12, 129, slowLogPayloads)
470        && confirmPayloadParams(13, 127, slowLogPayloads);
471    }));
472
473    AdminProtos.SlowLogResponseRequest largeLogRequest =
474      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11)
475        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build();
476
477    Assert.assertNotEquals(-1,
478      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(largeLogRequest).size() == 14));
479
480    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
481      List<SlowLogPayload> largeLogPayloads = getSlowLogPayloads(largeLogRequest);
482
483      // confirm strict order of slow log payloads
484      return largeLogPayloads.size() == 14 && confirmPayloadParams(0, 154, largeLogPayloads)
485        && confirmPayloadParams(1, 152, largeLogPayloads)
486        && confirmPayloadParams(2, 150, largeLogPayloads)
487        && confirmPayloadParams(3, 148, largeLogPayloads)
488        && confirmPayloadParams(4, 146, largeLogPayloads)
489        && confirmPayloadParams(5, 144, largeLogPayloads)
490        && confirmPayloadParams(6, 142, largeLogPayloads)
491        && confirmPayloadParams(7, 140, largeLogPayloads)
492        && confirmPayloadParams(8, 138, largeLogPayloads)
493        && confirmPayloadParams(9, 136, largeLogPayloads)
494        && confirmPayloadParams(10, 134, largeLogPayloads)
495        && confirmPayloadParams(11, 132, largeLogPayloads)
496        && confirmPayloadParams(12, 130, largeLogPayloads)
497        && confirmPayloadParams(13, 128, largeLogPayloads);
498    }));
499  }
500
501  @Test
502  public void testSlowLogMixedFilters() throws Exception {
503
504    Configuration conf = applySlowLogRecorderConf(30);
505    Constructor<NamedQueueRecorder> constructor =
506      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
507    constructor.setAccessible(true);
508    namedQueueRecorder = constructor.newInstance(conf);
509    AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder()
510      .setLimit(15).setUserName("userName_87").setClientAddress("client_88").build();
511
512    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
513
514    for (int i = 0; i < 100; i++) {
515      RpcLogDetails rpcLogDetails =
516        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
517      namedQueueRecorder.addRecord(rpcLogDetails);
518    }
519
520    Assert.assertNotEquals(-1,
521      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 2));
522
523    AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder()
524      .setLimit(15).setUserName("userName_1").setClientAddress("client_2").build();
525    Assert.assertEquals(0, getSlowLogPayloads(request2).size());
526
527    AdminProtos.SlowLogResponseRequest request3 = AdminProtos.SlowLogResponseRequest.newBuilder()
528      .setLimit(15).setUserName("userName_87").setClientAddress("client_88")
529      .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND).build();
530    Assert.assertEquals(0, getSlowLogPayloads(request3).size());
531
532    AdminProtos.SlowLogResponseRequest request4 = AdminProtos.SlowLogResponseRequest.newBuilder()
533      .setLimit(15).setUserName("userName_87").setClientAddress("client_87")
534      .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND).build();
535    Assert.assertEquals(1, getSlowLogPayloads(request4).size());
536
537    AdminProtos.SlowLogResponseRequest request5 = AdminProtos.SlowLogResponseRequest.newBuilder()
538      .setLimit(15).setUserName("userName_88").setClientAddress("client_89")
539      .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR).build();
540    Assert.assertEquals(2, getSlowLogPayloads(request5).size());
541
542    AdminProtos.SlowLogResponseRequest requestSlowLog =
543      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
544    Assert.assertNotEquals(-1,
545      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15));
546  }
547
548  @Test
549  public void testOnlineSlowLogScanPayloadDefaultDisabled() throws Exception {
550    Configuration conf = applySlowLogRecorderConf(1);
551    conf.unset(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED);
552    Constructor<NamedQueueRecorder> constructor =
553      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
554    constructor.setAccessible(true);
555    namedQueueRecorder = constructor.newInstance(conf);
556    AdminProtos.SlowLogResponseRequest request =
557      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
558
559    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
560    LOG.debug("Initially ringbuffer of Slow Log records is empty");
561    RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
562    namedQueueRecorder.addRecord(rpcLogDetails);
563    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
564      Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
565      if (slowLogPayload.isPresent()) {
566        return !slowLogPayload.get().hasScan();
567      }
568      return false;
569    }));
570  }
571
572  @Test
573  public void testOnlineSlowLogScanPayloadExplicitlyDisabled() throws Exception {
574    Configuration conf = applySlowLogRecorderConf(1);
575    conf.setBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, false);
576    Constructor<NamedQueueRecorder> constructor =
577      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
578    constructor.setAccessible(true);
579    namedQueueRecorder = constructor.newInstance(conf);
580    AdminProtos.SlowLogResponseRequest request =
581      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
582
583    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
584    LOG.debug("Initially ringbuffer of Slow Log records is empty");
585    RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
586    namedQueueRecorder.addRecord(rpcLogDetails);
587    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
588      Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
589      if (slowLogPayload.isPresent()) {
590        return !slowLogPayload.get().hasScan();
591      }
592      return false;
593    }));
594  }
595
596  @Test
597  public void testOnlineSlowLogScanPayloadExplicitlyEnabled() throws Exception {
598    Configuration conf = applySlowLogRecorderConf(1);
599    conf.setBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, true);
600    Constructor<NamedQueueRecorder> constructor =
601      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
602    constructor.setAccessible(true);
603    namedQueueRecorder = constructor.newInstance(conf);
604    AdminProtos.SlowLogResponseRequest request =
605      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
606
607    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
608    LOG.debug("Initially ringbuffer of Slow Log records is empty");
609    RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
610    namedQueueRecorder.addRecord(rpcLogDetails);
611    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
612      Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
613      if (slowLogPayload.isPresent()) {
614        return slowLogPayload.get().hasScan();
615      }
616      return false;
617    }));
618  }
619
620  @Test
621  public void testOnlineSlowLogRequestAttributes() throws Exception {
622    Configuration conf = applySlowLogRecorderConf(1);
623    Constructor<NamedQueueRecorder> constructor =
624      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
625    constructor.setAccessible(true);
626    namedQueueRecorder = constructor.newInstance(conf);
627    AdminProtos.SlowLogResponseRequest request =
628      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
629
630    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
631    LOG.debug("Initially ringbuffer of Slow Log records is empty");
632    RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
633    namedQueueRecorder.addRecord(rpcLogDetails);
634    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
635      Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
636      if (slowLogPayload.isPresent() && !slowLogPayload.get().getRequestAttributeList().isEmpty()) {
637        return slowLogPayload.get().getRequestAttributeList().containsAll(REQUEST_HEADERS);
638      }
639      return false;
640    }));
641  }
642
643  @Test
644  public void testOnlineSlowLogConnectionAttributes() throws Exception {
645    Configuration conf = applySlowLogRecorderConf(1);
646    Constructor<NamedQueueRecorder> constructor =
647      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
648    constructor.setAccessible(true);
649    namedQueueRecorder = constructor.newInstance(conf);
650    AdminProtos.SlowLogResponseRequest request =
651      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
652
653    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
654    LOG.debug("Initially ringbuffer of Slow Log records is empty");
655    RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
656    namedQueueRecorder.addRecord(rpcLogDetails);
657    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
658      Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
659      if (
660        slowLogPayload.isPresent() && !slowLogPayload.get().getConnectionAttributeList().isEmpty()
661      ) {
662        return slowLogPayload.get().getConnectionAttributeList().containsAll(CONNECTION_HEADERS);
663      }
664      return false;
665    }));
666  }
667
668  static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className,
669    int forcedParamIndex) {
670    RpcCall rpcCall = getRpcCall(userName, forcedParamIndex);
671    return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className, true,
672      true);
673  }
674
675  static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) {
676    RpcCall rpcCall = getRpcCall(userName);
677    return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className, true,
678      true);
679  }
680
681  private static RpcLogDetails getRpcLogDetailsOfScan() {
682    // forcedParamIndex of 0 results in a ScanRequest
683    return getRpcLogDetails("userName_1", "client_1", "class_1", 0);
684  }
685
686  private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className,
687    boolean isSlowLog, boolean isLargeLog) {
688    RpcCall rpcCall = getRpcCall(userName);
689    return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, 0, className,
690      isSlowLog, isLargeLog);
691  }
692
693  private static RpcCall getRpcCall(String userName) {
694    return getRpcCall(userName, Optional.empty());
695  }
696
697  private static RpcCall getRpcCall(String userName, int forcedParamIndex) {
698    return getRpcCall(userName, Optional.of(forcedParamIndex));
699  }
700
701  @SuppressWarnings("checkstyle:methodlength")
702  private static RpcCall getRpcCall(String userName, Optional<Integer> forcedParamIndex) {
703    RpcCall rpcCall = new RpcCall() {
704      @Override
705      public BlockingService getService() {
706        return null;
707      }
708
709      @Override
710      public Descriptors.MethodDescriptor getMethod() {
711        return null;
712      }
713
714      @Override
715      public Message getParam() {
716        return getMessage(forcedParamIndex);
717      }
718
719      @Override
720      public CellScanner getCellScanner() {
721        return null;
722      }
723
724      @Override
725      public long getReceiveTime() {
726        return 0;
727      }
728
729      @Override
730      public long getStartTime() {
731        return 0;
732      }
733
734      @Override
735      public void setStartTime(long startTime) {
736      }
737
738      @Override
739      public int getTimeout() {
740        return 0;
741      }
742
743      @Override
744      public int getPriority() {
745        return 0;
746      }
747
748      @Override
749      public long getDeadline() {
750        return 0;
751      }
752
753      @Override
754      public long getSize() {
755        return 0;
756      }
757
758      @Override
759      public RPCProtos.RequestHeader getHeader() {
760        return null;
761      }
762
763      @Override
764      public Map<String, byte[]> getConnectionAttributes() {
765        return CONNECTION_HEADERS.stream().collect(Collectors
766          .toMap(HBaseProtos.NameBytesPair::getName, pair -> pair.getValue().toByteArray()));
767      }
768
769      @Override
770      public Map<String, byte[]> getRequestAttributes() {
771        return REQUEST_HEADERS.stream().collect(Collectors.toMap(HBaseProtos.NameBytesPair::getName,
772          pair -> pair.getValue().toByteArray()));
773      }
774
775      @Override
776      public byte[] getRequestAttribute(String key) {
777        return null;
778      }
779
780      @Override
781      public int getRemotePort() {
782        return 0;
783      }
784
785      @Override
786      public void setResponse(Message param, CellScanner cells, Throwable errorThrowable,
787        String error) {
788      }
789
790      @Override
791      public void sendResponseIfReady() throws IOException {
792      }
793
794      @Override
795      public void cleanup() {
796      }
797
798      @Override
799      public String toShortString() {
800        return null;
801      }
802
803      @Override
804      public long disconnectSince() {
805        return 0;
806      }
807
808      @Override
809      public boolean isClientCellBlockSupported() {
810        return false;
811      }
812
813      @Override
814      public Optional<User> getRequestUser() {
815        return getUser(userName);
816      }
817
818      @Override
819      public Optional<X509Certificate[]> getClientCertificateChain() {
820        return Optional.empty();
821      }
822
823      @Override
824      public InetAddress getRemoteAddress() {
825        return null;
826      }
827
828      @Override
829      public HBaseProtos.VersionInfo getClientVersionInfo() {
830        return null;
831      }
832
833      @Override
834      public void setCallBack(RpcCallback callback) {
835      }
836
837      @Override
838      public boolean isRetryImmediatelySupported() {
839        return false;
840      }
841
842      @Override
843      public long getResponseCellSize() {
844        return 0;
845      }
846
847      @Override
848      public void incrementResponseCellSize(long cellSize) {
849      }
850
851      @Override
852      public long getBlockBytesScanned() {
853        return 0;
854      }
855
856      @Override
857      public void incrementBlockBytesScanned(long blockSize) {
858      }
859
860      @Override
861      public long getResponseExceptionSize() {
862        return 0;
863      }
864
865      @Override
866      public void incrementResponseExceptionSize(long exceptionSize) {
867      }
868
869      @Override
870      public void updateFsReadTime(long latencyMillis) {
871
872      }
873
874      @Override
875      public long getFsReadTime() {
876        return 0;
877      }
878    };
879    return rpcCall;
880  }
881
882  private static Message getMessage(Optional<Integer> forcedParamIndex) {
883
884    i = (i + 1) % 3;
885
886    Message message = null;
887
888    switch (forcedParamIndex.orElse(i)) {
889
890      case 0: {
891        message = ClientProtos.ScanRequest.newBuilder()
892          .setRegion(
893            HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region1"))
894              .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME).build())
895          .build();
896        break;
897      }
898      case 1: {
899        message = ClientProtos.MutateRequest.newBuilder()
900          .setRegion(
901            HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region2"))
902              .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
903          .setMutation(ClientProtos.MutationProto.newBuilder()
904            .setRow(ByteString.copyFromUtf8("row123")).build())
905          .build();
906        break;
907      }
908      case 2: {
909        message = ClientProtos.GetRequest.newBuilder()
910          .setRegion(
911            HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region2"))
912              .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
913          .setGet(ClientProtos.Get.newBuilder().setRow(ByteString.copyFromUtf8("row123")).build())
914          .build();
915        break;
916      }
917      default:
918        throw new RuntimeException("Not supposed to get here?");
919    }
920
921    return message;
922
923  }
924
925  private static Optional<User> getUser(String userName) {
926
927    return Optional.of(new User() {
928      @Override
929      public String getShortName() {
930        return userName;
931      }
932
933      @Override
934      public <T> T runAs(PrivilegedAction<T> action) {
935        return null;
936      }
937
938      @Override
939      public <T> T runAs(PrivilegedExceptionAction<T> action) {
940        return null;
941      }
942    });
943
944  }
945
946}