001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS;
022import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS;
023import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025import static org.junit.jupiter.api.Assertions.assertFalse;
026import static org.junit.jupiter.api.Assertions.assertNotNull;
027import static org.junit.jupiter.api.Assertions.assertTrue;
028import static org.mockito.ArgumentMatchers.any;
029import static org.mockito.ArgumentMatchers.anyInt;
030import static org.mockito.ArgumentMatchers.anyLong;
031import static org.mockito.ArgumentMatchers.argThat;
032import static org.mockito.Mockito.atLeast;
033import static org.mockito.Mockito.doAnswer;
034import static org.mockito.Mockito.mock;
035import static org.mockito.Mockito.times;
036import static org.mockito.Mockito.verify;
037
038import java.io.IOException;
039import java.util.Arrays;
040import java.util.Optional;
041import java.util.concurrent.CompletableFuture;
042import java.util.concurrent.ExecutorService;
043import java.util.concurrent.Executors;
044import java.util.concurrent.TimeUnit;
045import java.util.concurrent.atomic.AtomicInteger;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.hbase.Cell;
048import org.apache.hadoop.hbase.CellBuilderFactory;
049import org.apache.hadoop.hbase.CellBuilderType;
050import org.apache.hadoop.hbase.HBaseConfiguration;
051import org.apache.hadoop.hbase.HRegionLocation;
052import org.apache.hadoop.hbase.ServerName;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.ipc.HBaseRpcController;
055import org.apache.hadoop.hbase.security.User;
056import org.apache.hadoop.hbase.security.UserProvider;
057import org.apache.hadoop.hbase.testclassification.ClientTests;
058import org.apache.hadoop.hbase.testclassification.MediumTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.junit.jupiter.api.BeforeEach;
061import org.junit.jupiter.api.Tag;
062import org.junit.jupiter.api.Test;
063import org.junit.jupiter.api.TestInfo;
064import org.mockito.ArgumentMatcher;
065import org.mockito.invocation.InvocationOnMock;
066import org.mockito.stubbing.Answer;
067
068import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
069
070import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
084
085/**
086 * Confirm that we will set the priority in {@link HBaseRpcController} for several table operations.
087 */
088@Tag(ClientTests.TAG)
089@Tag(MediumTests.TAG)
090public class TestAsyncTableRpcPriority {
091
092  private static Configuration CONF = HBaseConfiguration.create();
093
094  private ClientService.Interface stub;
095
096  private ExecutorService threadPool;
097
098  private AsyncConnection conn;
099
100  public String name;
101
102  @BeforeEach
103  public void setUp(TestInfo testInfo) throws IOException {
104    name = testInfo.getTestMethod().get().getName();
105    this.threadPool = Executors.newSingleThreadExecutor();
106    stub = mock(ClientService.Interface.class);
107
108    doAnswer(new Answer<Void>() {
109
110      @Override
111      public Void answer(InvocationOnMock invocation) throws Throwable {
112        ClientProtos.MultiResponse resp =
113          ClientProtos.MultiResponse.newBuilder()
114            .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException(
115              ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))))
116            .build();
117        RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2);
118        done.run(resp);
119        return null;
120      }
121    }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any());
122    doAnswer(new Answer<Void>() {
123
124      @Override
125      public Void answer(InvocationOnMock invocation) throws Throwable {
126        MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation();
127        MutateResponse resp;
128        switch (req.getMutateType()) {
129          case INCREMENT:
130            ColumnValue value = req.getColumnValue(0);
131            QualifierValue qvalue = value.getQualifierValue(0);
132            Cell cell =
133              CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
134                .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray())
135                .setQualifier(qvalue.getQualifier().toByteArray())
136                .setValue(qvalue.getValue().toByteArray()).build();
137            resp = MutateResponse.newBuilder()
138              .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build();
139            break;
140          default:
141            resp = MutateResponse.getDefaultInstance();
142            break;
143        }
144        RpcCallback<MutateResponse> done = invocation.getArgument(2);
145        done.run(resp);
146        return null;
147      }
148    }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any());
149    doAnswer(new Answer<Void>() {
150
151      @Override
152      public Void answer(InvocationOnMock invocation) throws Throwable {
153        RpcCallback<GetResponse> done = invocation.getArgument(2);
154        done.run(GetResponse.getDefaultInstance());
155        return null;
156      }
157    }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
158    User user = UserProvider.instantiate(CONF).getCurrent();
159    conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF, user), "test", null,
160      user) {
161
162      @Override
163      AsyncRegionLocator getLocator() {
164        AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
165        Answer<CompletableFuture<HRegionLocation>> answer =
166          new Answer<CompletableFuture<HRegionLocation>>() {
167
168            @Override
169            public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation)
170              throws Throwable {
171              TableName tableName = invocation.getArgument(0);
172              RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
173              ServerName serverName = ServerName.valueOf("rs", 16010, 12345);
174              HRegionLocation loc = new HRegionLocation(info, serverName);
175              return CompletableFuture.completedFuture(loc);
176            }
177          };
178        doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
179          any(RegionLocateType.class), anyLong());
180        doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
181          anyInt(), any(RegionLocateType.class), anyLong());
182        return locator;
183      }
184
185      @Override
186      ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
187        return stub;
188      }
189    };
190  }
191
192  private HBaseRpcController assertPriority(int priority) {
193    return argThat(new ArgumentMatcher<HBaseRpcController>() {
194
195      @Override
196      public boolean matches(HBaseRpcController controller) {
197        return controller.getPriority() == priority;
198      }
199    });
200  }
201
202  private ScanRequest assertScannerCloseRequest() {
203    return argThat(new ArgumentMatcher<ScanRequest>() {
204
205      @Override
206      public boolean matches(ScanRequest request) {
207        return request.hasCloseScanner() && request.getCloseScanner();
208      }
209    });
210  }
211
212  @Test
213  public void testGet() {
214    conn.getTable(TableName.valueOf(name)).get(new Get(Bytes.toBytes(0)).setPriority(11)).join();
215    verify(stub, times(1)).get(assertPriority(11), any(GetRequest.class), any());
216  }
217
218  @Test
219  public void testGetNormalTable() {
220    conn.getTable(TableName.valueOf(name)).get(new Get(Bytes.toBytes(0))).join();
221    verify(stub, times(1)).get(assertPriority(NORMAL_QOS), any(GetRequest.class), any());
222  }
223
224  @Test
225  public void testGetSystemTable() {
226    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name)).get(new Get(Bytes.toBytes(0)))
227      .join();
228    verify(stub, times(1)).get(assertPriority(SYSTEMTABLE_QOS), any(GetRequest.class), any());
229  }
230
231  @Test
232  public void testGetMetaTable() {
233    conn.getTable(TableName.META_TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
234    verify(stub, times(1)).get(assertPriority(SYSTEMTABLE_QOS), any(GetRequest.class), any());
235  }
236
237  @Test
238  public void testPut() {
239    conn.getTable(TableName.valueOf(name)).put(new Put(Bytes.toBytes(0)).setPriority(12)
240      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
241    verify(stub, times(1)).mutate(assertPriority(12), any(MutateRequest.class), any());
242  }
243
244  @Test
245  public void testPutNormalTable() {
246    conn.getTable(TableName.valueOf(name)).put(new Put(Bytes.toBytes(0))
247      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
248    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
249  }
250
251  @Test
252  public void testPutSystemTable() {
253    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name)).put(new Put(Bytes.toBytes(0))
254      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
255    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
256  }
257
258  @Test
259  public void testPutMetaTable() {
260    conn.getTable(TableName.META_TABLE_NAME).put(new Put(Bytes.toBytes(0))
261      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
262    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
263  }
264
265  @Test
266  public void testDelete() {
267    conn.getTable(TableName.valueOf(name)).delete(new Delete(Bytes.toBytes(0)).setPriority(13))
268      .join();
269    verify(stub, times(1)).mutate(assertPriority(13), any(MutateRequest.class), any());
270  }
271
272  @Test
273  public void testDeleteNormalTable() {
274    conn.getTable(TableName.valueOf(name)).delete(new Delete(Bytes.toBytes(0))).join();
275    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
276  }
277
278  @Test
279  public void testDeleteSystemTable() {
280    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name))
281      .delete(new Delete(Bytes.toBytes(0))).join();
282    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
283  }
284
285  @Test
286  public void testDeleteMetaTable() {
287    conn.getTable(TableName.META_TABLE_NAME).delete(new Delete(Bytes.toBytes(0))).join();
288    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
289  }
290
291  @Test
292  public void testAppend() {
293    conn.getTable(TableName.valueOf(name)).append(new Append(Bytes.toBytes(0)).setPriority(14)
294      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
295    verify(stub, times(1)).mutate(assertPriority(14), any(MutateRequest.class), any());
296  }
297
298  @Test
299  public void testAppendNormalTable() {
300    conn.getTable(TableName.valueOf(name)).append(new Append(Bytes.toBytes(0))
301      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
302    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
303  }
304
305  @Test
306  public void testAppendSystemTable() {
307    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name))
308      .append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
309        Bytes.toBytes("v")))
310      .join();
311    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
312  }
313
314  @Test
315  public void testAppendMetaTable() {
316    conn.getTable(TableName.META_TABLE_NAME).append(new Append(Bytes.toBytes(0))
317      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join();
318    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
319  }
320
321  @Test
322  public void testIncrement() {
323    conn.getTable(TableName.valueOf(name)).increment(new Increment(Bytes.toBytes(0))
324      .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).setPriority(15)).join();
325    verify(stub, times(1)).mutate(assertPriority(15), any(MutateRequest.class), any());
326  }
327
328  @Test
329  public void testIncrementNormalTable() {
330    conn.getTable(TableName.valueOf(name))
331      .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
332    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
333  }
334
335  @Test
336  public void testIncrementSystemTable() {
337    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name))
338      .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
339    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
340  }
341
342  @Test
343  public void testIncrementMetaTable() {
344    conn.getTable(TableName.META_TABLE_NAME)
345      .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join();
346    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
347  }
348
349  @Test
350  public void testCheckAndPut() {
351    conn.getTable(TableName.valueOf(name)).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
352      .qualifier(Bytes.toBytes("cq")).ifNotExists()
353      .thenPut(new Put(Bytes.toBytes(0))
354        .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")).setPriority(16))
355      .join();
356    verify(stub, times(1)).mutate(assertPriority(16), any(MutateRequest.class), any());
357  }
358
359  @Test
360  public void testCheckAndPutNormalTable() {
361    conn.getTable(TableName.valueOf(name)).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
362      .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0))
363        .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
364      .join();
365    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
366  }
367
368  @Test
369  public void testCheckAndPutSystemTable() {
370    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name))
371      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
372      .ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
373        Bytes.toBytes("cq"), Bytes.toBytes("v")))
374      .join();
375    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
376  }
377
378  @Test
379  public void testCheckAndPutMetaTable() {
380    conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
381      .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0))
382        .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
383      .join();
384    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
385  }
386
387  @Test
388  public void testCheckAndDelete() {
389    conn.getTable(TableName.valueOf(name)).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
390      .qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v"))
391      .thenDelete(new Delete(Bytes.toBytes(0)).setPriority(17)).join();
392    verify(stub, times(1)).mutate(assertPriority(17), any(MutateRequest.class), any());
393  }
394
395  @Test
396  public void testCheckAndDeleteNormalTable() {
397    conn.getTable(TableName.valueOf(name)).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
398      .qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v"))
399      .thenDelete(new Delete(Bytes.toBytes(0))).join();
400    verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any());
401  }
402
403  @Test
404  public void testCheckAndDeleteSystemTable() {
405    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name))
406      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
407      .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join();
408    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
409  }
410
411  @Test
412  public void testCheckAndDeleteMetaTable() {
413    conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
414      .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0))
415        .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
416      .join();
417    verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any());
418  }
419
420  @Test
421  public void testCheckAndMutate() throws IOException {
422    conn.getTable(TableName.valueOf(name)).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
423      .qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v"))
424      .thenMutate(new RowMutations(Bytes.toBytes(0))
425        .add((Mutation) new Delete(Bytes.toBytes(0)).setPriority(18)))
426      .join();
427    verify(stub, times(1)).multi(assertPriority(18), any(ClientProtos.MultiRequest.class), any());
428  }
429
430  @Test
431  public void testCheckAndMutateNormalTable() throws IOException {
432    conn.getTable(TableName.valueOf(name)).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
433      .qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v"))
434      .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
435      .join();
436    verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class),
437      any());
438  }
439
440  @Test
441  public void testCheckAndMutateSystemTable() throws IOException {
442    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name))
443      .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
444      .ifEquals(Bytes.toBytes("v"))
445      .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
446      .join();
447    verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
448      any(ClientProtos.MultiRequest.class), any());
449  }
450
451  @Test
452  public void testCheckAndMutateMetaTable() throws IOException {
453    conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
454      .qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v"))
455      .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0))))
456      .join();
457    verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
458      any(ClientProtos.MultiRequest.class), any());
459  }
460
461  private CompletableFuture<Void> mockScanReturnRenewFuture(int scanPriority) {
462    int scannerId = 1;
463    CompletableFuture<Void> future = new CompletableFuture<>();
464    AtomicInteger scanNextCalled = new AtomicInteger(0);
465    doAnswer(new Answer<Void>() {
466
467      @SuppressWarnings("FutureReturnValueIgnored")
468      @Override
469      public Void answer(InvocationOnMock invocation) throws Throwable {
470        threadPool.submit(() -> {
471          ScanRequest req = invocation.getArgument(1);
472          RpcCallback<ScanResponse> done = invocation.getArgument(2);
473          if (!req.hasScannerId()) {
474            done.run(ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800)
475              .setMoreResultsInRegion(true).setMoreResults(true).build());
476          } else {
477            if (req.hasRenew() && req.getRenew()) {
478              future.complete(null);
479            }
480
481            assertFalse(req.hasCloseScanner() && req.getCloseScanner(),
482              "close scanner should not come in with scan priority " + scanPriority);
483
484            Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
485              .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
486              .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
487              .setValue(Bytes.toBytes("v")).build();
488            Result result = Result.create(Arrays.asList(cell));
489            done.run(ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800)
490              .setMoreResultsInRegion(true).setMoreResults(true)
491              .addResults(ProtobufUtil.toResult(result)).build());
492          }
493        });
494        return null;
495      }
496    }).when(stub).scan(assertPriority(scanPriority), any(ScanRequest.class), any());
497
498    doAnswer(new Answer<Void>() {
499
500      @SuppressWarnings("FutureReturnValueIgnored")
501      @Override
502      public Void answer(InvocationOnMock invocation) throws Throwable {
503        threadPool.submit(() -> {
504          ScanRequest req = invocation.getArgument(1);
505          RpcCallback<ScanResponse> done = invocation.getArgument(2);
506          assertTrue(req.hasScannerId(), "close request should have scannerId");
507          assertEquals(scannerId, req.getScannerId(), "close request's scannerId should match");
508          assertTrue(req.hasCloseScanner() && req.getCloseScanner(),
509            "close request should have closerScanner set");
510
511          done.run(ScanResponse.getDefaultInstance());
512        });
513        return null;
514      }
515    }).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any());
516    return future;
517  }
518
519  @Test
520  public void testScan() throws Exception {
521    CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(19);
522    testForTable(TableName.valueOf(name), renewFuture, Optional.of(19));
523  }
524
525  @Test
526  public void testScanNormalTable() throws Exception {
527    CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(NORMAL_QOS);
528    testForTable(TableName.valueOf(name), renewFuture, Optional.of(NORMAL_QOS));
529  }
530
531  @Test
532  public void testScanSystemTable() throws Exception {
533    CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS);
534    testForTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name), renewFuture, Optional.empty());
535  }
536
537  @Test
538  public void testScanMetaTable() throws Exception {
539    CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS);
540    testForTable(TableName.META_TABLE_NAME, renewFuture, Optional.empty());
541  }
542
543  private void testForTable(TableName tableName, CompletableFuture<Void> renewFuture,
544    Optional<Integer> priority) throws Exception {
545    Scan scan = new Scan().setCaching(1).setMaxResultSize(1);
546    priority.ifPresent(scan::setPriority);
547
548    try (ResultScanner scanner = conn.getTable(tableName).getScanner(scan)) {
549      assertNotNull(scanner.next());
550      // wait for at least one renew to come in before closing
551      renewFuture.join();
552    }
553
554    // ensures the close thread has time to finish before asserting
555    threadPool.shutdown();
556    threadPool.awaitTermination(5, TimeUnit.SECONDS);
557
558    // just verify that the calls happened. verification of priority occurred in the mocking
559    // open, next, then one or more lease renewals, then close
560    verify(stub, atLeast(4)).scan(any(), any(ScanRequest.class), any());
561    // additionally, explicitly check for a close request
562    verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any());
563  }
564
565  @Test
566  public void testBatchNormalTable() {
567    conn.getTable(TableName.valueOf(name)).batchAll(Arrays.asList(new Delete(Bytes.toBytes(0))))
568      .join();
569    verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class),
570      any());
571  }
572
573  @Test
574  public void testBatchSystemTable() {
575    conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name))
576      .batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
577    verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
578      any(ClientProtos.MultiRequest.class), any());
579  }
580
581  @Test
582  public void testBatchMetaTable() {
583    conn.getTable(TableName.META_TABLE_NAME).batchAll(Arrays.asList(new Delete(Bytes.toBytes(0))))
584      .join();
585    verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
586      any(ClientProtos.MultiRequest.class), any());
587  }
588}