001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.namequeues;
021
022import java.io.IOException;
023import java.lang.reflect.Constructor;
024import java.net.InetAddress;
025import java.security.PrivilegedAction;
026import java.security.PrivilegedExceptionAction;
027import java.util.Collections;
028import java.util.List;
029import java.util.Optional;
030import java.util.concurrent.CompletableFuture;
031import java.util.concurrent.TimeUnit;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.CellScanner;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.ipc.RpcCall;
039import org.apache.hadoop.hbase.ipc.RpcCallback;
040import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
041import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
042import org.apache.hadoop.hbase.security.User;
043import org.apache.hadoop.hbase.testclassification.MasterTests;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.junit.Assert;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
053import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
054import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
055import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
056import org.apache.hbase.thirdparty.com.google.protobuf.Message;
057
058import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
063
064/**
065 * Tests for Online SlowLog Provider Service
066 */
067@Category({MasterTests.class, MediumTests.class})
068public class TestNamedQueueRecorder {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072    HBaseClassTestRule.forClass(TestNamedQueueRecorder.class);
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
075
076  private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil();
077
078  private NamedQueueRecorder namedQueueRecorder;
079
080  private static int i = 0;
081
082  private static Configuration applySlowLogRecorderConf(int eventSize) {
083    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
084    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
085    conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize);
086    return conf;
087  }
088
089  /**
090   * confirm that for a ringbuffer of slow logs, payload on given index of buffer
091   * has expected elements
092   *
093   * @param i index of ringbuffer logs
094   * @param j data value that was put on index i
095   * @param slowLogPayloads list of payload retrieved from {@link NamedQueueRecorder}
096   * @return if actual values are as per expectations
097   */
098  private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) {
099    boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j);
100    boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j);
101    boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j);
102    return isClassExpected && isClientExpected && isUserExpected;
103  }
104
105  @Test
106  public void testOnlieSlowLogConsumption() throws Exception{
107
108    Configuration conf = applySlowLogRecorderConf(8);
109    Constructor<NamedQueueRecorder> constructor =
110      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
111    constructor.setAccessible(true);
112    namedQueueRecorder = constructor.newInstance(conf);
113    AdminProtos.SlowLogResponseRequest request =
114      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
115
116    namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
117    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
118    LOG.debug("Initially ringbuffer of Slow Log records is empty");
119
120    int i = 0;
121
122    // add 5 records initially
123    for (; i < 5; i++) {
124      RpcLogDetails rpcLogDetails =
125        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
126      namedQueueRecorder.addRecord(rpcLogDetails);
127    }
128
129    Assert.assertNotEquals(-1,
130      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 5));
131    List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
132    Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads));
133    Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads));
134    Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads));
135    Assert.assertTrue(confirmPayloadParams(3, 2, slowLogPayloads));
136    Assert.assertTrue(confirmPayloadParams(4, 1, slowLogPayloads));
137
138    // add 2 more records
139    for (; i < 7; i++) {
140      RpcLogDetails rpcLogDetails =
141        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
142      namedQueueRecorder.addRecord(rpcLogDetails);
143    }
144
145    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
146      () -> getSlowLogPayloads(request).size() == 7));
147
148    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
149      () -> {
150        List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
151        return slowLogPayloadsList.size() == 7
152          && confirmPayloadParams(0, 7, slowLogPayloadsList)
153          && confirmPayloadParams(5, 2, slowLogPayloadsList)
154          && confirmPayloadParams(6, 1, slowLogPayloadsList);
155      })
156    );
157
158    // add 3 more records
159    for (; i < 10; i++) {
160      RpcLogDetails rpcLogDetails =
161        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
162      namedQueueRecorder.addRecord(rpcLogDetails);
163    }
164
165    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
166      () -> getSlowLogPayloads(request).size() == 8));
167
168    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
169      () -> {
170        List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
171        // confirm ringbuffer is full
172        return slowLogPayloadsList.size() == 8
173          && confirmPayloadParams(7, 3, slowLogPayloadsList)
174          && confirmPayloadParams(0, 10, slowLogPayloadsList)
175          && confirmPayloadParams(1, 9, slowLogPayloadsList);
176      })
177    );
178
179    // add 4 more records
180    for (; i < 14; i++) {
181      RpcLogDetails rpcLogDetails =
182        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
183      namedQueueRecorder.addRecord(rpcLogDetails);
184    }
185
186    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
187      () -> getSlowLogPayloads(request).size() == 8));
188
189    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
190      () -> {
191        List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
192        // confirm ringbuffer is full
193        // and ordered events
194        return slowLogPayloadsList.size() == 8
195          && confirmPayloadParams(0, 14, slowLogPayloadsList)
196          && confirmPayloadParams(1, 13, slowLogPayloadsList)
197          && confirmPayloadParams(2, 12, slowLogPayloadsList)
198          && confirmPayloadParams(3, 11, slowLogPayloadsList);
199      })
200    );
201
202    AdminProtos.SlowLogResponseRequest largeLogRequest =
203      AdminProtos.SlowLogResponseRequest.newBuilder()
204        .setLimit(15)
205        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
206        .build();
207    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
208      () -> {
209        List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(largeLogRequest);
210        // confirm ringbuffer is full
211        // and ordered events
212        return slowLogPayloadsList.size() == 8
213          && confirmPayloadParams(0, 14, slowLogPayloadsList)
214          && confirmPayloadParams(1, 13, slowLogPayloadsList)
215          && confirmPayloadParams(2, 12, slowLogPayloadsList)
216          && confirmPayloadParams(3, 11, slowLogPayloadsList);
217      })
218    );
219
220    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
221      () -> {
222        boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue(
223          NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
224
225        LOG.debug("cleared the ringbuffer of Online Slow Log records");
226
227        List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
228        // confirm ringbuffer is empty
229        return slowLogPayloadsList.size() == 0 && isRingBufferCleaned;
230      })
231    );
232
233  }
234
235  private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
236    NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
237    namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
238    namedQueueGetRequest.setSlowLogResponseRequest(request);
239    NamedQueueGetResponse namedQueueGetResponse =
240      namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
241    return namedQueueGetResponse == null ?
242      Collections.emptyList() : namedQueueGetResponse.getSlowLogPayloads();
243  }
244
245  @Test
246  public void testOnlineSlowLogWithHighRecords() throws Exception {
247
248    Configuration conf = applySlowLogRecorderConf(14);
249    Constructor<NamedQueueRecorder> constructor =
250      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
251    constructor.setAccessible(true);
252    namedQueueRecorder = constructor.newInstance(conf);
253    AdminProtos.SlowLogResponseRequest request =
254      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
255
256    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
257    LOG.debug("Initially ringbuffer of Slow Log records is empty");
258
259    for (int i = 0; i < 14 * 11; i++) {
260      RpcLogDetails rpcLogDetails =
261        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
262      namedQueueRecorder.addRecord(rpcLogDetails);
263    }
264    LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
265
266    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
267      () -> getSlowLogPayloads(request).size() == 14));
268
269    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
270      () -> {
271        List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
272
273        // confirm strict order of slow log payloads
274        return slowLogPayloads.size() == 14
275          && confirmPayloadParams(0, 154, slowLogPayloads)
276          && confirmPayloadParams(1, 153, slowLogPayloads)
277          && confirmPayloadParams(2, 152, slowLogPayloads)
278          && confirmPayloadParams(3, 151, slowLogPayloads)
279          && confirmPayloadParams(4, 150, slowLogPayloads)
280          && confirmPayloadParams(5, 149, slowLogPayloads)
281          && confirmPayloadParams(6, 148, slowLogPayloads)
282          && confirmPayloadParams(7, 147, slowLogPayloads)
283          && confirmPayloadParams(8, 146, slowLogPayloads)
284          && confirmPayloadParams(9, 145, slowLogPayloads)
285          && confirmPayloadParams(10, 144, slowLogPayloads)
286          && confirmPayloadParams(11, 143, slowLogPayloads)
287          && confirmPayloadParams(12, 142, slowLogPayloads)
288          && confirmPayloadParams(13, 141, slowLogPayloads);
289      })
290    );
291
292    boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue(
293      NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
294    Assert.assertTrue(isRingBufferCleaned);
295    LOG.debug("cleared the ringbuffer of Online Slow Log records");
296    List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
297
298    // confirm ringbuffer is empty
299    Assert.assertEquals(slowLogPayloads.size(), 0);
300  }
301
302  @Test
303  public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception {
304    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
305    conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY);
306
307    Constructor<NamedQueueRecorder> constructor =
308      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
309    constructor.setAccessible(true);
310    namedQueueRecorder = constructor.newInstance(conf);
311    AdminProtos.SlowLogResponseRequest request =
312      AdminProtos.SlowLogResponseRequest.newBuilder().build();
313    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
314    LOG.debug("Initially ringbuffer of Slow Log records is empty");
315    for (int i = 0; i < 300; i++) {
316      RpcLogDetails rpcLogDetails =
317        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
318      namedQueueRecorder.addRecord(rpcLogDetails);
319    }
320    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
321      () -> {
322        List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
323        return slowLogPayloads.size() == 0;
324      })
325    );
326
327  }
328
329  @Test
330  public void testOnlineSlowLogWithDisableConfig() throws Exception {
331    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
332    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false);
333    Constructor<NamedQueueRecorder> constructor =
334      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
335    constructor.setAccessible(true);
336    namedQueueRecorder = constructor.newInstance(conf);
337
338    AdminProtos.SlowLogResponseRequest request =
339      AdminProtos.SlowLogResponseRequest.newBuilder().build();
340    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
341    LOG.debug("Initially ringbuffer of Slow Log records is empty");
342    for (int i = 0; i < 300; i++) {
343      RpcLogDetails rpcLogDetails =
344        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
345      namedQueueRecorder.addRecord(rpcLogDetails);
346    }
347    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
348      () -> {
349        List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
350        return slowLogPayloads.size() == 0;
351      })
352    );
353    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
354  }
355
356  @Test
357  public void testSlowLogFilters() throws Exception {
358
359    Configuration conf = applySlowLogRecorderConf(30);
360    Constructor<NamedQueueRecorder> constructor =
361      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
362    constructor.setAccessible(true);
363    namedQueueRecorder = constructor.newInstance(conf);
364    AdminProtos.SlowLogResponseRequest request =
365      AdminProtos.SlowLogResponseRequest.newBuilder()
366        .setLimit(15)
367        .setUserName("userName_87")
368        .build();
369
370    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
371
372    LOG.debug("Initially ringbuffer of Slow Log records is empty");
373
374    for (int i = 0; i < 100; i++) {
375      RpcLogDetails rpcLogDetails =
376        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
377      namedQueueRecorder.addRecord(rpcLogDetails);
378    }
379    LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter");
380
381    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
382      () -> getSlowLogPayloads(request).size() == 1));
383
384    AdminProtos.SlowLogResponseRequest requestClient =
385      AdminProtos.SlowLogResponseRequest.newBuilder()
386        .setLimit(15)
387        .setClientAddress("client_85")
388        .build();
389    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
390      () -> getSlowLogPayloads(requestClient).size() == 1));
391
392    AdminProtos.SlowLogResponseRequest requestSlowLog =
393      AdminProtos.SlowLogResponseRequest.newBuilder()
394        .setLimit(15)
395        .build();
396    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
397      () -> getSlowLogPayloads(requestSlowLog).size() == 15));
398  }
399
400  @Test
401  public void testConcurrentSlowLogEvents() throws Exception {
402
403    Configuration conf = applySlowLogRecorderConf(50000);
404    Constructor<NamedQueueRecorder> constructor =
405      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
406    constructor.setAccessible(true);
407    namedQueueRecorder = constructor.newInstance(conf);
408    AdminProtos.SlowLogResponseRequest request =
409      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
410    AdminProtos.SlowLogResponseRequest largeLogRequest =
411      AdminProtos.SlowLogResponseRequest.newBuilder()
412        .setLimit(500000)
413        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
414        .build();
415    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
416    LOG.debug("Initially ringbuffer of Slow Log records is empty");
417
418    for (int j = 0; j < 1000; j++) {
419
420      CompletableFuture.runAsync(() -> {
421        for (int i = 0; i < 3500; i++) {
422          RpcLogDetails rpcLogDetails =
423            getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
424          namedQueueRecorder.addRecord(rpcLogDetails);
425        }
426      });
427
428    }
429
430    Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
431
432    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
433      5000, () -> getSlowLogPayloads(request).size() > 10000));
434    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
435      5000, () -> getSlowLogPayloads(largeLogRequest).size() > 10000));
436  }
437
438  @Test
439  public void testSlowLargeLogEvents() throws Exception {
440    Configuration conf = applySlowLogRecorderConf(28);
441    Constructor<NamedQueueRecorder> constructor =
442      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
443    constructor.setAccessible(true);
444    namedQueueRecorder = constructor.newInstance(conf);
445
446    AdminProtos.SlowLogResponseRequest request =
447      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
448
449    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
450    LOG.debug("Initially ringbuffer of Slow Log records is empty");
451
452    boolean isSlowLog;
453    boolean isLargeLog;
454    for (int i = 0; i < 14 * 11; i++) {
455      if (i % 2 == 0) {
456        isSlowLog = true;
457        isLargeLog = false;
458      } else {
459        isSlowLog = false;
460        isLargeLog = true;
461      }
462      RpcLogDetails rpcLogDetails =
463        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1),
464          isSlowLog, isLargeLog);
465      namedQueueRecorder.addRecord(rpcLogDetails);
466    }
467    LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
468
469    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
470      () -> getSlowLogPayloads(request).size() == 14));
471
472    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
473      () -> {
474        List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
475
476        // confirm strict order of slow log payloads
477        return slowLogPayloads.size() == 14
478          && confirmPayloadParams(0, 153, slowLogPayloads)
479          && confirmPayloadParams(1, 151, slowLogPayloads)
480          && confirmPayloadParams(2, 149, slowLogPayloads)
481          && confirmPayloadParams(3, 147, slowLogPayloads)
482          && confirmPayloadParams(4, 145, slowLogPayloads)
483          && confirmPayloadParams(5, 143, slowLogPayloads)
484          && confirmPayloadParams(6, 141, slowLogPayloads)
485          && confirmPayloadParams(7, 139, slowLogPayloads)
486          && confirmPayloadParams(8, 137, slowLogPayloads)
487          && confirmPayloadParams(9, 135, slowLogPayloads)
488          && confirmPayloadParams(10, 133, slowLogPayloads)
489          && confirmPayloadParams(11, 131, slowLogPayloads)
490          && confirmPayloadParams(12, 129, slowLogPayloads)
491          && confirmPayloadParams(13, 127, slowLogPayloads);
492      })
493    );
494
495    AdminProtos.SlowLogResponseRequest largeLogRequest =
496      AdminProtos.SlowLogResponseRequest.newBuilder()
497        .setLimit(14 * 11)
498        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
499        .build();
500
501    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
502      () -> getSlowLogPayloads(largeLogRequest).size() == 14));
503
504    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
505      () -> {
506        List<SlowLogPayload> largeLogPayloads = getSlowLogPayloads(largeLogRequest);
507
508        // confirm strict order of slow log payloads
509        return largeLogPayloads.size() == 14
510          && confirmPayloadParams(0, 154, largeLogPayloads)
511          && confirmPayloadParams(1, 152, largeLogPayloads)
512          && confirmPayloadParams(2, 150, largeLogPayloads)
513          && confirmPayloadParams(3, 148, largeLogPayloads)
514          && confirmPayloadParams(4, 146, largeLogPayloads)
515          && confirmPayloadParams(5, 144, largeLogPayloads)
516          && confirmPayloadParams(6, 142, largeLogPayloads)
517          && confirmPayloadParams(7, 140, largeLogPayloads)
518          && confirmPayloadParams(8, 138, largeLogPayloads)
519          && confirmPayloadParams(9, 136, largeLogPayloads)
520          && confirmPayloadParams(10, 134, largeLogPayloads)
521          && confirmPayloadParams(11, 132, largeLogPayloads)
522          && confirmPayloadParams(12, 130, largeLogPayloads)
523          && confirmPayloadParams(13, 128, largeLogPayloads);
524      })
525    );
526  }
527
528  @Test
529  public void testSlowLogMixedFilters() throws Exception {
530
531    Configuration conf = applySlowLogRecorderConf(30);
532    Constructor<NamedQueueRecorder> constructor =
533      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
534    constructor.setAccessible(true);
535    namedQueueRecorder = constructor.newInstance(conf);
536    AdminProtos.SlowLogResponseRequest request =
537      AdminProtos.SlowLogResponseRequest.newBuilder()
538        .setLimit(15)
539        .setUserName("userName_87")
540        .setClientAddress("client_88")
541        .build();
542
543    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
544
545    for (int i = 0; i < 100; i++) {
546      RpcLogDetails rpcLogDetails =
547        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
548      namedQueueRecorder.addRecord(rpcLogDetails);
549    }
550
551    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
552      () -> getSlowLogPayloads(request).size() == 2));
553
554    AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder()
555      .setLimit(15)
556      .setUserName("userName_1")
557      .setClientAddress("client_2")
558      .build();
559    Assert.assertEquals(0, getSlowLogPayloads(request2).size());
560
561    AdminProtos.SlowLogResponseRequest request3 =
562      AdminProtos.SlowLogResponseRequest.newBuilder()
563        .setLimit(15)
564        .setUserName("userName_87")
565        .setClientAddress("client_88")
566        .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND)
567        .build();
568    Assert.assertEquals(0, getSlowLogPayloads(request3).size());
569
570    AdminProtos.SlowLogResponseRequest request4 =
571      AdminProtos.SlowLogResponseRequest.newBuilder()
572        .setLimit(15)
573        .setUserName("userName_87")
574        .setClientAddress("client_87")
575        .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND)
576        .build();
577    Assert.assertEquals(1, getSlowLogPayloads(request4).size());
578
579    AdminProtos.SlowLogResponseRequest request5 =
580      AdminProtos.SlowLogResponseRequest.newBuilder()
581        .setLimit(15)
582        .setUserName("userName_88")
583        .setClientAddress("client_89")
584        .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR)
585        .build();
586    Assert.assertEquals(2, getSlowLogPayloads(request5).size());
587
588    AdminProtos.SlowLogResponseRequest requestSlowLog =
589      AdminProtos.SlowLogResponseRequest.newBuilder()
590        .setLimit(15)
591        .build();
592    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
593      () -> getSlowLogPayloads(requestSlowLog).size() == 15));
594  }
595
596  static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) {
597    RpcCall rpcCall = getRpcCall(userName);
598    return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, true, true);
599  }
600
601  private RpcLogDetails getRpcLogDetails(String userName, String clientAddress,
602      String className, boolean isSlowLog, boolean isLargeLog) {
603    RpcCall rpcCall = getRpcCall(userName);
604    return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, isSlowLog,
605      isLargeLog);
606  }
607
608  private static RpcCall getRpcCall(String userName) {
609    RpcCall rpcCall = new RpcCall() {
610      @Override
611      public BlockingService getService() {
612        return null;
613      }
614
615      @Override
616      public Descriptors.MethodDescriptor getMethod() {
617        return null;
618      }
619
620      @Override
621      public Message getParam() {
622        return getMessage();
623      }
624
625      @Override
626      public CellScanner getCellScanner() {
627        return null;
628      }
629
630      @Override
631      public long getReceiveTime() {
632        return 0;
633      }
634
635      @Override
636      public long getStartTime() {
637        return 0;
638      }
639
640      @Override
641      public void setStartTime(long startTime) {
642
643      }
644
645      @Override
646      public int getTimeout() {
647        return 0;
648      }
649
650      @Override
651      public int getPriority() {
652        return 0;
653      }
654
655      @Override
656      public long getDeadline() {
657        return 0;
658      }
659
660      @Override
661      public long getSize() {
662        return 0;
663      }
664
665      @Override
666      public RPCProtos.RequestHeader getHeader() {
667        return null;
668      }
669
670      @Override
671      public int getRemotePort() {
672        return 0;
673      }
674
675      @Override
676      public void setResponse(Message param, CellScanner cells,
677        Throwable errorThrowable, String error) {
678      }
679
680      @Override
681      public void sendResponseIfReady() throws IOException {
682      }
683
684      @Override
685      public void cleanup() {
686      }
687
688      @Override
689      public String toShortString() {
690        return null;
691      }
692
693      @Override
694      public long disconnectSince() {
695        return 0;
696      }
697
698      @Override
699      public boolean isClientCellBlockSupported() {
700        return false;
701      }
702
703      @Override
704      public Optional<User> getRequestUser() {
705        return getUser(userName);
706      }
707
708      @Override
709      public InetAddress getRemoteAddress() {
710        return null;
711      }
712
713      @Override
714      public HBaseProtos.VersionInfo getClientVersionInfo() {
715        return null;
716      }
717
718      @Override
719      public void setCallBack(RpcCallback callback) {
720      }
721
722      @Override
723      public boolean isRetryImmediatelySupported() {
724        return false;
725      }
726
727      @Override
728      public long getResponseCellSize() {
729        return 0;
730      }
731
732      @Override
733      public void incrementResponseCellSize(long cellSize) {
734      }
735
736      @Override
737      public long getResponseBlockSize() {
738        return 0;
739      }
740
741      @Override
742      public void incrementResponseBlockSize(long blockSize) {
743      }
744
745      @Override
746      public long getResponseExceptionSize() {
747        return 0;
748      }
749
750      @Override
751      public void incrementResponseExceptionSize(long exceptionSize) {
752      }
753    };
754    return rpcCall;
755  }
756
757  private static Message getMessage() {
758
759    i = (i + 1) % 3;
760
761    Message message = null;
762
763    switch (i) {
764
765      case 0: {
766        message = ClientProtos.ScanRequest.newBuilder()
767          .setRegion(HBaseProtos.RegionSpecifier.newBuilder()
768            .setValue(ByteString.copyFromUtf8("region1"))
769            .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)
770            .build())
771          .build();
772        break;
773      }
774      case 1: {
775        message = ClientProtos.MutateRequest.newBuilder()
776          .setRegion(HBaseProtos.RegionSpecifier.newBuilder()
777            .setValue(ByteString.copyFromUtf8("region2"))
778            .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
779          .setMutation(ClientProtos.MutationProto.newBuilder()
780            .setRow(ByteString.copyFromUtf8("row123"))
781            .build())
782          .build();
783        break;
784      }
785      case 2: {
786        message = ClientProtos.GetRequest.newBuilder()
787          .setRegion(HBaseProtos.RegionSpecifier.newBuilder()
788            .setValue(ByteString.copyFromUtf8("region2"))
789            .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
790          .setGet(ClientProtos.Get.newBuilder()
791            .setRow(ByteString.copyFromUtf8("row123"))
792            .build())
793          .build();
794        break;
795      }
796      default:
797        throw new RuntimeException("Not supposed to get here?");
798    }
799
800    return message;
801
802  }
803
804  private static Optional<User> getUser(String userName) {
805
806    return Optional.of(new User() {
807      @Override
808      public String getShortName() {
809        return userName;
810      }
811
812      @Override
813      public <T> T runAs(PrivilegedAction<T> action) {
814        return null;
815      }
816
817      @Override
818      public <T> T runAs(PrivilegedExceptionAction<T> action) {
819        return null;
820      }
821    });
822
823  }
824
825}