001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027import static org.mockito.ArgumentMatchers.any;
028import static org.mockito.ArgumentMatchers.anyBoolean;
029import static org.mockito.ArgumentMatchers.anyInt;
030import static org.mockito.Mockito.when;
031
032import java.io.IOException;
033import java.util.Iterator;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellScanner;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.RegionLocations;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
045import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
046import org.apache.hadoop.hbase.testclassification.SmallTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.junit.After;
049import org.junit.Before;
050import org.junit.ClassRule;
051import org.junit.Rule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.junit.rules.TestName;
055import org.mockito.InOrder;
056import org.mockito.Mockito;
057import org.mockito.invocation.InvocationOnMock;
058import org.mockito.stubbing.Answer;
059
060/**
061 * Test the ClientScanner.
062 */
063@Category(SmallTests.class)
064public class TestClientScanner {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestClientScanner.class);
069
070  Scan scan;
071  ExecutorService pool;
072  Configuration conf;
073
074  ClusterConnection clusterConn;
075  RpcRetryingCallerFactory rpcFactory;
076  RpcControllerFactory controllerFactory;
077
078  @Rule
079  public TestName name = new TestName();
080
081  @Before
082  public void setup() throws IOException {
083    clusterConn = Mockito.mock(ClusterConnection.class);
084    rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
085    controllerFactory = Mockito.mock(RpcControllerFactory.class);
086    pool = Executors.newSingleThreadExecutor();
087    scan = new Scan();
088    conf = new Configuration();
089    Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
090  }
091
092  @After
093  public void teardown() {
094    if (null != pool) {
095      pool.shutdownNow();
096    }
097  }
098
099  private static class MockClientScanner extends ClientSimpleScanner {
100
101    private boolean rpcFinished = false;
102    private boolean rpcFinishedFired = false;
103    private boolean initialized = false;
104
105    public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
106      ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
107      RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
108      throws IOException {
109      super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
110        HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
111        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout);
112    }
113
114    @Override
115    protected boolean moveToNextRegion() {
116      if (!initialized) {
117        initialized = true;
118        return super.moveToNextRegion();
119      }
120      if (!rpcFinished) {
121        return super.moveToNextRegion();
122      }
123      // Enforce that we don't short-circuit more than once
124      if (rpcFinishedFired) {
125        throw new RuntimeException(
126          "Expected nextScanner to only be called once after " + " short-circuit was triggered.");
127      }
128      rpcFinishedFired = true;
129      return false;
130    }
131
132    public void setRpcFinished(boolean rpcFinished) {
133      this.rpcFinished = rpcFinished;
134    }
135  }
136
137  @Test
138  @SuppressWarnings("unchecked")
139  public void testNoResultsHint() throws IOException {
140    final Result[] results = new Result[1];
141    KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
142      KeyValue.Type.Maximum);
143    results[0] = Result.create(new Cell[] { kv1 });
144
145    RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
146
147    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
148    Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt()))
149      .thenAnswer(new Answer<Result[]>() {
150        private int count = 0;
151
152        @Override
153        public Result[] answer(InvocationOnMock invocation) throws Throwable {
154          ScannerCallableWithReplicas callable = invocation.getArgument(0);
155          switch (count) {
156            case 0: // initialize
157              count++;
158              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.UNKNOWN);
159              return results;
160            case 1: // detect no more results
161            case 2: // close
162              count++;
163              return new Result[0];
164            default:
165              throw new RuntimeException("Expected only 2 invocations");
166          }
167        }
168      });
169
170    // Set a much larger cache and buffer size than we'll provide
171    scan.setCaching(100);
172    scan.setMaxResultSize(1000 * 1000);
173
174    try (MockClientScanner scanner =
175      new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
176        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
177
178      scanner.setRpcFinished(true);
179
180      InOrder inOrder = Mockito.inOrder(caller);
181
182      scanner.loadCache();
183
184      // One for fetching the results
185      // One for fetching empty results and quit as we do not have moreResults hint.
186      inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(Mockito.any(), Mockito.anyInt());
187
188      assertEquals(1, scanner.cache.size());
189      Result r = scanner.cache.poll();
190      assertNotNull(r);
191      CellScanner cs = r.cellScanner();
192      assertTrue(cs.advance());
193      assertEquals(kv1, cs.current());
194      assertFalse(cs.advance());
195    }
196  }
197
198  @Test
199  @SuppressWarnings("unchecked")
200  public void testSizeLimit() throws IOException {
201    final Result[] results = new Result[1];
202    KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
203      KeyValue.Type.Maximum);
204    results[0] = Result.create(new Cell[] { kv1 });
205
206    RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
207
208    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
209    Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt()))
210      .thenAnswer(new Answer<Result[]>() {
211        private int count = 0;
212
213        @Override
214        public Result[] answer(InvocationOnMock invocation) throws Throwable {
215          ScannerCallableWithReplicas callable = invocation.getArgument(0);
216          switch (count) {
217            case 0: // initialize
218              count++;
219              // if we set no here the implementation will trigger a close
220              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES);
221              return results;
222            case 1: // close
223              count++;
224              return null;
225            default:
226              throw new RuntimeException("Expected only 2 invocations");
227          }
228        }
229      });
230
231    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
232
233    // Set a much larger cache
234    scan.setCaching(100);
235    // The single key-value will exit the loop
236    scan.setMaxResultSize(1);
237
238    try (MockClientScanner scanner =
239      new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
240        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
241      InOrder inOrder = Mockito.inOrder(caller);
242
243      scanner.loadCache();
244
245      inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(Mockito.any(), Mockito.anyInt());
246
247      assertEquals(1, scanner.cache.size());
248      Result r = scanner.cache.poll();
249      assertNotNull(r);
250      CellScanner cs = r.cellScanner();
251      assertTrue(cs.advance());
252      assertEquals(kv1, cs.current());
253      assertFalse(cs.advance());
254    }
255  }
256
257  @Test
258  @SuppressWarnings("unchecked")
259  public void testCacheLimit() throws IOException {
260    KeyValue kv1 = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
261      KeyValue.Type.Maximum);
262    KeyValue kv2 = new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
263      KeyValue.Type.Maximum);
264    KeyValue kv3 = new KeyValue(Bytes.toBytes("row3"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
265      KeyValue.Type.Maximum);
266    final Result[] results = new Result[] { Result.create(new Cell[] { kv1 }),
267      Result.create(new Cell[] { kv2 }), Result.create(new Cell[] { kv3 }) };
268
269    RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
270
271    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
272    Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt()))
273      .thenAnswer(new Answer<Result[]>() {
274        private int count = 0;
275
276        @Override
277        public Result[] answer(InvocationOnMock invocation) throws Throwable {
278          ScannerCallableWithReplicas callable = invocation.getArgument(0);
279          switch (count) {
280            case 0: // initialize
281              count++;
282              // if we set no here the implementation will trigger a close
283              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES);
284              return results;
285            case 1: // close
286              count++;
287              return null;
288            default:
289              throw new RuntimeException("Expected only 2 invocations");
290          }
291        }
292      });
293
294    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
295
296    // Set a small cache
297    scan.setCaching(1);
298    // Set a very large size
299    scan.setMaxResultSize(1000 * 1000);
300
301    try (MockClientScanner scanner =
302      new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
303        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
304      InOrder inOrder = Mockito.inOrder(caller);
305
306      scanner.loadCache();
307
308      inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(Mockito.any(), Mockito.anyInt());
309
310      assertEquals(3, scanner.cache.size());
311      Result r = scanner.cache.poll();
312      assertNotNull(r);
313      CellScanner cs = r.cellScanner();
314      assertTrue(cs.advance());
315      assertEquals(kv1, cs.current());
316      assertFalse(cs.advance());
317
318      r = scanner.cache.poll();
319      assertNotNull(r);
320      cs = r.cellScanner();
321      assertTrue(cs.advance());
322      assertEquals(kv2, cs.current());
323      assertFalse(cs.advance());
324
325      r = scanner.cache.poll();
326      assertNotNull(r);
327      cs = r.cellScanner();
328      assertTrue(cs.advance());
329      assertEquals(kv3, cs.current());
330      assertFalse(cs.advance());
331    }
332  }
333
334  @Test
335  @SuppressWarnings("unchecked")
336  public void testNoMoreResults() throws IOException {
337    final Result[] results = new Result[1];
338    KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
339      KeyValue.Type.Maximum);
340    results[0] = Result.create(new Cell[] { kv1 });
341
342    RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
343
344    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
345    Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt()))
346      .thenAnswer(new Answer<Result[]>() {
347        private int count = 0;
348
349        @Override
350        public Result[] answer(InvocationOnMock invocation) throws Throwable {
351          ScannerCallableWithReplicas callable = invocation.getArgument(0);
352          switch (count) {
353            case 0: // initialize
354              count++;
355              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.NO);
356              return results;
357            case 1: // close
358              count++;
359              return null;
360            default:
361              throw new RuntimeException("Expected only 2 invocations");
362          }
363        }
364      });
365
366    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
367
368    // Set a much larger cache and buffer size than we'll provide
369    scan.setCaching(100);
370    scan.setMaxResultSize(1000 * 1000);
371
372    try (MockClientScanner scanner =
373      new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
374        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
375      scanner.setRpcFinished(true);
376
377      InOrder inOrder = Mockito.inOrder(caller);
378
379      scanner.loadCache();
380
381      inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(Mockito.any(), Mockito.anyInt());
382
383      assertEquals(1, scanner.cache.size());
384      Result r = scanner.cache.poll();
385      assertNotNull(r);
386      CellScanner cs = r.cellScanner();
387      assertTrue(cs.advance());
388      assertEquals(kv1, cs.current());
389      assertFalse(cs.advance());
390    }
391  }
392
393  @Test
394  @SuppressWarnings("unchecked")
395  public void testMoreResults() throws IOException {
396    final Result[] results1 = new Result[1];
397    KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
398      KeyValue.Type.Maximum);
399    results1[0] = Result.create(new Cell[] { kv1 });
400
401    final Result[] results2 = new Result[1];
402    KeyValue kv2 = new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
403      KeyValue.Type.Maximum);
404    results2[0] = Result.create(new Cell[] { kv2 });
405
406    RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
407
408    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
409    Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt()))
410      .thenAnswer(new Answer<Result[]>() {
411        private int count = 0;
412
413        @Override
414        public Result[] answer(InvocationOnMock invocation) throws Throwable {
415          ScannerCallableWithReplicas callable = invocation.getArgument(0);
416          switch (count) {
417            case 0: // initialize
418              count++;
419              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES);
420              return results1;
421            case 1:
422              count++;
423              // The server reports back false WRT more results
424              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.NO);
425              return results2;
426            case 2: // close
427              count++;
428              return null;
429            default:
430              throw new RuntimeException("Expected only 3 invocations");
431          }
432        }
433      });
434
435    // Set a much larger cache and buffer size than we'll provide
436    scan.setCaching(100);
437    scan.setMaxResultSize(1000 * 1000);
438
439    try (MockClientScanner scanner =
440      new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
441        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
442      InOrder inOrder = Mockito.inOrder(caller);
443      scanner.setRpcFinished(true);
444
445      scanner.loadCache();
446
447      inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(Mockito.any(), Mockito.anyInt());
448
449      assertEquals(2, scanner.cache.size());
450      Result r = scanner.cache.poll();
451      assertNotNull(r);
452      CellScanner cs = r.cellScanner();
453      assertTrue(cs.advance());
454      assertEquals(kv1, cs.current());
455      assertFalse(cs.advance());
456
457      r = scanner.cache.poll();
458      assertNotNull(r);
459      cs = r.cellScanner();
460      assertTrue(cs.advance());
461      assertEquals(kv2, cs.current());
462      assertFalse(cs.advance());
463    }
464  }
465
466  /**
467   * Tests the case where all replicas of a region throw an exception. It should not cause a hang
468   * but the exception should propagate to the client
469   */
470  @Test
471  public void testExceptionsFromReplicasArePropagated() throws IOException {
472    scan.setConsistency(Consistency.TIMELINE);
473
474    // Mock a caller which calls the callable for ScannerCallableWithReplicas,
475    // but throws an exception for the actual scanner calls via callWithRetries.
476    rpcFactory = new MockRpcRetryingCallerFactory(conf);
477    conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY,
478      MockRpcRetryingCallerFactory.class.getName());
479
480    // mock 3 replica locations
481    when(clusterConn.locateRegion((TableName) any(), (byte[]) any(), anyBoolean(), anyBoolean(),
482      anyInt())).thenReturn(new RegionLocations(null, null, null));
483
484    try (MockClientScanner scanner =
485      new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
486        rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) {
487      Iterator<Result> iter = scanner.iterator();
488      while (iter.hasNext()) {
489        iter.next();
490      }
491      fail("Should have failed with RetriesExhaustedException");
492    } catch (RuntimeException expected) {
493      assertThat(expected.getCause(), instanceOf(RetriesExhaustedException.class));
494    }
495  }
496
497  public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory {
498
499    public MockRpcRetryingCallerFactory(Configuration conf) {
500      super(conf);
501    }
502
503    @Override
504    public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
505      return new RpcRetryingCaller<T>() {
506        @Override
507        public void cancel() {
508        }
509
510        @Override
511        public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
512          throws IOException, RuntimeException {
513          throw new IOException("Scanner exception");
514        }
515
516        @Override
517        public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
518          throws IOException, RuntimeException {
519          try {
520            return callable.call(callTimeout);
521          } catch (IOException e) {
522            throw e;
523          } catch (Exception e) {
524            throw new RuntimeException(e);
525          }
526        }
527      };
528    }
529  }
530}