001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.regionserver.slowlog;
021
022import java.io.IOException;
023import java.net.InetAddress;
024import java.security.PrivilegedAction;
025import java.security.PrivilegedExceptionAction;
026import java.util.List;
027import java.util.Optional;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.TimeUnit;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.CellScanner;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ipc.RpcCall;
037import org.apache.hadoop.hbase.ipc.RpcCallback;
038import org.apache.hadoop.hbase.security.User;
039import org.apache.hadoop.hbase.testclassification.MasterTests;
040import org.apache.hadoop.hbase.testclassification.MediumTests;
041import org.junit.Assert;
042import org.junit.ClassRule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
049import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
050import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
051import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
052import org.apache.hbase.thirdparty.com.google.protobuf.Message;
053
054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
059
060/**
061 * Tests for Online SlowLog Provider Service
062 */
063@Category({MasterTests.class, MediumTests.class})
064public class TestSlowLogRecorder {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestSlowLogRecorder.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class);
071
072  private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
073
074  private SlowLogRecorder slowLogRecorder;
075
076  private static int i = 0;
077
078  private static Configuration applySlowLogRecorderConf(int eventSize) {
079
080    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
081    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
082    conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize);
083    return conf;
084
085  }
086
087  /**
088   * confirm that for a ringbuffer of slow logs, payload on given index of buffer
089   * has expected elements
090   *
091   * @param i index of ringbuffer logs
092   * @param j data value that was put on index i
093   * @param slowLogPayloads list of payload retrieved from {@link SlowLogRecorder}
094   * @return if actual values are as per expectations
095   */
096  private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) {
097
098    boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j);
099    boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j);
100    boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j);
101    return isClassExpected && isClientExpected && isUserExpected;
102  }
103
104  @Test
105  public void testOnlieSlowLogConsumption() throws Exception {
106
107    Configuration conf = applySlowLogRecorderConf(8);
108    slowLogRecorder = new SlowLogRecorder(conf);
109    AdminProtos.SlowLogResponseRequest request =
110      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
111
112    slowLogRecorder.clearSlowLogPayloads();
113    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
114    LOG.debug("Initially ringbuffer of Slow Log records is empty");
115
116    int i = 0;
117
118    // add 5 records initially
119    for (; i < 5; i++) {
120      RpcLogDetails rpcLogDetails =
121        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
122      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
123    }
124
125    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
126      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 5));
127    List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
128    Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads));
129    Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads));
130    Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads));
131    Assert.assertTrue(confirmPayloadParams(3, 2, slowLogPayloads));
132    Assert.assertTrue(confirmPayloadParams(4, 1, slowLogPayloads));
133
134    // add 2 more records
135    for (; i < 7; i++) {
136      RpcLogDetails rpcLogDetails =
137        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
138      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
139    }
140
141    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
142      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 7));
143
144    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
145      () -> {
146        List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
147        return slowLogPayloadsList.size() == 7
148          && confirmPayloadParams(0, 7, slowLogPayloadsList)
149          && confirmPayloadParams(5, 2, slowLogPayloadsList)
150          && confirmPayloadParams(6, 1, slowLogPayloadsList);
151      })
152    );
153
154    // add 3 more records
155    for (; i < 10; i++) {
156      RpcLogDetails rpcLogDetails =
157        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
158      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
159    }
160
161    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
162      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
163
164    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
165      () -> {
166        List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
167        // confirm ringbuffer is full
168        return slowLogPayloadsList.size() == 8
169          && confirmPayloadParams(7, 3, slowLogPayloadsList)
170          && confirmPayloadParams(0, 10, slowLogPayloadsList)
171          && confirmPayloadParams(1, 9, slowLogPayloadsList);
172      })
173    );
174
175    // add 4 more records
176    for (; i < 14; i++) {
177      RpcLogDetails rpcLogDetails =
178        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
179      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
180    }
181
182    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
183      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
184
185    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
186      () -> {
187        List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
188        // confirm ringbuffer is full
189        // and ordered events
190        return slowLogPayloadsList.size() == 8
191          && confirmPayloadParams(0, 14, slowLogPayloadsList)
192          && confirmPayloadParams(1, 13, slowLogPayloadsList)
193          && confirmPayloadParams(2, 12, slowLogPayloadsList)
194          && confirmPayloadParams(3, 11, slowLogPayloadsList);
195      })
196    );
197
198    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
199      () -> {
200        List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getLargeLogPayloads(request);
201        // confirm ringbuffer is full
202        // and ordered events
203        return slowLogPayloadsList.size() == 8
204          && confirmPayloadParams(0, 14, slowLogPayloadsList)
205          && confirmPayloadParams(1, 13, slowLogPayloadsList)
206          && confirmPayloadParams(2, 12, slowLogPayloadsList)
207          && confirmPayloadParams(3, 11, slowLogPayloadsList);
208      })
209    );
210
211    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
212      () -> {
213        boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
214
215        LOG.debug("cleared the ringbuffer of Online Slow Log records");
216
217        List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
218        // confirm ringbuffer is empty
219        return slowLogPayloadsList.size() == 0 && isRingBufferCleaned;
220      })
221    );
222
223  }
224
225  @Test
226  public void testOnlineSlowLogWithHighRecords() throws Exception {
227
228    Configuration conf = applySlowLogRecorderConf(14);
229    slowLogRecorder = new SlowLogRecorder(conf);
230    AdminProtos.SlowLogResponseRequest request =
231      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
232
233    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
234    LOG.debug("Initially ringbuffer of Slow Log records is empty");
235
236    for (int i = 0; i < 14 * 11; i++) {
237      RpcLogDetails rpcLogDetails =
238        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
239      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
240    }
241    LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
242
243    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
244      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
245
246    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
247      () -> {
248        List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
249
250        // confirm strict order of slow log payloads
251        return slowLogPayloads.size() == 14
252          && confirmPayloadParams(0, 154, slowLogPayloads)
253          && confirmPayloadParams(1, 153, slowLogPayloads)
254          && confirmPayloadParams(2, 152, slowLogPayloads)
255          && confirmPayloadParams(3, 151, slowLogPayloads)
256          && confirmPayloadParams(4, 150, slowLogPayloads)
257          && confirmPayloadParams(5, 149, slowLogPayloads)
258          && confirmPayloadParams(6, 148, slowLogPayloads)
259          && confirmPayloadParams(7, 147, slowLogPayloads)
260          && confirmPayloadParams(8, 146, slowLogPayloads)
261          && confirmPayloadParams(9, 145, slowLogPayloads)
262          && confirmPayloadParams(10, 144, slowLogPayloads)
263          && confirmPayloadParams(11, 143, slowLogPayloads)
264          && confirmPayloadParams(12, 142, slowLogPayloads)
265          && confirmPayloadParams(13, 141, slowLogPayloads);
266      })
267    );
268
269    boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
270    Assert.assertTrue(isRingBufferCleaned);
271    LOG.debug("cleared the ringbuffer of Online Slow Log records");
272    List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
273
274    // confirm ringbuffer is empty
275    Assert.assertEquals(slowLogPayloads.size(), 0);
276
277  }
278
279  @Test
280  public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception {
281
282    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
283    conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY);
284
285    slowLogRecorder = new SlowLogRecorder(conf);
286    AdminProtos.SlowLogResponseRequest request =
287      AdminProtos.SlowLogResponseRequest.newBuilder().build();
288    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
289    LOG.debug("Initially ringbuffer of Slow Log records is empty");
290
291    for (int i = 0; i < 300; i++) {
292      RpcLogDetails rpcLogDetails =
293        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
294      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
295    }
296
297    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
298      () -> {
299        List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
300        return slowLogPayloads.size() == 0;
301      })
302    );
303
304  }
305
306  @Test
307  public void testOnlineSlowLogWithDisableConfig() throws Exception {
308
309    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
310    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false);
311
312    slowLogRecorder = new SlowLogRecorder(conf);
313    AdminProtos.SlowLogResponseRequest request =
314      AdminProtos.SlowLogResponseRequest.newBuilder().build();
315    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
316    LOG.debug("Initially ringbuffer of Slow Log records is empty");
317
318    for (int i = 0; i < 300; i++) {
319      RpcLogDetails rpcLogDetails =
320        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
321      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
322    }
323
324    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
325      () -> {
326        List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
327        return slowLogPayloads.size() == 0;
328      })
329    );
330    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
331
332  }
333
334  @Test
335  public void testSlowLogFilters() throws Exception {
336
337    Configuration conf = applySlowLogRecorderConf(30);
338    slowLogRecorder = new SlowLogRecorder(conf);
339    AdminProtos.SlowLogResponseRequest request =
340      AdminProtos.SlowLogResponseRequest.newBuilder()
341        .setLimit(15)
342        .setUserName("userName_87")
343        .build();
344
345    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
346
347    LOG.debug("Initially ringbuffer of Slow Log records is empty");
348
349    for (int i = 0; i < 100; i++) {
350      RpcLogDetails rpcLogDetails =
351        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
352      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
353    }
354    LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter");
355
356    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
357      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 1));
358
359    AdminProtos.SlowLogResponseRequest requestClient =
360      AdminProtos.SlowLogResponseRequest.newBuilder()
361        .setLimit(15)
362        .setClientAddress("client_85")
363        .build();
364    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
365      () -> slowLogRecorder.getSlowLogPayloads(requestClient).size() == 1));
366
367    AdminProtos.SlowLogResponseRequest requestSlowLog =
368      AdminProtos.SlowLogResponseRequest.newBuilder()
369        .setLimit(15)
370        .build();
371    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
372      () -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15));
373
374  }
375
376  @Test
377  public void testConcurrentSlowLogEvents() throws Exception {
378
379    Configuration conf = applySlowLogRecorderConf(50000);
380    slowLogRecorder = new SlowLogRecorder(conf);
381    AdminProtos.SlowLogResponseRequest request =
382      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
383    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
384    LOG.debug("Initially ringbuffer of Slow Log records is empty");
385
386    for (int j = 0; j < 1000; j++) {
387
388      CompletableFuture.runAsync(() -> {
389        for (int i = 0; i < 3500; i++) {
390          RpcLogDetails rpcLogDetails =
391            getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
392          slowLogRecorder.addSlowLogPayload(rpcLogDetails);
393        }
394      });
395
396    }
397
398    Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
399
400    slowLogRecorder.clearSlowLogPayloads();
401
402    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
403      5000, () -> slowLogRecorder.getSlowLogPayloads(request).size() > 10000));
404    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
405      5000, () -> slowLogRecorder.getLargeLogPayloads(request).size() > 10000));
406  }
407
408  @Test
409  public void testSlowLargeLogEvents() throws Exception {
410    Configuration conf = applySlowLogRecorderConf(28);
411    slowLogRecorder = new SlowLogRecorder(conf);
412    AdminProtos.SlowLogResponseRequest request =
413      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
414
415    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
416    LOG.debug("Initially ringbuffer of Slow Log records is empty");
417
418    boolean isSlowLog;
419    boolean isLargeLog;
420    for (int i = 0; i < 14 * 11; i++) {
421      if (i % 2 == 0) {
422        isSlowLog = true;
423        isLargeLog = false;
424      } else {
425        isSlowLog = false;
426        isLargeLog = true;
427      }
428      RpcLogDetails rpcLogDetails =
429        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1),
430          isSlowLog, isLargeLog);
431      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
432    }
433    LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
434
435    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
436      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
437
438    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
439      () -> {
440        List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
441
442        // confirm strict order of slow log payloads
443        return slowLogPayloads.size() == 14
444          && confirmPayloadParams(0, 153, slowLogPayloads)
445          && confirmPayloadParams(1, 151, slowLogPayloads)
446          && confirmPayloadParams(2, 149, slowLogPayloads)
447          && confirmPayloadParams(3, 147, slowLogPayloads)
448          && confirmPayloadParams(4, 145, slowLogPayloads)
449          && confirmPayloadParams(5, 143, slowLogPayloads)
450          && confirmPayloadParams(6, 141, slowLogPayloads)
451          && confirmPayloadParams(7, 139, slowLogPayloads)
452          && confirmPayloadParams(8, 137, slowLogPayloads)
453          && confirmPayloadParams(9, 135, slowLogPayloads)
454          && confirmPayloadParams(10, 133, slowLogPayloads)
455          && confirmPayloadParams(11, 131, slowLogPayloads)
456          && confirmPayloadParams(12, 129, slowLogPayloads)
457          && confirmPayloadParams(13, 127, slowLogPayloads);
458      })
459    );
460
461    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
462      () -> slowLogRecorder.getLargeLogPayloads(request).size() == 14));
463
464    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
465      () -> {
466        List<SlowLogPayload> largeLogPayloads = slowLogRecorder.getLargeLogPayloads(request);
467
468        // confirm strict order of slow log payloads
469        return largeLogPayloads.size() == 14
470          && confirmPayloadParams(0, 154, largeLogPayloads)
471          && confirmPayloadParams(1, 152, largeLogPayloads)
472          && confirmPayloadParams(2, 150, largeLogPayloads)
473          && confirmPayloadParams(3, 148, largeLogPayloads)
474          && confirmPayloadParams(4, 146, largeLogPayloads)
475          && confirmPayloadParams(5, 144, largeLogPayloads)
476          && confirmPayloadParams(6, 142, largeLogPayloads)
477          && confirmPayloadParams(7, 140, largeLogPayloads)
478          && confirmPayloadParams(8, 138, largeLogPayloads)
479          && confirmPayloadParams(9, 136, largeLogPayloads)
480          && confirmPayloadParams(10, 134, largeLogPayloads)
481          && confirmPayloadParams(11, 132, largeLogPayloads)
482          && confirmPayloadParams(12, 130, largeLogPayloads)
483          && confirmPayloadParams(13, 128, largeLogPayloads);
484      })
485    );
486
487  }
488
489  @Test
490  public void testSlowLogMixedFilters() throws Exception {
491
492    Configuration conf = applySlowLogRecorderConf(30);
493    slowLogRecorder = new SlowLogRecorder(conf);
494    AdminProtos.SlowLogResponseRequest request =
495      AdminProtos.SlowLogResponseRequest.newBuilder()
496        .setLimit(15)
497        .setUserName("userName_87")
498        .setClientAddress("client_88")
499        .build();
500
501    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
502
503    for (int i = 0; i < 100; i++) {
504      RpcLogDetails rpcLogDetails =
505        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
506      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
507    }
508
509    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
510      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 2));
511
512    AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder()
513      .setLimit(15)
514      .setUserName("userName_1")
515      .setClientAddress("client_2")
516      .build();
517    Assert.assertEquals(0, slowLogRecorder.getSlowLogPayloads(request2).size());
518
519    AdminProtos.SlowLogResponseRequest request3 =
520      AdminProtos.SlowLogResponseRequest.newBuilder()
521        .setLimit(15)
522        .setUserName("userName_87")
523        .setClientAddress("client_88")
524        .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND)
525        .build();
526    Assert.assertEquals(0, slowLogRecorder.getSlowLogPayloads(request3).size());
527
528    AdminProtos.SlowLogResponseRequest request4 =
529      AdminProtos.SlowLogResponseRequest.newBuilder()
530        .setLimit(15)
531        .setUserName("userName_87")
532        .setClientAddress("client_87")
533        .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND)
534        .build();
535    Assert.assertEquals(1, slowLogRecorder.getSlowLogPayloads(request4).size());
536
537    AdminProtos.SlowLogResponseRequest request5 =
538      AdminProtos.SlowLogResponseRequest.newBuilder()
539        .setLimit(15)
540        .setUserName("userName_88")
541        .setClientAddress("client_89")
542        .setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR)
543        .build();
544    Assert.assertEquals(2, slowLogRecorder.getSlowLogPayloads(request5).size());
545
546    AdminProtos.SlowLogResponseRequest requestSlowLog =
547      AdminProtos.SlowLogResponseRequest.newBuilder()
548        .setLimit(15)
549        .build();
550    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
551      () -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15));
552  }
553
554  static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) {
555    RpcCall rpcCall = getRpcCall(userName);
556    return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, true, true);
557  }
558
559  private RpcLogDetails getRpcLogDetails(String userName, String clientAddress,
560      String className, boolean isSlowLog, boolean isLargeLog) {
561    RpcCall rpcCall = getRpcCall(userName);
562    return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, isSlowLog,
563      isLargeLog);
564  }
565
566  private static RpcCall getRpcCall(String userName) {
567    RpcCall rpcCall = new RpcCall() {
568      @Override
569      public BlockingService getService() {
570        return null;
571      }
572
573      @Override
574      public Descriptors.MethodDescriptor getMethod() {
575        return null;
576      }
577
578      @Override
579      public Message getParam() {
580        return getMessage();
581      }
582
583      @Override
584      public CellScanner getCellScanner() {
585        return null;
586      }
587
588      @Override
589      public long getReceiveTime() {
590        return 0;
591      }
592
593      @Override
594      public long getStartTime() {
595        return 0;
596      }
597
598      @Override
599      public void setStartTime(long startTime) {
600
601      }
602
603      @Override
604      public int getTimeout() {
605        return 0;
606      }
607
608      @Override
609      public int getPriority() {
610        return 0;
611      }
612
613      @Override
614      public long getDeadline() {
615        return 0;
616      }
617
618      @Override
619      public long getSize() {
620        return 0;
621      }
622
623      @Override
624      public RPCProtos.RequestHeader getHeader() {
625        return null;
626      }
627
628      @Override
629      public int getRemotePort() {
630        return 0;
631      }
632
633      @Override
634      public void setResponse(Message param, CellScanner cells,
635        Throwable errorThrowable, String error) {
636      }
637
638      @Override
639      public void sendResponseIfReady() throws IOException {
640      }
641
642      @Override
643      public void cleanup() {
644      }
645
646      @Override
647      public String toShortString() {
648        return null;
649      }
650
651      @Override
652      public long disconnectSince() {
653        return 0;
654      }
655
656      @Override
657      public boolean isClientCellBlockSupported() {
658        return false;
659      }
660
661      @Override
662      public Optional<User> getRequestUser() {
663        return getUser(userName);
664      }
665
666      @Override
667      public InetAddress getRemoteAddress() {
668        return null;
669      }
670
671      @Override
672      public HBaseProtos.VersionInfo getClientVersionInfo() {
673        return null;
674      }
675
676      @Override
677      public void setCallBack(RpcCallback callback) {
678      }
679
680      @Override
681      public boolean isRetryImmediatelySupported() {
682        return false;
683      }
684
685      @Override
686      public long getResponseCellSize() {
687        return 0;
688      }
689
690      @Override
691      public void incrementResponseCellSize(long cellSize) {
692      }
693
694      @Override
695      public long getResponseBlockSize() {
696        return 0;
697      }
698
699      @Override
700      public void incrementResponseBlockSize(long blockSize) {
701      }
702
703      @Override
704      public long getResponseExceptionSize() {
705        return 0;
706      }
707
708      @Override
709      public void incrementResponseExceptionSize(long exceptionSize) {
710      }
711    };
712    return rpcCall;
713  }
714
715  private static Message getMessage() {
716
717    i = (i + 1) % 3;
718
719    Message message = null;
720
721    switch (i) {
722
723      case 0: {
724        message = ClientProtos.ScanRequest.newBuilder()
725          .setRegion(HBaseProtos.RegionSpecifier.newBuilder()
726            .setValue(ByteString.copyFromUtf8("region1"))
727            .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)
728            .build())
729          .build();
730        break;
731      }
732      case 1: {
733        message = ClientProtos.MutateRequest.newBuilder()
734          .setRegion(HBaseProtos.RegionSpecifier.newBuilder()
735            .setValue(ByteString.copyFromUtf8("region2"))
736            .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
737          .setMutation(ClientProtos.MutationProto.newBuilder()
738            .setRow(ByteString.copyFromUtf8("row123"))
739            .build())
740          .build();
741        break;
742      }
743      case 2: {
744        message = ClientProtos.GetRequest.newBuilder()
745          .setRegion(HBaseProtos.RegionSpecifier.newBuilder()
746            .setValue(ByteString.copyFromUtf8("region2"))
747            .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
748          .setGet(ClientProtos.Get.newBuilder()
749            .setRow(ByteString.copyFromUtf8("row123"))
750            .build())
751          .build();
752        break;
753      }
754      default:
755        throw new RuntimeException("Not supposed to get here?");
756    }
757
758    return message;
759
760  }
761
762  private static Optional<User> getUser(String userName) {
763
764    return Optional.of(new User() {
765
766
767      @Override
768      public String getShortName() {
769        return userName;
770      }
771
772
773      @Override
774      public <T> T runAs(PrivilegedAction<T> action) {
775        return null;
776      }
777
778
779      @Override
780      public <T> T runAs(PrivilegedExceptionAction<T> action) throws
781          IOException, InterruptedException {
782        return null;
783      }
784
785    });
786
787  }
788
789}