001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS;
022import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS;
023import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNotNull;
027import static org.junit.Assert.assertTrue;
028import static org.mockito.ArgumentMatchers.any;
029import static org.mockito.ArgumentMatchers.anyInt;
030import static org.mockito.ArgumentMatchers.anyLong;
031import static org.mockito.ArgumentMatchers.argThat;
032import static org.mockito.Mockito.atLeast;
033import static org.mockito.Mockito.doAnswer;
034import static org.mockito.Mockito.mock;
035import static org.mockito.Mockito.times;
036import static org.mockito.Mockito.verify;
037
038import java.io.IOException;
039import java.util.Arrays;
040import java.util.Optional;
041import java.util.concurrent.CompletableFuture;
042import java.util.concurrent.ExecutorService;
043import java.util.concurrent.Executors;
044import java.util.concurrent.TimeUnit;
045import java.util.concurrent.atomic.AtomicInteger;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.hbase.Cell;
048import org.apache.hadoop.hbase.CellBuilderFactory;
049import org.apache.hadoop.hbase.CellBuilderType;
050import org.apache.hadoop.hbase.HBaseClassTestRule;
051import org.apache.hadoop.hbase.HBaseConfiguration;
052import org.apache.hadoop.hbase.HRegionLocation;
053import org.apache.hadoop.hbase.ServerName;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.ipc.HBaseRpcController;
056import org.apache.hadoop.hbase.security.UserProvider;
057import org.apache.hadoop.hbase.testclassification.ClientTests;
058import org.apache.hadoop.hbase.testclassification.MediumTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.junit.Before;
061import org.junit.ClassRule;
062import org.junit.Rule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065import org.junit.rules.TestName;
066import org.mockito.ArgumentMatcher;
067import org.mockito.invocation.InvocationOnMock;
068import org.mockito.stubbing.Answer;
069
070import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
071
072import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
086
087/**
088 * Confirm that we will set the priority in {@link HBaseRpcController} for several table operations.
089 */
090@Category({ ClientTests.class, MediumTests.class })
091public class TestAsyncTableRpcPriority {
092
093  @ClassRule
094  public static final HBaseClassTestRule CLASS_RULE =
095    HBaseClassTestRule.forClass(TestAsyncTableRpcPriority.class);
096
097  private static Configuration CONF = HBaseConfiguration.create();
098
099  private ClientService.Interface stub;
100
101  private ExecutorService threadPool;
102
103  private AsyncConnection conn;
104
105  @Rule
106  public TestName name = new TestName();
107
108  @Before
109  public void setUp() throws IOException {
110    this.threadPool = Executors.newSingleThreadExecutor();
111    stub = mock(ClientService.Interface.class);
112
113    doAnswer(new Answer<Void>() {
114
115      @Override
116      public Void answer(InvocationOnMock invocation) throws Throwable {
117        ClientProtos.MultiResponse resp =
118          ClientProtos.MultiResponse.newBuilder()
119            .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException(
120              ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))))
121            .build();
122        RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2);
123        done.run(resp);
124        return null;
125      }
126    }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any());
127    doAnswer(new Answer<Void>() {
128
129      @Override
130      public Void answer(InvocationOnMock invocation) throws Throwable {
131        MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation();
132        MutateResponse resp;
133        switch (req.getMutateType()) {
134          case INCREMENT:
135            ColumnValue value = req.getColumnValue(0);
136            QualifierValue qvalue = value.getQualifierValue(0);
137            Cell cell =
138              CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
139                .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray())
140                .setQualifier(qvalue.getQualifier().toByteArray())
141                .setValue(qvalue.getValue().toByteArray()).build();
142            resp = MutateResponse.newBuilder()
143              .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build();
144            break;
145          default:
146            resp = MutateResponse.getDefaultInstance();
147            break;
148        }
149        RpcCallback<MutateResponse> done = invocation.getArgument(2);
150        done.run(resp);
151        return null;
152      }
153    }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any());
154    doAnswer(new Answer<Void>() {
155
156      @Override
157      public Void answer(InvocationOnMock invocation) throws Throwable {
158        RpcCallback<GetResponse> done = invocation.getArgument(2);
159        done.run(GetResponse.getDefaultInstance());
160        return null;
161      }
162    }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
163    conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test",
164      UserProvider.instantiate(CONF).getCurrent()) {
165
166      @Override
167      AsyncRegionLocator getLocator() {
168        AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
169        Answer<CompletableFuture<HRegionLocation>> answer =
170          new Answer<CompletableFuture<HRegionLocation>>() {
171
172            @Override
173            public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation)
174              throws Throwable {
175              TableName tableName = invocation.getArgument(0);
176              RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
177              ServerName serverName = ServerName.valueOf("rs", 16010, 12345);
178              HRegionLocation loc = new HRegionLocation(info, serverName);
179              return CompletableFuture.completedFuture(loc);
180            }
181          };
182        doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
183          any(RegionLocateType.class), anyLong());
184        doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
185          anyInt(), any(RegionLocateType.class), anyLong());
186        return locator;
187      }
188
189      @Override
190      ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
191        return stub;
192      }
193    };
194  }
195
196  private HBaseRpcController assertPriority(int priority) {
197    return argThat(new ArgumentMatcher<HBaseRpcController>() {
198
199      @Override
200      public boolean matches(HBaseRpcController controller) {
201        return controller.getPriority() == priority;
202      }
203    });
204  }
205
206  private ScanRequest assertScannerCloseRequest() {
207    return argThat(new ArgumentMatcher<ScanRequest>() {
208
209      @Override
210      public boolean matches(ScanRequest request) {
211        return request.hasCloseScanner() && request.getCloseScanner();
212      }
213    });
214  }
215
216  @Test
217  public void testGet() {
218    conn.getTable(TableName.valueOf(name.getMethodName()))
219      .get(new Get(Bytes.toBytes(0)).setPriority(11)).join();
220    verify(stub, times(1)).get(assertPriority(11), any(GetRequest.class), any());
221  }
222
223  @Test
224  public void testGetNormalTable() {
225    conn.getTable(TableName.valueOf(name.getMethodName())).get(new Get(Bytes.toBytes(0))).join();
226    verify(stub, times(1)).get(assertPriority(NORMAL_QOS), any(GetRequest.class), any());
227  }
228
229  @Test
230  public void testGetSystemTable() {
231    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
232      .get(new Get(Bytes.toBytes(0))).join();
233    verify(stub, times(1)).get(assertPriority(SYSTEMTABLE_QOS), any(GetRequest.class), any());
234  }
235
236  @Test
237  public void testGetMetaTable() {
238    conn.getTable(TableName.META_TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
239    verify(stub, times(1)).get(assertPriority(SYSTEMTABLE_QOS), any(GetRequest.class), any());
240  }
241
242  @Test
243  public void testPut() {
244    conn
245      .getTable(TableName.valueOf(name.getMethodName())).put(new Put(Bytes.toBytes(0))
246        .setPriority(12).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
247      .join();
248    verify(stub, times(1)).mutate(assertPriority(12), any(MutateRequest.class), any());
249  }
250
251  @Test
252  public void testPutNormalTable() {
253    conn.getTable(TableName.valueOf(name.getMethodName())).put(new Put(Bytes.toBytes(0))
254      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
255    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
256  }
257
258  @Test
259  public void testPutSystemTable() {
260    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
261      .put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
262        Bytes.toBytes("v")))
263      .join();
264    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
265  }
266
267  @Test
268  public void testPutMetaTable() {
269    conn.getTable(TableName.META_TABLE_NAME).put(new Put(Bytes.toBytes(0))
270      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
271    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
272  }
273
274  @Test
275  public void testDelete() {
276    conn.getTable(TableName.valueOf(name.getMethodName()))
277      .delete(new Delete(Bytes.toBytes(0)).setPriority(13)).join();
278    verify(stub, times(1)).mutate(assertPriority(13), any(MutateRequest.class), any());
279  }
280
281  @Test
282  public void testDeleteNormalTable() {
283    conn.getTable(TableName.valueOf(name.getMethodName())).delete(new Delete(Bytes.toBytes(0)))
284      .join();
285    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
286  }
287
288  @Test
289  public void testDeleteSystemTable() {
290    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
291      .delete(new Delete(Bytes.toBytes(0))).join();
292    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
293  }
294
295  @Test
296  public void testDeleteMetaTable() {
297    conn.getTable(TableName.META_TABLE_NAME).delete(new Delete(Bytes.toBytes(0))).join();
298    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
299  }
300
301  @Test
302  public void testAppend() {
303    conn
304      .getTable(TableName.valueOf(name.getMethodName())).append(new Append(Bytes.toBytes(0))
305        .setPriority(14).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
306      .join();
307    verify(stub, times(1)).mutate(assertPriority(14), any(MutateRequest.class), any());
308  }
309
310  @Test
311  public void testAppendNormalTable() {
312    conn.getTable(TableName.valueOf(name.getMethodName())).append(new Append(Bytes.toBytes(0))
313      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
314    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
315  }
316
317  @Test
318  public void testAppendSystemTable() {
319    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
320      .append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
321        Bytes.toBytes("v")))
322      .join();
323    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
324  }
325
326  @Test
327  public void testAppendMetaTable() {
328    conn.getTable(TableName.META_TABLE_NAME).append(new Append(Bytes.toBytes(0))
329      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
330    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
331  }
332
333  @Test
334  public void testIncrement() {
335    conn.getTable(TableName.valueOf(name.getMethodName())).increment(new Increment(Bytes.toBytes(0))
336      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).setPriority(15)).join();
337    verify(stub, times(1)).mutate(assertPriority(15), any(MutateRequest.class), any());
338  }
339
340  @Test
341  public void testIncrementNormalTable() {
342    conn.getTable(TableName.valueOf(name.getMethodName()))
343      .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
344    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
345  }
346
347  @Test
348  public void testIncrementSystemTable() {
349    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
350      .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
351    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
352  }
353
354  @Test
355  public void testIncrementMetaTable() {
356    conn.getTable(TableName.META_TABLE_NAME)
357      .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
358    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
359  }
360
361  @Test
362  public void testCheckAndPut() {
363    conn.getTable(TableName.valueOf(name.getMethodName()))
364      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
365      .ifNotExists()
366      .thenPut(new Put(Bytes.toBytes(0))
367        .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")).setPriority(16))
368      .join();
369    verify(stub, times(1)).mutate(assertPriority(16), any(MutateRequest.class), any());
370  }
371
372  @Test
373  public void testCheckAndPutNormalTable() {
374    conn.getTable(TableName.valueOf(name.getMethodName()))
375      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
376      .ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
377        Bytes.toBytes("cq"), Bytes.toBytes("v")))
378      .join();
379    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
380  }
381
382  @Test
383  public void testCheckAndPutSystemTable() {
384    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
385      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
386      .ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
387        Bytes.toBytes("cq"), Bytes.toBytes("v")))
388      .join();
389    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
390  }
391
392  @Test
393  public void testCheckAndPutMetaTable() {
394    conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
395      .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0))
396        .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
397      .join();
398    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
399  }
400
401  @Test
402  public void testCheckAndDelete() {
403    conn.getTable(TableName.valueOf(name.getMethodName()))
404      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
405      .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0)).setPriority(17)).join();
406    verify(stub, times(1)).mutate(assertPriority(17), any(MutateRequest.class), any());
407  }
408
409  @Test
410  public void testCheckAndDeleteNormalTable() {
411    conn.getTable(TableName.valueOf(name.getMethodName()))
412      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
413      .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join();
414    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
415  }
416
417  @Test
418  public void testCheckAndDeleteSystemTable() {
419    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
420      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
421      .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join();
422    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
423  }
424
425  @Test
426  public void testCheckAndDeleteMetaTable() {
427    conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
428      .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0))
429        .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
430      .join();
431    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
432  }
433
434  @Test
435  public void testCheckAndMutate() throws IOException {
436    conn.getTable(TableName.valueOf(name.getMethodName()))
437      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
438      .ifEquals(Bytes.toBytes("v")).thenMutate(new RowMutations(Bytes.toBytes(0))
439        .add((Mutation) new Delete(Bytes.toBytes(0)).setPriority(18)))
440      .join();
441    verify(stub, times(1)).multi(assertPriority(18), any(ClientProtos.MultiRequest.class), any());
442  }
443
444  @Test
445  public void testCheckAndMutateNormalTable() throws IOException {
446    conn.getTable(TableName.valueOf(name.getMethodName()))
447      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
448      .ifEquals(Bytes.toBytes("v"))
449      .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
450      .join();
451    verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class),
452      any());
453  }
454
455  @Test
456  public void testCheckAndMutateSystemTable() throws IOException {
457    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
458      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
459      .ifEquals(Bytes.toBytes("v"))
460      .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
461      .join();
462    verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
463      any(ClientProtos.MultiRequest.class), any());
464  }
465
466  @Test
467  public void testCheckAndMutateMetaTable() throws IOException {
468    conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
469      .qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v"))
470      .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
471      .join();
472    verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
473      any(ClientProtos.MultiRequest.class), any());
474  }
475
476  private CompletableFuture<Void> mockScanReturnRenewFuture(int scanPriority) {
477    int scannerId = 1;
478    CompletableFuture<Void> future = new CompletableFuture<>();
479    AtomicInteger scanNextCalled = new AtomicInteger(0);
480    doAnswer(new Answer<Void>() {
481
482      @SuppressWarnings("FutureReturnValueIgnored")
483      @Override
484      public Void answer(InvocationOnMock invocation) throws Throwable {
485        threadPool.submit(() -> {
486          ScanRequest req = invocation.getArgument(1);
487          RpcCallback<ScanResponse> done = invocation.getArgument(2);
488          if (!req.hasScannerId()) {
489            done.run(ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800)
490              .setMoreResultsInRegion(true).setMoreResults(true).build());
491          } else {
492            if (req.hasRenew() && req.getRenew()) {
493              future.complete(null);
494            }
495
496            assertFalse("close scanner should not come in with scan priority " + scanPriority,
497              req.hasCloseScanner() && req.getCloseScanner());
498
499            Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
500              .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
501              .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
502              .setValue(Bytes.toBytes("v")).build();
503            Result result = Result.create(Arrays.asList(cell));
504            done.run(ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800)
505              .setMoreResultsInRegion(true).setMoreResults(true)
506              .addResults(ProtobufUtil.toResult(result)).build());
507          }
508        });
509        return null;
510      }
511    }).when(stub).scan(assertPriority(scanPriority), any(ScanRequest.class), any());
512
513    doAnswer(new Answer<Void>() {
514
515      @SuppressWarnings("FutureReturnValueIgnored")
516      @Override
517      public Void answer(InvocationOnMock invocation) throws Throwable {
518        threadPool.submit(() -> {
519          ScanRequest req = invocation.getArgument(1);
520          RpcCallback<ScanResponse> done = invocation.getArgument(2);
521          assertTrue("close request should have scannerId", req.hasScannerId());
522          assertEquals("close request's scannerId should match", scannerId, req.getScannerId());
523          assertTrue("close request should have closerScanner set",
524            req.hasCloseScanner() && req.getCloseScanner());
525
526          done.run(ScanResponse.getDefaultInstance());
527        });
528        return null;
529      }
530    }).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any());
531    return future;
532  }
533
534  @Test
535  public void testScan() throws Exception {
536    CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(19);
537    testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(19));
538  }
539
540  @Test
541  public void testScanNormalTable() throws Exception {
542    CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(NORMAL_QOS);
543    testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(NORMAL_QOS));
544  }
545
546  @Test
547  public void testScanSystemTable() throws Exception {
548    CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS);
549    testForTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()), renewFuture,
550      Optional.empty());
551  }
552
553  @Test
554  public void testScanMetaTable() throws Exception {
555    CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS);
556    testForTable(TableName.META_TABLE_NAME, renewFuture, Optional.empty());
557  }
558
559  private void testForTable(TableName tableName, CompletableFuture<Void> renewFuture,
560    Optional<Integer> priority) throws Exception {
561    Scan scan = new Scan().setCaching(1).setMaxResultSize(1);
562    priority.ifPresent(scan::setPriority);
563
564    try (ResultScanner scanner = conn.getTable(tableName).getScanner(scan)) {
565      assertNotNull(scanner.next());
566      // wait for at least one renew to come in before closing
567      renewFuture.join();
568    }
569
570    // ensures the close thread has time to finish before asserting
571    threadPool.shutdown();
572    threadPool.awaitTermination(5, TimeUnit.SECONDS);
573
574    // just verify that the calls happened. verification of priority occurred in the mocking
575    // open, next, then one or more lease renewals, then close
576    verify(stub, atLeast(4)).scan(any(), any(ScanRequest.class), any());
577    // additionally, explicitly check for a close request
578    verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any());
579  }
580
581  @Test
582  public void testBatchNormalTable() {
583    conn.getTable(TableName.valueOf(name.getMethodName()))
584      .batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
585    verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class),
586      any());
587  }
588
589  @Test
590  public void testBatchSystemTable() {
591    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
592      .batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
593    verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
594      any(ClientProtos.MultiRequest.class), any());
595  }
596
597  @Test
598  public void testBatchMetaTable() {
599    conn.getTable(TableName.META_TABLE_NAME).batchAll(Arrays.asList(new Delete(Bytes.toBytes(0))))
600      .join();
601    verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
602      any(ClientProtos.MultiRequest.class), any());
603  }
604}