001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS; 022import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS; 023import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025import static org.junit.jupiter.api.Assertions.assertFalse; 026import static org.junit.jupiter.api.Assertions.assertNotNull; 027import static org.junit.jupiter.api.Assertions.assertTrue; 028import static org.mockito.ArgumentMatchers.any; 029import static org.mockito.ArgumentMatchers.anyInt; 030import static org.mockito.ArgumentMatchers.anyLong; 031import static org.mockito.ArgumentMatchers.argThat; 032import static org.mockito.Mockito.atLeast; 033import static org.mockito.Mockito.doAnswer; 034import static org.mockito.Mockito.mock; 035import static org.mockito.Mockito.times; 036import static org.mockito.Mockito.verify; 037 038import java.io.IOException; 039import java.util.Arrays; 040import java.util.Optional; 041import java.util.concurrent.CompletableFuture; 042import java.util.concurrent.ExecutorService; 043import java.util.concurrent.Executors; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.atomic.AtomicInteger; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.hbase.Cell; 048import org.apache.hadoop.hbase.CellBuilderFactory; 049import org.apache.hadoop.hbase.CellBuilderType; 050import org.apache.hadoop.hbase.HBaseConfiguration; 051import org.apache.hadoop.hbase.HRegionLocation; 052import org.apache.hadoop.hbase.ServerName; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.ipc.HBaseRpcController; 055import org.apache.hadoop.hbase.security.User; 056import org.apache.hadoop.hbase.security.UserProvider; 057import org.apache.hadoop.hbase.testclassification.ClientTests; 058import org.apache.hadoop.hbase.testclassification.MediumTests; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.junit.jupiter.api.BeforeEach; 061import org.junit.jupiter.api.Tag; 062import org.junit.jupiter.api.Test; 063import org.junit.jupiter.api.TestInfo; 064import org.mockito.ArgumentMatcher; 065import org.mockito.invocation.InvocationOnMock; 066import org.mockito.stubbing.Answer; 067 068import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 069 070import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 084 085/** 086 * Confirm that we will set the priority in {@link HBaseRpcController} for several table operations. 087 */ 088@Tag(ClientTests.TAG) 089@Tag(MediumTests.TAG) 090public class TestAsyncTableRpcPriority { 091 092 private static Configuration CONF = HBaseConfiguration.create(); 093 094 private ClientService.Interface stub; 095 096 private ExecutorService threadPool; 097 098 private AsyncConnection conn; 099 100 public String name; 101 102 @BeforeEach 103 public void setUp(TestInfo testInfo) throws IOException { 104 name = testInfo.getTestMethod().get().getName(); 105 this.threadPool = Executors.newSingleThreadExecutor(); 106 stub = mock(ClientService.Interface.class); 107 108 doAnswer(new Answer<Void>() { 109 110 @Override 111 public Void answer(InvocationOnMock invocation) throws Throwable { 112 ClientProtos.MultiResponse resp = 113 ClientProtos.MultiResponse.newBuilder() 114 .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException( 115 ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())))) 116 .build(); 117 RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2); 118 done.run(resp); 119 return null; 120 } 121 }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any()); 122 doAnswer(new Answer<Void>() { 123 124 @Override 125 public Void answer(InvocationOnMock invocation) throws Throwable { 126 MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation(); 127 MutateResponse resp; 128 switch (req.getMutateType()) { 129 case INCREMENT: 130 ColumnValue value = req.getColumnValue(0); 131 QualifierValue qvalue = value.getQualifierValue(0); 132 Cell cell = 133 CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) 134 .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray()) 135 .setQualifier(qvalue.getQualifier().toByteArray()) 136 .setValue(qvalue.getValue().toByteArray()).build(); 137 resp = MutateResponse.newBuilder() 138 .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build(); 139 break; 140 default: 141 resp = MutateResponse.getDefaultInstance(); 142 break; 143 } 144 RpcCallback<MutateResponse> done = invocation.getArgument(2); 145 done.run(resp); 146 return null; 147 } 148 }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any()); 149 doAnswer(new Answer<Void>() { 150 151 @Override 152 public Void answer(InvocationOnMock invocation) throws Throwable { 153 RpcCallback<GetResponse> done = invocation.getArgument(2); 154 done.run(GetResponse.getDefaultInstance()); 155 return null; 156 } 157 }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any()); 158 User user = UserProvider.instantiate(CONF).getCurrent(); 159 conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF, user), "test", null, 160 user) { 161 162 @Override 163 AsyncRegionLocator getLocator() { 164 AsyncRegionLocator locator = mock(AsyncRegionLocator.class); 165 Answer<CompletableFuture<HRegionLocation>> answer = 166 new Answer<CompletableFuture<HRegionLocation>>() { 167 168 @Override 169 public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation) 170 throws Throwable { 171 TableName tableName = invocation.getArgument(0); 172 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 173 ServerName serverName = ServerName.valueOf("rs", 16010, 12345); 174 HRegionLocation loc = new HRegionLocation(info, serverName); 175 return CompletableFuture.completedFuture(loc); 176 } 177 }; 178 doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), 179 any(RegionLocateType.class), anyLong()); 180 doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), 181 anyInt(), any(RegionLocateType.class), anyLong()); 182 return locator; 183 } 184 185 @Override 186 ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { 187 return stub; 188 } 189 }; 190 } 191 192 private HBaseRpcController assertPriority(int priority) { 193 return argThat(new ArgumentMatcher<HBaseRpcController>() { 194 195 @Override 196 public boolean matches(HBaseRpcController controller) { 197 return controller.getPriority() == priority; 198 } 199 }); 200 } 201 202 private ScanRequest assertScannerCloseRequest() { 203 return argThat(new ArgumentMatcher<ScanRequest>() { 204 205 @Override 206 public boolean matches(ScanRequest request) { 207 return request.hasCloseScanner() && request.getCloseScanner(); 208 } 209 }); 210 } 211 212 @Test 213 public void testGet() { 214 conn.getTable(TableName.valueOf(name)).get(new Get(Bytes.toBytes(0)).setPriority(11)).join(); 215 verify(stub, times(1)).get(assertPriority(11), any(GetRequest.class), any()); 216 } 217 218 @Test 219 public void testGetNormalTable() { 220 conn.getTable(TableName.valueOf(name)).get(new Get(Bytes.toBytes(0))).join(); 221 verify(stub, times(1)).get(assertPriority(NORMAL_QOS), any(GetRequest.class), any()); 222 } 223 224 @Test 225 public void testGetSystemTable() { 226 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name)).get(new Get(Bytes.toBytes(0))) 227 .join(); 228 verify(stub, times(1)).get(assertPriority(SYSTEMTABLE_QOS), any(GetRequest.class), any()); 229 } 230 231 @Test 232 public void testGetMetaTable() { 233 conn.getTable(TableName.META_TABLE_NAME).get(new Get(Bytes.toBytes(0))).join(); 234 verify(stub, times(1)).get(assertPriority(SYSTEMTABLE_QOS), any(GetRequest.class), any()); 235 } 236 237 @Test 238 public void testPut() { 239 conn.getTable(TableName.valueOf(name)).put(new Put(Bytes.toBytes(0)).setPriority(12) 240 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); 241 verify(stub, times(1)).mutate(assertPriority(12), any(MutateRequest.class), any()); 242 } 243 244 @Test 245 public void testPutNormalTable() { 246 conn.getTable(TableName.valueOf(name)).put(new Put(Bytes.toBytes(0)) 247 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); 248 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 249 } 250 251 @Test 252 public void testPutSystemTable() { 253 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name)).put(new Put(Bytes.toBytes(0)) 254 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); 255 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 256 } 257 258 @Test 259 public void testPutMetaTable() { 260 conn.getTable(TableName.META_TABLE_NAME).put(new Put(Bytes.toBytes(0)) 261 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); 262 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 263 } 264 265 @Test 266 public void testDelete() { 267 conn.getTable(TableName.valueOf(name)).delete(new Delete(Bytes.toBytes(0)).setPriority(13)) 268 .join(); 269 verify(stub, times(1)).mutate(assertPriority(13), any(MutateRequest.class), any()); 270 } 271 272 @Test 273 public void testDeleteNormalTable() { 274 conn.getTable(TableName.valueOf(name)).delete(new Delete(Bytes.toBytes(0))).join(); 275 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 276 } 277 278 @Test 279 public void testDeleteSystemTable() { 280 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name)) 281 .delete(new Delete(Bytes.toBytes(0))).join(); 282 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 283 } 284 285 @Test 286 public void testDeleteMetaTable() { 287 conn.getTable(TableName.META_TABLE_NAME).delete(new Delete(Bytes.toBytes(0))).join(); 288 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 289 } 290 291 @Test 292 public void testAppend() { 293 conn.getTable(TableName.valueOf(name)).append(new Append(Bytes.toBytes(0)).setPriority(14) 294 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); 295 verify(stub, times(1)).mutate(assertPriority(14), any(MutateRequest.class), any()); 296 } 297 298 @Test 299 public void testAppendNormalTable() { 300 conn.getTable(TableName.valueOf(name)).append(new Append(Bytes.toBytes(0)) 301 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); 302 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 303 } 304 305 @Test 306 public void testAppendSystemTable() { 307 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name)) 308 .append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 309 Bytes.toBytes("v"))) 310 .join(); 311 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 312 } 313 314 @Test 315 public void testAppendMetaTable() { 316 conn.getTable(TableName.META_TABLE_NAME).append(new Append(Bytes.toBytes(0)) 317 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); 318 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 319 } 320 321 @Test 322 public void testIncrement() { 323 conn.getTable(TableName.valueOf(name)).increment(new Increment(Bytes.toBytes(0)) 324 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).setPriority(15)).join(); 325 verify(stub, times(1)).mutate(assertPriority(15), any(MutateRequest.class), any()); 326 } 327 328 @Test 329 public void testIncrementNormalTable() { 330 conn.getTable(TableName.valueOf(name)) 331 .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join(); 332 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 333 } 334 335 @Test 336 public void testIncrementSystemTable() { 337 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name)) 338 .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join(); 339 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 340 } 341 342 @Test 343 public void testIncrementMetaTable() { 344 conn.getTable(TableName.META_TABLE_NAME) 345 .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join(); 346 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 347 } 348 349 @Test 350 public void testCheckAndPut() { 351 conn.getTable(TableName.valueOf(name)).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) 352 .qualifier(Bytes.toBytes("cq")).ifNotExists() 353 .thenPut(new Put(Bytes.toBytes(0)) 354 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")).setPriority(16)) 355 .join(); 356 verify(stub, times(1)).mutate(assertPriority(16), any(MutateRequest.class), any()); 357 } 358 359 @Test 360 public void testCheckAndPutNormalTable() { 361 conn.getTable(TableName.valueOf(name)).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) 362 .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0)) 363 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) 364 .join(); 365 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 366 } 367 368 @Test 369 public void testCheckAndPutSystemTable() { 370 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name)) 371 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 372 .ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), 373 Bytes.toBytes("cq"), Bytes.toBytes("v"))) 374 .join(); 375 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 376 } 377 378 @Test 379 public void testCheckAndPutMetaTable() { 380 conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) 381 .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0)) 382 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) 383 .join(); 384 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 385 } 386 387 @Test 388 public void testCheckAndDelete() { 389 conn.getTable(TableName.valueOf(name)).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) 390 .qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v")) 391 .thenDelete(new Delete(Bytes.toBytes(0)).setPriority(17)).join(); 392 verify(stub, times(1)).mutate(assertPriority(17), any(MutateRequest.class), any()); 393 } 394 395 @Test 396 public void testCheckAndDeleteNormalTable() { 397 conn.getTable(TableName.valueOf(name)).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) 398 .qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v")) 399 .thenDelete(new Delete(Bytes.toBytes(0))).join(); 400 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 401 } 402 403 @Test 404 public void testCheckAndDeleteSystemTable() { 405 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name)) 406 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 407 .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join(); 408 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 409 } 410 411 @Test 412 public void testCheckAndDeleteMetaTable() { 413 conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) 414 .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0)) 415 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) 416 .join(); 417 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 418 } 419 420 @Test 421 public void testCheckAndMutate() throws IOException { 422 conn.getTable(TableName.valueOf(name)).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) 423 .qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v")) 424 .thenMutate(new RowMutations(Bytes.toBytes(0)) 425 .add((Mutation) new Delete(Bytes.toBytes(0)).setPriority(18))) 426 .join(); 427 verify(stub, times(1)).multi(assertPriority(18), any(ClientProtos.MultiRequest.class), any()); 428 } 429 430 @Test 431 public void testCheckAndMutateNormalTable() throws IOException { 432 conn.getTable(TableName.valueOf(name)).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) 433 .qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v")) 434 .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0)))) 435 .join(); 436 verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class), 437 any()); 438 } 439 440 @Test 441 public void testCheckAndMutateSystemTable() throws IOException { 442 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name)) 443 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 444 .ifEquals(Bytes.toBytes("v")) 445 .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0)))) 446 .join(); 447 verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS), 448 any(ClientProtos.MultiRequest.class), any()); 449 } 450 451 @Test 452 public void testCheckAndMutateMetaTable() throws IOException { 453 conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) 454 .qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v")) 455 .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0)))) 456 .join(); 457 verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS), 458 any(ClientProtos.MultiRequest.class), any()); 459 } 460 461 private CompletableFuture<Void> mockScanReturnRenewFuture(int scanPriority) { 462 int scannerId = 1; 463 CompletableFuture<Void> future = new CompletableFuture<>(); 464 AtomicInteger scanNextCalled = new AtomicInteger(0); 465 doAnswer(new Answer<Void>() { 466 467 @SuppressWarnings("FutureReturnValueIgnored") 468 @Override 469 public Void answer(InvocationOnMock invocation) throws Throwable { 470 threadPool.submit(() -> { 471 ScanRequest req = invocation.getArgument(1); 472 RpcCallback<ScanResponse> done = invocation.getArgument(2); 473 if (!req.hasScannerId()) { 474 done.run(ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800) 475 .setMoreResultsInRegion(true).setMoreResults(true).build()); 476 } else { 477 if (req.hasRenew() && req.getRenew()) { 478 future.complete(null); 479 } 480 481 assertFalse(req.hasCloseScanner() && req.getCloseScanner(), 482 "close scanner should not come in with scan priority " + scanPriority); 483 484 Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 485 .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) 486 .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")) 487 .setValue(Bytes.toBytes("v")).build(); 488 Result result = Result.create(Arrays.asList(cell)); 489 done.run(ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800) 490 .setMoreResultsInRegion(true).setMoreResults(true) 491 .addResults(ProtobufUtil.toResult(result)).build()); 492 } 493 }); 494 return null; 495 } 496 }).when(stub).scan(assertPriority(scanPriority), any(ScanRequest.class), any()); 497 498 doAnswer(new Answer<Void>() { 499 500 @SuppressWarnings("FutureReturnValueIgnored") 501 @Override 502 public Void answer(InvocationOnMock invocation) throws Throwable { 503 threadPool.submit(() -> { 504 ScanRequest req = invocation.getArgument(1); 505 RpcCallback<ScanResponse> done = invocation.getArgument(2); 506 assertTrue(req.hasScannerId(), "close request should have scannerId"); 507 assertEquals(scannerId, req.getScannerId(), "close request's scannerId should match"); 508 assertTrue(req.hasCloseScanner() && req.getCloseScanner(), 509 "close request should have closerScanner set"); 510 511 done.run(ScanResponse.getDefaultInstance()); 512 }); 513 return null; 514 } 515 }).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any()); 516 return future; 517 } 518 519 @Test 520 public void testScan() throws Exception { 521 CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(19); 522 testForTable(TableName.valueOf(name), renewFuture, Optional.of(19)); 523 } 524 525 @Test 526 public void testScanNormalTable() throws Exception { 527 CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(NORMAL_QOS); 528 testForTable(TableName.valueOf(name), renewFuture, Optional.of(NORMAL_QOS)); 529 } 530 531 @Test 532 public void testScanSystemTable() throws Exception { 533 CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS); 534 testForTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name), renewFuture, Optional.empty()); 535 } 536 537 @Test 538 public void testScanMetaTable() throws Exception { 539 CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS); 540 testForTable(TableName.META_TABLE_NAME, renewFuture, Optional.empty()); 541 } 542 543 private void testForTable(TableName tableName, CompletableFuture<Void> renewFuture, 544 Optional<Integer> priority) throws Exception { 545 Scan scan = new Scan().setCaching(1).setMaxResultSize(1); 546 priority.ifPresent(scan::setPriority); 547 548 try (ResultScanner scanner = conn.getTable(tableName).getScanner(scan)) { 549 assertNotNull(scanner.next()); 550 // wait for at least one renew to come in before closing 551 renewFuture.join(); 552 } 553 554 // ensures the close thread has time to finish before asserting 555 threadPool.shutdown(); 556 threadPool.awaitTermination(5, TimeUnit.SECONDS); 557 558 // just verify that the calls happened. verification of priority occurred in the mocking 559 // open, next, then one or more lease renewals, then close 560 verify(stub, atLeast(4)).scan(any(), any(ScanRequest.class), any()); 561 // additionally, explicitly check for a close request 562 verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any()); 563 } 564 565 @Test 566 public void testBatchNormalTable() { 567 conn.getTable(TableName.valueOf(name)).batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))) 568 .join(); 569 verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class), 570 any()); 571 } 572 573 @Test 574 public void testBatchSystemTable() { 575 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name)) 576 .batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); 577 verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS), 578 any(ClientProtos.MultiRequest.class), any()); 579 } 580 581 @Test 582 public void testBatchMetaTable() { 583 conn.getTable(TableName.META_TABLE_NAME).batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))) 584 .join(); 585 verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS), 586 any(ClientProtos.MultiRequest.class), any()); 587 } 588}