001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; 021import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS; 022import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS; 023import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertNotNull; 027import static org.junit.Assert.assertTrue; 028import static org.mockito.ArgumentMatchers.any; 029import static org.mockito.ArgumentMatchers.anyInt; 030import static org.mockito.ArgumentMatchers.anyLong; 031import static org.mockito.ArgumentMatchers.argThat; 032import static org.mockito.Mockito.atLeast; 033import static org.mockito.Mockito.doAnswer; 034import static org.mockito.Mockito.mock; 035import static org.mockito.Mockito.times; 036import static org.mockito.Mockito.verify; 037 038import java.io.IOException; 039import java.util.Arrays; 040import java.util.Optional; 041import java.util.concurrent.CompletableFuture; 042import java.util.concurrent.ExecutorService; 043import java.util.concurrent.Executors; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.atomic.AtomicInteger; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.hbase.Cell; 048import org.apache.hadoop.hbase.CellBuilderFactory; 049import org.apache.hadoop.hbase.CellBuilderType; 050import org.apache.hadoop.hbase.HBaseClassTestRule; 051import org.apache.hadoop.hbase.HBaseConfiguration; 052import org.apache.hadoop.hbase.HRegionLocation; 053import org.apache.hadoop.hbase.ServerName; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.ipc.HBaseRpcController; 056import org.apache.hadoop.hbase.security.UserProvider; 057import org.apache.hadoop.hbase.testclassification.ClientTests; 058import org.apache.hadoop.hbase.testclassification.MediumTests; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.junit.Before; 061import org.junit.ClassRule; 062import org.junit.Rule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065import org.junit.rules.TestName; 066import org.mockito.ArgumentMatcher; 067import org.mockito.invocation.InvocationOnMock; 068import org.mockito.stubbing.Answer; 069 070import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 071 072import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 086 087/** 088 * Confirm that we will set the priority in {@link HBaseRpcController} for several table operations. 089 */ 090@Category({ ClientTests.class, MediumTests.class }) 091public class TestAsyncTableRpcPriority { 092 093 @ClassRule 094 public static final HBaseClassTestRule CLASS_RULE = 095 HBaseClassTestRule.forClass(TestAsyncTableRpcPriority.class); 096 097 private static Configuration CONF = HBaseConfiguration.create(); 098 099 private ClientService.Interface stub; 100 101 private ExecutorService threadPool; 102 103 private AsyncConnection conn; 104 105 @Rule 106 public TestName name = new TestName(); 107 108 @Before 109 public void setUp() throws IOException { 110 this.threadPool = Executors.newSingleThreadExecutor(); 111 stub = mock(ClientService.Interface.class); 112 113 doAnswer(new Answer<Void>() { 114 115 @Override 116 public Void answer(InvocationOnMock invocation) throws Throwable { 117 ClientProtos.MultiResponse resp = 118 ClientProtos.MultiResponse.newBuilder() 119 .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException( 120 ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())))) 121 .build(); 122 RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2); 123 done.run(resp); 124 return null; 125 } 126 }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any()); 127 doAnswer(new Answer<Void>() { 128 129 @Override 130 public Void answer(InvocationOnMock invocation) throws Throwable { 131 MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation(); 132 MutateResponse resp; 133 switch (req.getMutateType()) { 134 case INCREMENT: 135 ColumnValue value = req.getColumnValue(0); 136 QualifierValue qvalue = value.getQualifierValue(0); 137 Cell cell = 138 CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) 139 .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray()) 140 .setQualifier(qvalue.getQualifier().toByteArray()) 141 .setValue(qvalue.getValue().toByteArray()).build(); 142 resp = MutateResponse.newBuilder() 143 .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build(); 144 break; 145 default: 146 resp = MutateResponse.getDefaultInstance(); 147 break; 148 } 149 RpcCallback<MutateResponse> done = invocation.getArgument(2); 150 done.run(resp); 151 return null; 152 } 153 }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any()); 154 doAnswer(new Answer<Void>() { 155 156 @Override 157 public Void answer(InvocationOnMock invocation) throws Throwable { 158 RpcCallback<GetResponse> done = invocation.getArgument(2); 159 done.run(GetResponse.getDefaultInstance()); 160 return null; 161 } 162 }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any()); 163 conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", 164 UserProvider.instantiate(CONF).getCurrent()) { 165 166 @Override 167 AsyncRegionLocator getLocator() { 168 AsyncRegionLocator locator = mock(AsyncRegionLocator.class); 169 Answer<CompletableFuture<HRegionLocation>> answer = 170 new Answer<CompletableFuture<HRegionLocation>>() { 171 172 @Override 173 public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation) 174 throws Throwable { 175 TableName tableName = invocation.getArgument(0); 176 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 177 ServerName serverName = ServerName.valueOf("rs", 16010, 12345); 178 HRegionLocation loc = new HRegionLocation(info, serverName); 179 return CompletableFuture.completedFuture(loc); 180 } 181 }; 182 doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), 183 any(RegionLocateType.class), anyLong()); 184 doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), 185 anyInt(), any(RegionLocateType.class), anyLong()); 186 return locator; 187 } 188 189 @Override 190 ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { 191 return stub; 192 } 193 }; 194 } 195 196 private HBaseRpcController assertPriority(int priority) { 197 return argThat(new ArgumentMatcher<HBaseRpcController>() { 198 199 @Override 200 public boolean matches(HBaseRpcController controller) { 201 return controller.getPriority() == priority; 202 } 203 }); 204 } 205 206 private ScanRequest assertScannerCloseRequest() { 207 return argThat(new ArgumentMatcher<ScanRequest>() { 208 209 @Override 210 public boolean matches(ScanRequest request) { 211 return request.hasCloseScanner() && request.getCloseScanner(); 212 } 213 }); 214 } 215 216 @Test 217 public void testGet() { 218 conn.getTable(TableName.valueOf(name.getMethodName())) 219 .get(new Get(Bytes.toBytes(0)).setPriority(11)).join(); 220 verify(stub, times(1)).get(assertPriority(11), any(GetRequest.class), any()); 221 } 222 223 @Test 224 public void testGetNormalTable() { 225 conn.getTable(TableName.valueOf(name.getMethodName())).get(new Get(Bytes.toBytes(0))).join(); 226 verify(stub, times(1)).get(assertPriority(NORMAL_QOS), any(GetRequest.class), any()); 227 } 228 229 @Test 230 public void testGetSystemTable() { 231 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 232 .get(new Get(Bytes.toBytes(0))).join(); 233 verify(stub, times(1)).get(assertPriority(SYSTEMTABLE_QOS), any(GetRequest.class), any()); 234 } 235 236 @Test 237 public void testGetMetaTable() { 238 conn.getTable(TableName.META_TABLE_NAME).get(new Get(Bytes.toBytes(0))).join(); 239 verify(stub, times(1)).get(assertPriority(SYSTEMTABLE_QOS), any(GetRequest.class), any()); 240 } 241 242 @Test 243 public void testPut() { 244 conn 245 .getTable(TableName.valueOf(name.getMethodName())).put(new Put(Bytes.toBytes(0)) 246 .setPriority(12).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) 247 .join(); 248 verify(stub, times(1)).mutate(assertPriority(12), any(MutateRequest.class), any()); 249 } 250 251 @Test 252 public void testPutNormalTable() { 253 conn.getTable(TableName.valueOf(name.getMethodName())).put(new Put(Bytes.toBytes(0)) 254 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); 255 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 256 } 257 258 @Test 259 public void testPutSystemTable() { 260 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 261 .put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 262 Bytes.toBytes("v"))) 263 .join(); 264 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 265 } 266 267 @Test 268 public void testPutMetaTable() { 269 conn.getTable(TableName.META_TABLE_NAME).put(new Put(Bytes.toBytes(0)) 270 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); 271 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 272 } 273 274 @Test 275 public void testDelete() { 276 conn.getTable(TableName.valueOf(name.getMethodName())) 277 .delete(new Delete(Bytes.toBytes(0)).setPriority(13)).join(); 278 verify(stub, times(1)).mutate(assertPriority(13), any(MutateRequest.class), any()); 279 } 280 281 @Test 282 public void testDeleteNormalTable() { 283 conn.getTable(TableName.valueOf(name.getMethodName())).delete(new Delete(Bytes.toBytes(0))) 284 .join(); 285 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 286 } 287 288 @Test 289 public void testDeleteSystemTable() { 290 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 291 .delete(new Delete(Bytes.toBytes(0))).join(); 292 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 293 } 294 295 @Test 296 public void testDeleteMetaTable() { 297 conn.getTable(TableName.META_TABLE_NAME).delete(new Delete(Bytes.toBytes(0))).join(); 298 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 299 } 300 301 @Test 302 public void testAppend() { 303 conn 304 .getTable(TableName.valueOf(name.getMethodName())).append(new Append(Bytes.toBytes(0)) 305 .setPriority(14).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) 306 .join(); 307 verify(stub, times(1)).mutate(assertPriority(14), any(MutateRequest.class), any()); 308 } 309 310 @Test 311 public void testAppendNormalTable() { 312 conn.getTable(TableName.valueOf(name.getMethodName())).append(new Append(Bytes.toBytes(0)) 313 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); 314 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 315 } 316 317 @Test 318 public void testAppendSystemTable() { 319 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 320 .append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 321 Bytes.toBytes("v"))) 322 .join(); 323 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 324 } 325 326 @Test 327 public void testAppendMetaTable() { 328 conn.getTable(TableName.META_TABLE_NAME).append(new Append(Bytes.toBytes(0)) 329 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); 330 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 331 } 332 333 @Test 334 public void testIncrement() { 335 conn.getTable(TableName.valueOf(name.getMethodName())).increment(new Increment(Bytes.toBytes(0)) 336 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).setPriority(15)).join(); 337 verify(stub, times(1)).mutate(assertPriority(15), any(MutateRequest.class), any()); 338 } 339 340 @Test 341 public void testIncrementNormalTable() { 342 conn.getTable(TableName.valueOf(name.getMethodName())) 343 .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join(); 344 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 345 } 346 347 @Test 348 public void testIncrementSystemTable() { 349 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 350 .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join(); 351 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 352 } 353 354 @Test 355 public void testIncrementMetaTable() { 356 conn.getTable(TableName.META_TABLE_NAME) 357 .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join(); 358 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 359 } 360 361 @Test 362 public void testCheckAndPut() { 363 conn.getTable(TableName.valueOf(name.getMethodName())) 364 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 365 .ifNotExists() 366 .thenPut(new Put(Bytes.toBytes(0)) 367 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")).setPriority(16)) 368 .join(); 369 verify(stub, times(1)).mutate(assertPriority(16), any(MutateRequest.class), any()); 370 } 371 372 @Test 373 public void testCheckAndPutNormalTable() { 374 conn.getTable(TableName.valueOf(name.getMethodName())) 375 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 376 .ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), 377 Bytes.toBytes("cq"), Bytes.toBytes("v"))) 378 .join(); 379 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 380 } 381 382 @Test 383 public void testCheckAndPutSystemTable() { 384 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 385 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 386 .ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), 387 Bytes.toBytes("cq"), Bytes.toBytes("v"))) 388 .join(); 389 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 390 } 391 392 @Test 393 public void testCheckAndPutMetaTable() { 394 conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) 395 .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0)) 396 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) 397 .join(); 398 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 399 } 400 401 @Test 402 public void testCheckAndDelete() { 403 conn.getTable(TableName.valueOf(name.getMethodName())) 404 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 405 .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0)).setPriority(17)).join(); 406 verify(stub, times(1)).mutate(assertPriority(17), any(MutateRequest.class), any()); 407 } 408 409 @Test 410 public void testCheckAndDeleteNormalTable() { 411 conn.getTable(TableName.valueOf(name.getMethodName())) 412 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 413 .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join(); 414 verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); 415 } 416 417 @Test 418 public void testCheckAndDeleteSystemTable() { 419 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 420 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 421 .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join(); 422 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 423 } 424 425 @Test 426 public void testCheckAndDeleteMetaTable() { 427 conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) 428 .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0)) 429 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) 430 .join(); 431 verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); 432 } 433 434 @Test 435 public void testCheckAndMutate() throws IOException { 436 conn.getTable(TableName.valueOf(name.getMethodName())) 437 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 438 .ifEquals(Bytes.toBytes("v")).thenMutate(new RowMutations(Bytes.toBytes(0)) 439 .add((Mutation) new Delete(Bytes.toBytes(0)).setPriority(18))) 440 .join(); 441 verify(stub, times(1)).multi(assertPriority(18), any(ClientProtos.MultiRequest.class), any()); 442 } 443 444 @Test 445 public void testCheckAndMutateNormalTable() throws IOException { 446 conn.getTable(TableName.valueOf(name.getMethodName())) 447 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 448 .ifEquals(Bytes.toBytes("v")) 449 .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0)))) 450 .join(); 451 verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class), 452 any()); 453 } 454 455 @Test 456 public void testCheckAndMutateSystemTable() throws IOException { 457 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 458 .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 459 .ifEquals(Bytes.toBytes("v")) 460 .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0)))) 461 .join(); 462 verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS), 463 any(ClientProtos.MultiRequest.class), any()); 464 } 465 466 @Test 467 public void testCheckAndMutateMetaTable() throws IOException { 468 conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) 469 .qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v")) 470 .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0)))) 471 .join(); 472 verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS), 473 any(ClientProtos.MultiRequest.class), any()); 474 } 475 476 private CompletableFuture<Void> mockScanReturnRenewFuture(int scanPriority) { 477 int scannerId = 1; 478 CompletableFuture<Void> future = new CompletableFuture<>(); 479 AtomicInteger scanNextCalled = new AtomicInteger(0); 480 doAnswer(new Answer<Void>() { 481 482 @SuppressWarnings("FutureReturnValueIgnored") 483 @Override 484 public Void answer(InvocationOnMock invocation) throws Throwable { 485 threadPool.submit(() -> { 486 ScanRequest req = invocation.getArgument(1); 487 RpcCallback<ScanResponse> done = invocation.getArgument(2); 488 if (!req.hasScannerId()) { 489 done.run(ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800) 490 .setMoreResultsInRegion(true).setMoreResults(true).build()); 491 } else { 492 if (req.hasRenew() && req.getRenew()) { 493 future.complete(null); 494 } 495 496 assertFalse("close scanner should not come in with scan priority " + scanPriority, 497 req.hasCloseScanner() && req.getCloseScanner()); 498 499 Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 500 .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) 501 .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")) 502 .setValue(Bytes.toBytes("v")).build(); 503 Result result = Result.create(Arrays.asList(cell)); 504 done.run(ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800) 505 .setMoreResultsInRegion(true).setMoreResults(true) 506 .addResults(ProtobufUtil.toResult(result)).build()); 507 } 508 }); 509 return null; 510 } 511 }).when(stub).scan(assertPriority(scanPriority), any(ScanRequest.class), any()); 512 513 doAnswer(new Answer<Void>() { 514 515 @SuppressWarnings("FutureReturnValueIgnored") 516 @Override 517 public Void answer(InvocationOnMock invocation) throws Throwable { 518 threadPool.submit(() -> { 519 ScanRequest req = invocation.getArgument(1); 520 RpcCallback<ScanResponse> done = invocation.getArgument(2); 521 assertTrue("close request should have scannerId", req.hasScannerId()); 522 assertEquals("close request's scannerId should match", scannerId, req.getScannerId()); 523 assertTrue("close request should have closerScanner set", 524 req.hasCloseScanner() && req.getCloseScanner()); 525 526 done.run(ScanResponse.getDefaultInstance()); 527 }); 528 return null; 529 } 530 }).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any()); 531 return future; 532 } 533 534 @Test 535 public void testScan() throws Exception { 536 CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(19); 537 testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(19)); 538 } 539 540 @Test 541 public void testScanNormalTable() throws Exception { 542 CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(NORMAL_QOS); 543 testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(NORMAL_QOS)); 544 } 545 546 @Test 547 public void testScanSystemTable() throws Exception { 548 CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS); 549 testForTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()), renewFuture, 550 Optional.empty()); 551 } 552 553 @Test 554 public void testScanMetaTable() throws Exception { 555 CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS); 556 testForTable(TableName.META_TABLE_NAME, renewFuture, Optional.empty()); 557 } 558 559 private void testForTable(TableName tableName, CompletableFuture<Void> renewFuture, 560 Optional<Integer> priority) throws Exception { 561 Scan scan = new Scan().setCaching(1).setMaxResultSize(1); 562 priority.ifPresent(scan::setPriority); 563 564 try (ResultScanner scanner = conn.getTable(tableName).getScanner(scan)) { 565 assertNotNull(scanner.next()); 566 // wait for at least one renew to come in before closing 567 renewFuture.join(); 568 } 569 570 // ensures the close thread has time to finish before asserting 571 threadPool.shutdown(); 572 threadPool.awaitTermination(5, TimeUnit.SECONDS); 573 574 // just verify that the calls happened. verification of priority occurred in the mocking 575 // open, next, then one or more lease renewals, then close 576 verify(stub, atLeast(4)).scan(any(), any(ScanRequest.class), any()); 577 // additionally, explicitly check for a close request 578 verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any()); 579 } 580 581 @Test 582 public void testBatchNormalTable() { 583 conn.getTable(TableName.valueOf(name.getMethodName())) 584 .batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); 585 verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class), 586 any()); 587 } 588 589 @Test 590 public void testBatchSystemTable() { 591 conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) 592 .batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); 593 verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS), 594 any(ClientProtos.MultiRequest.class), any()); 595 } 596 597 @Test 598 public void testBatchMetaTable() { 599 conn.getTable(TableName.META_TABLE_NAME).batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))) 600 .join(); 601 verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS), 602 any(ClientProtos.MultiRequest.class), any()); 603 } 604}