001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Map;
033import java.util.Map.Entry;
034import java.util.Random;
035import java.util.Set;
036import java.util.TreeSet;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.ExecutorService;
040import java.util.concurrent.Executors;
041import java.util.concurrent.Future;
042import java.util.concurrent.LinkedBlockingQueue;
043import java.util.concurrent.SynchronousQueue;
044import java.util.concurrent.ThreadFactory;
045import java.util.concurrent.ThreadPoolExecutor;
046import java.util.concurrent.TimeUnit;
047import java.util.concurrent.atomic.AtomicBoolean;
048import java.util.concurrent.atomic.AtomicInteger;
049import java.util.concurrent.atomic.AtomicLong;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.hbase.CallDroppedException;
052import org.apache.hadoop.hbase.CallQueueTooBigException;
053import org.apache.hadoop.hbase.Cell;
054import org.apache.hadoop.hbase.HBaseClassTestRule;
055import org.apache.hadoop.hbase.HBaseIOException;
056import org.apache.hadoop.hbase.HBaseServerException;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.HRegionInfo;
059import org.apache.hadoop.hbase.HRegionLocation;
060import org.apache.hadoop.hbase.RegionLocations;
061import org.apache.hadoop.hbase.ServerName;
062import org.apache.hadoop.hbase.TableName;
063import org.apache.hadoop.hbase.client.AsyncProcessTask.ListRowAccess;
064import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows;
065import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
066import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
067import org.apache.hadoop.hbase.client.coprocessor.Batch;
068import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
069import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
070import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
071import org.apache.hadoop.hbase.security.User;
072import org.apache.hadoop.hbase.testclassification.ClientTests;
073import org.apache.hadoop.hbase.testclassification.LargeTests;
074import org.apache.hadoop.hbase.util.Bytes;
075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
076import org.apache.hadoop.hbase.util.Threads;
077import org.junit.Assert;
078import org.junit.Before;
079import org.junit.ClassRule;
080import org.junit.Test;
081import org.junit.experimental.categories.Category;
082import org.mockito.Mockito;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
087
088@Category({ ClientTests.class, LargeTests.class })
089public class TestAsyncProcess {
090
091  @ClassRule
092  public static final HBaseClassTestRule CLASS_RULE =
093    HBaseClassTestRule.forClass(TestAsyncProcess.class);
094
095  private static final Logger LOG = LoggerFactory.getLogger(TestAsyncProcess.class);
096  private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE");
097  private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1");
098  private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2");
099  private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3");
100  private static final byte[] FAILS = Bytes.toBytes("FAILS");
101  private Configuration CONF;
102  private ConnectionConfiguration CONNECTION_CONFIG;
103  private static final ServerName sn = ServerName.valueOf("s1,1,1");
104  private static final ServerName sn2 = ServerName.valueOf("s2,2,2");
105  private static final ServerName sn3 = ServerName.valueOf("s3,3,3");
106  private static final HRegionInfo hri1 =
107    new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
108  private static final HRegionInfo hri2 =
109    new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
110  private static final HRegionInfo hri3 =
111    new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
112  private static final HRegionLocation loc1 = new HRegionLocation(hri1, sn);
113  private static final HRegionLocation loc2 = new HRegionLocation(hri2, sn);
114  private static final HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
115
116  // Replica stuff
117  private static final RegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1);
118  private static final RegionInfo hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2);
119  private static final RegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
120  private static final RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
121    new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3));
122  private static final RegionLocations hrls2 =
123    new RegionLocations(new HRegionLocation(hri2, sn2), new HRegionLocation(hri2r1, sn3));
124  private static final RegionLocations hrls3 =
125    new RegionLocations(new HRegionLocation(hri3, sn3), null);
126
127  private static final String success = "success";
128  private static Exception failure = new Exception("failure");
129
130  private static final int NB_RETRIES = 3;
131
132  private int RPC_TIMEOUT;
133  private int OPERATION_TIMEOUT;
134
135  @Before
136  public void beforeEach() {
137    this.CONF = new Configuration();
138    CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
139    this.CONNECTION_CONFIG = new ConnectionConfiguration(CONF);
140    this.RPC_TIMEOUT =
141      CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
142    this.OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
143      HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
144  }
145
146  static class CountingThreadFactory implements ThreadFactory {
147    final AtomicInteger nbThreads;
148    ThreadFactory realFactory =
149      new ThreadFactoryBuilder().setNameFormat("test-TestAsyncProcess-pool-%d")
150        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build();
151
152    @Override
153    public Thread newThread(Runnable r) {
154      nbThreads.incrementAndGet();
155      return realFactory.newThread(r);
156    }
157
158    CountingThreadFactory(AtomicInteger nbThreads) {
159      this.nbThreads = nbThreads;
160    }
161  }
162
163  static class MyAsyncProcess extends AsyncProcess {
164    final AtomicInteger nbMultiResponse = new AtomicInteger();
165    final AtomicInteger nbActions = new AtomicInteger();
166    public List<AsyncRequestFuture> allReqs = new ArrayList<>();
167    public AtomicInteger callsCt = new AtomicInteger();
168    private Configuration conf;
169
170    private long previousTimeout = -1;
171    final ExecutorService service;
172
173    @Override
174    protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(AsyncProcessTask task,
175      List<Action> actions, long nonceGroup) {
176      // Test HTable has tableName of null, so pass DUMMY_TABLE
177      AsyncProcessTask wrap = new AsyncProcessTask(task) {
178        @Override
179        public TableName getTableName() {
180          return DUMMY_TABLE;
181        }
182      };
183      AsyncRequestFutureImpl<Res> r =
184        new MyAsyncRequestFutureImpl<>(wrap, actions, nonceGroup, this);
185      allReqs.add(r);
186      return r;
187    }
188
189    public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
190      super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
191        new RpcControllerFactory(conf));
192      service = Executors.newFixedThreadPool(5);
193      this.conf = conf;
194    }
195
196    public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
197      super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
198        new RpcControllerFactory(conf));
199      service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
200        new CountingThreadFactory(nbThreads));
201    }
202
203    public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
204      List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
205      boolean needResults) throws InterruptedIOException {
206      AsyncProcessTask task = AsyncProcessTask.newBuilder(callback)
207        .setPool(pool == null ? service : pool).setTableName(tableName).setRowAccess(rows)
208        .setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL)
209        .setNeedResults(needResults)
210        .setRpcTimeout(
211          conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT))
212        .setOperationTimeout(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
213          HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT))
214        .build();
215      return submit(task);
216    }
217
218    public <CResult> AsyncRequestFuture submit(TableName tableName, final List<? extends Row> rows,
219      boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
220      throws InterruptedIOException {
221      return submit(null, tableName, rows, atLeastOne, callback, needResults);
222    }
223
224    @Override
225    public <Res> AsyncRequestFuture submit(AsyncProcessTask<Res> task)
226      throws InterruptedIOException {
227      previousTimeout = task.getRpcTimeout();
228      // We use results in tests to check things, so override to always save them.
229      AsyncProcessTask<Res> wrap = new AsyncProcessTask<Res>(task) {
230        @Override
231        public boolean getNeedResults() {
232          return true;
233        }
234      };
235      return super.submit(wrap);
236    }
237
238    @Override
239    protected RpcRetryingCaller<AbstractResponse>
240      createCaller(CancellableRegionServerCallable callable, int rpcTimeout) {
241      callsCt.incrementAndGet();
242      MultiServerCallable callable1 = (MultiServerCallable) callable;
243      final MultiResponse mr = createMultiResponse(callable1.getMulti(), nbMultiResponse, nbActions,
244        new ResponseGenerator() {
245          @Override
246          public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
247            if (Arrays.equals(FAILS, a.getAction().getRow())) {
248              mr.add(regionName, a.getOriginalIndex(), failure);
249            } else {
250              mr.add(regionName, a.getOriginalIndex(), success);
251            }
252          }
253        });
254
255      return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10,
256        RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) {
257        @Override
258        public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
259          int callTimeout) throws IOException, RuntimeException {
260          try {
261            // sleep one second in order for threadpool to start another thread instead of reusing
262            // existing one.
263            Thread.sleep(1000);
264          } catch (InterruptedException e) {
265            // ignore error
266          }
267          return mr;
268        }
269      };
270    }
271
272  }
273
274  static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
275    private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
276
277    public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, long nonceGroup,
278      AsyncProcess asyncProcess) {
279      super(task, actions, nonceGroup, asyncProcess);
280    }
281
282    @Override
283    protected void updateStats(ServerName server, MultiResponse resp) {
284      // Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
285    }
286
287    Map<ServerName, List<Long>> getRequestHeapSize() {
288      return heapSizesByServer;
289    }
290
291    @Override
292    SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt,
293      ServerName server, Set<CancellableRegionServerCallable> callsInProgress) {
294      SingleServerRequestRunnable rq =
295        new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress);
296      List<Long> heapCount = heapSizesByServer.get(server);
297      if (heapCount == null) {
298        heapCount = new ArrayList<>();
299        heapSizesByServer.put(server, heapCount);
300      }
301      heapCount.add(heapSizeOf(multiAction));
302      return rq;
303    }
304
305    private long heapSizeOf(MultiAction multiAction) {
306      return multiAction.actions.values().stream().flatMap(v -> v.stream())
307        .map(action -> action.getAction()).filter(row -> row instanceof Mutation)
308        .mapToLong(row -> ((Mutation) row).heapSize()).sum();
309    }
310  }
311
312  static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse> {
313
314    private final IOException e;
315
316    public CallerWithFailure(IOException e) {
317      super(100, 500, 100, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null);
318      this.e = e;
319    }
320
321    @Override
322    public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
323      int callTimeout) throws IOException, RuntimeException {
324      throw e;
325    }
326  }
327
328  /**
329   * Used to simulate the case where a RegionServer responds to a multi request, but some or all of
330   * the actions have an Exception instead of Result. These responses go through receiveMultiAction,
331   * which has handling for individual action failures.
332   */
333  static class CallerWithRegionException extends RpcRetryingCallerImpl<AbstractResponse> {
334
335    private final IOException e;
336    private MultiAction multi;
337
338    public CallerWithRegionException(IOException e, MultiAction multi) {
339      super(100, 500, 100, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null);
340      this.e = e;
341      this.multi = multi;
342    }
343
344    @Override
345    public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
346      int callTimeout) throws IOException, RuntimeException {
347      MultiResponse response = new MultiResponse();
348      for (Entry<byte[], List<Action>> entry : multi.actions.entrySet()) {
349        response.addException(entry.getKey(), e);
350      }
351      return response;
352    }
353  }
354
355  static class AsyncProcessWithFailure extends MyAsyncProcess {
356
357    private final IOException ioe;
358    private final ServerName failingServer;
359    private final boolean returnAsRegionException;
360
361    public AsyncProcessWithFailure(ClusterConnection hc, Configuration myConf, IOException ioe,
362      ServerName failingServer, boolean returnAsRegionException) {
363      super(hc, myConf);
364      this.ioe = ioe;
365      this.failingServer = failingServer;
366      this.returnAsRegionException = returnAsRegionException;
367      serverTrackerTimeout = 1L;
368    }
369
370    public AsyncProcessWithFailure(ClusterConnection hc, Configuration myConf, IOException ioe) {
371      this(hc, myConf, ioe, null, false);
372    }
373
374    @Override
375    protected RpcRetryingCaller<AbstractResponse>
376      createCaller(CancellableRegionServerCallable callable, int rpcTimeout) {
377      MultiServerCallable msc = (MultiServerCallable) callable;
378      if (failingServer != null) {
379        if (!msc.getServerName().equals(failingServer)) {
380          return super.createCaller(callable, rpcTimeout);
381        }
382      }
383
384      if (returnAsRegionException) {
385        return new CallerWithRegionException(ioe, msc.getMulti());
386      }
387
388      callsCt.incrementAndGet();
389      return new CallerWithFailure(ioe);
390    }
391  }
392
393  /**
394   * Make the backoff time always different on each call.
395   */
396  static class MyClientBackoffPolicy implements ClientBackoffPolicy {
397    private final Map<ServerName, AtomicInteger> count = new HashMap<>();
398
399    @Override
400    public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) {
401      AtomicInteger inc = count.get(serverName);
402      if (inc == null) {
403        inc = new AtomicInteger(0);
404        count.put(serverName, inc);
405      }
406      return inc.getAndIncrement();
407    }
408  }
409
410  static class MyAsyncProcessWithReplicas extends MyAsyncProcess {
411    private Set<byte[]> failures = new TreeSet<>(new Bytes.ByteArrayComparator());
412    private long primarySleepMs = 0, replicaSleepMs = 0;
413    private Map<ServerName, Long> customPrimarySleepMs = new HashMap<>();
414    private final AtomicLong replicaCalls = new AtomicLong(0);
415
416    public void addFailures(RegionInfo... hris) {
417      for (RegionInfo hri : hris) {
418        failures.add(hri.getRegionName());
419      }
420    }
421
422    public long getReplicaCallCount() {
423      return replicaCalls.get();
424    }
425
426    public void setPrimaryCallDelay(ServerName server, long primaryMs) {
427      customPrimarySleepMs.put(server, primaryMs);
428    }
429
430    public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) {
431      super(hc, conf);
432    }
433
434    public void setCallDelays(long primaryMs, long replicaMs) {
435      this.primarySleepMs = primaryMs;
436      this.replicaSleepMs = replicaMs;
437    }
438
439    @Override
440    protected RpcRetryingCaller<AbstractResponse>
441      createCaller(CancellableRegionServerCallable payloadCallable, int rpcTimeout) {
442      MultiServerCallable callable = (MultiServerCallable) payloadCallable;
443      final MultiResponse mr = createMultiResponse(callable.getMulti(), nbMultiResponse, nbActions,
444        new ResponseGenerator() {
445          @Override
446          public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
447            if (failures.contains(regionName)) {
448              mr.add(regionName, a.getOriginalIndex(), failure);
449            } else {
450              boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId());
451              mr.add(regionName, a.getOriginalIndex(), Result.create(new Cell[0], null, isStale));
452            }
453          }
454        });
455      // Currently AsyncProcess either sends all-replica, or all-primary request.
456      final boolean isDefault = RegionReplicaUtil.isDefaultReplica(
457        callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId());
458      final ServerName server = ((MultiServerCallable) callable).getServerName();
459      String debugMsg = "Call to " + server + ", primary=" + isDefault + " with "
460        + callable.getMulti().actions.size() + " entries: ";
461      for (byte[] region : callable.getMulti().actions.keySet()) {
462        debugMsg += "[" + Bytes.toStringBinary(region) + "], ";
463      }
464      LOG.debug(debugMsg);
465      if (!isDefault) {
466        replicaCalls.incrementAndGet();
467      }
468
469      return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10,
470        RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) {
471        @Override
472        public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
473          int callTimeout) throws IOException, RuntimeException {
474          long sleep = -1;
475          if (isDefault) {
476            Long customSleep = customPrimarySleepMs.get(server);
477            sleep = (customSleep == null ? primarySleepMs : customSleep.longValue());
478          } else {
479            sleep = replicaSleepMs;
480          }
481          if (sleep != 0) {
482            try {
483              Thread.sleep(sleep);
484            } catch (InterruptedException e) {
485              // Restore interrupt status
486              Thread.currentThread().interrupt();
487            }
488          }
489          return mr;
490        }
491      };
492    }
493  }
494
495  static MultiResponse createMultiResponse(final MultiAction multi, AtomicInteger nbMultiResponse,
496    AtomicInteger nbActions, ResponseGenerator gen) {
497    final MultiResponse mr = new MultiResponse();
498    nbMultiResponse.incrementAndGet();
499    for (Map.Entry<byte[], List<Action>> entry : multi.actions.entrySet()) {
500      byte[] regionName = entry.getKey();
501      for (Action a : entry.getValue()) {
502        nbActions.incrementAndGet();
503        gen.addResponse(mr, regionName, a);
504      }
505    }
506    return mr;
507  }
508
509  private static interface ResponseGenerator {
510    void addResponse(final MultiResponse mr, byte[] regionName, Action a);
511  }
512
513  /**
514   * Returns our async process.
515   */
516  static class MyConnectionImpl extends ConnectionImplementation {
517    public static class TestRegistry extends DoNothingConnectionRegistry {
518
519      public TestRegistry(Configuration conf, User user) {
520        super(conf, user);
521      }
522
523      @Override
524      public CompletableFuture<String> getClusterId() {
525        return CompletableFuture.completedFuture("testClusterId");
526      }
527    }
528
529    final AtomicInteger nbThreads = new AtomicInteger(0);
530
531    protected MyConnectionImpl(Configuration conf) throws IOException {
532      super(setupConf(conf), null, null, Collections.emptyMap());
533    }
534
535    private static Configuration setupConf(Configuration conf) {
536      conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, TestRegistry.class,
537        ConnectionRegistry.class);
538      return conf;
539    }
540
541    @Override
542    public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache,
543      boolean retry, int replicaId) throws IOException {
544      return new RegionLocations(loc1);
545    }
546
547    @Override
548    public boolean hasCellBlockSupport() {
549      return false;
550    }
551  }
552
553  /**
554   * Returns our async process.
555   */
556  static class MyConnectionImpl2 extends MyConnectionImpl {
557    List<HRegionLocation> hrl;
558    final boolean usedRegions[];
559
560    protected MyConnectionImpl2(List<HRegionLocation> hrl, Configuration conf) throws IOException {
561      super(conf);
562      this.hrl = hrl;
563      this.usedRegions = new boolean[hrl.size()];
564    }
565
566    @Override
567    public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache,
568      boolean retry, int replicaId) throws IOException {
569      int i = 0;
570      for (HRegionLocation hr : hrl) {
571        if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
572          usedRegions[i] = true;
573          return new RegionLocations(hr);
574        }
575        i++;
576      }
577      return null;
578    }
579  }
580
581  @Test
582  public void testListRowAccess() {
583    int count = 10;
584    List<String> values = new ArrayList<>();
585    for (int i = 0; i != count; ++i) {
586      values.add(String.valueOf(i));
587    }
588
589    ListRowAccess<String> taker = new ListRowAccess<>(values);
590    assertEquals(count, taker.size());
591
592    int takeCount = 0;
593    Iterator<String> it = taker.iterator();
594    while (it.hasNext()) {
595      String v = it.next();
596      assertEquals(String.valueOf(takeCount), v);
597      ++takeCount;
598      it.remove();
599      if (Math.random() >= 0.5) {
600        break;
601      }
602    }
603    assertEquals(count, taker.size() + takeCount);
604
605    it = taker.iterator();
606    while (it.hasNext()) {
607      String v = it.next();
608      assertEquals(String.valueOf(takeCount), v);
609      ++takeCount;
610      it.remove();
611    }
612    assertEquals(0, taker.size());
613    assertEquals(count, takeCount);
614  }
615
616  private static long calculateRequestCount(long putSizePerServer, long maxHeapSizePerRequest) {
617    if (putSizePerServer <= maxHeapSizePerRequest) {
618      return 1;
619    } else if (putSizePerServer % maxHeapSizePerRequest == 0) {
620      return putSizePerServer / maxHeapSizePerRequest;
621    } else {
622      return putSizePerServer / maxHeapSizePerRequest + 1;
623    }
624  }
625
626  @Test
627  public void testSubmitSameSizeOfRequest() throws Exception {
628    long writeBuffer = 2 * 1024 * 1024;
629    long putsHeapSize = writeBuffer;
630    doSubmitRequest(writeBuffer, putsHeapSize);
631  }
632
633  @Test
634  public void testSubmitLargeRequestWithUnlimitedSize() throws Exception {
635    long maxHeapSizePerRequest = Long.MAX_VALUE;
636    long putsHeapSize = 2 * 1024 * 1024;
637    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
638  }
639
640  @Test
641  public void testSubmitRandomSizeRequest() throws Exception {
642    Random rn = new Random();
643    final long limit = 10 * 1024 * 1024;
644    final int requestCount = 1 + (int) (rn.nextDouble() * 3);
645    long n = rn.nextLong();
646    if (n < 0) {
647      n = -n;
648    } else if (n == 0) {
649      n = 1;
650    }
651    long putsHeapSize = n % limit;
652    long maxHeapSizePerRequest = putsHeapSize / requestCount;
653    LOG.info("[testSubmitRandomSizeRequest] maxHeapSizePerRequest=" + maxHeapSizePerRequest
654      + ", putsHeapSize=" + putsHeapSize);
655    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
656  }
657
658  @Test
659  public void testSubmitSmallRequest() throws Exception {
660    long maxHeapSizePerRequest = 2 * 1024 * 1024;
661    long putsHeapSize = 100;
662    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
663  }
664
665  @Test
666  public void testSubmitLargeRequest() throws Exception {
667    long maxHeapSizePerRequest = 2 * 1024 * 1024;
668    long putsHeapSize = maxHeapSizePerRequest * 2;
669    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
670  }
671
672  private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception {
673    ClusterConnection conn = createHConnection();
674    final String defaultClazz =
675      conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
676    final long defaultHeapSizePerRequest =
677      conn.getConfiguration().getLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
678        SimpleRequestController.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
679    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
680      SimpleRequestController.class.getName());
681    conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
682      maxHeapSizePerRequest);
683
684    // sn has two regions
685    long putSizeSN = 0;
686    long putSizeSN2 = 0;
687    List<Put> puts = new ArrayList<>();
688    while ((putSizeSN + putSizeSN2) <= putsHeapSize) {
689      Put put1 = new Put(DUMMY_BYTES_1);
690      put1.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
691      Put put2 = new Put(DUMMY_BYTES_2);
692      put2.addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
693      Put put3 = new Put(DUMMY_BYTES_3);
694      put3.addColumn(DUMMY_BYTES_3, DUMMY_BYTES_3, DUMMY_BYTES_3);
695      putSizeSN += (put1.heapSize() + put2.heapSize());
696      putSizeSN2 += put3.heapSize();
697      puts.add(put1);
698      puts.add(put2);
699      puts.add(put3);
700    }
701
702    int minCountSnRequest = (int) calculateRequestCount(putSizeSN, maxHeapSizePerRequest);
703    int minCountSn2Request = (int) calculateRequestCount(putSizeSN2, maxHeapSizePerRequest);
704    LOG.info("Total put count:" + puts.size() + ", putSizeSN:" + putSizeSN + ", putSizeSN2:"
705      + putSizeSN2 + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest + ", minCountSnRequest:"
706      + minCountSnRequest + ", minCountSn2Request:" + minCountSn2Request);
707
708    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
709    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
710    try (BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap)) {
711      mutator.mutate(puts);
712      mutator.flush();
713      List<AsyncRequestFuture> reqs = ap.allReqs;
714
715      int actualSnReqCount = 0;
716      int actualSn2ReqCount = 0;
717      for (AsyncRequestFuture req : reqs) {
718        if (!(req instanceof AsyncRequestFutureImpl)) {
719          continue;
720        }
721        MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req;
722        if (ars.getRequestHeapSize().containsKey(sn)) {
723          ++actualSnReqCount;
724        }
725        if (ars.getRequestHeapSize().containsKey(sn2)) {
726          ++actualSn2ReqCount;
727        }
728      }
729      // If the server is busy, the actual count may be incremented.
730      assertEquals(true, minCountSnRequest <= actualSnReqCount);
731      assertEquals(true, minCountSn2Request <= actualSn2ReqCount);
732      Map<ServerName, Long> sizePerServers = new HashMap<>();
733      for (AsyncRequestFuture req : reqs) {
734        if (!(req instanceof AsyncRequestFutureImpl)) {
735          continue;
736        }
737        MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req;
738        Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize();
739        for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) {
740          long sum = 0;
741          for (long size : entry.getValue()) {
742            assertEquals(true, size <= maxHeapSizePerRequest);
743            sum += size;
744          }
745          assertEquals(true, sum <= maxHeapSizePerRequest);
746          long value = sizePerServers.getOrDefault(entry.getKey(), 0L);
747          sizePerServers.put(entry.getKey(), value + sum);
748        }
749      }
750      assertEquals(true, sizePerServers.containsKey(sn));
751      assertEquals(true, sizePerServers.containsKey(sn2));
752      assertEquals(false, sizePerServers.containsKey(sn3));
753      assertEquals(putSizeSN, (long) sizePerServers.get(sn));
754      assertEquals(putSizeSN2, (long) sizePerServers.get(sn2));
755    }
756    // restore config.
757    conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
758      defaultHeapSizePerRequest);
759    if (defaultClazz != null) {
760      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
761        defaultClazz);
762    }
763  }
764
765  @Test
766  public void testSubmit() throws Exception {
767    ClusterConnection hc = createHConnection();
768    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
769
770    List<Put> puts = new ArrayList<>(1);
771    puts.add(createPut(1, true));
772
773    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
774    Assert.assertTrue(puts.isEmpty());
775  }
776
777  @Test
778  public void testSubmitWithCB() throws Exception {
779    ClusterConnection hc = createHConnection();
780    final AtomicInteger updateCalled = new AtomicInteger(0);
781    Batch.Callback<Object> cb = new Batch.Callback<Object>() {
782      @Override
783      public void update(byte[] region, byte[] row, Object result) {
784        updateCalled.incrementAndGet();
785      }
786    };
787    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
788
789    List<Put> puts = new ArrayList<>(1);
790    puts.add(createPut(1, true));
791
792    final AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, cb, false);
793    Assert.assertTrue(puts.isEmpty());
794    ars.waitUntilDone();
795    Assert.assertEquals(1, updateCalled.get());
796  }
797
798  @Test
799  public void testSubmitBusyRegion() throws Exception {
800    ClusterConnection conn = createHConnection();
801    final String defaultClazz =
802      conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
803    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
804      SimpleRequestController.class.getName());
805    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
806    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
807    List<Put> puts = new ArrayList<>(1);
808    puts.add(createPut(1, true));
809
810    for (int i = 0; i != controller.maxConcurrentTasksPerRegion; ++i) {
811      ap.incTaskCounters(Collections.singleton(hri1.getRegionName()), sn);
812    }
813    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
814    Assert.assertEquals(puts.size(), 1);
815
816    ap.decTaskCounters(Collections.singleton(hri1.getRegionName()), sn);
817    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
818    Assert.assertEquals(0, puts.size());
819    if (defaultClazz != null) {
820      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
821        defaultClazz);
822    }
823  }
824
825  @Test
826  public void testSubmitBusyRegionServer() throws Exception {
827    ClusterConnection conn = createHConnection();
828    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
829    final String defaultClazz =
830      conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
831    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
832      SimpleRequestController.class.getName());
833    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
834    controller.taskCounterPerServer.put(sn2,
835      new AtomicInteger(controller.maxConcurrentTasksPerServer));
836
837    List<Put> puts = new ArrayList<>(4);
838    puts.add(createPut(1, true));
839    puts.add(createPut(3, true)); // <== this one won't be taken, the rs is busy
840    puts.add(createPut(1, true)); // <== this one will make it, the region is already in
841    puts.add(createPut(2, true)); // <== new region, but the rs is ok
842
843    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
844    Assert.assertEquals(" puts=" + puts, 1, puts.size());
845
846    controller.taskCounterPerServer.put(sn2,
847      new AtomicInteger(controller.maxConcurrentTasksPerServer - 1));
848    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
849    Assert.assertTrue(puts.isEmpty());
850    if (defaultClazz != null) {
851      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
852        defaultClazz);
853    }
854  }
855
856  @Test
857  public void testFail() throws Exception {
858    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
859
860    List<Put> puts = new ArrayList<>(1);
861    Put p = createPut(1, false);
862    puts.add(p);
863
864    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
865    Assert.assertEquals(0, puts.size());
866    ars.waitUntilDone();
867    verifyResult(ars, false);
868    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
869
870    Assert.assertEquals(1, ars.getErrors().exceptions.size());
871    Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
872      failure.equals(ars.getErrors().exceptions.get(0)));
873    Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
874      failure.equals(ars.getErrors().exceptions.get(0)));
875
876    Assert.assertEquals(1, ars.getFailedOperations().size());
877    Assert.assertTrue("was: " + ars.getFailedOperations().get(0),
878      p.equals(ars.getFailedOperations().get(0)));
879  }
880
881  @Test
882  public void testSubmitTrue() throws IOException {
883    ClusterConnection conn = createHConnection();
884    final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
885    final String defaultClazz =
886      conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
887    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
888      SimpleRequestController.class.getName());
889    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
890    controller.tasksInProgress.incrementAndGet();
891    final AtomicInteger ai = new AtomicInteger(controller.maxConcurrentTasksPerRegion);
892    controller.taskCounterPerRegion.put(hri1.getRegionName(), ai);
893
894    final AtomicBoolean checkPoint = new AtomicBoolean(false);
895    final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
896
897    Thread t = new Thread() {
898      @Override
899      public void run() {
900        Threads.sleep(1000);
901        Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent
902        ai.decrementAndGet();
903        controller.tasksInProgress.decrementAndGet();
904        checkPoint2.set(true);
905      }
906    };
907
908    List<Put> puts = new ArrayList<>(1);
909    Put p = createPut(1, true);
910    puts.add(p);
911
912    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
913    Assert.assertFalse(puts.isEmpty());
914
915    t.start();
916
917    ap.submit(null, DUMMY_TABLE, puts, true, null, false);
918    Assert.assertTrue(puts.isEmpty());
919
920    checkPoint.set(true);
921    while (!checkPoint2.get()) {
922      Threads.sleep(1);
923    }
924    if (defaultClazz != null) {
925      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
926        defaultClazz);
927    }
928  }
929
930  @Test
931  public void testFailAndSuccess() throws Exception {
932    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
933
934    List<Put> puts = new ArrayList<>(3);
935    puts.add(createPut(1, false));
936    puts.add(createPut(1, true));
937    puts.add(createPut(1, true));
938
939    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
940    Assert.assertTrue(puts.isEmpty());
941    ars.waitUntilDone();
942    verifyResult(ars, false, true, true);
943    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
944    ap.callsCt.set(0);
945    Assert.assertEquals(1, ars.getErrors().actions.size());
946
947    puts.add(createPut(1, true));
948    // Wait for AP to be free. While ars might have the result, ap counters are decreased later.
949    ap.waitForMaximumCurrentTasks(0, null);
950    ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
951    Assert.assertEquals(0, puts.size());
952    ars.waitUntilDone();
953    Assert.assertEquals(1, ap.callsCt.get());
954    verifyResult(ars, true);
955  }
956
957  @Test
958  public void testFlush() throws Exception {
959    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
960
961    List<Put> puts = new ArrayList<>(3);
962    puts.add(createPut(1, false));
963    puts.add(createPut(1, true));
964    puts.add(createPut(1, true));
965
966    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
967    ars.waitUntilDone();
968    verifyResult(ars, false, true, true);
969    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
970
971    Assert.assertEquals(1, ars.getFailedOperations().size());
972  }
973
974  @Test
975  public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException {
976    ClusterConnection hc = createHConnection();
977    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
978    testTaskCount(ap);
979  }
980
981  @Test
982  public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException {
983    Configuration copyConf = new Configuration(CONF);
984    copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
985    MyClientBackoffPolicy bp = new MyClientBackoffPolicy();
986    ClusterConnection conn = createHConnection();
987    Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
988    Mockito.when(conn.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf));
989    Mockito.when(conn.getBackoffPolicy()).thenReturn(bp);
990    final String defaultClazz =
991      conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
992    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
993      SimpleRequestController.class.getName());
994    MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
995    testTaskCount(ap);
996    if (defaultClazz != null) {
997      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
998        defaultClazz);
999    }
1000  }
1001
1002  private void testTaskCount(MyAsyncProcess ap)
1003    throws InterruptedIOException, InterruptedException {
1004    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
1005    List<Put> puts = new ArrayList<>();
1006    for (int i = 0; i != 3; ++i) {
1007      puts.add(createPut(1, true));
1008      puts.add(createPut(2, true));
1009      puts.add(createPut(3, true));
1010    }
1011    ap.submit(null, DUMMY_TABLE, puts, true, null, false);
1012    ap.waitForMaximumCurrentTasks(0, null);
1013    // More time to wait if there are incorrect task count.
1014    TimeUnit.SECONDS.sleep(1);
1015    assertEquals(0, controller.tasksInProgress.get());
1016    for (AtomicInteger count : controller.taskCounterPerRegion.values()) {
1017      assertEquals(0, count.get());
1018    }
1019    for (AtomicInteger count : controller.taskCounterPerServer.values()) {
1020      assertEquals(0, count.get());
1021    }
1022  }
1023
1024  @Test
1025  public void testMaxTask() throws Exception {
1026    ClusterConnection conn = createHConnection();
1027    final String defaultClazz =
1028      conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
1029    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
1030      SimpleRequestController.class.getName());
1031    final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1032    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
1033
1034    for (int i = 0; i < 1000; i++) {
1035      ap.incTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn);
1036    }
1037
1038    final Thread myThread = Thread.currentThread();
1039
1040    Thread t = new Thread() {
1041      @Override
1042      public void run() {
1043        Threads.sleep(2000);
1044        myThread.interrupt();
1045      }
1046    };
1047
1048    List<Put> puts = new ArrayList<>(1);
1049    puts.add(createPut(1, true));
1050
1051    t.start();
1052
1053    try {
1054      ap.submit(null, DUMMY_TABLE, puts, false, null, false);
1055      Assert.fail("We should have been interrupted.");
1056    } catch (InterruptedIOException expected) {
1057    }
1058
1059    final long sleepTime = 2000;
1060
1061    Thread t2 = new Thread() {
1062      @Override
1063      public void run() {
1064        Threads.sleep(sleepTime);
1065        while (controller.tasksInProgress.get() > 0) {
1066          ap.decTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn);
1067        }
1068      }
1069    };
1070    t2.start();
1071
1072    long start = EnvironmentEdgeManager.currentTime();
1073    ap.submit(null, DUMMY_TABLE, new ArrayList<>(), false, null, false);
1074    long end = EnvironmentEdgeManager.currentTime();
1075
1076    // Adds 100 to secure us against approximate timing.
1077    Assert.assertTrue(start + 100L + sleepTime > end);
1078    if (defaultClazz != null) {
1079      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
1080        defaultClazz);
1081    }
1082  }
1083
1084  private ClusterConnection createHConnection() throws IOException {
1085    return createHConnection(CONNECTION_CONFIG);
1086  }
1087
1088  private ClusterConnection createHConnection(ConnectionConfiguration configuration)
1089    throws IOException {
1090    ClusterConnection hc = createHConnectionCommon(configuration);
1091    setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1));
1092    setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2));
1093    setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3));
1094    Mockito
1095      .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean()))
1096      .thenReturn(Arrays.asList(loc1, loc2, loc3));
1097    setMockLocation(hc, FAILS, new RegionLocations(loc2));
1098    return hc;
1099  }
1100
1101  private ClusterConnection createHConnectionWithReplicas(ConnectionConfiguration configuration)
1102    throws IOException {
1103    ClusterConnection hc = createHConnectionCommon(configuration);
1104    setMockLocation(hc, DUMMY_BYTES_1, hrls1);
1105    setMockLocation(hc, DUMMY_BYTES_2, hrls2);
1106    setMockLocation(hc, DUMMY_BYTES_3, hrls3);
1107    List<HRegionLocation> locations = new ArrayList<>();
1108    for (HRegionLocation loc : hrls1.getRegionLocations()) {
1109      locations.add(loc);
1110    }
1111    for (HRegionLocation loc : hrls2.getRegionLocations()) {
1112      locations.add(loc);
1113    }
1114    for (HRegionLocation loc : hrls3.getRegionLocations()) {
1115      locations.add(loc);
1116    }
1117    Mockito
1118      .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean()))
1119      .thenReturn(locations);
1120    return hc;
1121  }
1122
1123  private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result)
1124    throws IOException {
1125    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
1126      Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
1127    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
1128      Mockito.anyBoolean())).thenReturn(result);
1129  }
1130
1131  private ClusterConnection
1132    createHConnectionCommon(ConnectionConfiguration connectionConfiguration) {
1133    ClusterConnection hc = Mockito.mock(ClusterConnection.class);
1134    NonceGenerator ng = Mockito.mock(NonceGenerator.class);
1135    Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
1136    Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
1137    Mockito.when(hc.getConfiguration()).thenReturn(CONF);
1138    Mockito.when(hc.getConnectionConfiguration()).thenReturn(connectionConfiguration);
1139    return hc;
1140  }
1141
1142  @Test
1143  public void testHTablePutSuccess() throws Exception {
1144    ClusterConnection conn = createHConnection();
1145    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1146    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1147    BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
1148
1149    Put put = createPut(1, true);
1150
1151    Assert.assertEquals(conn.getConnectionConfiguration().getWriteBufferSize(),
1152      ht.getWriteBufferSize());
1153    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1154    ht.mutate(put);
1155    ht.flush();
1156    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1157  }
1158
1159  @Test
1160  public void testSettingWriteBufferPeriodicFlushParameters() throws Exception {
1161    ClusterConnection conn = createHConnection();
1162    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1163
1164    checkPeriodicFlushParameters(conn, ap, 1234, 1234, 1234, 1234);
1165    checkPeriodicFlushParameters(conn, ap, 0, 0, 0,
1166      BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
1167    checkPeriodicFlushParameters(conn, ap, -1234, 0, -1234,
1168      BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
1169    checkPeriodicFlushParameters(conn, ap, 1, 1, 1,
1170      BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
1171  }
1172
1173  private void checkPeriodicFlushParameters(ClusterConnection conn, MyAsyncProcess ap, long setTO,
1174    long expectTO, long setTT, long expectTT) {
1175    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1176
1177    // The BufferedMutatorParams does nothing with the value
1178    bufferParam.setWriteBufferPeriodicFlushTimeoutMs(setTO);
1179    bufferParam.setWriteBufferPeriodicFlushTimerTickMs(setTT);
1180    Assert.assertEquals(setTO, bufferParam.getWriteBufferPeriodicFlushTimeoutMs());
1181    Assert.assertEquals(setTT, bufferParam.getWriteBufferPeriodicFlushTimerTickMs());
1182
1183    // The BufferedMutatorImpl corrects illegal values (indirect via BufferedMutatorParams)
1184    BufferedMutatorImpl ht1 = new BufferedMutatorImpl(conn, bufferParam, ap);
1185    Assert.assertEquals(expectTO, ht1.getWriteBufferPeriodicFlushTimeoutMs());
1186    Assert.assertEquals(expectTT, ht1.getWriteBufferPeriodicFlushTimerTickMs());
1187
1188    // The BufferedMutatorImpl corrects illegal values (direct via setter)
1189    BufferedMutatorImpl ht2 =
1190      new BufferedMutatorImpl(conn, createBufferedMutatorParams(ap, DUMMY_TABLE), ap);
1191    ht2.setWriteBufferPeriodicFlush(setTO, setTT);
1192    Assert.assertEquals(expectTO, ht2.getWriteBufferPeriodicFlushTimeoutMs());
1193    Assert.assertEquals(expectTT, ht2.getWriteBufferPeriodicFlushTimerTickMs());
1194
1195  }
1196
1197  @Test
1198  public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception {
1199    ClusterConnection conn = createHConnection();
1200    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1201    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1202
1203    bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1); // Flush ASAP
1204    bufferParam.setWriteBufferPeriodicFlushTimerTickMs(1); // Check every 100ms
1205    bufferParam.writeBufferSize(10000); // Write buffer set to much larger than the single record
1206
1207    BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
1208
1209    // Verify if BufferedMutator has the right settings.
1210    Assert.assertEquals(10000, ht.getWriteBufferSize());
1211    Assert.assertEquals(1, ht.getWriteBufferPeriodicFlushTimeoutMs());
1212    Assert.assertEquals(BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS,
1213      ht.getWriteBufferPeriodicFlushTimerTickMs());
1214
1215    Put put = createPut(1, true);
1216
1217    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
1218    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1219
1220    // ----- Insert, flush immediately, MUST NOT flush automatically
1221    ht.mutate(put);
1222    ht.flush();
1223
1224    Thread.sleep(1000);
1225    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
1226    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1227
1228    // ----- Insert, NO flush, MUST flush automatically
1229    ht.mutate(put);
1230    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
1231    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
1232
1233    // The timerTick should fire every 100ms, so after twice that we must have
1234    // seen at least 1 tick and we should see an automatic flush
1235    Thread.sleep(200);
1236    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1237    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1238
1239    // Ensure it does not flush twice
1240    Thread.sleep(200);
1241    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1242    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1243
1244    // ----- DISABLE AUTO FLUSH, Insert, NO flush, MUST NOT flush automatically
1245    ht.disableWriteBufferPeriodicFlush();
1246    ht.mutate(put);
1247    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1248    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
1249
1250    // Wait for at least 1 timerTick, we should see NO flushes.
1251    Thread.sleep(200);
1252    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1253    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
1254
1255    // Reenable periodic flushing, a flush seems to take about 1 second
1256    // so we wait for 2 seconds and it should have finished the flush.
1257    ht.setWriteBufferPeriodicFlush(1, 100);
1258    Thread.sleep(2000);
1259    Assert.assertEquals(2, ht.getExecutedWriteBufferPeriodicFlushes());
1260    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1261  }
1262
1263  @Test
1264  public void testBufferedMutatorImplWithSharedPool() throws Exception {
1265    ClusterConnection conn = createHConnection();
1266    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1267    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1268    BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap);
1269
1270    ht.close();
1271    assertFalse(ap.service.isShutdown());
1272  }
1273
1274  @Test
1275  public void testFailedPutAndNewPut() throws Exception {
1276    ClusterConnection conn = createHConnection();
1277    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1278    BufferedMutatorParams bufferParam =
1279      createBufferedMutatorParams(ap, DUMMY_TABLE).writeBufferSize(0);
1280    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1281
1282    Put p = createPut(1, false);
1283    try {
1284      mutator.mutate(p);
1285      Assert.fail();
1286    } catch (RetriesExhaustedWithDetailsException expected) {
1287      assertEquals(1, expected.getNumExceptions());
1288      assertTrue(expected.getRow(0) == p);
1289    }
1290    // Let's do all the retries.
1291    ap.waitForMaximumCurrentTasks(0, null);
1292    Assert.assertEquals(0, mutator.size());
1293
1294    // There is no global error so the new put should not fail
1295    mutator.mutate(createPut(1, true));
1296    Assert.assertEquals("the put should not been inserted.", 0, mutator.size());
1297  }
1298
1299  @SuppressWarnings("SelfComparison")
1300  @Test
1301  public void testAction() {
1302    Action action_0 = new Action(new Put(Bytes.toBytes("abc")), 10);
1303    Action action_1 = new Action(new Put(Bytes.toBytes("ccc")), 10);
1304    Action action_2 = new Action(new Put(Bytes.toBytes("ccc")), 10);
1305    Action action_3 = new Action(new Delete(Bytes.toBytes("ccc")), 10);
1306    assertFalse(action_0.equals(action_1));
1307    assertTrue(action_0.equals(action_0));
1308    assertTrue(action_1.equals(action_2));
1309    assertTrue(action_2.equals(action_1));
1310    assertFalse(action_0.equals(new Put(Bytes.toBytes("abc"))));
1311    assertTrue(action_2.equals(action_3));
1312    assertFalse(action_0.equals(action_3));
1313    assertEquals(0, action_0.compareTo(action_0));
1314    assertTrue(action_0.compareTo(action_1) < 0);
1315    assertTrue(action_1.compareTo(action_0) > 0);
1316    assertEquals(0, action_1.compareTo(action_2));
1317  }
1318
1319  @Test
1320  public void testBatch() throws IOException, InterruptedException {
1321    ClusterConnection conn = new MyConnectionImpl(CONF);
1322    HTable ht = (HTable) conn.getTable(DUMMY_TABLE);
1323    ht.multiAp = new MyAsyncProcess(conn, CONF);
1324
1325    List<Put> puts = new ArrayList<>(7);
1326    puts.add(createPut(1, true));
1327    puts.add(createPut(1, true));
1328    puts.add(createPut(1, true));
1329    puts.add(createPut(1, true));
1330    puts.add(createPut(1, false)); // <=== the bad apple, position 4
1331    puts.add(createPut(1, true));
1332    puts.add(createPut(1, false)); // <=== another bad apple, position 6
1333
1334    Object[] res = new Object[puts.size()];
1335    try {
1336      ht.batch(puts, res);
1337      Assert.fail();
1338    } catch (RetriesExhaustedException expected) {
1339    }
1340
1341    Assert.assertEquals(success, res[0]);
1342    Assert.assertEquals(success, res[1]);
1343    Assert.assertEquals(success, res[2]);
1344    Assert.assertEquals(success, res[3]);
1345    Assert.assertEquals(failure, res[4]);
1346    Assert.assertEquals(success, res[5]);
1347    Assert.assertEquals(failure, res[6]);
1348  }
1349
1350  @Test
1351  public void testErrorsServers() throws IOException {
1352    Configuration configuration = new Configuration(CONF);
1353    ClusterConnection conn = new MyConnectionImpl(configuration);
1354    MyAsyncProcess ap = new MyAsyncProcess(conn, configuration);
1355    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1356    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1357    configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true);
1358
1359    Assert.assertNotNull(ap.createServerErrorTracker());
1360    Assert.assertTrue(ap.serverTrackerTimeout > 200L);
1361    ap.serverTrackerTimeout = 1L;
1362
1363    Put p = createPut(1, false);
1364    mutator.mutate(p);
1365
1366    try {
1367      mutator.flush();
1368      Assert.fail();
1369    } catch (RetriesExhaustedWithDetailsException expected) {
1370      assertEquals(1, expected.getNumExceptions());
1371      assertTrue(expected.getRow(0) == p);
1372    }
1373    // Checking that the ErrorsServers came into play and didn't make us stop immediately
1374    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
1375  }
1376
1377  @Test
1378  public void testReadAndWriteTimeout() throws IOException {
1379    final long readTimeout = 10 * 1000;
1380    final long writeTimeout = 20 * 1000;
1381    Configuration copyConf = new Configuration(CONF);
1382    copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout);
1383    copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout);
1384    ClusterConnection conn = new MyConnectionImpl(copyConf);
1385    MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
1386    try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) {
1387      ht.multiAp = ap;
1388      List<Get> gets = new ArrayList<>();
1389      gets.add(new Get(DUMMY_BYTES_1));
1390      gets.add(new Get(DUMMY_BYTES_2));
1391      try {
1392        ht.get(gets);
1393      } catch (ClassCastException e) {
1394        // No result response on this test.
1395      }
1396      assertEquals(readTimeout, ap.previousTimeout);
1397      ap.previousTimeout = -1;
1398
1399      try {
1400        ht.existsAll(gets);
1401      } catch (ClassCastException e) {
1402        // No result response on this test.
1403      }
1404      assertEquals(readTimeout, ap.previousTimeout);
1405      ap.previousTimeout = -1;
1406
1407      List<Delete> deletes = new ArrayList<>();
1408      deletes.add(new Delete(DUMMY_BYTES_1));
1409      deletes.add(new Delete(DUMMY_BYTES_2));
1410      ht.delete(deletes);
1411      assertEquals(writeTimeout, ap.previousTimeout);
1412    }
1413  }
1414
1415  @Test
1416  public void testErrors() throws IOException {
1417    ClusterConnection conn = new MyConnectionImpl(CONF);
1418    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test"));
1419    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1420    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1421
1422    Assert.assertNotNull(ap.createServerErrorTracker());
1423
1424    Put p = createPut(1, true);
1425    mutator.mutate(p);
1426
1427    try {
1428      mutator.flush();
1429      Assert.fail();
1430    } catch (RetriesExhaustedWithDetailsException expected) {
1431      assertEquals(1, expected.getNumExceptions());
1432      assertTrue(expected.getRow(0) == p);
1433    }
1434    // Checking that the ErrorsServers came into play and didn't make us stop immediately
1435    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
1436  }
1437
1438  @Test
1439  public void testCallQueueTooLarge() throws IOException {
1440    ClusterConnection conn = new MyConnectionImpl(CONF);
1441    AsyncProcessWithFailure ap =
1442      new AsyncProcessWithFailure(conn, CONF, new CallQueueTooBigException());
1443    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1444    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1445    Assert.assertNotNull(ap.createServerErrorTracker());
1446    Put p = createPut(1, true);
1447    mutator.mutate(p);
1448
1449    try {
1450      mutator.flush();
1451      Assert.fail();
1452    } catch (RetriesExhaustedWithDetailsException expected) {
1453      assertEquals(1, expected.getNumExceptions());
1454      assertTrue(expected.getRow(0) == p);
1455    }
1456    // Checking that the ErrorsServers came into play and didn't make us stop immediately
1457    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
1458  }
1459
1460  /**
1461   * This test simulates multiple regions on 2 servers. We should have 2 multi requests and 2
1462   * threads: 1 per server, this whatever the number of regions.
1463   */
1464  @Test
1465  public void testThreadCreation() throws Exception {
1466    final int NB_REGS = 100;
1467    List<HRegionLocation> hrls = new ArrayList<>(NB_REGS);
1468    List<Get> gets = new ArrayList<>(NB_REGS);
1469    for (int i = 0; i < NB_REGS; i++) {
1470      HRegionInfo hri =
1471        new HRegionInfo(DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i);
1472      HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
1473      hrls.add(hrl);
1474
1475      Get get = new Get(Bytes.toBytes(i * 10L));
1476      gets.add(get);
1477    }
1478
1479    MyConnectionImpl2 con = new MyConnectionImpl2(hrls, CONF);
1480    MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads);
1481    HTable ht = (HTable) con.getTable(DUMMY_TABLE, ap.service);
1482    ht.multiAp = ap;
1483    ht.batch(gets, null);
1484
1485    Assert.assertEquals(NB_REGS, ap.nbActions.get());
1486    Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get());
1487    Assert.assertEquals("1 thread per server", 2, con.nbThreads.get());
1488
1489    int nbReg = 0;
1490    for (int i = 0; i < NB_REGS; i++) {
1491      if (con.usedRegions[i]) nbReg++;
1492    }
1493    Assert.assertEquals("nbReg=" + nbReg, NB_REGS, nbReg);
1494  }
1495
1496  @Test
1497  public void testReplicaReplicaSuccess() throws Exception {
1498    // Main call takes too long so replicas succeed, except for one region w/o replicas.
1499    // One region has no replica, so the main call succeeds for it.
1500    MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
1501    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
1502    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service)
1503      .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE)
1504      .setRowAccess(rows).setResults(new Object[3]).setSubmittedRows(SubmittedRows.ALL).build();
1505    AsyncRequestFuture ars = ap.submit(task);
1506    verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
1507    Assert.assertEquals(2, ap.getReplicaCallCount());
1508  }
1509
1510  @Test
1511  public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception {
1512    // Main call succeeds before replica calls are kicked off.
1513    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
1514    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
1515    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service)
1516      .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE)
1517      .setRowAccess(rows).setResults(new Object[3]).setSubmittedRows(SubmittedRows.ALL).build();
1518    AsyncRequestFuture ars = ap.submit(task);
1519    verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
1520    Assert.assertEquals(0, ap.getReplicaCallCount());
1521  }
1522
1523  @Test
1524  public void testReplicaParallelCallsSucceed() throws Exception {
1525    // Either main or replica can succeed.
1526    MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
1527    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1528    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service)
1529      .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE)
1530      .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build();
1531    AsyncRequestFuture ars = ap.submit(task);
1532    verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
1533    long replicaCalls = ap.getReplicaCallCount();
1534    Assert.assertTrue(replicaCalls >= 0);
1535    Assert.assertTrue(replicaCalls <= 2);
1536  }
1537
1538  @Test
1539  public void testReplicaPartialReplicaCall() throws Exception {
1540    // One server is slow, so the result for its region comes from replica, whereas
1541    // the result for other region comes from primary before replica calls happen.
1542    // There should be no replica call for that region at all.
1543    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
1544    ap.setPrimaryCallDelay(sn2, 2000);
1545    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1546    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service)
1547      .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE)
1548      .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build();
1549    AsyncRequestFuture ars = ap.submit(task);
1550    verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
1551    Assert.assertEquals(1, ap.getReplicaCallCount());
1552  }
1553
1554  @Test
1555  public void testReplicaMainFailsBeforeReplicaCalls() throws Exception {
1556    // Main calls fail before replica calls can start - this is currently not handled.
1557    // It would probably never happen if we can get location (due to retries),
1558    // and it would require additional synchronization.
1559    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0);
1560    ap.addFailures(hri1, hri2);
1561    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1562    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service)
1563      .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE)
1564      .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build();
1565    AsyncRequestFuture ars = ap.submit(task);
1566    verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
1567    Assert.assertEquals(0, ap.getReplicaCallCount());
1568  }
1569
1570  @Test
1571  public void testReplicaReplicaSuccessWithParallelFailures() throws Exception {
1572    // Main calls fails after replica calls start. For two-replica region, one replica call
1573    // also fails. Regardless, we get replica results for both regions.
1574    MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0);
1575    ap.addFailures(hri1, hri1r2, hri2);
1576    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1577    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service)
1578      .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE)
1579      .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build();
1580    AsyncRequestFuture ars = ap.submit(task);
1581    verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
1582    Assert.assertEquals(2, ap.getReplicaCallCount());
1583  }
1584
1585  @Test
1586  public void testReplicaAllCallsFailForOneRegion() throws Exception {
1587    // For one of the region, all 3, main and replica, calls fail. For the other, replica
1588    // call fails but its exception should not be visible as it did succeed.
1589    MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0);
1590    ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
1591    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1592    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service)
1593      .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE)
1594      .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build();
1595    AsyncRequestFuture ars = ap.submit(task);
1596    verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
1597    // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1
1598    Assert.assertEquals(3, ars.getErrors().getNumExceptions());
1599    for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) {
1600      Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow());
1601    }
1602  }
1603
1604  private MyAsyncProcessWithReplicas createReplicaAp(int replicaAfterMs, int primaryMs,
1605    int replicaMs) throws Exception {
1606    return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1);
1607  }
1608
1609  private MyAsyncProcessWithReplicas createReplicaAp(int replicaAfterMs, int primaryMs,
1610    int replicaMs, int retries) throws Exception {
1611    // TODO: this is kind of timing dependent... perhaps it should detect from createCaller
1612    // that the replica call has happened and that way control the ordering.
1613    Configuration conf = new Configuration();
1614    conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000);
1615    if (retries >= 0) {
1616      conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
1617    }
1618    ClusterConnection conn = createHConnectionWithReplicas(new ConnectionConfiguration(conf));
1619    MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf);
1620    ap.setCallDelays(primaryMs, replicaMs);
1621    return ap;
1622  }
1623
1624  private BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap, TableName name) {
1625    return new BufferedMutatorParams(name).pool(ap.service).rpcTimeout(RPC_TIMEOUT)
1626      .opertationTimeout(OPERATION_TIMEOUT);
1627  }
1628
1629  private static List<Get> makeTimelineGets(byte[]... rows) {
1630    List<Get> result = new ArrayList<>(rows.length);
1631    for (byte[] row : rows) {
1632      Get get = new Get(row);
1633      get.setConsistency(Consistency.TIMELINE);
1634      result.add(get);
1635    }
1636    return result;
1637  }
1638
1639  private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception {
1640    Object[] actual = ars.getResults();
1641    Assert.assertEquals(expected.length, actual.length);
1642    for (int i = 0; i < expected.length; ++i) {
1643      Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable));
1644    }
1645  }
1646
1647  /** After reading TheDailyWtf, I always wanted to create a MyBoolean enum like this! */
1648  private enum RR {
1649    TRUE,
1650    FALSE,
1651    DONT_CARE,
1652    FAILED
1653  }
1654
1655  private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception {
1656    Object[] actuals = ars.getResults();
1657    Assert.assertEquals(expecteds.length, actuals.length);
1658    for (int i = 0; i < expecteds.length; ++i) {
1659      Object actual = actuals[i];
1660      RR expected = expecteds[i];
1661      Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable);
1662      if (expected != RR.FAILED && expected != RR.DONT_CARE) {
1663        Assert.assertEquals(expected == RR.TRUE, ((Result) actual).isStale());
1664      }
1665    }
1666  }
1667
1668  /**
1669   * @param regCnt  the region: 1 to 3.
1670   * @param success if true, the put will succeed.
1671   * @return a put
1672   */
1673  private Put createPut(int regCnt, boolean success) {
1674    Put p;
1675    if (!success) {
1676      p = new Put(FAILS);
1677    } else switch (regCnt) {
1678      case 1:
1679        p = new Put(DUMMY_BYTES_1);
1680        break;
1681      case 2:
1682        p = new Put(DUMMY_BYTES_2);
1683        break;
1684      case 3:
1685        p = new Put(DUMMY_BYTES_3);
1686        break;
1687      default:
1688        throw new IllegalArgumentException("unknown " + regCnt);
1689    }
1690
1691    p.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
1692
1693    return p;
1694  }
1695
1696  static class MyThreadPoolExecutor extends ThreadPoolExecutor {
1697    public MyThreadPoolExecutor(int coreThreads, int maxThreads, long keepAliveTime,
1698      TimeUnit timeunit, BlockingQueue<Runnable> blockingqueue) {
1699      super(coreThreads, maxThreads, keepAliveTime, timeunit, blockingqueue);
1700    }
1701
1702    @Override
1703    public Future submit(Runnable runnable) {
1704      throw new OutOfMemoryError("OutOfMemory error thrown by means");
1705    }
1706  }
1707
1708  static class AsyncProcessForThrowableCheck extends AsyncProcess {
1709    public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
1710      super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
1711        new RpcControllerFactory(conf));
1712    }
1713  }
1714
1715  @Test
1716  public void testUncheckedException() throws Exception {
1717    // Test the case pool.submit throws unchecked exception
1718    ClusterConnection hc = createHConnection();
1719    MyThreadPoolExecutor myPool =
1720      new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200));
1721    AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, CONF);
1722
1723    List<Put> puts = new ArrayList<>(1);
1724    puts.add(createPut(1, true));
1725    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(myPool).setRpcTimeout(RPC_TIMEOUT)
1726      .setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE).setRowAccess(puts)
1727      .setSubmittedRows(SubmittedRows.NORMAL).build();
1728    ap.submit(task);
1729    Assert.assertTrue(puts.isEmpty());
1730  }
1731
1732  /**
1733   * Below tests make sure we could use a special pause setting when retry an exception where
1734   * {@link HBaseServerException#isServerOverloaded(Throwable)} is true, see HBASE-17114
1735   */
1736
1737  @Test
1738  public void testRetryPauseWhenServerOverloadedDueToCQTBE() throws Exception {
1739    testRetryPauseWhenServerIsOverloaded(new CallQueueTooBigException());
1740  }
1741
1742  @Test
1743  public void testRetryPauseWhenServerOverloadedDueToCDE() throws Exception {
1744    testRetryPauseWhenServerIsOverloaded(new CallDroppedException());
1745  }
1746
1747  @Test
1748  public void testRetryPauseForRpcThrottling() throws IOException {
1749    long waitInterval = 500L;
1750    testRetryPause(new Configuration(CONF), waitInterval, new RpcThrottlingException(
1751      RpcThrottlingException.Type.NumReadRequestsExceeded, waitInterval, "For test"));
1752  }
1753
1754  private void testRetryPauseWhenServerIsOverloaded(HBaseServerException exception)
1755    throws IOException {
1756    Configuration testConf = new Configuration(CONF);
1757    long specialPause = 500L;
1758    testConf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
1759      specialPause);
1760    testRetryPause(testConf, specialPause, exception);
1761  }
1762
1763  private void testRetryPause(Configuration testConf, long expectedPause,
1764    HBaseIOException exception) throws IOException {
1765
1766    final int retries = 1;
1767    testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
1768
1769    ClusterConnection conn = new MyConnectionImpl(testConf);
1770    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, testConf, exception);
1771    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1772    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1773
1774    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
1775
1776    Put p = createPut(1, true);
1777    mutator.mutate(p);
1778
1779    long startTime = EnvironmentEdgeManager.currentTime();
1780    try {
1781      mutator.flush();
1782      Assert.fail();
1783    } catch (RetriesExhaustedWithDetailsException expected) {
1784      assertEquals(1, expected.getNumExceptions());
1785      assertTrue(expected.getRow(0) == p);
1786    }
1787    long actualSleep = EnvironmentEdgeManager.currentTime() - startTime;
1788    long expectedSleep = 0L;
1789    for (int i = 0; i < retries; i++) {
1790      expectedSleep += ConnectionUtils.getPauseTime(expectedPause, i);
1791      // Prevent jitter in ConcurrentMapUtils#getPauseTime to affect result
1792      actualSleep += (long) (expectedPause * 0.01f);
1793    }
1794    LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
1795    Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually " + actualSleep + "ms",
1796      actualSleep >= expectedSleep);
1797
1798    // check and confirm normal IOE will use the normal pause
1799    final long normalPause =
1800      testConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1801    ap = new AsyncProcessWithFailure(conn, testConf, new IOException());
1802    bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1803    mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1804    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
1805    mutator.mutate(p);
1806    startTime = EnvironmentEdgeManager.currentTime();
1807    try {
1808      mutator.flush();
1809      Assert.fail();
1810    } catch (RetriesExhaustedWithDetailsException expected) {
1811      assertEquals(1, expected.getNumExceptions());
1812      assertTrue(expected.getRow(0) == p);
1813    }
1814    actualSleep = EnvironmentEdgeManager.currentTime() - startTime;
1815    expectedSleep = 0L;
1816    for (int i = 0; i < retries; i++) {
1817      expectedSleep += ConnectionUtils.getPauseTime(normalPause, i);
1818    }
1819    // plus an additional pause to balance the program execution time
1820    expectedSleep += normalPause;
1821    LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
1822    Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep);
1823  }
1824
1825  @Test
1826  public void testFastFailIfBackoffGreaterThanRemaining() throws IOException {
1827    Configuration testConf = new Configuration(CONF);
1828    testConf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 100);
1829    long waitInterval = 500L;
1830    HBaseIOException exception = new RpcThrottlingException(
1831      RpcThrottlingException.Type.NumReadRequestsExceeded, waitInterval, "For test");
1832
1833    final int retries = 1;
1834    testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
1835
1836    ClusterConnection conn = new MyConnectionImpl(testConf);
1837    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, testConf, exception);
1838    BufferedMutatorParams bufferParam =
1839      createBufferedMutatorParams(ap, DUMMY_TABLE).operationTimeout(100);
1840    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1841
1842    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
1843
1844    Put p = createPut(1, true);
1845    mutator.mutate(p);
1846
1847    try {
1848      mutator.flush();
1849      Assert.fail();
1850    } catch (RetriesExhaustedWithDetailsException expected) {
1851      assertEquals(1, expected.getNumExceptions());
1852      assertTrue(expected.getCause(0) instanceof OperationTimeoutExceededException);
1853      assertTrue(expected.getCause(0).getMessage().startsWith("Backoff"));
1854    }
1855  }
1856
1857  /**
1858   * Tests that we properly recover from exceptions that DO NOT go through receiveGlobalFailure, due
1859   * to updating the meta cache for the region which failed. Successful multigets can include region
1860   * exceptions in the MultiResponse. In that case, it skips receiveGlobalFailure and instead
1861   * handles in receiveMultiAction
1862   */
1863  @Test
1864  public void testRetryWithExceptionClearsMetaCacheUsingRegionException() throws Exception {
1865    testRetryWithExceptionClearsMetaCache(true);
1866  }
1867
1868  /**
1869   * Tests that we properly recover from exceptions that go through receiveGlobalFailure, due to
1870   * updating the meta cache for the region which failed.
1871   */
1872  @Test
1873  public void testRetryWithExceptionClearsMetaCacheUsingServerException() throws Exception {
1874    testRetryWithExceptionClearsMetaCache(false);
1875  }
1876
1877  private void testRetryWithExceptionClearsMetaCache(boolean useRegionException)
1878    throws IOException {
1879    Configuration myConf = new Configuration(CONF);
1880    myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
1881    ClusterConnection conn = createHConnection(new ConnectionConfiguration(myConf));
1882
1883    // we pass in loc1.getServerName here so that only calls to that server will fail
1884    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, myConf,
1885      new RegionOpeningException("test"), loc1.getServerName(), useRegionException);
1886    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1887    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1888
1889    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
1890
1891    Assert.assertEquals(conn.locateRegion(DUMMY_TABLE, DUMMY_BYTES_1, true, true).toString(),
1892      new RegionLocations(loc1).toString());
1893
1894    // simulate updateCachedLocations, by changing the loc for this row to loc3. only loc1 fails,
1895    // so this means retry will succeed
1896    Mockito.doAnswer(invocation -> {
1897      setMockLocation(conn, DUMMY_BYTES_1, new RegionLocations(loc3));
1898      return null;
1899    }).when(conn).updateCachedLocations(Mockito.eq(DUMMY_TABLE),
1900      Mockito.eq(loc1.getRegion().getRegionName()), Mockito.eq(DUMMY_BYTES_1), Mockito.any(),
1901      Mockito.eq(loc1.getServerName()));
1902
1903    // Ensure we haven't called updateCachedLocations yet
1904    Mockito.verify(conn, Mockito.times(0)).updateCachedLocations(Mockito.any(), Mockito.any(),
1905      Mockito.any(), Mockito.any(), Mockito.any());
1906
1907    Put p = createPut(1, true);
1908    mutator.mutate(p);
1909
1910    // we expect this to succeed because the bad region location should be updated upon
1911    // the initial failure causing retries to succeed.
1912    mutator.flush();
1913
1914    // validate that we updated the location, as we expected
1915    Assert.assertEquals(conn.locateRegion(DUMMY_TABLE, DUMMY_BYTES_1, true, true).toString(),
1916      new RegionLocations(loc3).toString());
1917    // this is a given since the location updated, but validate that we called updateCachedLocations
1918    Mockito.verify(conn, Mockito.atLeastOnce()).updateCachedLocations(Mockito.eq(DUMMY_TABLE),
1919      Mockito.eq(loc1.getRegion().getRegionName()), Mockito.eq(DUMMY_BYTES_1), Mockito.any(),
1920      Mockito.eq(loc1.getServerName()));
1921  }
1922
1923  @Test
1924  public void testQueueRowAccess() throws Exception {
1925    ClusterConnection conn = createHConnection();
1926    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
1927      new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000));
1928    Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
1929    Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
1930    mutator.mutate(p0);
1931    BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess();
1932    // QueueRowAccess should take all undealt mutations
1933    assertEquals(0, mutator.size());
1934    mutator.mutate(p1);
1935    assertEquals(1, mutator.size());
1936    BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess();
1937    // QueueRowAccess should take all undealt mutations
1938    assertEquals(0, mutator.size());
1939    assertEquals(1, ra0.size());
1940    assertEquals(1, ra1.size());
1941    Iterator<Row> iter0 = ra0.iterator();
1942    Iterator<Row> iter1 = ra1.iterator();
1943    assertTrue(iter0.hasNext());
1944    assertTrue(iter1.hasNext());
1945    // the next() will poll the mutation from inner buffer and update the buffer count
1946    assertTrue(iter0.next() == p0);
1947    assertEquals(1, mutator.getUnflushedSize());
1948    assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
1949    assertTrue(iter1.next() == p1);
1950    assertEquals(0, mutator.getUnflushedSize());
1951    assertEquals(0, mutator.getCurrentWriteBufferSize());
1952    assertFalse(iter0.hasNext());
1953    assertFalse(iter1.hasNext());
1954    // ra0 doest handle the mutation so the mutation won't be pushed back to buffer
1955    iter0.remove();
1956    ra0.close();
1957    assertEquals(0, mutator.size());
1958    assertEquals(0, mutator.getUnflushedSize());
1959    assertEquals(0, mutator.getCurrentWriteBufferSize());
1960    // ra1 doesn't handle the mutation so the mutation will be pushed back to buffer
1961    ra1.close();
1962    assertEquals(1, mutator.size());
1963    assertEquals(1, mutator.getUnflushedSize());
1964    assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
1965  }
1966}