001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027import static org.mockito.ArgumentMatchers.any;
028import static org.mockito.ArgumentMatchers.anyBoolean;
029import static org.mockito.ArgumentMatchers.anyInt;
030import static org.mockito.Mockito.when;
031
032import java.io.IOException;
033import java.util.Collections;
034import java.util.Iterator;
035import java.util.concurrent.ExecutorService;
036import java.util.concurrent.Executors;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellScanner;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.RegionLocations;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
046import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
047import org.apache.hadoop.hbase.testclassification.SmallTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.junit.After;
050import org.junit.Before;
051import org.junit.ClassRule;
052import org.junit.Rule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.junit.rules.TestName;
056import org.mockito.InOrder;
057import org.mockito.Mockito;
058import org.mockito.invocation.InvocationOnMock;
059import org.mockito.stubbing.Answer;
060
061/**
062 * Test the ClientScanner.
063 */
064@Category(SmallTests.class)
065public class TestClientScanner {
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestClientScanner.class);
070
071  Scan scan;
072  ExecutorService pool;
073  Configuration conf;
074  ConnectionConfiguration connectionConfig;
075
076  ClusterConnection clusterConn;
077  RpcRetryingCallerFactory rpcFactory;
078  RpcControllerFactory controllerFactory;
079
080  @Rule
081  public TestName name = new TestName();
082
083  @Before
084  public void setup() throws IOException {
085    clusterConn = Mockito.mock(ClusterConnection.class);
086    rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
087    controllerFactory = Mockito.mock(RpcControllerFactory.class);
088    pool = Executors.newSingleThreadExecutor();
089    scan = new Scan();
090    conf = new Configuration();
091    connectionConfig = new ConnectionConfiguration(conf);
092    Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
093    Mockito.when(clusterConn.getConnectionConfiguration()).thenReturn(connectionConfig);
094  }
095
096  @After
097  public void teardown() {
098    if (null != pool) {
099      pool.shutdownNow();
100    }
101  }
102
103  private static class MockClientScanner extends ClientSimpleScanner {
104
105    private boolean rpcFinished = false;
106    private boolean rpcFinishedFired = false;
107    private boolean initialized = false;
108
109    public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
110      ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
111      RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
112      ConnectionConfiguration connectionConfig) throws IOException {
113      super(conf, scan, scan, tableName, connection, rpcFactory, controllerFactory, pool,
114        HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
115        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout,
116        connectionConfig, Collections.emptyMap());
117    }
118
119    @Override
120    protected boolean moveToNextRegion() {
121      if (!initialized) {
122        initialized = true;
123        return super.moveToNextRegion();
124      }
125      if (!rpcFinished) {
126        return super.moveToNextRegion();
127      }
128      // Enforce that we don't short-circuit more than once
129      if (rpcFinishedFired) {
130        throw new RuntimeException(
131          "Expected nextScanner to only be called once after " + " short-circuit was triggered.");
132      }
133      rpcFinishedFired = true;
134      return false;
135    }
136
137    public void setRpcFinished(boolean rpcFinished) {
138      this.rpcFinished = rpcFinished;
139    }
140  }
141
142  @Test
143  @SuppressWarnings("unchecked")
144  public void testNoResultsHint() throws IOException {
145    final Result[] results = new Result[1];
146    KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
147      KeyValue.Type.Maximum);
148    results[0] = Result.create(new Cell[] { kv1 });
149
150    RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
151
152    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
153    Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt()))
154      .thenAnswer(new Answer<Result[]>() {
155        private int count = 0;
156
157        @Override
158        public Result[] answer(InvocationOnMock invocation) throws Throwable {
159          ScannerCallableWithReplicas callable = invocation.getArgument(0);
160          switch (count) {
161            case 0: // initialize
162              count++;
163              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.UNKNOWN);
164              return results;
165            case 1: // detect no more results
166            case 2: // close
167              count++;
168              return new Result[0];
169            default:
170              throw new RuntimeException("Expected only 2 invocations");
171          }
172        }
173      });
174
175    // Set a much larger cache and buffer size than we'll provide
176    scan.setCaching(100);
177    scan.setMaxResultSize(1000 * 1000);
178
179    try (MockClientScanner scanner =
180      new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
181        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
182
183      scanner.setRpcFinished(true);
184
185      InOrder inOrder = Mockito.inOrder(caller);
186
187      scanner.loadCache();
188
189      // One for fetching the results
190      // One for fetching empty results and quit as we do not have moreResults hint.
191      inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(Mockito.any(), Mockito.anyInt());
192
193      assertEquals(1, scanner.cache.size());
194      Result r = scanner.cache.poll();
195      assertNotNull(r);
196      CellScanner cs = r.cellScanner();
197      assertTrue(cs.advance());
198      assertEquals(kv1, cs.current());
199      assertFalse(cs.advance());
200    }
201  }
202
203  @Test
204  @SuppressWarnings("unchecked")
205  public void testSizeLimit() throws IOException {
206    final Result[] results = new Result[1];
207    KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
208      KeyValue.Type.Maximum);
209    results[0] = Result.create(new Cell[] { kv1 });
210
211    RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
212
213    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
214    Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt()))
215      .thenAnswer(new Answer<Result[]>() {
216        private int count = 0;
217
218        @Override
219        public Result[] answer(InvocationOnMock invocation) throws Throwable {
220          ScannerCallableWithReplicas callable = invocation.getArgument(0);
221          switch (count) {
222            case 0: // initialize
223              count++;
224              // if we set no here the implementation will trigger a close
225              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES);
226              return results;
227            case 1: // close
228              count++;
229              return null;
230            default:
231              throw new RuntimeException("Expected only 2 invocations");
232          }
233        }
234      });
235
236    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
237
238    // Set a much larger cache
239    scan.setCaching(100);
240    // The single key-value will exit the loop
241    scan.setMaxResultSize(1);
242
243    try (MockClientScanner scanner =
244      new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
245        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
246      InOrder inOrder = Mockito.inOrder(caller);
247
248      scanner.loadCache();
249
250      inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(Mockito.any(), Mockito.anyInt());
251
252      assertEquals(1, scanner.cache.size());
253      Result r = scanner.cache.poll();
254      assertNotNull(r);
255      CellScanner cs = r.cellScanner();
256      assertTrue(cs.advance());
257      assertEquals(kv1, cs.current());
258      assertFalse(cs.advance());
259    }
260  }
261
262  @Test
263  @SuppressWarnings("unchecked")
264  public void testCacheLimit() throws IOException {
265    KeyValue kv1 = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
266      KeyValue.Type.Maximum);
267    KeyValue kv2 = new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
268      KeyValue.Type.Maximum);
269    KeyValue kv3 = new KeyValue(Bytes.toBytes("row3"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
270      KeyValue.Type.Maximum);
271    final Result[] results = new Result[] { Result.create(new Cell[] { kv1 }),
272      Result.create(new Cell[] { kv2 }), Result.create(new Cell[] { kv3 }) };
273
274    RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
275
276    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
277    Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt()))
278      .thenAnswer(new Answer<Result[]>() {
279        private int count = 0;
280
281        @Override
282        public Result[] answer(InvocationOnMock invocation) throws Throwable {
283          ScannerCallableWithReplicas callable = invocation.getArgument(0);
284          switch (count) {
285            case 0: // initialize
286              count++;
287              // if we set no here the implementation will trigger a close
288              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES);
289              return results;
290            case 1: // close
291              count++;
292              return null;
293            default:
294              throw new RuntimeException("Expected only 2 invocations");
295          }
296        }
297      });
298
299    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
300
301    // Set a small cache
302    scan.setCaching(1);
303    // Set a very large size
304    scan.setMaxResultSize(1000 * 1000);
305
306    try (MockClientScanner scanner =
307      new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
308        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
309      InOrder inOrder = Mockito.inOrder(caller);
310
311      scanner.loadCache();
312
313      inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(Mockito.any(), Mockito.anyInt());
314
315      assertEquals(3, scanner.cache.size());
316      Result r = scanner.cache.poll();
317      assertNotNull(r);
318      CellScanner cs = r.cellScanner();
319      assertTrue(cs.advance());
320      assertEquals(kv1, cs.current());
321      assertFalse(cs.advance());
322
323      r = scanner.cache.poll();
324      assertNotNull(r);
325      cs = r.cellScanner();
326      assertTrue(cs.advance());
327      assertEquals(kv2, cs.current());
328      assertFalse(cs.advance());
329
330      r = scanner.cache.poll();
331      assertNotNull(r);
332      cs = r.cellScanner();
333      assertTrue(cs.advance());
334      assertEquals(kv3, cs.current());
335      assertFalse(cs.advance());
336    }
337  }
338
339  @Test
340  @SuppressWarnings("unchecked")
341  public void testNoMoreResults() throws IOException {
342    final Result[] results = new Result[1];
343    KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
344      KeyValue.Type.Maximum);
345    results[0] = Result.create(new Cell[] { kv1 });
346
347    RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
348
349    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
350    Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt()))
351      .thenAnswer(new Answer<Result[]>() {
352        private int count = 0;
353
354        @Override
355        public Result[] answer(InvocationOnMock invocation) throws Throwable {
356          ScannerCallableWithReplicas callable = invocation.getArgument(0);
357          switch (count) {
358            case 0: // initialize
359              count++;
360              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.NO);
361              return results;
362            case 1: // close
363              count++;
364              return null;
365            default:
366              throw new RuntimeException("Expected only 2 invocations");
367          }
368        }
369      });
370
371    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
372
373    // Set a much larger cache and buffer size than we'll provide
374    scan.setCaching(100);
375    scan.setMaxResultSize(1000 * 1000);
376
377    try (MockClientScanner scanner =
378      new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
379        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
380      scanner.setRpcFinished(true);
381
382      InOrder inOrder = Mockito.inOrder(caller);
383
384      scanner.loadCache();
385
386      inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(Mockito.any(), Mockito.anyInt());
387
388      assertEquals(1, scanner.cache.size());
389      Result r = scanner.cache.poll();
390      assertNotNull(r);
391      CellScanner cs = r.cellScanner();
392      assertTrue(cs.advance());
393      assertEquals(kv1, cs.current());
394      assertFalse(cs.advance());
395    }
396  }
397
398  @Test
399  @SuppressWarnings("unchecked")
400  public void testMoreResults() throws IOException {
401    final Result[] results1 = new Result[1];
402    KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
403      KeyValue.Type.Maximum);
404    results1[0] = Result.create(new Cell[] { kv1 });
405
406    final Result[] results2 = new Result[1];
407    KeyValue kv2 = new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
408      KeyValue.Type.Maximum);
409    results2[0] = Result.create(new Cell[] { kv2 });
410
411    RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
412
413    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
414    Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt()))
415      .thenAnswer(new Answer<Result[]>() {
416        private int count = 0;
417
418        @Override
419        public Result[] answer(InvocationOnMock invocation) throws Throwable {
420          ScannerCallableWithReplicas callable = invocation.getArgument(0);
421          switch (count) {
422            case 0: // initialize
423              count++;
424              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES);
425              return results1;
426            case 1:
427              count++;
428              // The server reports back false WRT more results
429              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.NO);
430              return results2;
431            case 2: // close
432              count++;
433              return null;
434            default:
435              throw new RuntimeException("Expected only 3 invocations");
436          }
437        }
438      });
439
440    // Set a much larger cache and buffer size than we'll provide
441    scan.setCaching(100);
442    scan.setMaxResultSize(1000 * 1000);
443
444    try (MockClientScanner scanner =
445      new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
446        rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
447      InOrder inOrder = Mockito.inOrder(caller);
448      scanner.setRpcFinished(true);
449
450      scanner.loadCache();
451
452      inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(Mockito.any(), Mockito.anyInt());
453
454      assertEquals(2, scanner.cache.size());
455      Result r = scanner.cache.poll();
456      assertNotNull(r);
457      CellScanner cs = r.cellScanner();
458      assertTrue(cs.advance());
459      assertEquals(kv1, cs.current());
460      assertFalse(cs.advance());
461
462      r = scanner.cache.poll();
463      assertNotNull(r);
464      cs = r.cellScanner();
465      assertTrue(cs.advance());
466      assertEquals(kv2, cs.current());
467      assertFalse(cs.advance());
468    }
469  }
470
471  /**
472   * Tests the case where all replicas of a region throw an exception. It should not cause a hang
473   * but the exception should propagate to the client
474   */
475  @Test
476  public void testExceptionsFromReplicasArePropagated() throws IOException {
477    scan.setConsistency(Consistency.TIMELINE);
478
479    // Mock a caller which calls the callable for ScannerCallableWithReplicas,
480    // but throws an exception for the actual scanner calls via callWithRetries.
481    rpcFactory = new MockRpcRetryingCallerFactory(conf, connectionConfig);
482    conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY,
483      MockRpcRetryingCallerFactory.class.getName());
484
485    // mock 3 replica locations
486    when(clusterConn.locateRegion((TableName) any(), (byte[]) any(), anyBoolean(), anyBoolean(),
487      anyInt())).thenReturn(new RegionLocations(null, null, null));
488
489    try (MockClientScanner scanner =
490      new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
491        rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE, connectionConfig)) {
492      Iterator<Result> iter = scanner.iterator();
493      while (iter.hasNext()) {
494        iter.next();
495      }
496      fail("Should have failed with RetriesExhaustedException");
497    } catch (RuntimeException expected) {
498      assertThat(expected.getCause(), instanceOf(RetriesExhaustedException.class));
499    }
500  }
501
502  public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory {
503
504    public MockRpcRetryingCallerFactory(Configuration conf,
505      ConnectionConfiguration connectionConf) {
506      super(conf, connectionConf);
507    }
508
509    @Override
510    public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
511      return new RpcRetryingCaller<T>() {
512        @Override
513        public void cancel() {
514        }
515
516        @Override
517        public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
518          throws IOException, RuntimeException {
519          throw new IOException("Scanner exception");
520        }
521
522        @Override
523        public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
524          throws IOException, RuntimeException {
525          try {
526            return callable.call(callTimeout);
527          } catch (IOException e) {
528            throw e;
529          } catch (Exception e) {
530            throw new RuntimeException(e);
531          }
532        }
533      };
534    }
535  }
536}