001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.LinkedList;
032import java.util.List;
033import java.util.Map;
034import java.util.Random;
035import java.util.Set;
036import java.util.TreeSet;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.ExecutorService;
040import java.util.concurrent.Executors;
041import java.util.concurrent.Future;
042import java.util.concurrent.LinkedBlockingQueue;
043import java.util.concurrent.SynchronousQueue;
044import java.util.concurrent.ThreadFactory;
045import java.util.concurrent.ThreadPoolExecutor;
046import java.util.concurrent.TimeUnit;
047import java.util.concurrent.atomic.AtomicBoolean;
048import java.util.concurrent.atomic.AtomicInteger;
049import java.util.concurrent.atomic.AtomicLong;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.hbase.CallQueueTooBigException;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.HBaseClassTestRule;
054import org.apache.hadoop.hbase.HConstants;
055import org.apache.hadoop.hbase.HRegionInfo;
056import org.apache.hadoop.hbase.HRegionLocation;
057import org.apache.hadoop.hbase.RegionLocations;
058import org.apache.hadoop.hbase.ServerName;
059import org.apache.hadoop.hbase.TableName;
060import org.apache.hadoop.hbase.client.AsyncProcessTask.ListRowAccess;
061import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows;
062import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
063import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
064import org.apache.hadoop.hbase.client.coprocessor.Batch;
065import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
066import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
067import org.apache.hadoop.hbase.testclassification.ClientTests;
068import org.apache.hadoop.hbase.testclassification.LargeTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.Threads;
071import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
072import org.junit.Assert;
073import org.junit.Before;
074import org.junit.ClassRule;
075import org.junit.Test;
076import org.junit.experimental.categories.Category;
077import org.mockito.Mockito;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081@Category({ClientTests.class, LargeTests.class})
082public class TestAsyncProcess {
083
084  @ClassRule
085  public static final HBaseClassTestRule CLASS_RULE =
086      HBaseClassTestRule.forClass(TestAsyncProcess.class);
087
088  private static final Logger LOG = LoggerFactory.getLogger(TestAsyncProcess.class);
089  private static final TableName DUMMY_TABLE =
090      TableName.valueOf("DUMMY_TABLE");
091  private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1");
092  private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2");
093  private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3");
094  private static final byte[] FAILS = Bytes.toBytes("FAILS");
095  private Configuration CONF;
096  private ConnectionConfiguration CONNECTION_CONFIG;
097  private static final ServerName sn = ServerName.valueOf("s1,1,1");
098  private static final ServerName sn2 = ServerName.valueOf("s2,2,2");
099  private static final ServerName sn3 = ServerName.valueOf("s3,3,3");
100  private static final HRegionInfo hri1 =
101      new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
102  private static final HRegionInfo hri2 =
103      new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
104  private static final HRegionInfo hri3 =
105      new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
106  private static final HRegionLocation loc1 = new HRegionLocation(hri1, sn);
107  private static final HRegionLocation loc2 = new HRegionLocation(hri2, sn);
108  private static final HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
109
110  // Replica stuff
111  private static final RegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1);
112  private static final RegionInfo hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2);
113  private static final RegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
114  private static final RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
115      new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3));
116  private static final RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2),
117      new HRegionLocation(hri2r1, sn3));
118  private static final RegionLocations hrls3 =
119      new RegionLocations(new HRegionLocation(hri3, sn3), null);
120
121  private static final String success = "success";
122  private static Exception failure = new Exception("failure");
123
124  private static final int NB_RETRIES = 3;
125
126  private int RPC_TIMEOUT;
127  private int OPERATION_TIMEOUT;
128
129  @Before
130  public void beforeEach() {
131    this.CONF = new Configuration();
132    CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
133    this.CONNECTION_CONFIG = new ConnectionConfiguration(CONF);
134    this.RPC_TIMEOUT = CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
135        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
136    this.OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
137        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
138  }
139
140  static class CountingThreadFactory implements ThreadFactory {
141    final AtomicInteger nbThreads;
142    ThreadFactory realFactory =
143      new ThreadFactoryBuilder().setNameFormat("test-TestAsyncProcess-pool-%d")
144        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build();
145    @Override
146    public Thread newThread(Runnable r) {
147      nbThreads.incrementAndGet();
148      return realFactory.newThread(r);
149    }
150
151    CountingThreadFactory(AtomicInteger nbThreads){
152      this.nbThreads = nbThreads;
153    }
154  }
155
156  static class MyAsyncProcess extends AsyncProcess {
157    final AtomicInteger nbMultiResponse = new AtomicInteger();
158    final AtomicInteger nbActions = new AtomicInteger();
159    public List<AsyncRequestFuture> allReqs = new ArrayList<>();
160    public AtomicInteger callsCt = new AtomicInteger();
161    private Configuration conf;
162
163    private long previousTimeout = -1;
164    final ExecutorService service;
165    @Override
166    protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(
167      AsyncProcessTask task, List<Action> actions, long nonceGroup) {
168      // Test HTable has tableName of null, so pass DUMMY_TABLE
169      AsyncProcessTask wrap = new AsyncProcessTask(task){
170        @Override
171        public TableName getTableName() {
172          return DUMMY_TABLE;
173        }
174      };
175      AsyncRequestFutureImpl<Res> r = new MyAsyncRequestFutureImpl<>(
176          wrap, actions, nonceGroup, this);
177      allReqs.add(r);
178      return r;
179    }
180
181    public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
182      super(hc, conf,
183          new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
184      service = Executors.newFixedThreadPool(5);
185      this.conf = conf;
186    }
187
188    public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
189      super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
190      service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
191          new SynchronousQueue<>(), new CountingThreadFactory(nbThreads));
192    }
193
194    public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
195        List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
196        boolean needResults) throws InterruptedIOException {
197      AsyncProcessTask task = AsyncProcessTask.newBuilder(callback)
198              .setPool(pool == null ? service : pool)
199              .setTableName(tableName)
200              .setRowAccess(rows)
201              .setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL)
202              .setNeedResults(needResults)
203              .setRpcTimeout(conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
204                    HConstants.DEFAULT_HBASE_RPC_TIMEOUT))
205              .setOperationTimeout(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
206                    HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT))
207              .build();
208      return submit(task);
209    }
210
211    public <CResult> AsyncRequestFuture submit(TableName tableName,
212        final List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
213        boolean needResults) throws InterruptedIOException {
214      return submit(null, tableName, rows, atLeastOne, callback, needResults);
215    }
216
217    @Override
218    public <Res> AsyncRequestFuture submit(AsyncProcessTask<Res> task)
219            throws InterruptedIOException {
220      previousTimeout = task.getRpcTimeout();
221      // We use results in tests to check things, so override to always save them.
222      AsyncProcessTask<Res> wrap = new AsyncProcessTask<Res>(task) {
223        @Override
224        public boolean getNeedResults() {
225          return true;
226        }
227      };
228      return super.submit(wrap);
229    }
230
231    @Override
232    protected RpcRetryingCaller<AbstractResponse> createCaller(
233        CancellableRegionServerCallable callable, int rpcTimeout) {
234      callsCt.incrementAndGet();
235      MultiServerCallable callable1 = (MultiServerCallable) callable;
236      final MultiResponse mr = createMultiResponse(
237          callable1.getMulti(), nbMultiResponse, nbActions,
238          new ResponseGenerator() {
239            @Override
240            public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
241              if (Arrays.equals(FAILS, a.getAction().getRow())) {
242                mr.add(regionName, a.getOriginalIndex(), failure);
243              } else {
244                mr.add(regionName, a.getOriginalIndex(), success);
245              }
246            }
247          });
248
249      return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) {
250        @Override
251        public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
252                                                   int callTimeout)
253            throws IOException, RuntimeException {
254          try {
255            // sleep one second in order for threadpool to start another thread instead of reusing
256            // existing one.
257            Thread.sleep(1000);
258          } catch (InterruptedException e) {
259            // ignore error
260          }
261          return mr;
262        }
263      };
264    }
265
266
267  }
268
269  static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
270    private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
271    public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions,
272      long nonceGroup, AsyncProcess asyncProcess) {
273      super(task, actions, nonceGroup, asyncProcess);
274    }
275
276    @Override
277    protected void updateStats(ServerName server, MultiResponse resp) {
278      // Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
279    }
280
281    Map<ServerName, List<Long>> getRequestHeapSize() {
282      return heapSizesByServer;
283    }
284
285    @Override
286    SingleServerRequestRunnable createSingleServerRequest(
287          MultiAction multiAction, int numAttempt, ServerName server,
288        Set<CancellableRegionServerCallable> callsInProgress) {
289      SingleServerRequestRunnable rq = new SingleServerRequestRunnable(
290              multiAction, numAttempt, server, callsInProgress);
291      List<Long> heapCount = heapSizesByServer.get(server);
292      if (heapCount == null) {
293        heapCount = new ArrayList<>();
294        heapSizesByServer.put(server, heapCount);
295      }
296      heapCount.add(heapSizeOf(multiAction));
297      return rq;
298    }
299
300    private long heapSizeOf(MultiAction multiAction) {
301      return multiAction.actions.values().stream()
302              .flatMap(v -> v.stream())
303              .map(action -> action.getAction())
304              .filter(row -> row instanceof Mutation)
305              .mapToLong(row -> ((Mutation) row).heapSize())
306              .sum();
307    }
308  }
309
310  static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse>{
311
312    private final IOException e;
313
314    public CallerWithFailure(IOException e) {
315      super(100, 500, 100, 9);
316      this.e = e;
317    }
318
319    @Override
320    public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
321                                            int callTimeout)
322        throws IOException, RuntimeException {
323      throw e;
324    }
325  }
326
327
328  static class AsyncProcessWithFailure extends MyAsyncProcess {
329
330    private final IOException ioe;
331
332    public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) {
333      super(hc, conf);
334      this.ioe = ioe;
335      serverTrackerTimeout = 1L;
336    }
337
338    @Override
339    protected RpcRetryingCaller<AbstractResponse> createCaller(
340      CancellableRegionServerCallable callable, int rpcTimeout) {
341      callsCt.incrementAndGet();
342      return new CallerWithFailure(ioe);
343    }
344  }
345
346  /**
347   * Make the backoff time always different on each call.
348   */
349  static class MyClientBackoffPolicy implements ClientBackoffPolicy {
350    private final Map<ServerName, AtomicInteger> count = new HashMap<>();
351    @Override
352    public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) {
353      AtomicInteger inc = count.get(serverName);
354      if (inc == null) {
355        inc = new AtomicInteger(0);
356        count.put(serverName, inc);
357      }
358      return inc.getAndIncrement();
359    }
360  }
361
362  static class MyAsyncProcessWithReplicas extends MyAsyncProcess {
363    private Set<byte[]> failures = new TreeSet<>(new Bytes.ByteArrayComparator());
364    private long primarySleepMs = 0, replicaSleepMs = 0;
365    private Map<ServerName, Long> customPrimarySleepMs = new HashMap<>();
366    private final AtomicLong replicaCalls = new AtomicLong(0);
367
368    public void addFailures(RegionInfo... hris) {
369      for (RegionInfo hri : hris) {
370        failures.add(hri.getRegionName());
371      }
372    }
373
374    public long getReplicaCallCount() {
375      return replicaCalls.get();
376    }
377
378    public void setPrimaryCallDelay(ServerName server, long primaryMs) {
379      customPrimarySleepMs.put(server, primaryMs);
380    }
381
382    public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) {
383      super(hc, conf);
384    }
385
386    public void setCallDelays(long primaryMs, long replicaMs) {
387      this.primarySleepMs = primaryMs;
388      this.replicaSleepMs = replicaMs;
389    }
390
391    @Override
392    protected RpcRetryingCaller<AbstractResponse> createCaller(
393        CancellableRegionServerCallable payloadCallable, int rpcTimeout) {
394      MultiServerCallable callable = (MultiServerCallable) payloadCallable;
395      final MultiResponse mr = createMultiResponse(
396          callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
397            @Override
398            public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
399              if (failures.contains(regionName)) {
400                mr.add(regionName, a.getOriginalIndex(), failure);
401              } else {
402                boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId());
403                mr.add(regionName, a.getOriginalIndex(),
404                    Result.create(new Cell[0], null, isStale));
405              }
406            }
407          });
408      // Currently AsyncProcess either sends all-replica, or all-primary request.
409      final boolean isDefault = RegionReplicaUtil.isDefaultReplica(
410          callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId());
411      final ServerName server = ((MultiServerCallable)callable).getServerName();
412      String debugMsg = "Call to " + server + ", primary=" + isDefault + " with "
413          + callable.getMulti().actions.size() + " entries: ";
414      for (byte[] region : callable.getMulti().actions.keySet()) {
415        debugMsg += "[" + Bytes.toStringBinary(region) + "], ";
416      }
417      LOG.debug(debugMsg);
418      if (!isDefault) {
419        replicaCalls.incrementAndGet();
420      }
421
422      return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) {
423        @Override
424        public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
425                                                int callTimeout)
426        throws IOException, RuntimeException {
427          long sleep = -1;
428          if (isDefault) {
429            Long customSleep = customPrimarySleepMs.get(server);
430            sleep = (customSleep == null ? primarySleepMs : customSleep.longValue());
431          } else {
432            sleep = replicaSleepMs;
433          }
434          if (sleep != 0) {
435            try {
436              Thread.sleep(sleep);
437            } catch (InterruptedException e) {
438            }
439          }
440          return mr;
441        }
442      };
443    }
444  }
445
446  static MultiResponse createMultiResponse(final MultiAction multi,
447      AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) {
448    final MultiResponse mr = new MultiResponse();
449    nbMultiResponse.incrementAndGet();
450    for (Map.Entry<byte[], List<Action>> entry : multi.actions.entrySet()) {
451      byte[] regionName = entry.getKey();
452      for (Action a : entry.getValue()) {
453        nbActions.incrementAndGet();
454        gen.addResponse(mr, regionName, a);
455      }
456    }
457    return mr;
458  }
459
460  private static interface ResponseGenerator {
461    void addResponse(final MultiResponse mr, byte[] regionName, Action a);
462  }
463
464  /**
465   * Returns our async process.
466   */
467  static class MyConnectionImpl extends ConnectionImplementation {
468    public static class TestRegistry extends DoNothingConnectionRegistry {
469
470      public TestRegistry(Configuration conf) {
471        super(conf);
472      }
473
474      @Override
475      public CompletableFuture<String> getClusterId() {
476        return CompletableFuture.completedFuture("testClusterId");
477      }
478    }
479
480    final AtomicInteger nbThreads = new AtomicInteger(0);
481
482    protected MyConnectionImpl(Configuration conf) throws IOException {
483      super(setupConf(conf), null, null);
484    }
485
486    private static Configuration setupConf(Configuration conf) {
487      conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
488          TestRegistry.class, ConnectionRegistry.class);
489      return conf;
490    }
491
492    @Override
493    public RegionLocations locateRegion(TableName tableName,
494        byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
495      return new RegionLocations(loc1);
496    }
497
498    @Override
499    public boolean hasCellBlockSupport() {
500      return false;
501    }
502  }
503
504  /**
505   * Returns our async process.
506   */
507  static class MyConnectionImpl2 extends MyConnectionImpl {
508    List<HRegionLocation> hrl;
509    final boolean usedRegions[];
510
511    protected MyConnectionImpl2(List<HRegionLocation> hrl, Configuration conf) throws IOException {
512      super(conf);
513      this.hrl = hrl;
514      this.usedRegions = new boolean[hrl.size()];
515    }
516
517    @Override
518    public RegionLocations locateRegion(TableName tableName,
519        byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
520      int i = 0;
521      for (HRegionLocation hr : hrl){
522        if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
523          usedRegions[i] = true;
524          return new RegionLocations(hr);
525        }
526        i++;
527      }
528      return null;
529    }
530  }
531  @Test
532  public void testListRowAccess() {
533    int count = 10;
534    List<String> values = new LinkedList<>();
535    for (int i = 0; i != count; ++i) {
536      values.add(String.valueOf(i));
537    }
538
539    ListRowAccess<String> taker = new ListRowAccess(values);
540    assertEquals(count, taker.size());
541
542    int restoreCount = 0;
543    int takeCount = 0;
544    Iterator<String> it = taker.iterator();
545    while (it.hasNext()) {
546      String v = it.next();
547      assertEquals(String.valueOf(takeCount), v);
548      ++takeCount;
549      it.remove();
550      if (Math.random() >= 0.5) {
551        break;
552      }
553    }
554    assertEquals(count, taker.size() + takeCount);
555
556    it = taker.iterator();
557    while (it.hasNext()) {
558      String v = it.next();
559      assertEquals(String.valueOf(takeCount), v);
560      ++takeCount;
561      it.remove();
562    }
563    assertEquals(0, taker.size());
564    assertEquals(count, takeCount);
565  }
566  private static long calculateRequestCount(long putSizePerServer, long maxHeapSizePerRequest) {
567    if (putSizePerServer <= maxHeapSizePerRequest) {
568      return 1;
569    } else if (putSizePerServer % maxHeapSizePerRequest == 0) {
570      return putSizePerServer / maxHeapSizePerRequest;
571    } else {
572      return putSizePerServer / maxHeapSizePerRequest + 1;
573    }
574  }
575
576  @Test
577  public void testSubmitSameSizeOfRequest() throws Exception {
578    long writeBuffer = 2 * 1024 * 1024;
579    long putsHeapSize = writeBuffer;
580    doSubmitRequest(writeBuffer, putsHeapSize);
581  }
582
583  @Test
584  public void testSubmitLargeRequestWithUnlimitedSize() throws Exception {
585    long maxHeapSizePerRequest = Long.MAX_VALUE;
586    long putsHeapSize = 2 * 1024 * 1024;
587    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
588  }
589
590  @Test
591  public void testSubmitRandomSizeRequest() throws Exception {
592    Random rn = new Random();
593    final long limit = 10 * 1024 * 1024;
594    final int requestCount = 1 + (int) (rn.nextDouble() * 3);
595    long n = rn.nextLong();
596    if (n < 0) {
597      n = -n;
598    } else if (n == 0) {
599      n = 1;
600    }
601    long putsHeapSize = n % limit;
602    long maxHeapSizePerRequest = putsHeapSize / requestCount;
603    LOG.info("[testSubmitRandomSizeRequest] maxHeapSizePerRequest=" + maxHeapSizePerRequest +
604        ", putsHeapSize=" + putsHeapSize);
605    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
606  }
607
608  @Test
609  public void testSubmitSmallRequest() throws Exception {
610    long maxHeapSizePerRequest = 2 * 1024 * 1024;
611    long putsHeapSize = 100;
612    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
613  }
614
615  @Test
616  public void testSubmitLargeRequest() throws Exception {
617    long maxHeapSizePerRequest = 2 * 1024 * 1024;
618    long putsHeapSize = maxHeapSizePerRequest * 2;
619    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
620  }
621
622  private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception {
623    ClusterConnection conn = createHConnection();
624    final String defaultClazz =
625        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
626    final long defaultHeapSizePerRequest = conn.getConfiguration().getLong(
627      SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
628      SimpleRequestController.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
629    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
630      SimpleRequestController.class.getName());
631    conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
632        maxHeapSizePerRequest);
633
634    // sn has two regions
635    long putSizeSN = 0;
636    long putSizeSN2 = 0;
637    List<Put> puts = new ArrayList<>();
638    while ((putSizeSN + putSizeSN2) <= putsHeapSize) {
639      Put put1 = new Put(DUMMY_BYTES_1);
640      put1.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
641      Put put2 = new Put(DUMMY_BYTES_2);
642      put2.addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
643      Put put3 = new Put(DUMMY_BYTES_3);
644      put3.addColumn(DUMMY_BYTES_3, DUMMY_BYTES_3, DUMMY_BYTES_3);
645      putSizeSN += (put1.heapSize() + put2.heapSize());
646      putSizeSN2 += put3.heapSize();
647      puts.add(put1);
648      puts.add(put2);
649      puts.add(put3);
650    }
651
652    int minCountSnRequest = (int) calculateRequestCount(putSizeSN, maxHeapSizePerRequest);
653    int minCountSn2Request = (int) calculateRequestCount(putSizeSN2, maxHeapSizePerRequest);
654    LOG.info("Total put count:" + puts.size() + ", putSizeSN:"+ putSizeSN
655        + ", putSizeSN2:" + putSizeSN2
656        + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest
657        + ", minCountSnRequest:" + minCountSnRequest
658        + ", minCountSn2Request:" + minCountSn2Request);
659
660    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
661    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
662    try (BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap)) {
663      mutator.mutate(puts);
664      mutator.flush();
665      List<AsyncRequestFuture> reqs = ap.allReqs;
666
667      int actualSnReqCount = 0;
668      int actualSn2ReqCount = 0;
669      for (AsyncRequestFuture req : reqs) {
670        if (!(req instanceof AsyncRequestFutureImpl)) {
671          continue;
672        }
673        MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req;
674        if (ars.getRequestHeapSize().containsKey(sn)) {
675          ++actualSnReqCount;
676        }
677        if (ars.getRequestHeapSize().containsKey(sn2)) {
678          ++actualSn2ReqCount;
679        }
680      }
681      // If the server is busy, the actual count may be incremented.
682      assertEquals(true, minCountSnRequest <= actualSnReqCount);
683      assertEquals(true, minCountSn2Request <= actualSn2ReqCount);
684      Map<ServerName, Long> sizePerServers = new HashMap<>();
685      for (AsyncRequestFuture req : reqs) {
686        if (!(req instanceof AsyncRequestFutureImpl)) {
687          continue;
688        }
689        MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req;
690        Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize();
691        for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) {
692          long sum = 0;
693          for (long size : entry.getValue()) {
694            assertEquals(true, size <= maxHeapSizePerRequest);
695            sum += size;
696          }
697          assertEquals(true, sum <= maxHeapSizePerRequest);
698          long value = sizePerServers.getOrDefault(entry.getKey(), 0L);
699          sizePerServers.put(entry.getKey(), value + sum);
700        }
701      }
702      assertEquals(true, sizePerServers.containsKey(sn));
703      assertEquals(true, sizePerServers.containsKey(sn2));
704      assertEquals(false, sizePerServers.containsKey(sn3));
705      assertEquals(putSizeSN, (long) sizePerServers.get(sn));
706      assertEquals(putSizeSN2, (long) sizePerServers.get(sn2));
707    }
708    // restore config.
709    conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
710        defaultHeapSizePerRequest);
711    if (defaultClazz != null) {
712      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
713        defaultClazz);
714    }
715  }
716
717  @Test
718  public void testSubmit() throws Exception {
719    ClusterConnection hc = createHConnection();
720    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
721
722    List<Put> puts = new ArrayList<>(1);
723    puts.add(createPut(1, true));
724
725    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
726    Assert.assertTrue(puts.isEmpty());
727  }
728
729  @Test
730  public void testSubmitWithCB() throws Exception {
731    ClusterConnection hc = createHConnection();
732    final AtomicInteger updateCalled = new AtomicInteger(0);
733    Batch.Callback<Object> cb = new Batch.Callback<Object>() {
734      @Override
735      public void update(byte[] region, byte[] row, Object result) {
736        updateCalled.incrementAndGet();
737      }
738    };
739    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
740
741    List<Put> puts = new ArrayList<>(1);
742    puts.add(createPut(1, true));
743
744    final AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, cb, false);
745    Assert.assertTrue(puts.isEmpty());
746    ars.waitUntilDone();
747    Assert.assertEquals(1, updateCalled.get());
748  }
749
750  @Test
751  public void testSubmitBusyRegion() throws Exception {
752    ClusterConnection conn = createHConnection();
753    final String defaultClazz =
754        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
755    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
756      SimpleRequestController.class.getName());
757    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
758    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
759    List<Put> puts = new ArrayList<>(1);
760    puts.add(createPut(1, true));
761
762    for (int i = 0; i != controller.maxConcurrentTasksPerRegion; ++i) {
763      ap.incTaskCounters(Collections.singleton(hri1.getRegionName()), sn);
764    }
765    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
766    Assert.assertEquals(puts.size(), 1);
767
768    ap.decTaskCounters(Collections.singleton(hri1.getRegionName()), sn);
769    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
770    Assert.assertEquals(0, puts.size());
771    if (defaultClazz != null) {
772      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
773        defaultClazz);
774    }
775  }
776
777
778  @Test
779  public void testSubmitBusyRegionServer() throws Exception {
780    ClusterConnection conn = createHConnection();
781    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
782    final String defaultClazz =
783        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
784    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
785      SimpleRequestController.class.getName());
786    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
787    controller.taskCounterPerServer.put(sn2,
788        new AtomicInteger(controller.maxConcurrentTasksPerServer));
789
790    List<Put> puts = new ArrayList<>(4);
791    puts.add(createPut(1, true));
792    puts.add(createPut(3, true)); // <== this one won't be taken, the rs is busy
793    puts.add(createPut(1, true)); // <== this one will make it, the region is already in
794    puts.add(createPut(2, true)); // <== new region, but the rs is ok
795
796    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
797    Assert.assertEquals(" puts=" + puts, 1, puts.size());
798
799    controller.taskCounterPerServer.put(sn2,
800        new AtomicInteger(controller.maxConcurrentTasksPerServer - 1));
801    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
802    Assert.assertTrue(puts.isEmpty());
803    if (defaultClazz != null) {
804      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
805        defaultClazz);
806    }
807  }
808
809  @Test
810  public void testFail() throws Exception {
811    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
812
813    List<Put> puts = new ArrayList<>(1);
814    Put p = createPut(1, false);
815    puts.add(p);
816
817    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
818    Assert.assertEquals(0, puts.size());
819    ars.waitUntilDone();
820    verifyResult(ars, false);
821    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
822
823    Assert.assertEquals(1, ars.getErrors().exceptions.size());
824    Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
825        failure.equals(ars.getErrors().exceptions.get(0)));
826    Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
827        failure.equals(ars.getErrors().exceptions.get(0)));
828
829    Assert.assertEquals(1, ars.getFailedOperations().size());
830    Assert.assertTrue("was: " + ars.getFailedOperations().get(0),
831        p.equals(ars.getFailedOperations().get(0)));
832  }
833
834
835  @Test
836  public void testSubmitTrue() throws IOException {
837    ClusterConnection conn = createHConnection();
838    final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
839    final String defaultClazz =
840        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
841    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
842      SimpleRequestController.class.getName());
843    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
844    controller.tasksInProgress.incrementAndGet();
845    final AtomicInteger ai = new AtomicInteger(controller.maxConcurrentTasksPerRegion);
846    controller.taskCounterPerRegion.put(hri1.getRegionName(), ai);
847
848    final AtomicBoolean checkPoint = new AtomicBoolean(false);
849    final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
850
851    Thread t = new Thread(){
852      @Override
853      public void run(){
854        Threads.sleep(1000);
855        Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent
856        ai.decrementAndGet();
857        controller.tasksInProgress.decrementAndGet();
858        checkPoint2.set(true);
859      }
860    };
861
862    List<Put> puts = new ArrayList<>(1);
863    Put p = createPut(1, true);
864    puts.add(p);
865
866    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
867    Assert.assertFalse(puts.isEmpty());
868
869    t.start();
870
871    ap.submit(null, DUMMY_TABLE, puts, true, null, false);
872    Assert.assertTrue(puts.isEmpty());
873
874    checkPoint.set(true);
875    while (!checkPoint2.get()){
876      Threads.sleep(1);
877    }
878    if (defaultClazz != null) {
879      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
880        defaultClazz);
881    }
882  }
883
884  @Test
885  public void testFailAndSuccess() throws Exception {
886    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
887
888    List<Put> puts = new ArrayList<>(3);
889    puts.add(createPut(1, false));
890    puts.add(createPut(1, true));
891    puts.add(createPut(1, true));
892
893    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
894    Assert.assertTrue(puts.isEmpty());
895    ars.waitUntilDone();
896    verifyResult(ars, false, true, true);
897    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
898    ap.callsCt.set(0);
899    Assert.assertEquals(1, ars.getErrors().actions.size());
900
901    puts.add(createPut(1, true));
902    // Wait for AP to be free. While ars might have the result, ap counters are decreased later.
903    ap.waitForMaximumCurrentTasks(0, null);
904    ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
905    Assert.assertEquals(0, puts.size());
906    ars.waitUntilDone();
907    Assert.assertEquals(1, ap.callsCt.get());
908    verifyResult(ars, true);
909  }
910
911  @Test
912  public void testFlush() throws Exception {
913    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
914
915    List<Put> puts = new ArrayList<>(3);
916    puts.add(createPut(1, false));
917    puts.add(createPut(1, true));
918    puts.add(createPut(1, true));
919
920    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
921    ars.waitUntilDone();
922    verifyResult(ars, false, true, true);
923    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
924
925    Assert.assertEquals(1, ars.getFailedOperations().size());
926  }
927
928  @Test
929  public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException {
930    ClusterConnection hc = createHConnection();
931    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
932    testTaskCount(ap);
933  }
934
935  @Test
936  public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException {
937    Configuration copyConf = new Configuration(CONF);
938    copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
939    MyClientBackoffPolicy bp = new MyClientBackoffPolicy();
940    ClusterConnection conn = createHConnection();
941    Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
942    Mockito.when(conn.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf));
943    Mockito.when(conn.getBackoffPolicy()).thenReturn(bp);
944    final String defaultClazz =
945        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
946    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
947      SimpleRequestController.class.getName());
948    MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
949    testTaskCount(ap);
950    if (defaultClazz != null) {
951      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
952        defaultClazz);
953    }
954  }
955
956  private void testTaskCount(MyAsyncProcess ap)
957      throws InterruptedIOException, InterruptedException {
958    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
959    List<Put> puts = new ArrayList<>();
960    for (int i = 0; i != 3; ++i) {
961      puts.add(createPut(1, true));
962      puts.add(createPut(2, true));
963      puts.add(createPut(3, true));
964    }
965    ap.submit(null, DUMMY_TABLE, puts, true, null, false);
966    ap.waitForMaximumCurrentTasks(0, null);
967    // More time to wait if there are incorrect task count.
968    TimeUnit.SECONDS.sleep(1);
969    assertEquals(0, controller.tasksInProgress.get());
970    for (AtomicInteger count : controller.taskCounterPerRegion.values()) {
971      assertEquals(0, count.get());
972    }
973    for (AtomicInteger count : controller.taskCounterPerServer.values()) {
974      assertEquals(0, count.get());
975    }
976  }
977
978  @Test
979  public void testMaxTask() throws Exception {
980    ClusterConnection conn = createHConnection();
981    final String defaultClazz =
982        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
983    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
984      SimpleRequestController.class.getName());
985    final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
986    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
987
988
989    for (int i = 0; i < 1000; i++) {
990      ap.incTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn);
991    }
992
993    final Thread myThread = Thread.currentThread();
994
995    Thread t = new Thread() {
996      @Override
997      public void run() {
998        Threads.sleep(2000);
999        myThread.interrupt();
1000      }
1001    };
1002
1003    List<Put> puts = new ArrayList<>(1);
1004    puts.add(createPut(1, true));
1005
1006    t.start();
1007
1008    try {
1009      ap.submit(null, DUMMY_TABLE, puts, false, null, false);
1010      Assert.fail("We should have been interrupted.");
1011    } catch (InterruptedIOException expected) {
1012    }
1013
1014    final long sleepTime = 2000;
1015
1016    Thread t2 = new Thread() {
1017      @Override
1018      public void run() {
1019        Threads.sleep(sleepTime);
1020        while (controller.tasksInProgress.get() > 0) {
1021          ap.decTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn);
1022        }
1023      }
1024    };
1025    t2.start();
1026
1027    long start = System.currentTimeMillis();
1028    ap.submit(null, DUMMY_TABLE, new ArrayList<>(), false, null, false);
1029    long end = System.currentTimeMillis();
1030
1031    //Adds 100 to secure us against approximate timing.
1032    Assert.assertTrue(start + 100L + sleepTime > end);
1033    if (defaultClazz != null) {
1034      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
1035        defaultClazz);
1036    }
1037  }
1038
1039  private ClusterConnection createHConnection() throws IOException {
1040    ClusterConnection hc = createHConnectionCommon();
1041    setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1));
1042    setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2));
1043    setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3));
1044    Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(),
1045        Mockito.anyBoolean())).thenReturn(Arrays.asList(loc1, loc2, loc3));
1046    setMockLocation(hc, FAILS, new RegionLocations(loc2));
1047    return hc;
1048  }
1049
1050  private ClusterConnection createHConnectionWithReplicas() throws IOException {
1051    ClusterConnection hc = createHConnectionCommon();
1052    setMockLocation(hc, DUMMY_BYTES_1, hrls1);
1053    setMockLocation(hc, DUMMY_BYTES_2, hrls2);
1054    setMockLocation(hc, DUMMY_BYTES_3, hrls3);
1055    List<HRegionLocation> locations = new ArrayList<>();
1056    for (HRegionLocation loc : hrls1.getRegionLocations()) {
1057      locations.add(loc);
1058    }
1059    for (HRegionLocation loc : hrls2.getRegionLocations()) {
1060      locations.add(loc);
1061    }
1062    for (HRegionLocation loc : hrls3.getRegionLocations()) {
1063      locations.add(loc);
1064    }
1065    Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(),
1066        Mockito.anyBoolean())).thenReturn(locations);
1067    return hc;
1068  }
1069
1070  private static void setMockLocation(ClusterConnection hc, byte[] row,
1071      RegionLocations result) throws IOException {
1072    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
1073        Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
1074    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
1075        Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result);
1076  }
1077
1078  private ClusterConnection createHConnectionCommon() {
1079    ClusterConnection hc = Mockito.mock(ClusterConnection.class);
1080    NonceGenerator ng = Mockito.mock(NonceGenerator.class);
1081    Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
1082    Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
1083    Mockito.when(hc.getConfiguration()).thenReturn(CONF);
1084    Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG);
1085    return hc;
1086  }
1087
1088  @Test
1089  public void testHTablePutSuccess() throws Exception {
1090    ClusterConnection conn = createHConnection();
1091    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1092    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1093    BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
1094
1095    Put put = createPut(1, true);
1096
1097    Assert.assertEquals(conn.getConnectionConfiguration().getWriteBufferSize(),
1098        ht.getWriteBufferSize());
1099    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1100    ht.mutate(put);
1101    ht.flush();
1102    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1103  }
1104
1105  @Test
1106  public void testSettingWriteBufferPeriodicFlushParameters() throws Exception {
1107    ClusterConnection conn = createHConnection();
1108    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1109
1110    checkPeriodicFlushParameters(conn, ap,
1111            1234, 1234,
1112            1234, 1234);
1113    checkPeriodicFlushParameters(conn, ap,
1114               0,    0,
1115               0,    BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
1116    checkPeriodicFlushParameters(conn, ap,
1117           -1234,    0,
1118           -1234,    BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
1119    checkPeriodicFlushParameters(conn, ap,
1120               1,    1,
1121               1,    BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
1122  }
1123
1124  private void checkPeriodicFlushParameters(ClusterConnection conn,
1125                                            MyAsyncProcess ap,
1126                                            long setTO, long expectTO,
1127                                            long setTT, long expectTT
1128                                            ) {
1129    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1130
1131    // The BufferedMutatorParams does nothing with the value
1132    bufferParam.setWriteBufferPeriodicFlushTimeoutMs(setTO);
1133    bufferParam.setWriteBufferPeriodicFlushTimerTickMs(setTT);
1134    Assert.assertEquals(setTO, bufferParam.getWriteBufferPeriodicFlushTimeoutMs());
1135    Assert.assertEquals(setTT, bufferParam.getWriteBufferPeriodicFlushTimerTickMs());
1136
1137    // The BufferedMutatorImpl corrects illegal values (indirect via BufferedMutatorParams)
1138    BufferedMutatorImpl ht1 = new BufferedMutatorImpl(conn, bufferParam, ap);
1139    Assert.assertEquals(expectTO, ht1.getWriteBufferPeriodicFlushTimeoutMs());
1140    Assert.assertEquals(expectTT, ht1.getWriteBufferPeriodicFlushTimerTickMs());
1141
1142    // The BufferedMutatorImpl corrects illegal values (direct via setter)
1143    BufferedMutatorImpl ht2 =
1144            new BufferedMutatorImpl(conn, createBufferedMutatorParams(ap, DUMMY_TABLE), ap);
1145    ht2.setWriteBufferPeriodicFlush(setTO, setTT);
1146    Assert.assertEquals(expectTO, ht2.getWriteBufferPeriodicFlushTimeoutMs());
1147    Assert.assertEquals(expectTT, ht2.getWriteBufferPeriodicFlushTimerTickMs());
1148
1149  }
1150
1151  @Test
1152  public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception {
1153    ClusterConnection conn = createHConnection();
1154    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1155    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1156
1157    bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1);     // Flush ASAP
1158    bufferParam.setWriteBufferPeriodicFlushTimerTickMs(1); // Check every 100ms
1159    bufferParam.writeBufferSize(10000);  // Write buffer set to much larger than the single record
1160
1161    BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
1162
1163    // Verify if BufferedMutator has the right settings.
1164    Assert.assertEquals(10000, ht.getWriteBufferSize());
1165    Assert.assertEquals(1, ht.getWriteBufferPeriodicFlushTimeoutMs());
1166    Assert.assertEquals(BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS,
1167            ht.getWriteBufferPeriodicFlushTimerTickMs());
1168
1169    Put put = createPut(1, true);
1170
1171    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
1172    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1173
1174    // ----- Insert, flush immediately, MUST NOT flush automatically
1175    ht.mutate(put);
1176    ht.flush();
1177
1178    Thread.sleep(1000);
1179    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
1180    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1181
1182    // ----- Insert, NO flush, MUST flush automatically
1183    ht.mutate(put);
1184    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
1185    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
1186
1187    // The timerTick should fire every 100ms, so after twice that we must have
1188    // seen at least 1 tick and we should see an automatic flush
1189    Thread.sleep(200);
1190    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1191    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1192
1193    // Ensure it does not flush twice
1194    Thread.sleep(200);
1195    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1196    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1197
1198    // ----- DISABLE AUTO FLUSH, Insert, NO flush, MUST NOT flush automatically
1199    ht.disableWriteBufferPeriodicFlush();
1200    ht.mutate(put);
1201    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1202    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
1203
1204    // Wait for at least 1 timerTick, we should see NO flushes.
1205    Thread.sleep(200);
1206    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1207    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
1208
1209    // Reenable periodic flushing, a flush seems to take about 1 second
1210    // so we wait for 2 seconds and it should have finished the flush.
1211    ht.setWriteBufferPeriodicFlush(1, 100);
1212    Thread.sleep(2000);
1213    Assert.assertEquals(2, ht.getExecutedWriteBufferPeriodicFlushes());
1214    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1215  }
1216
1217
1218  @Test
1219  public void testBufferedMutatorImplWithSharedPool() throws Exception {
1220    ClusterConnection conn = createHConnection();
1221    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1222    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1223    BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap);
1224
1225    ht.close();
1226    assertFalse(ap.service.isShutdown());
1227  }
1228
1229  @Test
1230  public void testFailedPutAndNewPut() throws Exception {
1231    ClusterConnection conn = createHConnection();
1232    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1233    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE)
1234            .writeBufferSize(0);
1235    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1236
1237    Put p = createPut(1, false);
1238    try {
1239      mutator.mutate(p);
1240      Assert.fail();
1241    } catch (RetriesExhaustedWithDetailsException expected) {
1242      assertEquals(1, expected.getNumExceptions());
1243      assertTrue(expected.getRow(0) == p);
1244    }
1245    // Let's do all the retries.
1246    ap.waitForMaximumCurrentTasks(0, null);
1247    Assert.assertEquals(0, mutator.size());
1248
1249    // There is no global error so the new put should not fail
1250    mutator.mutate(createPut(1, true));
1251    Assert.assertEquals("the put should not been inserted.", 0, mutator.size());
1252  }
1253
1254  @SuppressWarnings("SelfComparison")
1255  @Test
1256  public void testAction() {
1257    Action action_0 = new Action(new Put(Bytes.toBytes("abc")), 10);
1258    Action action_1 = new Action(new Put(Bytes.toBytes("ccc")), 10);
1259    Action action_2 = new Action(new Put(Bytes.toBytes("ccc")), 10);
1260    Action action_3 = new Action(new Delete(Bytes.toBytes("ccc")), 10);
1261    assertFalse(action_0.equals(action_1));
1262    assertTrue(action_0.equals(action_0));
1263    assertTrue(action_1.equals(action_2));
1264    assertTrue(action_2.equals(action_1));
1265    assertFalse(action_0.equals(new Put(Bytes.toBytes("abc"))));
1266    assertTrue(action_2.equals(action_3));
1267    assertFalse(action_0.equals(action_3));
1268    assertEquals(0, action_0.compareTo(action_0));
1269    assertTrue(action_0.compareTo(action_1) < 0);
1270    assertTrue(action_1.compareTo(action_0) > 0);
1271    assertEquals(0, action_1.compareTo(action_2));
1272  }
1273
1274  @Test
1275  public void testBatch() throws IOException, InterruptedException {
1276    ClusterConnection conn = new MyConnectionImpl(CONF);
1277    HTable ht = (HTable) conn.getTable(DUMMY_TABLE);
1278    ht.multiAp = new MyAsyncProcess(conn, CONF);
1279
1280    List<Put> puts = new ArrayList<>(7);
1281    puts.add(createPut(1, true));
1282    puts.add(createPut(1, true));
1283    puts.add(createPut(1, true));
1284    puts.add(createPut(1, true));
1285    puts.add(createPut(1, false)); // <=== the bad apple, position 4
1286    puts.add(createPut(1, true));
1287    puts.add(createPut(1, false)); // <=== another bad apple, position 6
1288
1289    Object[] res = new Object[puts.size()];
1290    try {
1291      ht.batch(puts, res);
1292      Assert.fail();
1293    } catch (RetriesExhaustedException expected) {
1294    }
1295
1296    Assert.assertEquals(success, res[0]);
1297    Assert.assertEquals(success, res[1]);
1298    Assert.assertEquals(success, res[2]);
1299    Assert.assertEquals(success, res[3]);
1300    Assert.assertEquals(failure, res[4]);
1301    Assert.assertEquals(success, res[5]);
1302    Assert.assertEquals(failure, res[6]);
1303  }
1304  @Test
1305  public void testErrorsServers() throws IOException {
1306    Configuration configuration = new Configuration(CONF);
1307    ClusterConnection conn = new MyConnectionImpl(configuration);
1308    MyAsyncProcess ap = new MyAsyncProcess(conn, configuration);
1309    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1310    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1311    configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true);
1312
1313    Assert.assertNotNull(ap.createServerErrorTracker());
1314    Assert.assertTrue(ap.serverTrackerTimeout > 200L);
1315    ap.serverTrackerTimeout = 1L;
1316
1317    Put p = createPut(1, false);
1318    mutator.mutate(p);
1319
1320    try {
1321      mutator.flush();
1322      Assert.fail();
1323    } catch (RetriesExhaustedWithDetailsException expected) {
1324      assertEquals(1, expected.getNumExceptions());
1325      assertTrue(expected.getRow(0) == p);
1326    }
1327    // Checking that the ErrorsServers came into play and didn't make us stop immediately
1328    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
1329  }
1330
1331  @Test
1332  public void testReadAndWriteTimeout() throws IOException {
1333    final long readTimeout = 10 * 1000;
1334    final long writeTimeout = 20 * 1000;
1335    Configuration copyConf = new Configuration(CONF);
1336    copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout);
1337    copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout);
1338    ClusterConnection conn = new MyConnectionImpl(copyConf);
1339    MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
1340    try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) {
1341      ht.multiAp = ap;
1342      List<Get> gets = new LinkedList<>();
1343      gets.add(new Get(DUMMY_BYTES_1));
1344      gets.add(new Get(DUMMY_BYTES_2));
1345      try {
1346        ht.get(gets);
1347      } catch (ClassCastException e) {
1348        // No result response on this test.
1349      }
1350      assertEquals(readTimeout, ap.previousTimeout);
1351      ap.previousTimeout = -1;
1352
1353      try {
1354        ht.existsAll(gets);
1355      } catch (ClassCastException e) {
1356        // No result response on this test.
1357      }
1358      assertEquals(readTimeout, ap.previousTimeout);
1359      ap.previousTimeout = -1;
1360
1361      List<Delete> deletes = new LinkedList<>();
1362      deletes.add(new Delete(DUMMY_BYTES_1));
1363      deletes.add(new Delete(DUMMY_BYTES_2));
1364      ht.delete(deletes);
1365      assertEquals(writeTimeout, ap.previousTimeout);
1366    }
1367  }
1368
1369  @Test
1370  public void testErrors() throws IOException {
1371    ClusterConnection conn = new MyConnectionImpl(CONF);
1372    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test"));
1373    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1374    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1375
1376    Assert.assertNotNull(ap.createServerErrorTracker());
1377
1378    Put p = createPut(1, true);
1379    mutator.mutate(p);
1380
1381    try {
1382      mutator.flush();
1383      Assert.fail();
1384    } catch (RetriesExhaustedWithDetailsException expected) {
1385      assertEquals(1, expected.getNumExceptions());
1386      assertTrue(expected.getRow(0) == p);
1387    }
1388    // Checking that the ErrorsServers came into play and didn't make us stop immediately
1389    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
1390  }
1391
1392
1393  @Test
1394  public void testCallQueueTooLarge() throws IOException {
1395    ClusterConnection conn = new MyConnectionImpl(CONF);
1396    AsyncProcessWithFailure ap =
1397        new AsyncProcessWithFailure(conn, CONF, new CallQueueTooBigException());
1398    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1399    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1400    Assert.assertNotNull(ap.createServerErrorTracker());
1401    Put p = createPut(1, true);
1402    mutator.mutate(p);
1403
1404    try {
1405      mutator.flush();
1406      Assert.fail();
1407    } catch (RetriesExhaustedWithDetailsException expected) {
1408      assertEquals(1, expected.getNumExceptions());
1409      assertTrue(expected.getRow(0) == p);
1410    }
1411    // Checking that the ErrorsServers came into play and didn't make us stop immediately
1412    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
1413  }
1414  /**
1415   * This test simulates multiple regions on 2 servers. We should have 2 multi requests and
1416   *  2 threads: 1 per server, this whatever the number of regions.
1417   */
1418  @Test
1419  public void testThreadCreation() throws Exception {
1420    final int NB_REGS = 100;
1421    List<HRegionLocation> hrls = new ArrayList<>(NB_REGS);
1422    List<Get> gets = new ArrayList<>(NB_REGS);
1423    for (int i = 0; i < NB_REGS; i++) {
1424      HRegionInfo hri = new HRegionInfo(
1425          DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i);
1426      HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
1427      hrls.add(hrl);
1428
1429      Get get = new Get(Bytes.toBytes(i * 10L));
1430      gets.add(get);
1431    }
1432
1433    MyConnectionImpl2 con = new MyConnectionImpl2(hrls, CONF);
1434    MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads);
1435    HTable ht = (HTable) con.getTable(DUMMY_TABLE, ap.service);
1436    ht.multiAp = ap;
1437    ht.batch(gets, null);
1438
1439    Assert.assertEquals(NB_REGS, ap.nbActions.get());
1440    Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get());
1441    Assert.assertEquals("1 thread per server", 2, con.nbThreads.get());
1442
1443    int nbReg = 0;
1444    for (int i =0; i<NB_REGS; i++){
1445      if (con.usedRegions[i]) nbReg++;
1446    }
1447    Assert.assertEquals("nbReg=" + nbReg, NB_REGS, nbReg);
1448  }
1449
1450  @Test
1451  public void testReplicaReplicaSuccess() throws Exception {
1452    // Main call takes too long so replicas succeed, except for one region w/o replicas.
1453    // One region has no replica, so the main call succeeds for it.
1454    MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
1455    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
1456    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1457            .setPool(ap.service)
1458            .setRpcTimeout(RPC_TIMEOUT)
1459            .setOperationTimeout(OPERATION_TIMEOUT)
1460            .setTableName(DUMMY_TABLE)
1461            .setRowAccess(rows)
1462            .setResults(new Object[3])
1463            .setSubmittedRows(SubmittedRows.ALL)
1464            .build();
1465    AsyncRequestFuture ars = ap.submit(task);
1466    verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
1467    Assert.assertEquals(2, ap.getReplicaCallCount());
1468  }
1469
1470  @Test
1471  public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception {
1472    // Main call succeeds before replica calls are kicked off.
1473    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
1474    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
1475    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1476            .setPool(ap.service)
1477            .setRpcTimeout(RPC_TIMEOUT)
1478            .setOperationTimeout(OPERATION_TIMEOUT)
1479            .setTableName(DUMMY_TABLE)
1480            .setRowAccess(rows)
1481            .setResults(new Object[3])
1482            .setSubmittedRows(SubmittedRows.ALL)
1483            .build();
1484    AsyncRequestFuture ars = ap.submit(task);
1485    verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
1486    Assert.assertEquals(0, ap.getReplicaCallCount());
1487  }
1488
1489  @Test
1490  public void testReplicaParallelCallsSucceed() throws Exception {
1491    // Either main or replica can succeed.
1492    MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
1493    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1494    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1495            .setPool(ap.service)
1496            .setRpcTimeout(RPC_TIMEOUT)
1497            .setOperationTimeout(OPERATION_TIMEOUT)
1498            .setTableName(DUMMY_TABLE)
1499            .setRowAccess(rows)
1500            .setResults(new Object[2])
1501            .setSubmittedRows(SubmittedRows.ALL)
1502            .build();
1503    AsyncRequestFuture ars = ap.submit(task);
1504    verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
1505    long replicaCalls = ap.getReplicaCallCount();
1506    Assert.assertTrue(replicaCalls >= 0);
1507    Assert.assertTrue(replicaCalls <= 2);
1508  }
1509
1510  @Test
1511  public void testReplicaPartialReplicaCall() throws Exception {
1512    // One server is slow, so the result for its region comes from replica, whereas
1513    // the result for other region comes from primary before replica calls happen.
1514    // There should be no replica call for that region at all.
1515    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
1516    ap.setPrimaryCallDelay(sn2, 2000);
1517    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1518    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1519            .setPool(ap.service)
1520            .setRpcTimeout(RPC_TIMEOUT)
1521            .setOperationTimeout(OPERATION_TIMEOUT)
1522            .setTableName(DUMMY_TABLE)
1523            .setRowAccess(rows)
1524            .setResults(new Object[2])
1525            .setSubmittedRows(SubmittedRows.ALL)
1526            .build();
1527    AsyncRequestFuture ars = ap.submit(task);
1528    verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
1529    Assert.assertEquals(1, ap.getReplicaCallCount());
1530  }
1531
1532  @Test
1533  public void testReplicaMainFailsBeforeReplicaCalls() throws Exception {
1534    // Main calls fail before replica calls can start - this is currently not handled.
1535    // It would probably never happen if we can get location (due to retries),
1536    // and it would require additional synchronization.
1537    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0);
1538    ap.addFailures(hri1, hri2);
1539    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1540    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1541            .setPool(ap.service)
1542            .setRpcTimeout(RPC_TIMEOUT)
1543            .setOperationTimeout(OPERATION_TIMEOUT)
1544            .setTableName(DUMMY_TABLE)
1545            .setRowAccess(rows)
1546            .setResults(new Object[2])
1547            .setSubmittedRows(SubmittedRows.ALL)
1548            .build();
1549    AsyncRequestFuture ars = ap.submit(task);
1550    verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
1551    Assert.assertEquals(0, ap.getReplicaCallCount());
1552  }
1553
1554  @Test
1555  public void testReplicaReplicaSuccessWithParallelFailures() throws Exception {
1556    // Main calls fails after replica calls start. For two-replica region, one replica call
1557    // also fails. Regardless, we get replica results for both regions.
1558    MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0);
1559    ap.addFailures(hri1, hri1r2, hri2);
1560    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1561    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1562            .setPool(ap.service)
1563            .setRpcTimeout(RPC_TIMEOUT)
1564            .setOperationTimeout(OPERATION_TIMEOUT)
1565            .setTableName(DUMMY_TABLE)
1566            .setRowAccess(rows)
1567            .setResults(new Object[2])
1568            .setSubmittedRows(SubmittedRows.ALL)
1569            .build();
1570    AsyncRequestFuture ars = ap.submit(task);
1571    verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
1572    Assert.assertEquals(2, ap.getReplicaCallCount());
1573  }
1574
1575  @Test
1576  public void testReplicaAllCallsFailForOneRegion() throws Exception {
1577    // For one of the region, all 3, main and replica, calls fail. For the other, replica
1578    // call fails but its exception should not be visible as it did succeed.
1579    MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0);
1580    ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
1581    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1582    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1583            .setPool(ap.service)
1584            .setRpcTimeout(RPC_TIMEOUT)
1585            .setOperationTimeout(OPERATION_TIMEOUT)
1586            .setTableName(DUMMY_TABLE)
1587            .setRowAccess(rows)
1588            .setResults(new Object[2])
1589            .setSubmittedRows(SubmittedRows.ALL)
1590            .build();
1591    AsyncRequestFuture ars = ap.submit(task);
1592    verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
1593    // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1
1594    Assert.assertEquals(3, ars.getErrors().getNumExceptions());
1595    for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) {
1596      Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow());
1597    }
1598  }
1599
1600  private MyAsyncProcessWithReplicas createReplicaAp(
1601      int replicaAfterMs, int primaryMs, int replicaMs) throws Exception {
1602    return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1);
1603  }
1604
1605  private MyAsyncProcessWithReplicas createReplicaAp(
1606      int replicaAfterMs, int primaryMs, int replicaMs, int retries) throws Exception {
1607    // TODO: this is kind of timing dependent... perhaps it should detect from createCaller
1608    //       that the replica call has happened and that way control the ordering.
1609    Configuration conf = new Configuration();
1610    ClusterConnection conn = createHConnectionWithReplicas();
1611    conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000);
1612    if (retries >= 0) {
1613      conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
1614    }
1615    MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf);
1616    ap.setCallDelays(primaryMs, replicaMs);
1617    return ap;
1618  }
1619
1620  private BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap,
1621      TableName name) {
1622    return new BufferedMutatorParams(name)
1623            .pool(ap.service)
1624            .rpcTimeout(RPC_TIMEOUT)
1625            .opertationTimeout(OPERATION_TIMEOUT);
1626  }
1627
1628  private static List<Get> makeTimelineGets(byte[]... rows) {
1629    List<Get> result = new ArrayList<>(rows.length);
1630    for (byte[] row : rows) {
1631      Get get = new Get(row);
1632      get.setConsistency(Consistency.TIMELINE);
1633      result.add(get);
1634    }
1635    return result;
1636  }
1637
1638  private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception {
1639    Object[] actual = ars.getResults();
1640    Assert.assertEquals(expected.length, actual.length);
1641    for (int i = 0; i < expected.length; ++i) {
1642      Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable));
1643    }
1644  }
1645
1646  /** After reading TheDailyWtf, I always wanted to create a MyBoolean enum like this! */
1647  private enum RR {
1648    TRUE,
1649    FALSE,
1650    DONT_CARE,
1651    FAILED
1652  }
1653
1654  private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception {
1655    Object[] actuals = ars.getResults();
1656    Assert.assertEquals(expecteds.length, actuals.length);
1657    for (int i = 0; i < expecteds.length; ++i) {
1658      Object actual = actuals[i];
1659      RR expected = expecteds[i];
1660      Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable);
1661      if (expected != RR.FAILED && expected != RR.DONT_CARE) {
1662        Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale());
1663      }
1664    }
1665  }
1666
1667  /**
1668   * @param regCnt  the region: 1 to 3.
1669   * @param success if true, the put will succeed.
1670   * @return a put
1671   */
1672  private Put createPut(int regCnt, boolean success) {
1673    Put p;
1674    if (!success) {
1675      p = new Put(FAILS);
1676    } else switch (regCnt){
1677      case 1 :
1678        p = new Put(DUMMY_BYTES_1);
1679        break;
1680      case 2:
1681        p = new Put(DUMMY_BYTES_2);
1682        break;
1683      case 3:
1684        p = new Put(DUMMY_BYTES_3);
1685        break;
1686      default:
1687        throw new IllegalArgumentException("unknown " + regCnt);
1688    }
1689
1690    p.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
1691
1692    return p;
1693  }
1694
1695  static class MyThreadPoolExecutor extends ThreadPoolExecutor {
1696    public MyThreadPoolExecutor(int coreThreads, int maxThreads, long keepAliveTime,
1697        TimeUnit timeunit, BlockingQueue<Runnable> blockingqueue) {
1698      super(coreThreads, maxThreads, keepAliveTime, timeunit, blockingqueue);
1699    }
1700
1701    @Override
1702    public Future submit(Runnable runnable) {
1703      throw new OutOfMemoryError("OutOfMemory error thrown by means");
1704    }
1705  }
1706
1707  static class AsyncProcessForThrowableCheck extends AsyncProcess {
1708    public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
1709      super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(
1710          conf));
1711    }
1712  }
1713
1714  @Test
1715  public void testUncheckedException() throws Exception {
1716    // Test the case pool.submit throws unchecked exception
1717    ClusterConnection hc = createHConnection();
1718    MyThreadPoolExecutor myPool =
1719        new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
1720            new LinkedBlockingQueue<>(200));
1721    AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, CONF);
1722
1723    List<Put> puts = new ArrayList<>(1);
1724    puts.add(createPut(1, true));
1725    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1726            .setPool(myPool)
1727            .setRpcTimeout(RPC_TIMEOUT)
1728            .setOperationTimeout(OPERATION_TIMEOUT)
1729            .setTableName(DUMMY_TABLE)
1730            .setRowAccess(puts)
1731            .setSubmittedRows(SubmittedRows.NORMAL)
1732            .build();
1733    ap.submit(task);
1734    Assert.assertTrue(puts.isEmpty());
1735  }
1736
1737  /**
1738   * Test and make sure we could use a special pause setting when retry with
1739   * CallQueueTooBigException, see HBASE-17114
1740   * @throws Exception if unexpected error happened during test
1741   */
1742  @Test
1743  public void testRetryPauseWithCallQueueTooBigException() throws Exception {
1744    Configuration myConf = new Configuration(CONF);
1745    final long specialPause = 500L;
1746    final int retries = 1;
1747    myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause);
1748    myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
1749    ClusterConnection conn = new MyConnectionImpl(myConf);
1750    AsyncProcessWithFailure ap =
1751        new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException());
1752    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1753    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1754
1755    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
1756
1757    Put p = createPut(1, true);
1758    mutator.mutate(p);
1759
1760    long startTime = System.currentTimeMillis();
1761    try {
1762      mutator.flush();
1763      Assert.fail();
1764    } catch (RetriesExhaustedWithDetailsException expected) {
1765      assertEquals(1, expected.getNumExceptions());
1766      assertTrue(expected.getRow(0) == p);
1767    }
1768    long actualSleep = System.currentTimeMillis() - startTime;
1769    long expectedSleep = 0L;
1770    for (int i = 0; i < retries; i++) {
1771      expectedSleep += ConnectionUtils.getPauseTime(specialPause, i);
1772      // Prevent jitter in ConcurrentMapUtils#getPauseTime to affect result
1773      actualSleep += (long) (specialPause * 0.01f);
1774    }
1775    LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
1776    Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually " + actualSleep + "ms",
1777      actualSleep >= expectedSleep);
1778
1779    // check and confirm normal IOE will use the normal pause
1780    final long normalPause =
1781        myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1782    ap = new AsyncProcessWithFailure(conn, myConf, new IOException());
1783    bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1784    mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1785    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
1786    mutator.mutate(p);
1787    startTime = System.currentTimeMillis();
1788    try {
1789      mutator.flush();
1790      Assert.fail();
1791    } catch (RetriesExhaustedWithDetailsException expected) {
1792      assertEquals(1, expected.getNumExceptions());
1793      assertTrue(expected.getRow(0) == p);
1794    }
1795    actualSleep = System.currentTimeMillis() - startTime;
1796    expectedSleep = 0L;
1797    for (int i = 0; i < retries; i++) {
1798      expectedSleep += ConnectionUtils.getPauseTime(normalPause, i);
1799    }
1800    // plus an additional pause to balance the program execution time
1801    expectedSleep += normalPause;
1802    LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
1803    Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep);
1804  }
1805
1806  @Test
1807  public void testRetryWithExceptionClearsMetaCache() throws Exception {
1808    ClusterConnection conn = createHConnection();
1809    Configuration myConf = conn.getConfiguration();
1810    myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
1811
1812    AsyncProcessWithFailure ap =
1813        new AsyncProcessWithFailure(conn, myConf, new RegionOpeningException("test"));
1814    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1815    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1816
1817    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
1818
1819    Assert.assertEquals(
1820        conn.locateRegion(DUMMY_TABLE, DUMMY_BYTES_1, true, true).toString(),
1821        new RegionLocations(loc1).toString());
1822
1823    Mockito.verify(conn, Mockito.times(0)).clearCaches(Mockito.any());
1824
1825    Put p = createPut(1, true);
1826    mutator.mutate(p);
1827
1828    try {
1829      mutator.flush();
1830      Assert.fail();
1831    } catch (RetriesExhaustedWithDetailsException expected) {
1832      assertEquals(1, expected.getNumExceptions());
1833      assertTrue(expected.getRow(0) == p);
1834    }
1835
1836    Mockito.verify(conn, Mockito.times(1)).clearCaches(loc1.getServerName());
1837  }
1838
1839  @Test
1840  public void testQueueRowAccess() throws Exception {
1841    ClusterConnection conn = createHConnection();
1842    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
1843      new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000));
1844    Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
1845    Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
1846    mutator.mutate(p0);
1847    BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess();
1848    // QueueRowAccess should take all undealt mutations
1849    assertEquals(0, mutator.size());
1850    mutator.mutate(p1);
1851    assertEquals(1, mutator.size());
1852    BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess();
1853    // QueueRowAccess should take all undealt mutations
1854    assertEquals(0, mutator.size());
1855    assertEquals(1, ra0.size());
1856    assertEquals(1, ra1.size());
1857    Iterator<Row> iter0 = ra0.iterator();
1858    Iterator<Row> iter1 = ra1.iterator();
1859    assertTrue(iter0.hasNext());
1860    assertTrue(iter1.hasNext());
1861    // the next() will poll the mutation from inner buffer and update the buffer count
1862    assertTrue(iter0.next() == p0);
1863    assertEquals(1, mutator.getUnflushedSize());
1864    assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
1865    assertTrue(iter1.next() == p1);
1866    assertEquals(0, mutator.getUnflushedSize());
1867    assertEquals(0, mutator.getCurrentWriteBufferSize());
1868    assertFalse(iter0.hasNext());
1869    assertFalse(iter1.hasNext());
1870    // ra0 doest handle the mutation so the mutation won't be pushed back to buffer
1871    iter0.remove();
1872    ra0.close();
1873    assertEquals(0, mutator.size());
1874    assertEquals(0, mutator.getUnflushedSize());
1875    assertEquals(0, mutator.getCurrentWriteBufferSize());
1876    // ra1 doesn't handle the mutation so the mutation will be pushed back to buffer
1877    ra1.close();
1878    assertEquals(1, mutator.size());
1879    assertEquals(1, mutator.getUnflushedSize());
1880    assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
1881  }
1882}