001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027import static org.mockito.ArgumentMatchers.any; 028import static org.mockito.ArgumentMatchers.anyBoolean; 029import static org.mockito.ArgumentMatchers.anyInt; 030import static org.mockito.Mockito.when; 031 032import java.io.IOException; 033import java.util.Iterator; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.CellScanner; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.RegionLocations; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; 045import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 046import org.apache.hadoop.hbase.testclassification.SmallTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.junit.After; 049import org.junit.Before; 050import org.junit.ClassRule; 051import org.junit.Rule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.junit.rules.TestName; 055import org.mockito.InOrder; 056import org.mockito.Mockito; 057import org.mockito.invocation.InvocationOnMock; 058import org.mockito.stubbing.Answer; 059 060/** 061 * Test the ClientScanner. 062 */ 063@Category(SmallTests.class) 064public class TestClientScanner { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestClientScanner.class); 069 070 Scan scan; 071 ExecutorService pool; 072 Configuration conf; 073 074 ClusterConnection clusterConn; 075 RpcRetryingCallerFactory rpcFactory; 076 RpcControllerFactory controllerFactory; 077 078 @Rule 079 public TestName name = new TestName(); 080 081 @Before 082 public void setup() throws IOException { 083 clusterConn = Mockito.mock(ClusterConnection.class); 084 rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class); 085 controllerFactory = Mockito.mock(RpcControllerFactory.class); 086 pool = Executors.newSingleThreadExecutor(); 087 scan = new Scan(); 088 conf = new Configuration(); 089 Mockito.when(clusterConn.getConfiguration()).thenReturn(conf); 090 } 091 092 @After 093 public void teardown() { 094 if (null != pool) { 095 pool.shutdownNow(); 096 } 097 } 098 099 private static class MockClientScanner extends ClientSimpleScanner { 100 101 private boolean rpcFinished = false; 102 private boolean rpcFinishedFired = false; 103 private boolean initialized = false; 104 105 public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName, 106 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, 107 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) 108 throws IOException { 109 super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, 110 HConstants.DEFAULT_HBASE_RPC_TIMEOUT, 111 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout); 112 } 113 114 @Override 115 protected boolean moveToNextRegion() { 116 if (!initialized) { 117 initialized = true; 118 return super.moveToNextRegion(); 119 } 120 if (!rpcFinished) { 121 return super.moveToNextRegion(); 122 } 123 // Enforce that we don't short-circuit more than once 124 if (rpcFinishedFired) { 125 throw new RuntimeException( 126 "Expected nextScanner to only be called once after " + " short-circuit was triggered."); 127 } 128 rpcFinishedFired = true; 129 return false; 130 } 131 132 public void setRpcFinished(boolean rpcFinished) { 133 this.rpcFinished = rpcFinished; 134 } 135 } 136 137 @Test 138 @SuppressWarnings("unchecked") 139 public void testNoResultsHint() throws IOException { 140 final Result[] results = new Result[1]; 141 KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 142 KeyValue.Type.Maximum); 143 results[0] = Result.create(new Cell[] { kv1 }); 144 145 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class); 146 147 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 148 Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt())) 149 .thenAnswer(new Answer<Result[]>() { 150 private int count = 0; 151 152 @Override 153 public Result[] answer(InvocationOnMock invocation) throws Throwable { 154 ScannerCallableWithReplicas callable = invocation.getArgument(0); 155 switch (count) { 156 case 0: // initialize 157 count++; 158 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.UNKNOWN); 159 return results; 160 case 1: // detect no more results 161 case 2: // close 162 count++; 163 return new Result[0]; 164 default: 165 throw new RuntimeException("Expected only 2 invocations"); 166 } 167 } 168 }); 169 170 // Set a much larger cache and buffer size than we'll provide 171 scan.setCaching(100); 172 scan.setMaxResultSize(1000 * 1000); 173 174 try (MockClientScanner scanner = 175 new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, 176 rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { 177 178 scanner.setRpcFinished(true); 179 180 InOrder inOrder = Mockito.inOrder(caller); 181 182 scanner.loadCache(); 183 184 // One for fetching the results 185 // One for fetching empty results and quit as we do not have moreResults hint. 186 inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(Mockito.any(), Mockito.anyInt()); 187 188 assertEquals(1, scanner.cache.size()); 189 Result r = scanner.cache.poll(); 190 assertNotNull(r); 191 CellScanner cs = r.cellScanner(); 192 assertTrue(cs.advance()); 193 assertEquals(kv1, cs.current()); 194 assertFalse(cs.advance()); 195 } 196 } 197 198 @Test 199 @SuppressWarnings("unchecked") 200 public void testSizeLimit() throws IOException { 201 final Result[] results = new Result[1]; 202 KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 203 KeyValue.Type.Maximum); 204 results[0] = Result.create(new Cell[] { kv1 }); 205 206 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class); 207 208 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 209 Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt())) 210 .thenAnswer(new Answer<Result[]>() { 211 private int count = 0; 212 213 @Override 214 public Result[] answer(InvocationOnMock invocation) throws Throwable { 215 ScannerCallableWithReplicas callable = invocation.getArgument(0); 216 switch (count) { 217 case 0: // initialize 218 count++; 219 // if we set no here the implementation will trigger a close 220 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES); 221 return results; 222 case 1: // close 223 count++; 224 return null; 225 default: 226 throw new RuntimeException("Expected only 2 invocations"); 227 } 228 } 229 }); 230 231 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 232 233 // Set a much larger cache 234 scan.setCaching(100); 235 // The single key-value will exit the loop 236 scan.setMaxResultSize(1); 237 238 try (MockClientScanner scanner = 239 new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, 240 rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { 241 InOrder inOrder = Mockito.inOrder(caller); 242 243 scanner.loadCache(); 244 245 inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(Mockito.any(), Mockito.anyInt()); 246 247 assertEquals(1, scanner.cache.size()); 248 Result r = scanner.cache.poll(); 249 assertNotNull(r); 250 CellScanner cs = r.cellScanner(); 251 assertTrue(cs.advance()); 252 assertEquals(kv1, cs.current()); 253 assertFalse(cs.advance()); 254 } 255 } 256 257 @Test 258 @SuppressWarnings("unchecked") 259 public void testCacheLimit() throws IOException { 260 KeyValue kv1 = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 261 KeyValue.Type.Maximum); 262 KeyValue kv2 = new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 263 KeyValue.Type.Maximum); 264 KeyValue kv3 = new KeyValue(Bytes.toBytes("row3"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 265 KeyValue.Type.Maximum); 266 final Result[] results = new Result[] { Result.create(new Cell[] { kv1 }), 267 Result.create(new Cell[] { kv2 }), Result.create(new Cell[] { kv3 }) }; 268 269 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class); 270 271 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 272 Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt())) 273 .thenAnswer(new Answer<Result[]>() { 274 private int count = 0; 275 276 @Override 277 public Result[] answer(InvocationOnMock invocation) throws Throwable { 278 ScannerCallableWithReplicas callable = invocation.getArgument(0); 279 switch (count) { 280 case 0: // initialize 281 count++; 282 // if we set no here the implementation will trigger a close 283 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES); 284 return results; 285 case 1: // close 286 count++; 287 return null; 288 default: 289 throw new RuntimeException("Expected only 2 invocations"); 290 } 291 } 292 }); 293 294 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 295 296 // Set a small cache 297 scan.setCaching(1); 298 // Set a very large size 299 scan.setMaxResultSize(1000 * 1000); 300 301 try (MockClientScanner scanner = 302 new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, 303 rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { 304 InOrder inOrder = Mockito.inOrder(caller); 305 306 scanner.loadCache(); 307 308 inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(Mockito.any(), Mockito.anyInt()); 309 310 assertEquals(3, scanner.cache.size()); 311 Result r = scanner.cache.poll(); 312 assertNotNull(r); 313 CellScanner cs = r.cellScanner(); 314 assertTrue(cs.advance()); 315 assertEquals(kv1, cs.current()); 316 assertFalse(cs.advance()); 317 318 r = scanner.cache.poll(); 319 assertNotNull(r); 320 cs = r.cellScanner(); 321 assertTrue(cs.advance()); 322 assertEquals(kv2, cs.current()); 323 assertFalse(cs.advance()); 324 325 r = scanner.cache.poll(); 326 assertNotNull(r); 327 cs = r.cellScanner(); 328 assertTrue(cs.advance()); 329 assertEquals(kv3, cs.current()); 330 assertFalse(cs.advance()); 331 } 332 } 333 334 @Test 335 @SuppressWarnings("unchecked") 336 public void testNoMoreResults() throws IOException { 337 final Result[] results = new Result[1]; 338 KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 339 KeyValue.Type.Maximum); 340 results[0] = Result.create(new Cell[] { kv1 }); 341 342 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class); 343 344 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 345 Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt())) 346 .thenAnswer(new Answer<Result[]>() { 347 private int count = 0; 348 349 @Override 350 public Result[] answer(InvocationOnMock invocation) throws Throwable { 351 ScannerCallableWithReplicas callable = invocation.getArgument(0); 352 switch (count) { 353 case 0: // initialize 354 count++; 355 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.NO); 356 return results; 357 case 1: // close 358 count++; 359 return null; 360 default: 361 throw new RuntimeException("Expected only 2 invocations"); 362 } 363 } 364 }); 365 366 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 367 368 // Set a much larger cache and buffer size than we'll provide 369 scan.setCaching(100); 370 scan.setMaxResultSize(1000 * 1000); 371 372 try (MockClientScanner scanner = 373 new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, 374 rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { 375 scanner.setRpcFinished(true); 376 377 InOrder inOrder = Mockito.inOrder(caller); 378 379 scanner.loadCache(); 380 381 inOrder.verify(caller, Mockito.times(1)).callWithoutRetries(Mockito.any(), Mockito.anyInt()); 382 383 assertEquals(1, scanner.cache.size()); 384 Result r = scanner.cache.poll(); 385 assertNotNull(r); 386 CellScanner cs = r.cellScanner(); 387 assertTrue(cs.advance()); 388 assertEquals(kv1, cs.current()); 389 assertFalse(cs.advance()); 390 } 391 } 392 393 @Test 394 @SuppressWarnings("unchecked") 395 public void testMoreResults() throws IOException { 396 final Result[] results1 = new Result[1]; 397 KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 398 KeyValue.Type.Maximum); 399 results1[0] = Result.create(new Cell[] { kv1 }); 400 401 final Result[] results2 = new Result[1]; 402 KeyValue kv2 = new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 403 KeyValue.Type.Maximum); 404 results2[0] = Result.create(new Cell[] { kv2 }); 405 406 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class); 407 408 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller); 409 Mockito.when(caller.callWithoutRetries(Mockito.any(), Mockito.anyInt())) 410 .thenAnswer(new Answer<Result[]>() { 411 private int count = 0; 412 413 @Override 414 public Result[] answer(InvocationOnMock invocation) throws Throwable { 415 ScannerCallableWithReplicas callable = invocation.getArgument(0); 416 switch (count) { 417 case 0: // initialize 418 count++; 419 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.YES); 420 return results1; 421 case 1: 422 count++; 423 // The server reports back false WRT more results 424 callable.currentScannerCallable.setMoreResultsInRegion(MoreResults.NO); 425 return results2; 426 case 2: // close 427 count++; 428 return null; 429 default: 430 throw new RuntimeException("Expected only 3 invocations"); 431 } 432 } 433 }); 434 435 // Set a much larger cache and buffer size than we'll provide 436 scan.setCaching(100); 437 scan.setMaxResultSize(1000 * 1000); 438 439 try (MockClientScanner scanner = 440 new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, 441 rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { 442 InOrder inOrder = Mockito.inOrder(caller); 443 scanner.setRpcFinished(true); 444 445 scanner.loadCache(); 446 447 inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(Mockito.any(), Mockito.anyInt()); 448 449 assertEquals(2, scanner.cache.size()); 450 Result r = scanner.cache.poll(); 451 assertNotNull(r); 452 CellScanner cs = r.cellScanner(); 453 assertTrue(cs.advance()); 454 assertEquals(kv1, cs.current()); 455 assertFalse(cs.advance()); 456 457 r = scanner.cache.poll(); 458 assertNotNull(r); 459 cs = r.cellScanner(); 460 assertTrue(cs.advance()); 461 assertEquals(kv2, cs.current()); 462 assertFalse(cs.advance()); 463 } 464 } 465 466 /** 467 * Tests the case where all replicas of a region throw an exception. It should not cause a hang 468 * but the exception should propagate to the client 469 */ 470 @Test 471 public void testExceptionsFromReplicasArePropagated() throws IOException { 472 scan.setConsistency(Consistency.TIMELINE); 473 474 // Mock a caller which calls the callable for ScannerCallableWithReplicas, 475 // but throws an exception for the actual scanner calls via callWithRetries. 476 rpcFactory = new MockRpcRetryingCallerFactory(conf); 477 conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, 478 MockRpcRetryingCallerFactory.class.getName()); 479 480 // mock 3 replica locations 481 when(clusterConn.locateRegion((TableName) any(), (byte[]) any(), anyBoolean(), anyBoolean(), 482 anyInt())).thenReturn(new RegionLocations(null, null, null)); 483 484 try (MockClientScanner scanner = 485 new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, 486 rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) { 487 Iterator<Result> iter = scanner.iterator(); 488 while (iter.hasNext()) { 489 iter.next(); 490 } 491 fail("Should have failed with RetriesExhaustedException"); 492 } catch (RuntimeException expected) { 493 assertThat(expected.getCause(), instanceOf(RetriesExhaustedException.class)); 494 } 495 } 496 497 public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory { 498 499 public MockRpcRetryingCallerFactory(Configuration conf) { 500 super(conf); 501 } 502 503 @Override 504 public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) { 505 return new RpcRetryingCaller<T>() { 506 @Override 507 public void cancel() { 508 } 509 510 @Override 511 public T callWithRetries(RetryingCallable<T> callable, int callTimeout) 512 throws IOException, RuntimeException { 513 throw new IOException("Scanner exception"); 514 } 515 516 @Override 517 public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) 518 throws IOException, RuntimeException { 519 try { 520 return callable.call(callTimeout); 521 } catch (IOException e) { 522 throw e; 523 } catch (Exception e) { 524 throw new RuntimeException(e); 525 } 526 } 527 }; 528 } 529 } 530}