001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertThat;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027import static org.mockito.Matchers.any;
028import static org.mockito.Matchers.anyBoolean;
029import static org.mockito.Matchers.anyInt;
030import static org.mockito.Mockito.when;
031
032import java.io.IOException;
033import java.util.Iterator;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellScanner;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.KeyValue.Type;
042import org.apache.hadoop.hbase.RegionLocations;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
045import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
046import org.apache.hadoop.hbase.testclassification.SmallTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.junit.After;
049import org.junit.Before;
050import org.junit.ClassRule;
051import org.junit.Rule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.junit.rules.TestName;
055import org.mockito.InOrder;
056import org.mockito.Mockito;
057import org.mockito.invocation.InvocationOnMock;
058import org.mockito.stubbing.Answer;
059
060/**
061 * Test the ClientScanner.
062 */
063@Category(SmallTests.class)
064public class TestClientScanner {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068      HBaseClassTestRule.forClass(TestClientScanner.class);
069
070  Scan scan;
071  ExecutorService pool;
072  Configuration conf;
073
074  ClusterConnection clusterConn;
075  RpcRetryingCallerFactory rpcFactory;
076  RpcControllerFactory controllerFactory;
077
078  @Rule
079  public TestName name = new TestName();
080
081  @Before
082  public void setup() throws IOException {
083    clusterConn = Mockito.mock(ClusterConnection.class);
084    rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
085    controllerFactory = Mockito.mock(RpcControllerFactory.class);
086    pool = Executors.newSingleThreadExecutor();
087    scan = new Scan();
088    conf = new Configuration();
089    Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
090  }
091
092  @After
093  public void teardown() {
094    if (null != pool) {
095      pool.shutdownNow();
096    }
097  }
098
099  private static class MockClientScanner extends ClientSimpleScanner {
100
101    private boolean rpcFinished = false;
102    private boolean rpcFinishedFired = false;
103    private boolean initialized = false;
104
105    public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
106        ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
107        RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
108        throws IOException {
109      super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
110          primaryOperationTimeout);
111    }
112
113    @Override
114    protected boolean moveToNextRegion() {
115      if (!initialized) {
116        initialized = true;
117        return super.moveToNextRegion();
118      }
119      if (!rpcFinished) {
120        return super.moveToNextRegion();
121      }
122      // Enforce that we don't short-circuit more than once
123      if (rpcFinishedFired) {
124        throw new RuntimeException("Expected nextScanner to only be called once after " +
125            " short-circuit was triggered.");
126      }
127      rpcFinishedFired = true;
128      return false;
129    }
130
131    public void setRpcFinished(boolean rpcFinished) {
132      this.rpcFinished = rpcFinished;
133    }
134  }
135
136  @Test
137  @SuppressWarnings("unchecked")
138  public void testNoResultsHint() throws IOException {
139    final Result[] results = new Result[1];
140    KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
141        Type.Maximum);
142    results[0] = Result.create(new Cell[] {kv1});
143
144    RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
145
146    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
147    Mockito.when(caller.callWithoutRetries(Mockito.any(),
148      Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
149        private int count = 0;
150        @Override
151        public Result[] answer(InvocationOnMock invocation) throws Throwable {
152            ScannerCallableWithReplicas callable = invocation.getArgument(0);
153          switch (count) {
154            case 0: // initialize
155              count++;
156              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.UNKNOWN);
157              return results;
158            case 1: // detect no more results
159            case 2: // close
160              count++;
161              return new Result[0];
162            default:
163              throw new RuntimeException("Expected only 2 invocations");
164          }
165        }
166    });
167
168    // Set a much larger cache and buffer size than we'll provide
169    scan.setCaching(100);
170    scan.setMaxResultSize(1000*1000);
171
172    try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()),
173        clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
174
175      scanner.setRpcFinished(true);
176
177      InOrder inOrder = Mockito.inOrder(caller);
178
179      scanner.loadCache();
180
181      // One for fetching the results
182      // One for fetching empty results and quit as we do not have moreResults hint.
183      inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
184          Mockito.any(), Mockito.anyInt());
185
186      assertEquals(1, scanner.cache.size());
187      Result r = scanner.cache.poll();
188      assertNotNull(r);
189      CellScanner cs = r.cellScanner();
190      assertTrue(cs.advance());
191      assertEquals(kv1, cs.current());
192      assertFalse(cs.advance());
193    }
194  }
195
196  @Test
197  @SuppressWarnings("unchecked")
198  public void testSizeLimit() throws IOException {
199    final Result[] results = new Result[1];
200    KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
201        Type.Maximum);
202    results[0] = Result.create(new Cell[] {kv1});
203
204    RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
205
206    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
207    Mockito.when(caller.callWithoutRetries(Mockito.any(),
208      Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
209        private int count = 0;
210        @Override
211        public Result[] answer(InvocationOnMock invocation) throws Throwable {
212          ScannerCallableWithReplicas callable = invocation.getArgument(0);
213          switch (count) {
214            case 0: // initialize
215              count++;
216              // if we set no here the implementation will trigger a close
217              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES);
218              return results;
219            case 1: // close
220              count++;
221              return null;
222            default:
223              throw new RuntimeException("Expected only 2 invocations");
224          }
225        }
226    });
227
228    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
229
230    // Set a much larger cache
231    scan.setCaching(100);
232    // The single key-value will exit the loop
233    scan.setMaxResultSize(1);
234
235    try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()),
236        clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
237      InOrder inOrder = Mockito.inOrder(caller);
238
239      scanner.loadCache();
240
241      inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(
242          Mockito.any(), Mockito.anyInt());
243
244      assertEquals(1, scanner.cache.size());
245      Result r = scanner.cache.poll();
246      assertNotNull(r);
247      CellScanner cs = r.cellScanner();
248      assertTrue(cs.advance());
249      assertEquals(kv1, cs.current());
250      assertFalse(cs.advance());
251    }
252  }
253
254  @Test
255  @SuppressWarnings("unchecked")
256  public void testCacheLimit() throws IOException {
257    KeyValue kv1 = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
258        Type.Maximum);
259    KeyValue kv2 = new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
260        Type.Maximum);
261    KeyValue kv3 = new KeyValue(Bytes.toBytes("row3"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
262        Type.Maximum);
263    final Result[] results = new Result[] {Result.create(new Cell[] {kv1}),
264        Result.create(new Cell[] {kv2}), Result.create(new Cell[] {kv3})};
265
266    RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
267
268    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
269    Mockito.when(caller.callWithoutRetries(Mockito.any(),
270      Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
271        private int count = 0;
272        @Override
273        public Result[] answer(InvocationOnMock invocation) throws Throwable {
274          ScannerCallableWithReplicas callable = invocation.getArgument(0);
275          switch (count) {
276            case 0: // initialize
277              count++;
278              // if we set no here the implementation will trigger a close
279              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES);
280              return results;
281            case 1: // close
282              count++;
283              return null;
284            default:
285              throw new RuntimeException("Expected only 2 invocations");
286          }
287        }
288    });
289
290    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
291
292    // Set a small cache
293    scan.setCaching(1);
294    // Set a very large size
295    scan.setMaxResultSize(1000*1000);
296
297    try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()),
298        clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
299      InOrder inOrder = Mockito.inOrder(caller);
300
301      scanner.loadCache();
302
303      inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(
304          Mockito.any(), Mockito.anyInt());
305
306      assertEquals(3, scanner.cache.size());
307      Result r = scanner.cache.poll();
308      assertNotNull(r);
309      CellScanner cs = r.cellScanner();
310      assertTrue(cs.advance());
311      assertEquals(kv1, cs.current());
312      assertFalse(cs.advance());
313
314      r = scanner.cache.poll();
315      assertNotNull(r);
316      cs = r.cellScanner();
317      assertTrue(cs.advance());
318      assertEquals(kv2, cs.current());
319      assertFalse(cs.advance());
320
321      r = scanner.cache.poll();
322      assertNotNull(r);
323      cs = r.cellScanner();
324      assertTrue(cs.advance());
325      assertEquals(kv3, cs.current());
326      assertFalse(cs.advance());
327    }
328  }
329
330  @Test
331  @SuppressWarnings("unchecked")
332  public void testNoMoreResults() throws IOException {
333    final Result[] results = new Result[1];
334    KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
335        Type.Maximum);
336    results[0] = Result.create(new Cell[] {kv1});
337
338    RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
339
340    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
341    Mockito.when(caller.callWithoutRetries(Mockito.any(),
342      Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
343        private int count = 0;
344        @Override
345        public Result[] answer(InvocationOnMock invocation) throws Throwable {
346          ScannerCallableWithReplicas callable = invocation.getArgument(0);
347          switch (count) {
348            case 0: // initialize
349              count++;
350              callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.NO);
351              return results;
352            case 1: // close
353              count++;
354              return null;
355            default:
356              throw new RuntimeException("Expected only 2 invocations");
357          }
358        }
359    });
360
361    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
362
363    // Set a much larger cache and buffer size than we'll provide
364    scan.setCaching(100);
365    scan.setMaxResultSize(1000*1000);
366
367    try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()),
368        clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
369      scanner.setRpcFinished(true);
370
371      InOrder inOrder = Mockito.inOrder(caller);
372
373      scanner.loadCache();
374
375      inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(
376          Mockito.any(), Mockito.anyInt());
377
378      assertEquals(1, scanner.cache.size());
379      Result r = scanner.cache.poll();
380      assertNotNull(r);
381      CellScanner cs = r.cellScanner();
382      assertTrue(cs.advance());
383      assertEquals(kv1, cs.current());
384      assertFalse(cs.advance());
385    }
386  }
387
388  @Test
389  @SuppressWarnings("unchecked")
390  public void testMoreResults() throws IOException {
391    final Result[] results1 = new Result[1];
392    KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
393        Type.Maximum);
394    results1[0] = Result.create(new Cell[] {kv1});
395
396    final Result[] results2 = new Result[1];
397    KeyValue kv2 = new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
398        Type.Maximum);
399    results2[0] = Result.create(new Cell[] {kv2});
400
401
402    RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
403
404    Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
405    Mockito.when(caller.callWithoutRetries(Mockito.any(),
406        Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
407          private int count = 0;
408          @Override
409          public Result[] answer(InvocationOnMock invocation) throws Throwable {
410            ScannerCallableWithReplicas callable = invocation.getArgument(0);
411            switch (count) {
412              case 0: // initialize
413                count++;
414                callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES);
415                return results1;
416              case 1:
417                count++;
418                // The server reports back false WRT more results
419                callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.NO);
420                return results2;
421              case 2: // close
422                count++;
423                return null;
424              default:
425                throw new RuntimeException("Expected only 3 invocations");
426            }
427          }
428      });
429
430    // Set a much larger cache and buffer size than we'll provide
431    scan.setCaching(100);
432    scan.setMaxResultSize(1000*1000);
433
434    try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()),
435        clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
436      InOrder inOrder = Mockito.inOrder(caller);
437      scanner.setRpcFinished(true);
438
439      scanner.loadCache();
440
441      inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
442          Mockito.any(), Mockito.anyInt());
443
444      assertEquals(2, scanner.cache.size());
445      Result r = scanner.cache.poll();
446      assertNotNull(r);
447      CellScanner cs = r.cellScanner();
448      assertTrue(cs.advance());
449      assertEquals(kv1, cs.current());
450      assertFalse(cs.advance());
451
452      r = scanner.cache.poll();
453      assertNotNull(r);
454      cs = r.cellScanner();
455      assertTrue(cs.advance());
456      assertEquals(kv2, cs.current());
457      assertFalse(cs.advance());
458    }
459  }
460
461  /**
462   * Tests the case where all replicas of a region throw an exception. It should not cause a hang
463   * but the exception should propagate to the client
464   */
465  @Test
466  public void testExceptionsFromReplicasArePropagated() throws IOException {
467    scan.setConsistency(Consistency.TIMELINE);
468
469    // Mock a caller which calls the callable for ScannerCallableWithReplicas,
470    // but throws an exception for the actual scanner calls via callWithRetries.
471    rpcFactory = new MockRpcRetryingCallerFactory(conf);
472    conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY,
473      MockRpcRetryingCallerFactory.class.getName());
474
475    // mock 3 replica locations
476    when(clusterConn.locateRegion((TableName)any(), (byte[])any(), anyBoolean(),
477      anyBoolean(), anyInt())).thenReturn(new RegionLocations(null, null, null));
478
479    try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()),
480      clusterConn, rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) {
481      Iterator<Result> iter = scanner.iterator();
482      while (iter.hasNext()) {
483        iter.next();
484      }
485      fail("Should have failed with RetriesExhaustedException");
486    } catch (RuntimeException expected) {
487      assertThat(expected.getCause(), instanceOf(RetriesExhaustedException.class));
488    }
489  }
490
491  public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory {
492
493    public MockRpcRetryingCallerFactory(Configuration conf) {
494      super(conf);
495    }
496
497    @Override
498    public <T> RpcRetryingCaller<T> newCaller() {
499      return new RpcRetryingCaller<T>() {
500        @Override
501        public void cancel() {
502        }
503        @Override
504        public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
505            throws IOException, RuntimeException {
506          throw new IOException("Scanner exception");
507        }
508
509        @Override
510        public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
511            throws IOException, RuntimeException {
512          try {
513            return callable.call(callTimeout);
514          } catch (IOException e) {
515            throw e;
516          } catch (Exception e) {
517            throw new RuntimeException(e);
518          }
519        }
520      };
521    }
522  }
523}