001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues;
019
020import java.io.IOException;
021import java.lang.reflect.Constructor;
022import java.net.InetAddress;
023import java.security.PrivilegedAction;
024import java.security.PrivilegedExceptionAction;
025import java.util.Collections;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.CellScanner;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.ipc.RpcCall;
036import org.apache.hadoop.hbase.ipc.RpcCallback;
037import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
038import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
039import org.apache.hadoop.hbase.security.User;
040import org.apache.hadoop.hbase.testclassification.MasterTests;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.junit.Assert;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
050import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
051import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
052import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
053import org.apache.hbase.thirdparty.com.google.protobuf.Message;
054
055import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
060
061/**
062 * Tests for Online SlowLog Provider Service
063 */
064@Category({ MasterTests.class, MediumTests.class })
065public class TestNamedQueueRecorder {
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestNamedQueueRecorder.class);
070
071  private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
072
073  private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
074
075  private NamedQueueRecorder namedQueueRecorder;
076
077  private static int i = 0;
078
079  private static Configuration applySlowLogRecorderConf(int eventSize) {
080    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
081    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
082    conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize);
083    return conf;
084  }
085
086  /**
087   * confirm that for a ringbuffer of slow logs, payload on given index of buffer has expected
088   * elements
089   * @param i               index of ringbuffer logs
090   * @param j               data value that was put on index i
091   * @param slowLogPayloads list of payload retrieved from {@link NamedQueueRecorder}
092   * @return if actual values are as per expectations
093   */
094  private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) {
095    boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j);
096    boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j);
097    boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j);
098    return isClassExpected && isClientExpected && isUserExpected;
099  }
100
101  @Test
102  public void testOnlieSlowLogConsumption() throws Exception {
103
104    Configuration conf = applySlowLogRecorderConf(8);
105    Constructor<NamedQueueRecorder> constructor =
106      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
107    constructor.setAccessible(true);
108    namedQueueRecorder = constructor.newInstance(conf);
109    AdminProtos.SlowLogResponseRequest request =
110      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
111
112    namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
113    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
114    LOG.debug("Initially ringbuffer of Slow Log records is empty");
115
116    int i = 0;
117
118    // add 5 records initially
119    for (; i < 5; i++) {
120      RpcLogDetails rpcLogDetails =
121        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
122      namedQueueRecorder.addRecord(rpcLogDetails);
123    }
124
125    Assert.assertNotEquals(-1,
126      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 5));
127    List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
128    Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads));
129    Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads));
130    Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads));
131    Assert.assertTrue(confirmPayloadParams(3, 2, slowLogPayloads));
132    Assert.assertTrue(confirmPayloadParams(4, 1, slowLogPayloads));
133
134    // add 2 more records
135    for (; i < 7; i++) {
136      RpcLogDetails rpcLogDetails =
137        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
138      namedQueueRecorder.addRecord(rpcLogDetails);
139    }
140
141    Assert.assertNotEquals(-1,
142      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 7));
143
144    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
145      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
146      return slowLogPayloadsList.size() == 7 && confirmPayloadParams(0, 7, slowLogPayloadsList)
147        && confirmPayloadParams(5, 2, slowLogPayloadsList)
148        && confirmPayloadParams(6, 1, slowLogPayloadsList);
149    }));
150
151    // add 3 more records
152    for (; i < 10; i++) {
153      RpcLogDetails rpcLogDetails =
154        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
155      namedQueueRecorder.addRecord(rpcLogDetails);
156    }
157
158    Assert.assertNotEquals(-1,
159      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 8));
160
161    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
162      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
163      // confirm ringbuffer is full
164      return slowLogPayloadsList.size() == 8 && confirmPayloadParams(7, 3, slowLogPayloadsList)
165        && confirmPayloadParams(0, 10, slowLogPayloadsList)
166        && confirmPayloadParams(1, 9, slowLogPayloadsList);
167    }));
168
169    // add 4 more records
170    for (; i < 14; i++) {
171      RpcLogDetails rpcLogDetails =
172        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
173      namedQueueRecorder.addRecord(rpcLogDetails);
174    }
175
176    Assert.assertNotEquals(-1,
177      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 8));
178
179    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
180      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
181      // confirm ringbuffer is full
182      // and ordered events
183      return slowLogPayloadsList.size() == 8 && confirmPayloadParams(0, 14, slowLogPayloadsList)
184        && confirmPayloadParams(1, 13, slowLogPayloadsList)
185        && confirmPayloadParams(2, 12, slowLogPayloadsList)
186        && confirmPayloadParams(3, 11, slowLogPayloadsList);
187    }));
188
189    AdminProtos.SlowLogResponseRequest largeLogRequest =
190      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15)
191        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build();
192    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
193      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(largeLogRequest);
194      // confirm ringbuffer is full
195      // and ordered events
196      return slowLogPayloadsList.size() == 8 && confirmPayloadParams(0, 14, slowLogPayloadsList)
197        && confirmPayloadParams(1, 13, slowLogPayloadsList)
198        && confirmPayloadParams(2, 12, slowLogPayloadsList)
199        && confirmPayloadParams(3, 11, slowLogPayloadsList);
200    }));
201
202    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
203      boolean isRingBufferCleaned =
204        namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
205
206      LOG.debug("cleared the ringbuffer of Online Slow Log records");
207
208      List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
209      // confirm ringbuffer is empty
210      return slowLogPayloadsList.size() == 0 && isRingBufferCleaned;
211    }));
212
213  }
214
215  private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
216    NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
217    namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
218    namedQueueGetRequest.setSlowLogResponseRequest(request);
219    NamedQueueGetResponse namedQueueGetResponse =
220      namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
221    return namedQueueGetResponse == null
222      ? Collections.emptyList()
223      : namedQueueGetResponse.getSlowLogPayloads();
224  }
225
226  @Test
227  public void testOnlineSlowLogWithHighRecords() throws Exception {
228
229    Configuration conf = applySlowLogRecorderConf(14);
230    Constructor<NamedQueueRecorder> constructor =
231      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
232    constructor.setAccessible(true);
233    namedQueueRecorder = constructor.newInstance(conf);
234    AdminProtos.SlowLogResponseRequest request =
235      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
236
237    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
238    LOG.debug("Initially ringbuffer of Slow Log records is empty");
239
240    for (int i = 0; i < 14 * 11; i++) {
241      RpcLogDetails rpcLogDetails =
242        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
243      namedQueueRecorder.addRecord(rpcLogDetails);
244    }
245    LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
246
247    Assert.assertNotEquals(-1,
248      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14));
249
250    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
251      List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
252
253      // confirm strict order of slow log payloads
254      return slowLogPayloads.size() == 14 && confirmPayloadParams(0, 154, slowLogPayloads)
255        && confirmPayloadParams(1, 153, slowLogPayloads)
256        && confirmPayloadParams(2, 152, slowLogPayloads)
257        && confirmPayloadParams(3, 151, slowLogPayloads)
258        && confirmPayloadParams(4, 150, slowLogPayloads)
259        && confirmPayloadParams(5, 149, slowLogPayloads)
260        && confirmPayloadParams(6, 148, slowLogPayloads)
261        && confirmPayloadParams(7, 147, slowLogPayloads)
262        && confirmPayloadParams(8, 146, slowLogPayloads)
263        && confirmPayloadParams(9, 145, slowLogPayloads)
264        && confirmPayloadParams(10, 144, slowLogPayloads)
265        && confirmPayloadParams(11, 143, slowLogPayloads)
266        && confirmPayloadParams(12, 142, slowLogPayloads)
267        && confirmPayloadParams(13, 141, slowLogPayloads);
268    }));
269
270    boolean isRingBufferCleaned =
271      namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
272    Assert.assertTrue(isRingBufferCleaned);
273    LOG.debug("cleared the ringbuffer of Online Slow Log records");
274    List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
275
276    // confirm ringbuffer is empty
277    Assert.assertEquals(slowLogPayloads.size(), 0);
278  }
279
280  @Test
281  public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception {
282    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
283    conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY);
284
285    Constructor<NamedQueueRecorder> constructor =
286      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
287    constructor.setAccessible(true);
288    namedQueueRecorder = constructor.newInstance(conf);
289    AdminProtos.SlowLogResponseRequest request =
290      AdminProtos.SlowLogResponseRequest.newBuilder().build();
291    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
292    LOG.debug("Initially ringbuffer of Slow Log records is empty");
293    for (int i = 0; i < 300; i++) {
294      RpcLogDetails rpcLogDetails =
295        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
296      namedQueueRecorder.addRecord(rpcLogDetails);
297    }
298    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
299      List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
300      return slowLogPayloads.size() == 0;
301    }));
302
303  }
304
305  @Test
306  public void testOnlineSlowLogWithDisableConfig() throws Exception {
307    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
308    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false);
309    Constructor<NamedQueueRecorder> constructor =
310      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
311    constructor.setAccessible(true);
312    namedQueueRecorder = constructor.newInstance(conf);
313
314    AdminProtos.SlowLogResponseRequest request =
315      AdminProtos.SlowLogResponseRequest.newBuilder().build();
316    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
317    LOG.debug("Initially ringbuffer of Slow Log records is empty");
318    for (int i = 0; i < 300; i++) {
319      RpcLogDetails rpcLogDetails =
320        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
321      namedQueueRecorder.addRecord(rpcLogDetails);
322    }
323    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
324      List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
325      return slowLogPayloads.size() == 0;
326    }));
327    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
328  }
329
330  @Test
331  public void testSlowLogFilters() throws Exception {
332
333    Configuration conf = applySlowLogRecorderConf(30);
334    Constructor<NamedQueueRecorder> constructor =
335      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
336    constructor.setAccessible(true);
337    namedQueueRecorder = constructor.newInstance(conf);
338    AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder()
339      .setLimit(15).setUserName("userName_87").build();
340
341    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
342
343    LOG.debug("Initially ringbuffer of Slow Log records is empty");
344
345    for (int i = 0; i < 100; i++) {
346      RpcLogDetails rpcLogDetails =
347        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
348      namedQueueRecorder.addRecord(rpcLogDetails);
349    }
350    LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter");
351
352    Assert.assertNotEquals(-1,
353      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 1));
354
355    AdminProtos.SlowLogResponseRequest requestClient = AdminProtos.SlowLogResponseRequest
356      .newBuilder().setLimit(15).setClientAddress("client_85").build();
357    Assert.assertNotEquals(-1,
358      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestClient).size() == 1));
359
360    AdminProtos.SlowLogResponseRequest requestSlowLog =
361      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
362    Assert.assertNotEquals(-1,
363      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15));
364  }
365
366  @Test
367  public void testConcurrentSlowLogEvents() throws Exception {
368
369    Configuration conf = applySlowLogRecorderConf(50000);
370    Constructor<NamedQueueRecorder> constructor =
371      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
372    constructor.setAccessible(true);
373    namedQueueRecorder = constructor.newInstance(conf);
374    AdminProtos.SlowLogResponseRequest request =
375      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
376    AdminProtos.SlowLogResponseRequest largeLogRequest =
377      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000)
378        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build();
379    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
380    LOG.debug("Initially ringbuffer of Slow Log records is empty");
381
382    for (int j = 0; j < 1000; j++) {
383
384      CompletableFuture.runAsync(() -> {
385        for (int i = 0; i < 3500; i++) {
386          RpcLogDetails rpcLogDetails =
387            getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
388          namedQueueRecorder.addRecord(rpcLogDetails);
389        }
390      });
391
392    }
393
394    Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
395
396    Assert.assertNotEquals(-1,
397      HBASE_TESTING_UTILITY.waitFor(5000, () -> getSlowLogPayloads(request).size() > 10000));
398    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(5000,
399      () -> getSlowLogPayloads(largeLogRequest).size() > 10000));
400  }
401
402  @Test
403  public void testSlowLargeLogEvents() throws Exception {
404    Configuration conf = applySlowLogRecorderConf(28);
405    Constructor<NamedQueueRecorder> constructor =
406      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
407    constructor.setAccessible(true);
408    namedQueueRecorder = constructor.newInstance(conf);
409
410    AdminProtos.SlowLogResponseRequest request =
411      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
412
413    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
414    LOG.debug("Initially ringbuffer of Slow Log records is empty");
415
416    boolean isSlowLog;
417    boolean isLargeLog;
418    for (int i = 0; i < 14 * 11; i++) {
419      if (i % 2 == 0) {
420        isSlowLog = true;
421        isLargeLog = false;
422      } else {
423        isSlowLog = false;
424        isLargeLog = true;
425      }
426      RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1),
427        "class_" + (i + 1), isSlowLog, isLargeLog);
428      namedQueueRecorder.addRecord(rpcLogDetails);
429    }
430    LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
431
432    Assert.assertNotEquals(-1,
433      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14));
434
435    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
436      List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
437
438      // confirm strict order of slow log payloads
439      return slowLogPayloads.size() == 14 && confirmPayloadParams(0, 153, slowLogPayloads)
440        && confirmPayloadParams(1, 151, slowLogPayloads)
441        && confirmPayloadParams(2, 149, slowLogPayloads)
442        && confirmPayloadParams(3, 147, slowLogPayloads)
443        && confirmPayloadParams(4, 145, slowLogPayloads)
444        && confirmPayloadParams(5, 143, slowLogPayloads)
445        && confirmPayloadParams(6, 141, slowLogPayloads)
446        && confirmPayloadParams(7, 139, slowLogPayloads)
447        && confirmPayloadParams(8, 137, slowLogPayloads)
448        && confirmPayloadParams(9, 135, slowLogPayloads)
449        && confirmPayloadParams(10, 133, slowLogPayloads)
450        && confirmPayloadParams(11, 131, slowLogPayloads)
451        && confirmPayloadParams(12, 129, slowLogPayloads)
452        && confirmPayloadParams(13, 127, slowLogPayloads);
453    }));
454
455    AdminProtos.SlowLogResponseRequest largeLogRequest =
456      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11)
457        .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG).build();
458
459    Assert.assertNotEquals(-1,
460      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(largeLogRequest).size() == 14));
461
462    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
463      List<SlowLogPayload> largeLogPayloads = getSlowLogPayloads(largeLogRequest);
464
465      // confirm strict order of slow log payloads
466      return largeLogPayloads.size() == 14 && confirmPayloadParams(0, 154, largeLogPayloads)
467        && confirmPayloadParams(1, 152, largeLogPayloads)
468        && confirmPayloadParams(2, 150, largeLogPayloads)
469        && confirmPayloadParams(3, 148, largeLogPayloads)
470        && confirmPayloadParams(4, 146, largeLogPayloads)
471        && confirmPayloadParams(5, 144, largeLogPayloads)
472        && confirmPayloadParams(6, 142, largeLogPayloads)
473        && confirmPayloadParams(7, 140, largeLogPayloads)
474        && confirmPayloadParams(8, 138, largeLogPayloads)
475        && confirmPayloadParams(9, 136, largeLogPayloads)
476        && confirmPayloadParams(10, 134, largeLogPayloads)
477        && confirmPayloadParams(11, 132, largeLogPayloads)
478        && confirmPayloadParams(12, 130, largeLogPayloads)
479        && confirmPayloadParams(13, 128, largeLogPayloads);
480    }));
481  }
482
483  @Test
484  public void testSlowLogMixedFilters() throws Exception {
485
486    Configuration conf = applySlowLogRecorderConf(30);
487    Constructor<NamedQueueRecorder> constructor =
488      NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
489    constructor.setAccessible(true);
490    namedQueueRecorder = constructor.newInstance(conf);
491    AdminProtos.SlowLogResponseRequest request = AdminProtos.SlowLogResponseRequest.newBuilder()
492      .setLimit(15).setUserName("userName_87").setClientAddress("client_88").build();
493
494    Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
495
496    for (int i = 0; i < 100; i++) {
497      RpcLogDetails rpcLogDetails =
498        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
499      namedQueueRecorder.addRecord(rpcLogDetails);
500    }
501
502    Assert.assertNotEquals(-1,
503      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 2));
504
505    AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder()
506      .setLimit(15).setUserName("userName_1").setClientAddress("client_2").build();
507    Assert.assertEquals(0, getSlowLogPayloads(request2).size());
508
509    AdminProtos.SlowLogResponseRequest request3 = AdminProtos.SlowLogResponseRequest.newBuilder()
510      .setLimit(15).setUserName("userName_87").setClientAddress("client_88")
511      .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND).build();
512    Assert.assertEquals(0, getSlowLogPayloads(request3).size());
513
514    AdminProtos.SlowLogResponseRequest request4 = AdminProtos.SlowLogResponseRequest.newBuilder()
515      .setLimit(15).setUserName("userName_87").setClientAddress("client_87")
516      .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND).build();
517    Assert.assertEquals(1, getSlowLogPayloads(request4).size());
518
519    AdminProtos.SlowLogResponseRequest request5 = AdminProtos.SlowLogResponseRequest.newBuilder()
520      .setLimit(15).setUserName("userName_88").setClientAddress("client_89")
521      .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR).build();
522    Assert.assertEquals(2, getSlowLogPayloads(request5).size());
523
524    AdminProtos.SlowLogResponseRequest requestSlowLog =
525      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
526    Assert.assertNotEquals(-1,
527      HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15));
528  }
529
530  static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) {
531    RpcCall rpcCall = getRpcCall(userName);
532    return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, true, true);
533  }
534
535  private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className,
536    boolean isSlowLog, boolean isLargeLog) {
537    RpcCall rpcCall = getRpcCall(userName);
538    return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, isSlowLog,
539      isLargeLog);
540  }
541
542  private static RpcCall getRpcCall(String userName) {
543    RpcCall rpcCall = new RpcCall() {
544      @Override
545      public BlockingService getService() {
546        return null;
547      }
548
549      @Override
550      public Descriptors.MethodDescriptor getMethod() {
551        return null;
552      }
553
554      @Override
555      public Message getParam() {
556        return getMessage();
557      }
558
559      @Override
560      public CellScanner getCellScanner() {
561        return null;
562      }
563
564      @Override
565      public long getReceiveTime() {
566        return 0;
567      }
568
569      @Override
570      public long getStartTime() {
571        return 0;
572      }
573
574      @Override
575      public void setStartTime(long startTime) {
576
577      }
578
579      @Override
580      public int getTimeout() {
581        return 0;
582      }
583
584      @Override
585      public int getPriority() {
586        return 0;
587      }
588
589      @Override
590      public long getDeadline() {
591        return 0;
592      }
593
594      @Override
595      public long getSize() {
596        return 0;
597      }
598
599      @Override
600      public RPCProtos.RequestHeader getHeader() {
601        return null;
602      }
603
604      @Override
605      public int getRemotePort() {
606        return 0;
607      }
608
609      @Override
610      public void setResponse(Message param, CellScanner cells, Throwable errorThrowable,
611        String error) {
612      }
613
614      @Override
615      public void sendResponseIfReady() throws IOException {
616      }
617
618      @Override
619      public void cleanup() {
620      }
621
622      @Override
623      public String toShortString() {
624        return null;
625      }
626
627      @Override
628      public long disconnectSince() {
629        return 0;
630      }
631
632      @Override
633      public boolean isClientCellBlockSupported() {
634        return false;
635      }
636
637      @Override
638      public Optional<User> getRequestUser() {
639        return getUser(userName);
640      }
641
642      @Override
643      public InetAddress getRemoteAddress() {
644        return null;
645      }
646
647      @Override
648      public HBaseProtos.VersionInfo getClientVersionInfo() {
649        return null;
650      }
651
652      @Override
653      public void setCallBack(RpcCallback callback) {
654      }
655
656      @Override
657      public boolean isRetryImmediatelySupported() {
658        return false;
659      }
660
661      @Override
662      public long getResponseCellSize() {
663        return 0;
664      }
665
666      @Override
667      public void incrementResponseCellSize(long cellSize) {
668      }
669
670      @Override
671      public long getResponseBlockSize() {
672        return 0;
673      }
674
675      @Override
676      public void incrementResponseBlockSize(long blockSize) {
677      }
678
679      @Override
680      public long getResponseExceptionSize() {
681        return 0;
682      }
683
684      @Override
685      public void incrementResponseExceptionSize(long exceptionSize) {
686      }
687    };
688    return rpcCall;
689  }
690
691  private static Message getMessage() {
692
693    i = (i + 1) % 3;
694
695    Message message = null;
696
697    switch (i) {
698
699      case 0: {
700        message = ClientProtos.ScanRequest.newBuilder()
701          .setRegion(
702            HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region1"))
703              .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME).build())
704          .build();
705        break;
706      }
707      case 1: {
708        message = ClientProtos.MutateRequest.newBuilder()
709          .setRegion(
710            HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region2"))
711              .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
712          .setMutation(ClientProtos.MutationProto.newBuilder()
713            .setRow(ByteString.copyFromUtf8("row123")).build())
714          .build();
715        break;
716      }
717      case 2: {
718        message = ClientProtos.GetRequest.newBuilder()
719          .setRegion(
720            HBaseProtos.RegionSpecifier.newBuilder().setValue(ByteString.copyFromUtf8("region2"))
721              .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
722          .setGet(ClientProtos.Get.newBuilder().setRow(ByteString.copyFromUtf8("row123")).build())
723          .build();
724        break;
725      }
726      default:
727        throw new RuntimeException("Not supposed to get here?");
728    }
729
730    return message;
731
732  }
733
734  private static Optional<User> getUser(String userName) {
735
736    return Optional.of(new User() {
737      @Override
738      public String getShortName() {
739        return userName;
740      }
741
742      @Override
743      public <T> T runAs(PrivilegedAction<T> action) {
744        return null;
745      }
746
747      @Override
748      public <T> T runAs(PrivilegedExceptionAction<T> action) {
749        return null;
750      }
751    });
752
753  }
754
755}