001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.LinkedList;
032import java.util.List;
033import java.util.Map;
034import java.util.Random;
035import java.util.Set;
036import java.util.TreeSet;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.ExecutorService;
040import java.util.concurrent.Executors;
041import java.util.concurrent.Future;
042import java.util.concurrent.LinkedBlockingQueue;
043import java.util.concurrent.SynchronousQueue;
044import java.util.concurrent.ThreadFactory;
045import java.util.concurrent.ThreadPoolExecutor;
046import java.util.concurrent.TimeUnit;
047import java.util.concurrent.atomic.AtomicBoolean;
048import java.util.concurrent.atomic.AtomicInteger;
049import java.util.concurrent.atomic.AtomicLong;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.hbase.CallQueueTooBigException;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.HBaseClassTestRule;
054import org.apache.hadoop.hbase.HConstants;
055import org.apache.hadoop.hbase.HRegionInfo;
056import org.apache.hadoop.hbase.HRegionLocation;
057import org.apache.hadoop.hbase.RegionLocations;
058import org.apache.hadoop.hbase.ServerName;
059import org.apache.hadoop.hbase.TableName;
060import org.apache.hadoop.hbase.client.AsyncProcessTask.ListRowAccess;
061import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows;
062import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
063import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
064import org.apache.hadoop.hbase.client.coprocessor.Batch;
065import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
066import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
067import org.apache.hadoop.hbase.testclassification.ClientTests;
068import org.apache.hadoop.hbase.testclassification.MediumTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.Threads;
071import org.junit.Assert;
072import org.junit.Before;
073import org.junit.ClassRule;
074import org.junit.Test;
075import org.junit.experimental.categories.Category;
076import org.mockito.Mockito;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080@Category({ClientTests.class, MediumTests.class})
081public class TestAsyncProcess {
082
083  @ClassRule
084  public static final HBaseClassTestRule CLASS_RULE =
085      HBaseClassTestRule.forClass(TestAsyncProcess.class);
086
087  private static final Logger LOG = LoggerFactory.getLogger(TestAsyncProcess.class);
088  private static final TableName DUMMY_TABLE =
089      TableName.valueOf("DUMMY_TABLE");
090  private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1");
091  private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2");
092  private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3");
093  private static final byte[] FAILS = Bytes.toBytes("FAILS");
094  private Configuration CONF;
095  private ConnectionConfiguration CONNECTION_CONFIG;
096  private static final ServerName sn = ServerName.valueOf("s1,1,1");
097  private static final ServerName sn2 = ServerName.valueOf("s2,2,2");
098  private static final ServerName sn3 = ServerName.valueOf("s3,3,3");
099  private static final HRegionInfo hri1 =
100      new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
101  private static final HRegionInfo hri2 =
102      new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
103  private static final HRegionInfo hri3 =
104      new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
105  private static final HRegionLocation loc1 = new HRegionLocation(hri1, sn);
106  private static final HRegionLocation loc2 = new HRegionLocation(hri2, sn);
107  private static final HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
108
109  // Replica stuff
110  private static final RegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1);
111  private static final RegionInfo hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2);
112  private static final RegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
113  private static final RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
114      new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3));
115  private static final RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2),
116      new HRegionLocation(hri2r1, sn3));
117  private static final RegionLocations hrls3 =
118      new RegionLocations(new HRegionLocation(hri3, sn3), null);
119
120  private static final String success = "success";
121  private static Exception failure = new Exception("failure");
122
123  private static final int NB_RETRIES = 3;
124
125  private int RPC_TIMEOUT;
126  private int OPERATION_TIMEOUT;
127
128  @Before
129  public void beforeEach() {
130    this.CONF = new Configuration();
131    CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
132    this.CONNECTION_CONFIG = new ConnectionConfiguration(CONF);
133    this.RPC_TIMEOUT = CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
134        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
135    this.OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
136        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
137  }
138
139  static class CountingThreadFactory implements ThreadFactory {
140    final AtomicInteger nbThreads;
141    ThreadFactory realFactory =  Threads.newDaemonThreadFactory("test-TestAsyncProcess");
142    @Override
143    public Thread newThread(Runnable r) {
144      nbThreads.incrementAndGet();
145      return realFactory.newThread(r);
146    }
147
148    CountingThreadFactory(AtomicInteger nbThreads){
149      this.nbThreads = nbThreads;
150    }
151  }
152
153  static class MyAsyncProcess extends AsyncProcess {
154    final AtomicInteger nbMultiResponse = new AtomicInteger();
155    final AtomicInteger nbActions = new AtomicInteger();
156    public List<AsyncRequestFuture> allReqs = new ArrayList<>();
157    public AtomicInteger callsCt = new AtomicInteger();
158    private Configuration conf;
159
160    private long previousTimeout = -1;
161    final ExecutorService service;
162    @Override
163    protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(
164      AsyncProcessTask task, List<Action> actions, long nonceGroup) {
165      // Test HTable has tableName of null, so pass DUMMY_TABLE
166      AsyncProcessTask wrap = new AsyncProcessTask(task){
167        @Override
168        public TableName getTableName() {
169          return DUMMY_TABLE;
170        }
171      };
172      AsyncRequestFutureImpl<Res> r = new MyAsyncRequestFutureImpl<>(
173          wrap, actions, nonceGroup, this);
174      allReqs.add(r);
175      return r;
176    }
177
178    public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
179      super(hc, conf,
180          new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
181      service = Executors.newFixedThreadPool(5);
182      this.conf = conf;
183    }
184
185    public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
186      super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
187      service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
188          new SynchronousQueue<>(), new CountingThreadFactory(nbThreads));
189    }
190
191    public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
192        List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
193        boolean needResults) throws InterruptedIOException {
194      AsyncProcessTask task = AsyncProcessTask.newBuilder(callback)
195              .setPool(pool == null ? service : pool)
196              .setTableName(tableName)
197              .setRowAccess(rows)
198              .setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL)
199              .setNeedResults(needResults)
200              .setRpcTimeout(conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
201                    HConstants.DEFAULT_HBASE_RPC_TIMEOUT))
202              .setOperationTimeout(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
203                    HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT))
204              .build();
205      return submit(task);
206    }
207
208    public <CResult> AsyncRequestFuture submit(TableName tableName,
209        final List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
210        boolean needResults) throws InterruptedIOException {
211      return submit(null, tableName, rows, atLeastOne, callback, needResults);
212    }
213
214    @Override
215    public <Res> AsyncRequestFuture submit(AsyncProcessTask<Res> task)
216            throws InterruptedIOException {
217      previousTimeout = task.getRpcTimeout();
218      // We use results in tests to check things, so override to always save them.
219      AsyncProcessTask<Res> wrap = new AsyncProcessTask<Res>(task) {
220        @Override
221        public boolean getNeedResults() {
222          return true;
223        }
224      };
225      return super.submit(wrap);
226    }
227
228    @Override
229    protected RpcRetryingCaller<AbstractResponse> createCaller(
230        CancellableRegionServerCallable callable, int rpcTimeout) {
231      callsCt.incrementAndGet();
232      MultiServerCallable callable1 = (MultiServerCallable) callable;
233      final MultiResponse mr = createMultiResponse(
234          callable1.getMulti(), nbMultiResponse, nbActions,
235          new ResponseGenerator() {
236            @Override
237            public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
238              if (Arrays.equals(FAILS, a.getAction().getRow())) {
239                mr.add(regionName, a.getOriginalIndex(), failure);
240              } else {
241                mr.add(regionName, a.getOriginalIndex(), success);
242              }
243            }
244          });
245
246      return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) {
247        @Override
248        public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
249                                                   int callTimeout)
250            throws IOException, RuntimeException {
251          try {
252            // sleep one second in order for threadpool to start another thread instead of reusing
253            // existing one.
254            Thread.sleep(1000);
255          } catch (InterruptedException e) {
256            // ignore error
257          }
258          return mr;
259        }
260      };
261    }
262
263
264  }
265
266  static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
267    private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
268    public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions,
269      long nonceGroup, AsyncProcess asyncProcess) {
270      super(task, actions, nonceGroup, asyncProcess);
271    }
272
273    @Override
274    protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
275      // Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
276    }
277
278    Map<ServerName, List<Long>> getRequestHeapSize() {
279      return heapSizesByServer;
280    }
281
282    @Override
283    SingleServerRequestRunnable createSingleServerRequest(
284          MultiAction multiAction, int numAttempt, ServerName server,
285        Set<CancellableRegionServerCallable> callsInProgress) {
286      SingleServerRequestRunnable rq = new SingleServerRequestRunnable(
287              multiAction, numAttempt, server, callsInProgress);
288      List<Long> heapCount = heapSizesByServer.get(server);
289      if (heapCount == null) {
290        heapCount = new ArrayList<>();
291        heapSizesByServer.put(server, heapCount);
292      }
293      heapCount.add(heapSizeOf(multiAction));
294      return rq;
295    }
296
297    private long heapSizeOf(MultiAction multiAction) {
298      return multiAction.actions.values().stream()
299              .flatMap(v -> v.stream())
300              .map(action -> action.getAction())
301              .filter(row -> row instanceof Mutation)
302              .mapToLong(row -> ((Mutation) row).heapSize())
303              .sum();
304    }
305  }
306
307  static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse>{
308
309    private final IOException e;
310
311    public CallerWithFailure(IOException e) {
312      super(100, 500, 100, 9);
313      this.e = e;
314    }
315
316    @Override
317    public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
318                                            int callTimeout)
319        throws IOException, RuntimeException {
320      throw e;
321    }
322  }
323
324
325  static class AsyncProcessWithFailure extends MyAsyncProcess {
326
327    private final IOException ioe;
328
329    public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) {
330      super(hc, conf);
331      this.ioe = ioe;
332      serverTrackerTimeout = 1L;
333    }
334
335    @Override
336    protected RpcRetryingCaller<AbstractResponse> createCaller(
337      CancellableRegionServerCallable callable, int rpcTimeout) {
338      callsCt.incrementAndGet();
339      return new CallerWithFailure(ioe);
340    }
341  }
342
343  /**
344   * Make the backoff time always different on each call.
345   */
346  static class MyClientBackoffPolicy implements ClientBackoffPolicy {
347    private final Map<ServerName, AtomicInteger> count = new HashMap<>();
348    @Override
349    public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) {
350      AtomicInteger inc = count.get(serverName);
351      if (inc == null) {
352        inc = new AtomicInteger(0);
353        count.put(serverName, inc);
354      }
355      return inc.getAndIncrement();
356    }
357  }
358
359  static class MyAsyncProcessWithReplicas extends MyAsyncProcess {
360    private Set<byte[]> failures = new TreeSet<>(new Bytes.ByteArrayComparator());
361    private long primarySleepMs = 0, replicaSleepMs = 0;
362    private Map<ServerName, Long> customPrimarySleepMs = new HashMap<>();
363    private final AtomicLong replicaCalls = new AtomicLong(0);
364
365    public void addFailures(RegionInfo... hris) {
366      for (RegionInfo hri : hris) {
367        failures.add(hri.getRegionName());
368      }
369    }
370
371    public long getReplicaCallCount() {
372      return replicaCalls.get();
373    }
374
375    public void setPrimaryCallDelay(ServerName server, long primaryMs) {
376      customPrimarySleepMs.put(server, primaryMs);
377    }
378
379    public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) {
380      super(hc, conf);
381    }
382
383    public void setCallDelays(long primaryMs, long replicaMs) {
384      this.primarySleepMs = primaryMs;
385      this.replicaSleepMs = replicaMs;
386    }
387
388    @Override
389    protected RpcRetryingCaller<AbstractResponse> createCaller(
390        CancellableRegionServerCallable payloadCallable, int rpcTimeout) {
391      MultiServerCallable callable = (MultiServerCallable) payloadCallable;
392      final MultiResponse mr = createMultiResponse(
393          callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
394            @Override
395            public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
396              if (failures.contains(regionName)) {
397                mr.add(regionName, a.getOriginalIndex(), failure);
398              } else {
399                boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId());
400                mr.add(regionName, a.getOriginalIndex(),
401                    Result.create(new Cell[0], null, isStale));
402              }
403            }
404          });
405      // Currently AsyncProcess either sends all-replica, or all-primary request.
406      final boolean isDefault = RegionReplicaUtil.isDefaultReplica(
407          callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId());
408      final ServerName server = ((MultiServerCallable)callable).getServerName();
409      String debugMsg = "Call to " + server + ", primary=" + isDefault + " with "
410          + callable.getMulti().actions.size() + " entries: ";
411      for (byte[] region : callable.getMulti().actions.keySet()) {
412        debugMsg += "[" + Bytes.toStringBinary(region) + "], ";
413      }
414      LOG.debug(debugMsg);
415      if (!isDefault) {
416        replicaCalls.incrementAndGet();
417      }
418
419      return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) {
420        @Override
421        public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
422                                                int callTimeout)
423        throws IOException, RuntimeException {
424          long sleep = -1;
425          if (isDefault) {
426            Long customSleep = customPrimarySleepMs.get(server);
427            sleep = (customSleep == null ? primarySleepMs : customSleep.longValue());
428          } else {
429            sleep = replicaSleepMs;
430          }
431          if (sleep != 0) {
432            try {
433              Thread.sleep(sleep);
434            } catch (InterruptedException e) {
435            }
436          }
437          return mr;
438        }
439      };
440    }
441  }
442
443  static MultiResponse createMultiResponse(final MultiAction multi,
444      AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) {
445    final MultiResponse mr = new MultiResponse();
446    nbMultiResponse.incrementAndGet();
447    for (Map.Entry<byte[], List<Action>> entry : multi.actions.entrySet()) {
448      byte[] regionName = entry.getKey();
449      for (Action a : entry.getValue()) {
450        nbActions.incrementAndGet();
451        gen.addResponse(mr, regionName, a);
452      }
453    }
454    return mr;
455  }
456
457  private static interface ResponseGenerator {
458    void addResponse(final MultiResponse mr, byte[] regionName, Action a);
459  }
460
461  /**
462   * Returns our async process.
463   */
464  static class MyConnectionImpl extends ConnectionImplementation {
465    public static class TestRegistry extends DoNothingAsyncRegistry {
466
467      public TestRegistry(Configuration conf) {
468        super(conf);
469      }
470
471      @Override
472      public CompletableFuture<String> getClusterId() {
473        return CompletableFuture.completedFuture("testClusterId");
474      }
475    }
476
477    final AtomicInteger nbThreads = new AtomicInteger(0);
478
479    protected MyConnectionImpl(Configuration conf) throws IOException {
480      super(setupConf(conf), null, null);
481    }
482
483    private static Configuration setupConf(Configuration conf) {
484      conf.setClass(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, TestRegistry.class,
485        AsyncRegistry.class);
486      return conf;
487    }
488
489    @Override
490    public RegionLocations locateRegion(TableName tableName,
491        byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
492      return new RegionLocations(loc1);
493    }
494
495    @Override
496    public boolean hasCellBlockSupport() {
497      return false;
498    }
499  }
500
501  /**
502   * Returns our async process.
503   */
504  static class MyConnectionImpl2 extends MyConnectionImpl {
505    List<HRegionLocation> hrl;
506    final boolean usedRegions[];
507
508    protected MyConnectionImpl2(List<HRegionLocation> hrl, Configuration conf) throws IOException {
509      super(conf);
510      this.hrl = hrl;
511      this.usedRegions = new boolean[hrl.size()];
512    }
513
514    @Override
515    public RegionLocations locateRegion(TableName tableName,
516        byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
517      int i = 0;
518      for (HRegionLocation hr : hrl){
519        if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
520          usedRegions[i] = true;
521          return new RegionLocations(hr);
522        }
523        i++;
524      }
525      return null;
526    }
527  }
528  @Test
529  public void testListRowAccess() {
530    int count = 10;
531    List<String> values = new LinkedList<>();
532    for (int i = 0; i != count; ++i) {
533      values.add(String.valueOf(i));
534    }
535
536    ListRowAccess<String> taker = new ListRowAccess(values);
537    assertEquals(count, taker.size());
538
539    int restoreCount = 0;
540    int takeCount = 0;
541    Iterator<String> it = taker.iterator();
542    while (it.hasNext()) {
543      String v = it.next();
544      assertEquals(String.valueOf(takeCount), v);
545      ++takeCount;
546      it.remove();
547      if (Math.random() >= 0.5) {
548        break;
549      }
550    }
551    assertEquals(count, taker.size() + takeCount);
552
553    it = taker.iterator();
554    while (it.hasNext()) {
555      String v = it.next();
556      assertEquals(String.valueOf(takeCount), v);
557      ++takeCount;
558      it.remove();
559    }
560    assertEquals(0, taker.size());
561    assertEquals(count, takeCount);
562  }
563  private static long calculateRequestCount(long putSizePerServer, long maxHeapSizePerRequest) {
564    if (putSizePerServer <= maxHeapSizePerRequest) {
565      return 1;
566    } else if (putSizePerServer % maxHeapSizePerRequest == 0) {
567      return putSizePerServer / maxHeapSizePerRequest;
568    } else {
569      return putSizePerServer / maxHeapSizePerRequest + 1;
570    }
571  }
572
573  @Test
574  public void testSubmitSameSizeOfRequest() throws Exception {
575    long writeBuffer = 2 * 1024 * 1024;
576    long putsHeapSize = writeBuffer;
577    doSubmitRequest(writeBuffer, putsHeapSize);
578  }
579
580  @Test
581  public void testSubmitLargeRequestWithUnlimitedSize() throws Exception {
582    long maxHeapSizePerRequest = Long.MAX_VALUE;
583    long putsHeapSize = 2 * 1024 * 1024;
584    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
585  }
586
587  @Test
588  public void testSubmitRandomSizeRequest() throws Exception {
589    Random rn = new Random();
590    final long limit = 10 * 1024 * 1024;
591    final int requestCount = 1 + (int) (rn.nextDouble() * 3);
592    long n = rn.nextLong();
593    if (n < 0) {
594      n = -n;
595    } else if (n == 0) {
596      n = 1;
597    }
598    long putsHeapSize = n % limit;
599    long maxHeapSizePerRequest = putsHeapSize / requestCount;
600    LOG.info("[testSubmitRandomSizeRequest] maxHeapSizePerRequest=" + maxHeapSizePerRequest +
601        ", putsHeapSize=" + putsHeapSize);
602    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
603  }
604
605  @Test
606  public void testSubmitSmallRequest() throws Exception {
607    long maxHeapSizePerRequest = 2 * 1024 * 1024;
608    long putsHeapSize = 100;
609    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
610  }
611
612  @Test
613  public void testSubmitLargeRequest() throws Exception {
614    long maxHeapSizePerRequest = 2 * 1024 * 1024;
615    long putsHeapSize = maxHeapSizePerRequest * 2;
616    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
617  }
618
619  private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception {
620    ClusterConnection conn = createHConnection();
621    final String defaultClazz =
622        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
623    final long defaultHeapSizePerRequest = conn.getConfiguration().getLong(
624      SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
625      SimpleRequestController.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
626    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
627      SimpleRequestController.class.getName());
628    conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
629        maxHeapSizePerRequest);
630
631    // sn has two regions
632    long putSizeSN = 0;
633    long putSizeSN2 = 0;
634    List<Put> puts = new ArrayList<>();
635    while ((putSizeSN + putSizeSN2) <= putsHeapSize) {
636      Put put1 = new Put(DUMMY_BYTES_1);
637      put1.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
638      Put put2 = new Put(DUMMY_BYTES_2);
639      put2.addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
640      Put put3 = new Put(DUMMY_BYTES_3);
641      put3.addColumn(DUMMY_BYTES_3, DUMMY_BYTES_3, DUMMY_BYTES_3);
642      putSizeSN += (put1.heapSize() + put2.heapSize());
643      putSizeSN2 += put3.heapSize();
644      puts.add(put1);
645      puts.add(put2);
646      puts.add(put3);
647    }
648
649    int minCountSnRequest = (int) calculateRequestCount(putSizeSN, maxHeapSizePerRequest);
650    int minCountSn2Request = (int) calculateRequestCount(putSizeSN2, maxHeapSizePerRequest);
651    LOG.info("Total put count:" + puts.size() + ", putSizeSN:"+ putSizeSN
652        + ", putSizeSN2:" + putSizeSN2
653        + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest
654        + ", minCountSnRequest:" + minCountSnRequest
655        + ", minCountSn2Request:" + minCountSn2Request);
656
657    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
658    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
659    try (BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap)) {
660      mutator.mutate(puts);
661      mutator.flush();
662      List<AsyncRequestFuture> reqs = ap.allReqs;
663
664      int actualSnReqCount = 0;
665      int actualSn2ReqCount = 0;
666      for (AsyncRequestFuture req : reqs) {
667        if (!(req instanceof AsyncRequestFutureImpl)) {
668          continue;
669        }
670        MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req;
671        if (ars.getRequestHeapSize().containsKey(sn)) {
672          ++actualSnReqCount;
673        }
674        if (ars.getRequestHeapSize().containsKey(sn2)) {
675          ++actualSn2ReqCount;
676        }
677      }
678      // If the server is busy, the actual count may be incremented.
679      assertEquals(true, minCountSnRequest <= actualSnReqCount);
680      assertEquals(true, minCountSn2Request <= actualSn2ReqCount);
681      Map<ServerName, Long> sizePerServers = new HashMap<>();
682      for (AsyncRequestFuture req : reqs) {
683        if (!(req instanceof AsyncRequestFutureImpl)) {
684          continue;
685        }
686        MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req;
687        Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize();
688        for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) {
689          long sum = 0;
690          for (long size : entry.getValue()) {
691            assertEquals(true, size <= maxHeapSizePerRequest);
692            sum += size;
693          }
694          assertEquals(true, sum <= maxHeapSizePerRequest);
695          long value = sizePerServers.getOrDefault(entry.getKey(), 0L);
696          sizePerServers.put(entry.getKey(), value + sum);
697        }
698      }
699      assertEquals(true, sizePerServers.containsKey(sn));
700      assertEquals(true, sizePerServers.containsKey(sn2));
701      assertEquals(false, sizePerServers.containsKey(sn3));
702      assertEquals(putSizeSN, (long) sizePerServers.get(sn));
703      assertEquals(putSizeSN2, (long) sizePerServers.get(sn2));
704    }
705    // restore config.
706    conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
707        defaultHeapSizePerRequest);
708    if (defaultClazz != null) {
709      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
710        defaultClazz);
711    }
712  }
713
714  @Test
715  public void testSubmit() throws Exception {
716    ClusterConnection hc = createHConnection();
717    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
718
719    List<Put> puts = new ArrayList<>(1);
720    puts.add(createPut(1, true));
721
722    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
723    Assert.assertTrue(puts.isEmpty());
724  }
725
726  @Test
727  public void testSubmitWithCB() throws Exception {
728    ClusterConnection hc = createHConnection();
729    final AtomicInteger updateCalled = new AtomicInteger(0);
730    Batch.Callback<Object> cb = new Batch.Callback<Object>() {
731      @Override
732      public void update(byte[] region, byte[] row, Object result) {
733        updateCalled.incrementAndGet();
734      }
735    };
736    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
737
738    List<Put> puts = new ArrayList<>(1);
739    puts.add(createPut(1, true));
740
741    final AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, cb, false);
742    Assert.assertTrue(puts.isEmpty());
743    ars.waitUntilDone();
744    Assert.assertEquals(1, updateCalled.get());
745  }
746
747  @Test
748  public void testSubmitBusyRegion() throws Exception {
749    ClusterConnection conn = createHConnection();
750    final String defaultClazz =
751        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
752    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
753      SimpleRequestController.class.getName());
754    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
755    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
756    List<Put> puts = new ArrayList<>(1);
757    puts.add(createPut(1, true));
758
759    for (int i = 0; i != controller.maxConcurrentTasksPerRegion; ++i) {
760      ap.incTaskCounters(Collections.singleton(hri1.getRegionName()), sn);
761    }
762    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
763    Assert.assertEquals(puts.size(), 1);
764
765    ap.decTaskCounters(Collections.singleton(hri1.getRegionName()), sn);
766    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
767    Assert.assertEquals(0, puts.size());
768    if (defaultClazz != null) {
769      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
770        defaultClazz);
771    }
772  }
773
774
775  @Test
776  public void testSubmitBusyRegionServer() throws Exception {
777    ClusterConnection conn = createHConnection();
778    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
779    final String defaultClazz =
780        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
781    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
782      SimpleRequestController.class.getName());
783    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
784    controller.taskCounterPerServer.put(sn2,
785        new AtomicInteger(controller.maxConcurrentTasksPerServer));
786
787    List<Put> puts = new ArrayList<>(4);
788    puts.add(createPut(1, true));
789    puts.add(createPut(3, true)); // <== this one won't be taken, the rs is busy
790    puts.add(createPut(1, true)); // <== this one will make it, the region is already in
791    puts.add(createPut(2, true)); // <== new region, but the rs is ok
792
793    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
794    Assert.assertEquals(" puts=" + puts, 1, puts.size());
795
796    controller.taskCounterPerServer.put(sn2,
797        new AtomicInteger(controller.maxConcurrentTasksPerServer - 1));
798    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
799    Assert.assertTrue(puts.isEmpty());
800    if (defaultClazz != null) {
801      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
802        defaultClazz);
803    }
804  }
805
806  @Test
807  public void testFail() throws Exception {
808    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
809
810    List<Put> puts = new ArrayList<>(1);
811    Put p = createPut(1, false);
812    puts.add(p);
813
814    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
815    Assert.assertEquals(0, puts.size());
816    ars.waitUntilDone();
817    verifyResult(ars, false);
818    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
819
820    Assert.assertEquals(1, ars.getErrors().exceptions.size());
821    Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
822        failure.equals(ars.getErrors().exceptions.get(0)));
823    Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
824        failure.equals(ars.getErrors().exceptions.get(0)));
825
826    Assert.assertEquals(1, ars.getFailedOperations().size());
827    Assert.assertTrue("was: " + ars.getFailedOperations().get(0),
828        p.equals(ars.getFailedOperations().get(0)));
829  }
830
831
832  @Test
833  public void testSubmitTrue() throws IOException {
834    ClusterConnection conn = createHConnection();
835    final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
836    final String defaultClazz =
837        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
838    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
839      SimpleRequestController.class.getName());
840    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
841    controller.tasksInProgress.incrementAndGet();
842    final AtomicInteger ai = new AtomicInteger(controller.maxConcurrentTasksPerRegion);
843    controller.taskCounterPerRegion.put(hri1.getRegionName(), ai);
844
845    final AtomicBoolean checkPoint = new AtomicBoolean(false);
846    final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
847
848    Thread t = new Thread(){
849      @Override
850      public void run(){
851        Threads.sleep(1000);
852        Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent
853        ai.decrementAndGet();
854        controller.tasksInProgress.decrementAndGet();
855        checkPoint2.set(true);
856      }
857    };
858
859    List<Put> puts = new ArrayList<>(1);
860    Put p = createPut(1, true);
861    puts.add(p);
862
863    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
864    Assert.assertFalse(puts.isEmpty());
865
866    t.start();
867
868    ap.submit(null, DUMMY_TABLE, puts, true, null, false);
869    Assert.assertTrue(puts.isEmpty());
870
871    checkPoint.set(true);
872    while (!checkPoint2.get()){
873      Threads.sleep(1);
874    }
875    if (defaultClazz != null) {
876      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
877        defaultClazz);
878    }
879  }
880
881  @Test
882  public void testFailAndSuccess() throws Exception {
883    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
884
885    List<Put> puts = new ArrayList<>(3);
886    puts.add(createPut(1, false));
887    puts.add(createPut(1, true));
888    puts.add(createPut(1, true));
889
890    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
891    Assert.assertTrue(puts.isEmpty());
892    ars.waitUntilDone();
893    verifyResult(ars, false, true, true);
894    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
895    ap.callsCt.set(0);
896    Assert.assertEquals(1, ars.getErrors().actions.size());
897
898    puts.add(createPut(1, true));
899    // Wait for AP to be free. While ars might have the result, ap counters are decreased later.
900    ap.waitForMaximumCurrentTasks(0, null);
901    ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
902    Assert.assertEquals(0, puts.size());
903    ars.waitUntilDone();
904    Assert.assertEquals(1, ap.callsCt.get());
905    verifyResult(ars, true);
906  }
907
908  @Test
909  public void testFlush() throws Exception {
910    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
911
912    List<Put> puts = new ArrayList<>(3);
913    puts.add(createPut(1, false));
914    puts.add(createPut(1, true));
915    puts.add(createPut(1, true));
916
917    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
918    ars.waitUntilDone();
919    verifyResult(ars, false, true, true);
920    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
921
922    Assert.assertEquals(1, ars.getFailedOperations().size());
923  }
924
925  @Test
926  public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException {
927    ClusterConnection hc = createHConnection();
928    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
929    testTaskCount(ap);
930  }
931
932  @Test
933  public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException {
934    Configuration copyConf = new Configuration(CONF);
935    copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
936    MyClientBackoffPolicy bp = new MyClientBackoffPolicy();
937    ClusterConnection conn = createHConnection();
938    Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
939    Mockito.when(conn.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf));
940    Mockito.when(conn.getBackoffPolicy()).thenReturn(bp);
941    final String defaultClazz =
942        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
943    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
944      SimpleRequestController.class.getName());
945    MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
946    testTaskCount(ap);
947    if (defaultClazz != null) {
948      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
949        defaultClazz);
950    }
951  }
952
953  private void testTaskCount(MyAsyncProcess ap)
954      throws InterruptedIOException, InterruptedException {
955    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
956    List<Put> puts = new ArrayList<>();
957    for (int i = 0; i != 3; ++i) {
958      puts.add(createPut(1, true));
959      puts.add(createPut(2, true));
960      puts.add(createPut(3, true));
961    }
962    ap.submit(null, DUMMY_TABLE, puts, true, null, false);
963    ap.waitForMaximumCurrentTasks(0, null);
964    // More time to wait if there are incorrect task count.
965    TimeUnit.SECONDS.sleep(1);
966    assertEquals(0, controller.tasksInProgress.get());
967    for (AtomicInteger count : controller.taskCounterPerRegion.values()) {
968      assertEquals(0, count.get());
969    }
970    for (AtomicInteger count : controller.taskCounterPerServer.values()) {
971      assertEquals(0, count.get());
972    }
973  }
974
975  @Test
976  public void testMaxTask() throws Exception {
977    ClusterConnection conn = createHConnection();
978    final String defaultClazz =
979        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
980    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
981      SimpleRequestController.class.getName());
982    final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
983    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
984
985
986    for (int i = 0; i < 1000; i++) {
987      ap.incTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn);
988    }
989
990    final Thread myThread = Thread.currentThread();
991
992    Thread t = new Thread() {
993      @Override
994      public void run() {
995        Threads.sleep(2000);
996        myThread.interrupt();
997      }
998    };
999
1000    List<Put> puts = new ArrayList<>(1);
1001    puts.add(createPut(1, true));
1002
1003    t.start();
1004
1005    try {
1006      ap.submit(null, DUMMY_TABLE, puts, false, null, false);
1007      Assert.fail("We should have been interrupted.");
1008    } catch (InterruptedIOException expected) {
1009    }
1010
1011    final long sleepTime = 2000;
1012
1013    Thread t2 = new Thread() {
1014      @Override
1015      public void run() {
1016        Threads.sleep(sleepTime);
1017        while (controller.tasksInProgress.get() > 0) {
1018          ap.decTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn);
1019        }
1020      }
1021    };
1022    t2.start();
1023
1024    long start = System.currentTimeMillis();
1025    ap.submit(null, DUMMY_TABLE, new ArrayList<>(), false, null, false);
1026    long end = System.currentTimeMillis();
1027
1028    //Adds 100 to secure us against approximate timing.
1029    Assert.assertTrue(start + 100L + sleepTime > end);
1030    if (defaultClazz != null) {
1031      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
1032        defaultClazz);
1033    }
1034  }
1035
1036  private ClusterConnection createHConnection() throws IOException {
1037    ClusterConnection hc = createHConnectionCommon();
1038    setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1));
1039    setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2));
1040    setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3));
1041    Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(),
1042        Mockito.anyBoolean())).thenReturn(Arrays.asList(loc1, loc2, loc3));
1043    setMockLocation(hc, FAILS, new RegionLocations(loc2));
1044    return hc;
1045  }
1046
1047  private ClusterConnection createHConnectionWithReplicas() throws IOException {
1048    ClusterConnection hc = createHConnectionCommon();
1049    setMockLocation(hc, DUMMY_BYTES_1, hrls1);
1050    setMockLocation(hc, DUMMY_BYTES_2, hrls2);
1051    setMockLocation(hc, DUMMY_BYTES_3, hrls3);
1052    List<HRegionLocation> locations = new ArrayList<>();
1053    for (HRegionLocation loc : hrls1.getRegionLocations()) {
1054      locations.add(loc);
1055    }
1056    for (HRegionLocation loc : hrls2.getRegionLocations()) {
1057      locations.add(loc);
1058    }
1059    for (HRegionLocation loc : hrls3.getRegionLocations()) {
1060      locations.add(loc);
1061    }
1062    Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(),
1063        Mockito.anyBoolean())).thenReturn(locations);
1064    return hc;
1065  }
1066
1067  private static void setMockLocation(ClusterConnection hc, byte[] row,
1068      RegionLocations result) throws IOException {
1069    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
1070        Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
1071    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
1072        Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result);
1073  }
1074
1075  private ClusterConnection createHConnectionCommon() {
1076    ClusterConnection hc = Mockito.mock(ClusterConnection.class);
1077    NonceGenerator ng = Mockito.mock(NonceGenerator.class);
1078    Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
1079    Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
1080    Mockito.when(hc.getConfiguration()).thenReturn(CONF);
1081    Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG);
1082    return hc;
1083  }
1084
1085  @Test
1086  public void testHTablePutSuccess() throws Exception {
1087    ClusterConnection conn = createHConnection();
1088    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1089    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1090    BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
1091
1092    Put put = createPut(1, true);
1093
1094    Assert.assertEquals(conn.getConnectionConfiguration().getWriteBufferSize(),
1095        ht.getWriteBufferSize());
1096    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1097    ht.mutate(put);
1098    ht.flush();
1099    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1100  }
1101
1102  @Test
1103  public void testSettingWriteBufferPeriodicFlushParameters() throws Exception {
1104    ClusterConnection conn = createHConnection();
1105    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1106
1107    checkPeriodicFlushParameters(conn, ap,
1108            1234, 1234,
1109            1234, 1234);
1110    checkPeriodicFlushParameters(conn, ap,
1111               0,    0,
1112               0,    BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
1113    checkPeriodicFlushParameters(conn, ap,
1114           -1234,    0,
1115           -1234,    BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
1116    checkPeriodicFlushParameters(conn, ap,
1117               1,    1,
1118               1,    BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
1119  }
1120
1121  private void checkPeriodicFlushParameters(ClusterConnection conn,
1122                                            MyAsyncProcess ap,
1123                                            long setTO, long expectTO,
1124                                            long setTT, long expectTT
1125                                            ) {
1126    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1127
1128    // The BufferedMutatorParams does nothing with the value
1129    bufferParam.setWriteBufferPeriodicFlushTimeoutMs(setTO);
1130    bufferParam.setWriteBufferPeriodicFlushTimerTickMs(setTT);
1131    Assert.assertEquals(setTO, bufferParam.getWriteBufferPeriodicFlushTimeoutMs());
1132    Assert.assertEquals(setTT, bufferParam.getWriteBufferPeriodicFlushTimerTickMs());
1133
1134    // The BufferedMutatorImpl corrects illegal values (indirect via BufferedMutatorParams)
1135    BufferedMutatorImpl ht1 = new BufferedMutatorImpl(conn, bufferParam, ap);
1136    Assert.assertEquals(expectTO, ht1.getWriteBufferPeriodicFlushTimeoutMs());
1137    Assert.assertEquals(expectTT, ht1.getWriteBufferPeriodicFlushTimerTickMs());
1138
1139    // The BufferedMutatorImpl corrects illegal values (direct via setter)
1140    BufferedMutatorImpl ht2 =
1141            new BufferedMutatorImpl(conn, createBufferedMutatorParams(ap, DUMMY_TABLE), ap);
1142    ht2.setWriteBufferPeriodicFlush(setTO, setTT);
1143    Assert.assertEquals(expectTO, ht2.getWriteBufferPeriodicFlushTimeoutMs());
1144    Assert.assertEquals(expectTT, ht2.getWriteBufferPeriodicFlushTimerTickMs());
1145
1146  }
1147
1148  @Test
1149  public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception {
1150    ClusterConnection conn = createHConnection();
1151    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1152    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1153
1154    bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1);     // Flush ASAP
1155    bufferParam.setWriteBufferPeriodicFlushTimerTickMs(1); // Check every 100ms
1156    bufferParam.writeBufferSize(10000);  // Write buffer set to much larger than the single record
1157
1158    BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
1159
1160    // Verify if BufferedMutator has the right settings.
1161    Assert.assertEquals(10000, ht.getWriteBufferSize());
1162    Assert.assertEquals(1, ht.getWriteBufferPeriodicFlushTimeoutMs());
1163    Assert.assertEquals(BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS,
1164            ht.getWriteBufferPeriodicFlushTimerTickMs());
1165
1166    Put put = createPut(1, true);
1167
1168    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
1169    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1170
1171    // ----- Insert, flush immediately, MUST NOT flush automatically
1172    ht.mutate(put);
1173    ht.flush();
1174
1175    Thread.sleep(1000);
1176    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
1177    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1178
1179    // ----- Insert, NO flush, MUST flush automatically
1180    ht.mutate(put);
1181    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
1182    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
1183
1184    // The timerTick should fire every 100ms, so after twice that we must have
1185    // seen at least 1 tick and we should see an automatic flush
1186    Thread.sleep(200);
1187    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1188    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1189
1190    // Ensure it does not flush twice
1191    Thread.sleep(200);
1192    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1193    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1194
1195    // ----- DISABLE AUTO FLUSH, Insert, NO flush, MUST NOT flush automatically
1196    ht.disableWriteBufferPeriodicFlush();
1197    ht.mutate(put);
1198    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1199    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
1200
1201    // Wait for at least 1 timerTick, we should see NO flushes.
1202    Thread.sleep(200);
1203    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1204    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
1205
1206    // Reenable periodic flushing, a flush seems to take about 1 second
1207    // so we wait for 2 seconds and it should have finished the flush.
1208    ht.setWriteBufferPeriodicFlush(1, 100);
1209    Thread.sleep(2000);
1210    Assert.assertEquals(2, ht.getExecutedWriteBufferPeriodicFlushes());
1211    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1212  }
1213
1214
1215  @Test
1216  public void testBufferedMutatorImplWithSharedPool() throws Exception {
1217    ClusterConnection conn = createHConnection();
1218    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1219    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1220    BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap);
1221
1222    ht.close();
1223    assertFalse(ap.service.isShutdown());
1224  }
1225
1226  @Test
1227  public void testFailedPutAndNewPut() throws Exception {
1228    ClusterConnection conn = createHConnection();
1229    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1230    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE)
1231            .writeBufferSize(0);
1232    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1233
1234    Put p = createPut(1, false);
1235    try {
1236      mutator.mutate(p);
1237      Assert.fail();
1238    } catch (RetriesExhaustedWithDetailsException expected) {
1239      assertEquals(1, expected.getNumExceptions());
1240      assertTrue(expected.getRow(0) == p);
1241    }
1242    // Let's do all the retries.
1243    ap.waitForMaximumCurrentTasks(0, null);
1244    Assert.assertEquals(0, mutator.size());
1245
1246    // There is no global error so the new put should not fail
1247    mutator.mutate(createPut(1, true));
1248    Assert.assertEquals("the put should not been inserted.", 0, mutator.size());
1249  }
1250
1251  @SuppressWarnings("SelfComparison")
1252  @Test
1253  public void testAction() {
1254    Action action_0 = new Action(new Put(Bytes.toBytes("abc")), 10);
1255    Action action_1 = new Action(new Put(Bytes.toBytes("ccc")), 10);
1256    Action action_2 = new Action(new Put(Bytes.toBytes("ccc")), 10);
1257    Action action_3 = new Action(new Delete(Bytes.toBytes("ccc")), 10);
1258    assertFalse(action_0.equals(action_1));
1259    assertTrue(action_0.equals(action_0));
1260    assertTrue(action_1.equals(action_2));
1261    assertTrue(action_2.equals(action_1));
1262    assertFalse(action_0.equals(new Put(Bytes.toBytes("abc"))));
1263    assertTrue(action_2.equals(action_3));
1264    assertFalse(action_0.equals(action_3));
1265    assertEquals(0, action_0.compareTo(action_0));
1266    assertTrue(action_0.compareTo(action_1) < 0);
1267    assertTrue(action_1.compareTo(action_0) > 0);
1268    assertEquals(0, action_1.compareTo(action_2));
1269  }
1270
1271  @Test
1272  public void testBatch() throws IOException, InterruptedException {
1273    ClusterConnection conn = new MyConnectionImpl(CONF);
1274    HTable ht = (HTable) conn.getTable(DUMMY_TABLE);
1275    ht.multiAp = new MyAsyncProcess(conn, CONF);
1276
1277    List<Put> puts = new ArrayList<>(7);
1278    puts.add(createPut(1, true));
1279    puts.add(createPut(1, true));
1280    puts.add(createPut(1, true));
1281    puts.add(createPut(1, true));
1282    puts.add(createPut(1, false)); // <=== the bad apple, position 4
1283    puts.add(createPut(1, true));
1284    puts.add(createPut(1, false)); // <=== another bad apple, position 6
1285
1286    Object[] res = new Object[puts.size()];
1287    try {
1288      ht.batch(puts, res);
1289      Assert.fail();
1290    } catch (RetriesExhaustedException expected) {
1291    }
1292
1293    Assert.assertEquals(success, res[0]);
1294    Assert.assertEquals(success, res[1]);
1295    Assert.assertEquals(success, res[2]);
1296    Assert.assertEquals(success, res[3]);
1297    Assert.assertEquals(failure, res[4]);
1298    Assert.assertEquals(success, res[5]);
1299    Assert.assertEquals(failure, res[6]);
1300  }
1301  @Test
1302  public void testErrorsServers() throws IOException {
1303    Configuration configuration = new Configuration(CONF);
1304    ClusterConnection conn = new MyConnectionImpl(configuration);
1305    MyAsyncProcess ap = new MyAsyncProcess(conn, configuration);
1306    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1307    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1308    configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true);
1309
1310    Assert.assertNotNull(ap.createServerErrorTracker());
1311    Assert.assertTrue(ap.serverTrackerTimeout > 200L);
1312    ap.serverTrackerTimeout = 1L;
1313
1314    Put p = createPut(1, false);
1315    mutator.mutate(p);
1316
1317    try {
1318      mutator.flush();
1319      Assert.fail();
1320    } catch (RetriesExhaustedWithDetailsException expected) {
1321      assertEquals(1, expected.getNumExceptions());
1322      assertTrue(expected.getRow(0) == p);
1323    }
1324    // Checking that the ErrorsServers came into play and didn't make us stop immediately
1325    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
1326  }
1327
1328  @Test
1329  public void testReadAndWriteTimeout() throws IOException {
1330    final long readTimeout = 10 * 1000;
1331    final long writeTimeout = 20 * 1000;
1332    Configuration copyConf = new Configuration(CONF);
1333    copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout);
1334    copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout);
1335    ClusterConnection conn = new MyConnectionImpl(copyConf);
1336    MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
1337    try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) {
1338      ht.multiAp = ap;
1339      List<Get> gets = new LinkedList<>();
1340      gets.add(new Get(DUMMY_BYTES_1));
1341      gets.add(new Get(DUMMY_BYTES_2));
1342      try {
1343        ht.get(gets);
1344      } catch (ClassCastException e) {
1345        // No result response on this test.
1346      }
1347      assertEquals(readTimeout, ap.previousTimeout);
1348      ap.previousTimeout = -1;
1349
1350      try {
1351        ht.existsAll(gets);
1352      } catch (ClassCastException e) {
1353        // No result response on this test.
1354      }
1355      assertEquals(readTimeout, ap.previousTimeout);
1356      ap.previousTimeout = -1;
1357
1358      List<Delete> deletes = new LinkedList<>();
1359      deletes.add(new Delete(DUMMY_BYTES_1));
1360      deletes.add(new Delete(DUMMY_BYTES_2));
1361      ht.delete(deletes);
1362      assertEquals(writeTimeout, ap.previousTimeout);
1363    }
1364  }
1365
1366  @Test
1367  public void testErrors() throws IOException {
1368    ClusterConnection conn = new MyConnectionImpl(CONF);
1369    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test"));
1370    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1371    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1372
1373    Assert.assertNotNull(ap.createServerErrorTracker());
1374
1375    Put p = createPut(1, true);
1376    mutator.mutate(p);
1377
1378    try {
1379      mutator.flush();
1380      Assert.fail();
1381    } catch (RetriesExhaustedWithDetailsException expected) {
1382      assertEquals(1, expected.getNumExceptions());
1383      assertTrue(expected.getRow(0) == p);
1384    }
1385    // Checking that the ErrorsServers came into play and didn't make us stop immediately
1386    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
1387  }
1388
1389
1390  @Test
1391  public void testCallQueueTooLarge() throws IOException {
1392    ClusterConnection conn = new MyConnectionImpl(CONF);
1393    AsyncProcessWithFailure ap =
1394        new AsyncProcessWithFailure(conn, CONF, new CallQueueTooBigException());
1395    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1396    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1397    Assert.assertNotNull(ap.createServerErrorTracker());
1398    Put p = createPut(1, true);
1399    mutator.mutate(p);
1400
1401    try {
1402      mutator.flush();
1403      Assert.fail();
1404    } catch (RetriesExhaustedWithDetailsException expected) {
1405      assertEquals(1, expected.getNumExceptions());
1406      assertTrue(expected.getRow(0) == p);
1407    }
1408    // Checking that the ErrorsServers came into play and didn't make us stop immediately
1409    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
1410  }
1411  /**
1412   * This test simulates multiple regions on 2 servers. We should have 2 multi requests and
1413   *  2 threads: 1 per server, this whatever the number of regions.
1414   */
1415  @Test
1416  public void testThreadCreation() throws Exception {
1417    final int NB_REGS = 100;
1418    List<HRegionLocation> hrls = new ArrayList<>(NB_REGS);
1419    List<Get> gets = new ArrayList<>(NB_REGS);
1420    for (int i = 0; i < NB_REGS; i++) {
1421      HRegionInfo hri = new HRegionInfo(
1422          DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i);
1423      HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
1424      hrls.add(hrl);
1425
1426      Get get = new Get(Bytes.toBytes(i * 10L));
1427      gets.add(get);
1428    }
1429
1430    MyConnectionImpl2 con = new MyConnectionImpl2(hrls, CONF);
1431    MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads);
1432    HTable ht = (HTable) con.getTable(DUMMY_TABLE, ap.service);
1433    ht.multiAp = ap;
1434    ht.batch(gets, null);
1435
1436    Assert.assertEquals(NB_REGS, ap.nbActions.get());
1437    Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get());
1438    Assert.assertEquals("1 thread per server", 2, con.nbThreads.get());
1439
1440    int nbReg = 0;
1441    for (int i =0; i<NB_REGS; i++){
1442      if (con.usedRegions[i]) nbReg++;
1443    }
1444    Assert.assertEquals("nbReg=" + nbReg, NB_REGS, nbReg);
1445  }
1446
1447  @Test
1448  public void testReplicaReplicaSuccess() throws Exception {
1449    // Main call takes too long so replicas succeed, except for one region w/o replicas.
1450    // One region has no replica, so the main call succeeds for it.
1451    MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
1452    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
1453    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1454            .setPool(ap.service)
1455            .setRpcTimeout(RPC_TIMEOUT)
1456            .setOperationTimeout(OPERATION_TIMEOUT)
1457            .setTableName(DUMMY_TABLE)
1458            .setRowAccess(rows)
1459            .setResults(new Object[3])
1460            .setSubmittedRows(SubmittedRows.ALL)
1461            .build();
1462    AsyncRequestFuture ars = ap.submit(task);
1463    verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
1464    Assert.assertEquals(2, ap.getReplicaCallCount());
1465  }
1466
1467  @Test
1468  public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception {
1469    // Main call succeeds before replica calls are kicked off.
1470    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
1471    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
1472    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1473            .setPool(ap.service)
1474            .setRpcTimeout(RPC_TIMEOUT)
1475            .setOperationTimeout(OPERATION_TIMEOUT)
1476            .setTableName(DUMMY_TABLE)
1477            .setRowAccess(rows)
1478            .setResults(new Object[3])
1479            .setSubmittedRows(SubmittedRows.ALL)
1480            .build();
1481    AsyncRequestFuture ars = ap.submit(task);
1482    verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
1483    Assert.assertEquals(0, ap.getReplicaCallCount());
1484  }
1485
1486  @Test
1487  public void testReplicaParallelCallsSucceed() throws Exception {
1488    // Either main or replica can succeed.
1489    MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
1490    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1491    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1492            .setPool(ap.service)
1493            .setRpcTimeout(RPC_TIMEOUT)
1494            .setOperationTimeout(OPERATION_TIMEOUT)
1495            .setTableName(DUMMY_TABLE)
1496            .setRowAccess(rows)
1497            .setResults(new Object[2])
1498            .setSubmittedRows(SubmittedRows.ALL)
1499            .build();
1500    AsyncRequestFuture ars = ap.submit(task);
1501    verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
1502    long replicaCalls = ap.getReplicaCallCount();
1503    Assert.assertTrue(replicaCalls >= 0);
1504    Assert.assertTrue(replicaCalls <= 2);
1505  }
1506
1507  @Test
1508  public void testReplicaPartialReplicaCall() throws Exception {
1509    // One server is slow, so the result for its region comes from replica, whereas
1510    // the result for other region comes from primary before replica calls happen.
1511    // There should be no replica call for that region at all.
1512    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
1513    ap.setPrimaryCallDelay(sn2, 2000);
1514    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1515    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1516            .setPool(ap.service)
1517            .setRpcTimeout(RPC_TIMEOUT)
1518            .setOperationTimeout(OPERATION_TIMEOUT)
1519            .setTableName(DUMMY_TABLE)
1520            .setRowAccess(rows)
1521            .setResults(new Object[2])
1522            .setSubmittedRows(SubmittedRows.ALL)
1523            .build();
1524    AsyncRequestFuture ars = ap.submit(task);
1525    verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
1526    Assert.assertEquals(1, ap.getReplicaCallCount());
1527  }
1528
1529  @Test
1530  public void testReplicaMainFailsBeforeReplicaCalls() throws Exception {
1531    // Main calls fail before replica calls can start - this is currently not handled.
1532    // It would probably never happen if we can get location (due to retries),
1533    // and it would require additional synchronization.
1534    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0);
1535    ap.addFailures(hri1, hri2);
1536    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1537    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1538            .setPool(ap.service)
1539            .setRpcTimeout(RPC_TIMEOUT)
1540            .setOperationTimeout(OPERATION_TIMEOUT)
1541            .setTableName(DUMMY_TABLE)
1542            .setRowAccess(rows)
1543            .setResults(new Object[2])
1544            .setSubmittedRows(SubmittedRows.ALL)
1545            .build();
1546    AsyncRequestFuture ars = ap.submit(task);
1547    verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
1548    Assert.assertEquals(0, ap.getReplicaCallCount());
1549  }
1550
1551  @Test
1552  public void testReplicaReplicaSuccessWithParallelFailures() throws Exception {
1553    // Main calls fails after replica calls start. For two-replica region, one replica call
1554    // also fails. Regardless, we get replica results for both regions.
1555    MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0);
1556    ap.addFailures(hri1, hri1r2, hri2);
1557    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1558    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1559            .setPool(ap.service)
1560            .setRpcTimeout(RPC_TIMEOUT)
1561            .setOperationTimeout(OPERATION_TIMEOUT)
1562            .setTableName(DUMMY_TABLE)
1563            .setRowAccess(rows)
1564            .setResults(new Object[2])
1565            .setSubmittedRows(SubmittedRows.ALL)
1566            .build();
1567    AsyncRequestFuture ars = ap.submit(task);
1568    verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
1569    Assert.assertEquals(2, ap.getReplicaCallCount());
1570  }
1571
1572  @Test
1573  public void testReplicaAllCallsFailForOneRegion() throws Exception {
1574    // For one of the region, all 3, main and replica, calls fail. For the other, replica
1575    // call fails but its exception should not be visible as it did succeed.
1576    MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0);
1577    ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
1578    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1579    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1580            .setPool(ap.service)
1581            .setRpcTimeout(RPC_TIMEOUT)
1582            .setOperationTimeout(OPERATION_TIMEOUT)
1583            .setTableName(DUMMY_TABLE)
1584            .setRowAccess(rows)
1585            .setResults(new Object[2])
1586            .setSubmittedRows(SubmittedRows.ALL)
1587            .build();
1588    AsyncRequestFuture ars = ap.submit(task);
1589    verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
1590    // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1
1591    Assert.assertEquals(3, ars.getErrors().getNumExceptions());
1592    for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) {
1593      Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow());
1594    }
1595  }
1596
1597  private MyAsyncProcessWithReplicas createReplicaAp(
1598      int replicaAfterMs, int primaryMs, int replicaMs) throws Exception {
1599    return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1);
1600  }
1601
1602  private MyAsyncProcessWithReplicas createReplicaAp(
1603      int replicaAfterMs, int primaryMs, int replicaMs, int retries) throws Exception {
1604    // TODO: this is kind of timing dependent... perhaps it should detect from createCaller
1605    //       that the replica call has happened and that way control the ordering.
1606    Configuration conf = new Configuration();
1607    ClusterConnection conn = createHConnectionWithReplicas();
1608    conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000);
1609    if (retries >= 0) {
1610      conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
1611    }
1612    MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf);
1613    ap.setCallDelays(primaryMs, replicaMs);
1614    return ap;
1615  }
1616
1617  private BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap,
1618      TableName name) {
1619    return new BufferedMutatorParams(name)
1620            .pool(ap.service)
1621            .rpcTimeout(RPC_TIMEOUT)
1622            .opertationTimeout(OPERATION_TIMEOUT);
1623  }
1624
1625  private static List<Get> makeTimelineGets(byte[]... rows) {
1626    List<Get> result = new ArrayList<>(rows.length);
1627    for (byte[] row : rows) {
1628      Get get = new Get(row);
1629      get.setConsistency(Consistency.TIMELINE);
1630      result.add(get);
1631    }
1632    return result;
1633  }
1634
1635  private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception {
1636    Object[] actual = ars.getResults();
1637    Assert.assertEquals(expected.length, actual.length);
1638    for (int i = 0; i < expected.length; ++i) {
1639      Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable));
1640    }
1641  }
1642
1643  /** After reading TheDailyWtf, I always wanted to create a MyBoolean enum like this! */
1644  private enum RR {
1645    TRUE,
1646    FALSE,
1647    DONT_CARE,
1648    FAILED
1649  }
1650
1651  private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception {
1652    Object[] actuals = ars.getResults();
1653    Assert.assertEquals(expecteds.length, actuals.length);
1654    for (int i = 0; i < expecteds.length; ++i) {
1655      Object actual = actuals[i];
1656      RR expected = expecteds[i];
1657      Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable);
1658      if (expected != RR.FAILED && expected != RR.DONT_CARE) {
1659        Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale());
1660      }
1661    }
1662  }
1663
1664  /**
1665   * @param regCnt  the region: 1 to 3.
1666   * @param success if true, the put will succeed.
1667   * @return a put
1668   */
1669  private Put createPut(int regCnt, boolean success) {
1670    Put p;
1671    if (!success) {
1672      p = new Put(FAILS);
1673    } else switch (regCnt){
1674      case 1 :
1675        p = new Put(DUMMY_BYTES_1);
1676        break;
1677      case 2:
1678        p = new Put(DUMMY_BYTES_2);
1679        break;
1680      case 3:
1681        p = new Put(DUMMY_BYTES_3);
1682        break;
1683      default:
1684        throw new IllegalArgumentException("unknown " + regCnt);
1685    }
1686
1687    p.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
1688
1689    return p;
1690  }
1691
1692  static class MyThreadPoolExecutor extends ThreadPoolExecutor {
1693    public MyThreadPoolExecutor(int coreThreads, int maxThreads, long keepAliveTime,
1694        TimeUnit timeunit, BlockingQueue<Runnable> blockingqueue) {
1695      super(coreThreads, maxThreads, keepAliveTime, timeunit, blockingqueue);
1696    }
1697
1698    @Override
1699    public Future submit(Runnable runnable) {
1700      throw new OutOfMemoryError("OutOfMemory error thrown by means");
1701    }
1702  }
1703
1704  static class AsyncProcessForThrowableCheck extends AsyncProcess {
1705    public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
1706      super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(
1707          conf));
1708    }
1709  }
1710
1711  @Test
1712  public void testUncheckedException() throws Exception {
1713    // Test the case pool.submit throws unchecked exception
1714    ClusterConnection hc = createHConnection();
1715    MyThreadPoolExecutor myPool =
1716        new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
1717            new LinkedBlockingQueue<>(200));
1718    AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, CONF);
1719
1720    List<Put> puts = new ArrayList<>(1);
1721    puts.add(createPut(1, true));
1722    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1723            .setPool(myPool)
1724            .setRpcTimeout(RPC_TIMEOUT)
1725            .setOperationTimeout(OPERATION_TIMEOUT)
1726            .setTableName(DUMMY_TABLE)
1727            .setRowAccess(puts)
1728            .setSubmittedRows(SubmittedRows.NORMAL)
1729            .build();
1730    ap.submit(task);
1731    Assert.assertTrue(puts.isEmpty());
1732  }
1733
1734  /**
1735   * Test and make sure we could use a special pause setting when retry with
1736   * CallQueueTooBigException, see HBASE-17114
1737   * @throws Exception if unexpected error happened during test
1738   */
1739  @Test
1740  public void testRetryPauseWithCallQueueTooBigException() throws Exception {
1741    Configuration myConf = new Configuration(CONF);
1742    final long specialPause = 500L;
1743    final int retries = 1;
1744    myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause);
1745    myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
1746    ClusterConnection conn = new MyConnectionImpl(myConf);
1747    AsyncProcessWithFailure ap =
1748        new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException());
1749    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1750    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1751
1752    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
1753
1754    Put p = createPut(1, true);
1755    mutator.mutate(p);
1756
1757    long startTime = System.currentTimeMillis();
1758    try {
1759      mutator.flush();
1760      Assert.fail();
1761    } catch (RetriesExhaustedWithDetailsException expected) {
1762      assertEquals(1, expected.getNumExceptions());
1763      assertTrue(expected.getRow(0) == p);
1764    }
1765    long actualSleep = System.currentTimeMillis() - startTime;
1766    long expectedSleep = 0L;
1767    for (int i = 0; i < retries; i++) {
1768      expectedSleep += ConnectionUtils.getPauseTime(specialPause, i);
1769      // Prevent jitter in CollectionUtils#getPauseTime to affect result
1770      actualSleep += (long) (specialPause * 0.01f);
1771    }
1772    LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
1773    Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually " + actualSleep + "ms",
1774      actualSleep >= expectedSleep);
1775
1776    // check and confirm normal IOE will use the normal pause
1777    final long normalPause =
1778        myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1779    ap = new AsyncProcessWithFailure(conn, myConf, new IOException());
1780    bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1781    mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1782    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
1783    mutator.mutate(p);
1784    startTime = System.currentTimeMillis();
1785    try {
1786      mutator.flush();
1787      Assert.fail();
1788    } catch (RetriesExhaustedWithDetailsException expected) {
1789      assertEquals(1, expected.getNumExceptions());
1790      assertTrue(expected.getRow(0) == p);
1791    }
1792    actualSleep = System.currentTimeMillis() - startTime;
1793    expectedSleep = 0L;
1794    for (int i = 0; i < retries; i++) {
1795      expectedSleep += ConnectionUtils.getPauseTime(normalPause, i);
1796    }
1797    // plus an additional pause to balance the program execution time
1798    expectedSleep += normalPause;
1799    LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
1800    Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep);
1801  }
1802
1803  @Test
1804  public void testRetryWithExceptionClearsMetaCache() throws Exception {
1805    ClusterConnection conn = createHConnection();
1806    Configuration myConf = conn.getConfiguration();
1807    myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
1808
1809    AsyncProcessWithFailure ap =
1810        new AsyncProcessWithFailure(conn, myConf, new RegionOpeningException("test"));
1811    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1812    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1813
1814    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
1815
1816    Assert.assertEquals(
1817        conn.locateRegion(DUMMY_TABLE, DUMMY_BYTES_1, true, true).toString(),
1818        new RegionLocations(loc1).toString());
1819
1820    Mockito.verify(conn, Mockito.times(0)).clearCaches(Mockito.any());
1821
1822    Put p = createPut(1, true);
1823    mutator.mutate(p);
1824
1825    try {
1826      mutator.flush();
1827      Assert.fail();
1828    } catch (RetriesExhaustedWithDetailsException expected) {
1829      assertEquals(1, expected.getNumExceptions());
1830      assertTrue(expected.getRow(0) == p);
1831    }
1832
1833    Mockito.verify(conn, Mockito.times(1)).clearCaches(loc1.getServerName());
1834  }
1835
1836  @Test
1837  public void testQueueRowAccess() throws Exception {
1838    ClusterConnection conn = createHConnection();
1839    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
1840      new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000));
1841    Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
1842    Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
1843    mutator.mutate(p0);
1844    BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess();
1845    // QueueRowAccess should take all undealt mutations
1846    assertEquals(0, mutator.size());
1847    mutator.mutate(p1);
1848    assertEquals(1, mutator.size());
1849    BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess();
1850    // QueueRowAccess should take all undealt mutations
1851    assertEquals(0, mutator.size());
1852    assertEquals(1, ra0.size());
1853    assertEquals(1, ra1.size());
1854    Iterator<Row> iter0 = ra0.iterator();
1855    Iterator<Row> iter1 = ra1.iterator();
1856    assertTrue(iter0.hasNext());
1857    assertTrue(iter1.hasNext());
1858    // the next() will poll the mutation from inner buffer and update the buffer count
1859    assertTrue(iter0.next() == p0);
1860    assertEquals(1, mutator.getUnflushedSize());
1861    assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
1862    assertTrue(iter1.next() == p1);
1863    assertEquals(0, mutator.getUnflushedSize());
1864    assertEquals(0, mutator.getCurrentWriteBufferSize());
1865    assertFalse(iter0.hasNext());
1866    assertFalse(iter1.hasNext());
1867    // ra0 doest handle the mutation so the mutation won't be pushed back to buffer
1868    iter0.remove();
1869    ra0.close();
1870    assertEquals(0, mutator.size());
1871    assertEquals(0, mutator.getUnflushedSize());
1872    assertEquals(0, mutator.getCurrentWriteBufferSize());
1873    // ra1 doesn't handle the mutation so the mutation will be pushed back to buffer
1874    ra1.close();
1875    assertEquals(1, mutator.size());
1876    assertEquals(1, mutator.getUnflushedSize());
1877    assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
1878  }
1879}