001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertThat; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027import static org.mockito.Matchers.any; 028import static org.mockito.Matchers.anyBoolean; 029import static org.mockito.Matchers.anyInt; 030import static org.mockito.Mockito.when; 031 032import java.io.IOException; 033import java.util.Iterator; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.CellScanner; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.KeyValue; 041import org.apache.hadoop.hbase.KeyValue.Type; 042import org.apache.hadoop.hbase.RegionLocations; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; 045import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 046import org.apache.hadoop.hbase.testclassification.SmallTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.junit.After; 049import org.junit.Before; 050import org.junit.ClassRule; 051import org.junit.Rule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.junit.rules.TestName; 055import org.mockito.InOrder; 056import org.mockito.Mockito; 057import org.mockito.invocation.InvocationOnMock; 058import org.mockito.stubbing.Answer; 059 060/** 061 * Test the ClientScanner. 062 */ 063@Category(SmallTests.class) 064public class TestClientScanner { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestClientScanner.class); 069 070 Scan scan; 071 ExecutorService pool; 072 Configuration conf; 073 074 ClusterConnection clusterConn; 075 RpcRetryingCallerFactory rpcFactory; 076 RpcControllerFactory controllerFactory; 077 078 @Rule 079 public TestName name = new TestName(); 080 081 @Before 082 public void setup() throws IOException { 083 clusterConn = Mockito.mock(ClusterConnection.class); 084 rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class); 085 controllerFactory = Mockito.mock(RpcControllerFactory.class); 086 pool = Executors.newSingleThreadExecutor(); 087 scan = new Scan(); 088 conf = new Configuration(); 089 Mockito.when(clusterConn.getConfiguration()).thenReturn(conf); 090 } 091 092 @After 093 public void teardown() { 094 if (null != pool) { 095 pool.shutdownNow(); 096 } 097 } 098 099 private static class MockClientScanner extends ClientSimpleScanner { 100 101 private boolean rpcFinished = false; 102 private boolean rpcFinishedFired = false; 103 private boolean initialized = false; 104 105 public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName, 106 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, 107 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) 108 throws IOException { 109 super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, 110 primaryOperationTimeout); 111 } 112 113 @Override 114 protected boolean moveToNextRegion() { 115 if (!initialized) { 116 initialized = true; 117 return super.moveToNextRegion(); 118 } 119 if (!rpcFinished) { 120 return super.moveToNextRegion(); 121 } 122 // Enforce that we don't short-circuit more than once 123 if (rpcFinishedFired) { 124 throw new RuntimeException("Expected nextScanner to only be called once after " + 125 " short-circuit was triggered."); 126 } 127 rpcFinishedFired = true; 128 return false; 129 } 130 131 public void setRpcFinished(boolean rpcFinished) { 132 this.rpcFinished = rpcFinished; 133 } 134 } 135 136 @Test 137 @SuppressWarnings("unchecked") 138 public void testNoResultsHint() throws IOException { 139 final Result[] results = new Result[1]; 140 KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 141 Type.Maximum); 142 results[0] = Result.create(new Cell[] {kv1}); 143 144 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class); 145 146 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 147 Mockito.when(caller.callWithoutRetries(Mockito.any(), 148 Mockito.anyInt())).thenAnswer(new Answer<Result[]>() { 149 private int count = 0; 150 @Override 151 public Result[] answer(InvocationOnMock invocation) throws Throwable { 152 ScannerCallableWithReplicas callable = invocation.getArgument(0); 153 switch (count) { 154 case 0: // initialize 155 count++; 156 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.UNKNOWN); 157 return results; 158 case 1: // detect no more results 159 case 2: // close 160 count++; 161 return new Result[0]; 162 default: 163 throw new RuntimeException("Expected only 2 invocations"); 164 } 165 } 166 }); 167 168 // Set a much larger cache and buffer size than we'll provide 169 scan.setCaching(100); 170 scan.setMaxResultSize(1000*1000); 171 172 try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), 173 clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { 174 175 scanner.setRpcFinished(true); 176 177 InOrder inOrder = Mockito.inOrder(caller); 178 179 scanner.loadCache(); 180 181 // One for fetching the results 182 // One for fetching empty results and quit as we do not have moreResults hint. 183 inOrder.verify(caller, Mockito.times(2)).callWithoutRetries( 184 Mockito.any(), Mockito.anyInt()); 185 186 assertEquals(1, scanner.cache.size()); 187 Result r = scanner.cache.poll(); 188 assertNotNull(r); 189 CellScanner cs = r.cellScanner(); 190 assertTrue(cs.advance()); 191 assertEquals(kv1, cs.current()); 192 assertFalse(cs.advance()); 193 } 194 } 195 196 @Test 197 @SuppressWarnings("unchecked") 198 public void testSizeLimit() throws IOException { 199 final Result[] results = new Result[1]; 200 KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 201 Type.Maximum); 202 results[0] = Result.create(new Cell[] {kv1}); 203 204 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class); 205 206 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 207 Mockito.when(caller.callWithoutRetries(Mockito.any(), 208 Mockito.anyInt())).thenAnswer(new Answer<Result[]>() { 209 private int count = 0; 210 @Override 211 public Result[] answer(InvocationOnMock invocation) throws Throwable { 212 ScannerCallableWithReplicas callable = invocation.getArgument(0); 213 switch (count) { 214 case 0: // initialize 215 count++; 216 // if we set no here the implementation will trigger a close 217 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES); 218 return results; 219 case 1: // close 220 count++; 221 return null; 222 default: 223 throw new RuntimeException("Expected only 2 invocations"); 224 } 225 } 226 }); 227 228 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 229 230 // Set a much larger cache 231 scan.setCaching(100); 232 // The single key-value will exit the loop 233 scan.setMaxResultSize(1); 234 235 try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), 236 clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { 237 InOrder inOrder = Mockito.inOrder(caller); 238 239 scanner.loadCache(); 240 241 inOrder.verify(caller, Mockito.times(1)).callWithoutRetries( 242 Mockito.any(), Mockito.anyInt()); 243 244 assertEquals(1, scanner.cache.size()); 245 Result r = scanner.cache.poll(); 246 assertNotNull(r); 247 CellScanner cs = r.cellScanner(); 248 assertTrue(cs.advance()); 249 assertEquals(kv1, cs.current()); 250 assertFalse(cs.advance()); 251 } 252 } 253 254 @Test 255 @SuppressWarnings("unchecked") 256 public void testCacheLimit() throws IOException { 257 KeyValue kv1 = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 258 Type.Maximum); 259 KeyValue kv2 = new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 260 Type.Maximum); 261 KeyValue kv3 = new KeyValue(Bytes.toBytes("row3"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 262 Type.Maximum); 263 final Result[] results = new Result[] {Result.create(new Cell[] {kv1}), 264 Result.create(new Cell[] {kv2}), Result.create(new Cell[] {kv3})}; 265 266 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class); 267 268 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 269 Mockito.when(caller.callWithoutRetries(Mockito.any(), 270 Mockito.anyInt())).thenAnswer(new Answer<Result[]>() { 271 private int count = 0; 272 @Override 273 public Result[] answer(InvocationOnMock invocation) throws Throwable { 274 ScannerCallableWithReplicas callable = invocation.getArgument(0); 275 switch (count) { 276 case 0: // initialize 277 count++; 278 // if we set no here the implementation will trigger a close 279 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES); 280 return results; 281 case 1: // close 282 count++; 283 return null; 284 default: 285 throw new RuntimeException("Expected only 2 invocations"); 286 } 287 } 288 }); 289 290 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 291 292 // Set a small cache 293 scan.setCaching(1); 294 // Set a very large size 295 scan.setMaxResultSize(1000*1000); 296 297 try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), 298 clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { 299 InOrder inOrder = Mockito.inOrder(caller); 300 301 scanner.loadCache(); 302 303 inOrder.verify(caller, Mockito.times(1)).callWithoutRetries( 304 Mockito.any(), Mockito.anyInt()); 305 306 assertEquals(3, scanner.cache.size()); 307 Result r = scanner.cache.poll(); 308 assertNotNull(r); 309 CellScanner cs = r.cellScanner(); 310 assertTrue(cs.advance()); 311 assertEquals(kv1, cs.current()); 312 assertFalse(cs.advance()); 313 314 r = scanner.cache.poll(); 315 assertNotNull(r); 316 cs = r.cellScanner(); 317 assertTrue(cs.advance()); 318 assertEquals(kv2, cs.current()); 319 assertFalse(cs.advance()); 320 321 r = scanner.cache.poll(); 322 assertNotNull(r); 323 cs = r.cellScanner(); 324 assertTrue(cs.advance()); 325 assertEquals(kv3, cs.current()); 326 assertFalse(cs.advance()); 327 } 328 } 329 330 @Test 331 @SuppressWarnings("unchecked") 332 public void testNoMoreResults() throws IOException { 333 final Result[] results = new Result[1]; 334 KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 335 Type.Maximum); 336 results[0] = Result.create(new Cell[] {kv1}); 337 338 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class); 339 340 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 341 Mockito.when(caller.callWithoutRetries(Mockito.any(), 342 Mockito.anyInt())).thenAnswer(new Answer<Result[]>() { 343 private int count = 0; 344 @Override 345 public Result[] answer(InvocationOnMock invocation) throws Throwable { 346 ScannerCallableWithReplicas callable = invocation.getArgument(0); 347 switch (count) { 348 case 0: // initialize 349 count++; 350 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.NO); 351 return results; 352 case 1: // close 353 count++; 354 return null; 355 default: 356 throw new RuntimeException("Expected only 2 invocations"); 357 } 358 } 359 }); 360 361 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 362 363 // Set a much larger cache and buffer size than we'll provide 364 scan.setCaching(100); 365 scan.setMaxResultSize(1000*1000); 366 367 try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), 368 clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { 369 scanner.setRpcFinished(true); 370 371 InOrder inOrder = Mockito.inOrder(caller); 372 373 scanner.loadCache(); 374 375 inOrder.verify(caller, Mockito.times(1)).callWithoutRetries( 376 Mockito.any(), Mockito.anyInt()); 377 378 assertEquals(1, scanner.cache.size()); 379 Result r = scanner.cache.poll(); 380 assertNotNull(r); 381 CellScanner cs = r.cellScanner(); 382 assertTrue(cs.advance()); 383 assertEquals(kv1, cs.current()); 384 assertFalse(cs.advance()); 385 } 386 } 387 388 @Test 389 @SuppressWarnings("unchecked") 390 public void testMoreResults() throws IOException { 391 final Result[] results1 = new Result[1]; 392 KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 393 Type.Maximum); 394 results1[0] = Result.create(new Cell[] {kv1}); 395 396 final Result[] results2 = new Result[1]; 397 KeyValue kv2 = new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 398 Type.Maximum); 399 results2[0] = Result.create(new Cell[] {kv2}); 400 401 402 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class); 403 404 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 405 Mockito.when(caller.callWithoutRetries(Mockito.any(), 406 Mockito.anyInt())).thenAnswer(new Answer<Result[]>() { 407 private int count = 0; 408 @Override 409 public Result[] answer(InvocationOnMock invocation) throws Throwable { 410 ScannerCallableWithReplicas callable = invocation.getArgument(0); 411 switch (count) { 412 case 0: // initialize 413 count++; 414 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES); 415 return results1; 416 case 1: 417 count++; 418 // The server reports back false WRT more results 419 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.NO); 420 return results2; 421 case 2: // close 422 count++; 423 return null; 424 default: 425 throw new RuntimeException("Expected only 3 invocations"); 426 } 427 } 428 }); 429 430 // Set a much larger cache and buffer size than we'll provide 431 scan.setCaching(100); 432 scan.setMaxResultSize(1000*1000); 433 434 try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), 435 clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { 436 InOrder inOrder = Mockito.inOrder(caller); 437 scanner.setRpcFinished(true); 438 439 scanner.loadCache(); 440 441 inOrder.verify(caller, Mockito.times(2)).callWithoutRetries( 442 Mockito.any(), Mockito.anyInt()); 443 444 assertEquals(2, scanner.cache.size()); 445 Result r = scanner.cache.poll(); 446 assertNotNull(r); 447 CellScanner cs = r.cellScanner(); 448 assertTrue(cs.advance()); 449 assertEquals(kv1, cs.current()); 450 assertFalse(cs.advance()); 451 452 r = scanner.cache.poll(); 453 assertNotNull(r); 454 cs = r.cellScanner(); 455 assertTrue(cs.advance()); 456 assertEquals(kv2, cs.current()); 457 assertFalse(cs.advance()); 458 } 459 } 460 461 /** 462 * Tests the case where all replicas of a region throw an exception. It should not cause a hang 463 * but the exception should propagate to the client 464 */ 465 @Test 466 public void testExceptionsFromReplicasArePropagated() throws IOException { 467 scan.setConsistency(Consistency.TIMELINE); 468 469 // Mock a caller which calls the callable for ScannerCallableWithReplicas, 470 // but throws an exception for the actual scanner calls via callWithRetries. 471 rpcFactory = new MockRpcRetryingCallerFactory(conf); 472 conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, 473 MockRpcRetryingCallerFactory.class.getName()); 474 475 // mock 3 replica locations 476 when(clusterConn.locateRegion((TableName)any(), (byte[])any(), anyBoolean(), 477 anyBoolean(), anyInt())).thenReturn(new RegionLocations(null, null, null)); 478 479 try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), 480 clusterConn, rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) { 481 Iterator<Result> iter = scanner.iterator(); 482 while (iter.hasNext()) { 483 iter.next(); 484 } 485 fail("Should have failed with RetriesExhaustedException"); 486 } catch (RuntimeException expected) { 487 assertThat(expected.getCause(), instanceOf(RetriesExhaustedException.class)); 488 } 489 } 490 491 public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory { 492 493 public MockRpcRetryingCallerFactory(Configuration conf) { 494 super(conf); 495 } 496 497 @Override 498 public <T> RpcRetryingCaller<T> newCaller() { 499 return new RpcRetryingCaller<T>() { 500 @Override 501 public void cancel() { 502 } 503 @Override 504 public T callWithRetries(RetryingCallable<T> callable, int callTimeout) 505 throws IOException, RuntimeException { 506 throw new IOException("Scanner exception"); 507 } 508 509 @Override 510 public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) 511 throws IOException, RuntimeException { 512 try { 513 return callable.call(callTimeout); 514 } catch (IOException e) { 515 throw e; 516 } catch (Exception e) { 517 throw new RuntimeException(e); 518 } 519 } 520 }; 521 } 522 } 523}