001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Map;
033import java.util.Random;
034import java.util.Set;
035import java.util.TreeSet;
036import java.util.concurrent.BlockingQueue;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ExecutorService;
039import java.util.concurrent.Executors;
040import java.util.concurrent.Future;
041import java.util.concurrent.LinkedBlockingQueue;
042import java.util.concurrent.SynchronousQueue;
043import java.util.concurrent.ThreadFactory;
044import java.util.concurrent.ThreadPoolExecutor;
045import java.util.concurrent.TimeUnit;
046import java.util.concurrent.atomic.AtomicBoolean;
047import java.util.concurrent.atomic.AtomicInteger;
048import java.util.concurrent.atomic.AtomicLong;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.hbase.CallDroppedException;
051import org.apache.hadoop.hbase.CallQueueTooBigException;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.HBaseClassTestRule;
054import org.apache.hadoop.hbase.HBaseServerException;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.HRegionInfo;
057import org.apache.hadoop.hbase.HRegionLocation;
058import org.apache.hadoop.hbase.RegionLocations;
059import org.apache.hadoop.hbase.ServerName;
060import org.apache.hadoop.hbase.TableName;
061import org.apache.hadoop.hbase.client.AsyncProcessTask.ListRowAccess;
062import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows;
063import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
064import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
065import org.apache.hadoop.hbase.client.coprocessor.Batch;
066import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
067import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
068import org.apache.hadoop.hbase.testclassification.ClientTests;
069import org.apache.hadoop.hbase.testclassification.LargeTests;
070import org.apache.hadoop.hbase.util.Bytes;
071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
072import org.apache.hadoop.hbase.util.Threads;
073import org.junit.Assert;
074import org.junit.Before;
075import org.junit.ClassRule;
076import org.junit.Test;
077import org.junit.experimental.categories.Category;
078import org.mockito.Mockito;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
083
084@Category({ ClientTests.class, LargeTests.class })
085public class TestAsyncProcess {
086
087  @ClassRule
088  public static final HBaseClassTestRule CLASS_RULE =
089    HBaseClassTestRule.forClass(TestAsyncProcess.class);
090
091  private static final Logger LOG = LoggerFactory.getLogger(TestAsyncProcess.class);
092  private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE");
093  private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1");
094  private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2");
095  private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3");
096  private static final byte[] FAILS = Bytes.toBytes("FAILS");
097  private Configuration CONF;
098  private ConnectionConfiguration CONNECTION_CONFIG;
099  private static final ServerName sn = ServerName.valueOf("s1,1,1");
100  private static final ServerName sn2 = ServerName.valueOf("s2,2,2");
101  private static final ServerName sn3 = ServerName.valueOf("s3,3,3");
102  private static final HRegionInfo hri1 =
103    new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
104  private static final HRegionInfo hri2 =
105    new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
106  private static final HRegionInfo hri3 =
107    new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
108  private static final HRegionLocation loc1 = new HRegionLocation(hri1, sn);
109  private static final HRegionLocation loc2 = new HRegionLocation(hri2, sn);
110  private static final HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
111
112  // Replica stuff
113  private static final RegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1);
114  private static final RegionInfo hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2);
115  private static final RegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
116  private static final RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
117    new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3));
118  private static final RegionLocations hrls2 =
119    new RegionLocations(new HRegionLocation(hri2, sn2), new HRegionLocation(hri2r1, sn3));
120  private static final RegionLocations hrls3 =
121    new RegionLocations(new HRegionLocation(hri3, sn3), null);
122
123  private static final String success = "success";
124  private static Exception failure = new Exception("failure");
125
126  private static final int NB_RETRIES = 3;
127
128  private int RPC_TIMEOUT;
129  private int OPERATION_TIMEOUT;
130
131  @Before
132  public void beforeEach() {
133    this.CONF = new Configuration();
134    CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
135    this.CONNECTION_CONFIG = new ConnectionConfiguration(CONF);
136    this.RPC_TIMEOUT =
137      CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
138    this.OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
139      HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
140  }
141
142  static class CountingThreadFactory implements ThreadFactory {
143    final AtomicInteger nbThreads;
144    ThreadFactory realFactory =
145      new ThreadFactoryBuilder().setNameFormat("test-TestAsyncProcess-pool-%d")
146        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build();
147
148    @Override
149    public Thread newThread(Runnable r) {
150      nbThreads.incrementAndGet();
151      return realFactory.newThread(r);
152    }
153
154    CountingThreadFactory(AtomicInteger nbThreads) {
155      this.nbThreads = nbThreads;
156    }
157  }
158
159  static class MyAsyncProcess extends AsyncProcess {
160    final AtomicInteger nbMultiResponse = new AtomicInteger();
161    final AtomicInteger nbActions = new AtomicInteger();
162    public List<AsyncRequestFuture> allReqs = new ArrayList<>();
163    public AtomicInteger callsCt = new AtomicInteger();
164    private Configuration conf;
165
166    private long previousTimeout = -1;
167    final ExecutorService service;
168
169    @Override
170    protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(AsyncProcessTask task,
171      List<Action> actions, long nonceGroup) {
172      // Test HTable has tableName of null, so pass DUMMY_TABLE
173      AsyncProcessTask wrap = new AsyncProcessTask(task) {
174        @Override
175        public TableName getTableName() {
176          return DUMMY_TABLE;
177        }
178      };
179      AsyncRequestFutureImpl<Res> r =
180        new MyAsyncRequestFutureImpl<>(wrap, actions, nonceGroup, this);
181      allReqs.add(r);
182      return r;
183    }
184
185    public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
186      super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
187      service = Executors.newFixedThreadPool(5);
188      this.conf = conf;
189    }
190
191    public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
192      super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
193      service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
194        new CountingThreadFactory(nbThreads));
195    }
196
197    public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
198      List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
199      boolean needResults) throws InterruptedIOException {
200      AsyncProcessTask task = AsyncProcessTask.newBuilder(callback)
201        .setPool(pool == null ? service : pool).setTableName(tableName).setRowAccess(rows)
202        .setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL)
203        .setNeedResults(needResults)
204        .setRpcTimeout(
205          conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT))
206        .setOperationTimeout(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
207          HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT))
208        .build();
209      return submit(task);
210    }
211
212    public <CResult> AsyncRequestFuture submit(TableName tableName, final List<? extends Row> rows,
213      boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
214      throws InterruptedIOException {
215      return submit(null, tableName, rows, atLeastOne, callback, needResults);
216    }
217
218    @Override
219    public <Res> AsyncRequestFuture submit(AsyncProcessTask<Res> task)
220      throws InterruptedIOException {
221      previousTimeout = task.getRpcTimeout();
222      // We use results in tests to check things, so override to always save them.
223      AsyncProcessTask<Res> wrap = new AsyncProcessTask<Res>(task) {
224        @Override
225        public boolean getNeedResults() {
226          return true;
227        }
228      };
229      return super.submit(wrap);
230    }
231
232    @Override
233    protected RpcRetryingCaller<AbstractResponse>
234      createCaller(CancellableRegionServerCallable callable, int rpcTimeout) {
235      callsCt.incrementAndGet();
236      MultiServerCallable callable1 = (MultiServerCallable) callable;
237      final MultiResponse mr = createMultiResponse(callable1.getMulti(), nbMultiResponse, nbActions,
238        new ResponseGenerator() {
239          @Override
240          public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
241            if (Arrays.equals(FAILS, a.getAction().getRow())) {
242              mr.add(regionName, a.getOriginalIndex(), failure);
243            } else {
244              mr.add(regionName, a.getOriginalIndex(), success);
245            }
246          }
247        });
248
249      return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) {
250        @Override
251        public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
252          int callTimeout) throws IOException, RuntimeException {
253          try {
254            // sleep one second in order for threadpool to start another thread instead of reusing
255            // existing one.
256            Thread.sleep(1000);
257          } catch (InterruptedException e) {
258            // ignore error
259          }
260          return mr;
261        }
262      };
263    }
264
265  }
266
267  static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
268    private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
269
270    public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, long nonceGroup,
271      AsyncProcess asyncProcess) {
272      super(task, actions, nonceGroup, asyncProcess);
273    }
274
275    @Override
276    protected void updateStats(ServerName server, MultiResponse resp) {
277      // Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
278    }
279
280    Map<ServerName, List<Long>> getRequestHeapSize() {
281      return heapSizesByServer;
282    }
283
284    @Override
285    SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt,
286      ServerName server, Set<CancellableRegionServerCallable> callsInProgress) {
287      SingleServerRequestRunnable rq =
288        new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress);
289      List<Long> heapCount = heapSizesByServer.get(server);
290      if (heapCount == null) {
291        heapCount = new ArrayList<>();
292        heapSizesByServer.put(server, heapCount);
293      }
294      heapCount.add(heapSizeOf(multiAction));
295      return rq;
296    }
297
298    private long heapSizeOf(MultiAction multiAction) {
299      return multiAction.actions.values().stream().flatMap(v -> v.stream())
300        .map(action -> action.getAction()).filter(row -> row instanceof Mutation)
301        .mapToLong(row -> ((Mutation) row).heapSize()).sum();
302    }
303  }
304
305  static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse> {
306
307    private final IOException e;
308
309    public CallerWithFailure(IOException e) {
310      super(100, 500, 100, 9);
311      this.e = e;
312    }
313
314    @Override
315    public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
316      int callTimeout) throws IOException, RuntimeException {
317      throw e;
318    }
319  }
320
321  static class AsyncProcessWithFailure extends MyAsyncProcess {
322
323    private final IOException ioe;
324
325    public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) {
326      super(hc, conf);
327      this.ioe = ioe;
328      serverTrackerTimeout = 1L;
329    }
330
331    @Override
332    protected RpcRetryingCaller<AbstractResponse>
333      createCaller(CancellableRegionServerCallable callable, int rpcTimeout) {
334      callsCt.incrementAndGet();
335      return new CallerWithFailure(ioe);
336    }
337  }
338
339  /**
340   * Make the backoff time always different on each call.
341   */
342  static class MyClientBackoffPolicy implements ClientBackoffPolicy {
343    private final Map<ServerName, AtomicInteger> count = new HashMap<>();
344
345    @Override
346    public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) {
347      AtomicInteger inc = count.get(serverName);
348      if (inc == null) {
349        inc = new AtomicInteger(0);
350        count.put(serverName, inc);
351      }
352      return inc.getAndIncrement();
353    }
354  }
355
356  static class MyAsyncProcessWithReplicas extends MyAsyncProcess {
357    private Set<byte[]> failures = new TreeSet<>(new Bytes.ByteArrayComparator());
358    private long primarySleepMs = 0, replicaSleepMs = 0;
359    private Map<ServerName, Long> customPrimarySleepMs = new HashMap<>();
360    private final AtomicLong replicaCalls = new AtomicLong(0);
361
362    public void addFailures(RegionInfo... hris) {
363      for (RegionInfo hri : hris) {
364        failures.add(hri.getRegionName());
365      }
366    }
367
368    public long getReplicaCallCount() {
369      return replicaCalls.get();
370    }
371
372    public void setPrimaryCallDelay(ServerName server, long primaryMs) {
373      customPrimarySleepMs.put(server, primaryMs);
374    }
375
376    public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) {
377      super(hc, conf);
378    }
379
380    public void setCallDelays(long primaryMs, long replicaMs) {
381      this.primarySleepMs = primaryMs;
382      this.replicaSleepMs = replicaMs;
383    }
384
385    @Override
386    protected RpcRetryingCaller<AbstractResponse>
387      createCaller(CancellableRegionServerCallable payloadCallable, int rpcTimeout) {
388      MultiServerCallable callable = (MultiServerCallable) payloadCallable;
389      final MultiResponse mr = createMultiResponse(callable.getMulti(), nbMultiResponse, nbActions,
390        new ResponseGenerator() {
391          @Override
392          public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
393            if (failures.contains(regionName)) {
394              mr.add(regionName, a.getOriginalIndex(), failure);
395            } else {
396              boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId());
397              mr.add(regionName, a.getOriginalIndex(), Result.create(new Cell[0], null, isStale));
398            }
399          }
400        });
401      // Currently AsyncProcess either sends all-replica, or all-primary request.
402      final boolean isDefault = RegionReplicaUtil.isDefaultReplica(
403        callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId());
404      final ServerName server = ((MultiServerCallable) callable).getServerName();
405      String debugMsg = "Call to " + server + ", primary=" + isDefault + " with "
406        + callable.getMulti().actions.size() + " entries: ";
407      for (byte[] region : callable.getMulti().actions.keySet()) {
408        debugMsg += "[" + Bytes.toStringBinary(region) + "], ";
409      }
410      LOG.debug(debugMsg);
411      if (!isDefault) {
412        replicaCalls.incrementAndGet();
413      }
414
415      return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) {
416        @Override
417        public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
418          int callTimeout) throws IOException, RuntimeException {
419          long sleep = -1;
420          if (isDefault) {
421            Long customSleep = customPrimarySleepMs.get(server);
422            sleep = (customSleep == null ? primarySleepMs : customSleep.longValue());
423          } else {
424            sleep = replicaSleepMs;
425          }
426          if (sleep != 0) {
427            try {
428              Thread.sleep(sleep);
429            } catch (InterruptedException e) {
430              // Restore interrupt status
431              Thread.currentThread().interrupt();
432            }
433          }
434          return mr;
435        }
436      };
437    }
438  }
439
440  static MultiResponse createMultiResponse(final MultiAction multi, AtomicInteger nbMultiResponse,
441    AtomicInteger nbActions, ResponseGenerator gen) {
442    final MultiResponse mr = new MultiResponse();
443    nbMultiResponse.incrementAndGet();
444    for (Map.Entry<byte[], List<Action>> entry : multi.actions.entrySet()) {
445      byte[] regionName = entry.getKey();
446      for (Action a : entry.getValue()) {
447        nbActions.incrementAndGet();
448        gen.addResponse(mr, regionName, a);
449      }
450    }
451    return mr;
452  }
453
454  private static interface ResponseGenerator {
455    void addResponse(final MultiResponse mr, byte[] regionName, Action a);
456  }
457
458  /**
459   * Returns our async process.
460   */
461  static class MyConnectionImpl extends ConnectionImplementation {
462    public static class TestRegistry extends DoNothingConnectionRegistry {
463
464      public TestRegistry(Configuration conf) {
465        super(conf);
466      }
467
468      @Override
469      public CompletableFuture<String> getClusterId() {
470        return CompletableFuture.completedFuture("testClusterId");
471      }
472    }
473
474    final AtomicInteger nbThreads = new AtomicInteger(0);
475
476    protected MyConnectionImpl(Configuration conf) throws IOException {
477      super(setupConf(conf), null, null);
478    }
479
480    private static Configuration setupConf(Configuration conf) {
481      conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, TestRegistry.class,
482        ConnectionRegistry.class);
483      return conf;
484    }
485
486    @Override
487    public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache,
488      boolean retry, int replicaId) throws IOException {
489      return new RegionLocations(loc1);
490    }
491
492    @Override
493    public boolean hasCellBlockSupport() {
494      return false;
495    }
496  }
497
498  /**
499   * Returns our async process.
500   */
501  static class MyConnectionImpl2 extends MyConnectionImpl {
502    List<HRegionLocation> hrl;
503    final boolean usedRegions[];
504
505    protected MyConnectionImpl2(List<HRegionLocation> hrl, Configuration conf) throws IOException {
506      super(conf);
507      this.hrl = hrl;
508      this.usedRegions = new boolean[hrl.size()];
509    }
510
511    @Override
512    public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache,
513      boolean retry, int replicaId) throws IOException {
514      int i = 0;
515      for (HRegionLocation hr : hrl) {
516        if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
517          usedRegions[i] = true;
518          return new RegionLocations(hr);
519        }
520        i++;
521      }
522      return null;
523    }
524  }
525
526  @Test
527  public void testListRowAccess() {
528    int count = 10;
529    List<String> values = new ArrayList<>();
530    for (int i = 0; i != count; ++i) {
531      values.add(String.valueOf(i));
532    }
533
534    ListRowAccess<String> taker = new ListRowAccess<>(values);
535    assertEquals(count, taker.size());
536
537    int takeCount = 0;
538    Iterator<String> it = taker.iterator();
539    while (it.hasNext()) {
540      String v = it.next();
541      assertEquals(String.valueOf(takeCount), v);
542      ++takeCount;
543      it.remove();
544      if (Math.random() >= 0.5) {
545        break;
546      }
547    }
548    assertEquals(count, taker.size() + takeCount);
549
550    it = taker.iterator();
551    while (it.hasNext()) {
552      String v = it.next();
553      assertEquals(String.valueOf(takeCount), v);
554      ++takeCount;
555      it.remove();
556    }
557    assertEquals(0, taker.size());
558    assertEquals(count, takeCount);
559  }
560
561  private static long calculateRequestCount(long putSizePerServer, long maxHeapSizePerRequest) {
562    if (putSizePerServer <= maxHeapSizePerRequest) {
563      return 1;
564    } else if (putSizePerServer % maxHeapSizePerRequest == 0) {
565      return putSizePerServer / maxHeapSizePerRequest;
566    } else {
567      return putSizePerServer / maxHeapSizePerRequest + 1;
568    }
569  }
570
571  @Test
572  public void testSubmitSameSizeOfRequest() throws Exception {
573    long writeBuffer = 2 * 1024 * 1024;
574    long putsHeapSize = writeBuffer;
575    doSubmitRequest(writeBuffer, putsHeapSize);
576  }
577
578  @Test
579  public void testSubmitLargeRequestWithUnlimitedSize() throws Exception {
580    long maxHeapSizePerRequest = Long.MAX_VALUE;
581    long putsHeapSize = 2 * 1024 * 1024;
582    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
583  }
584
585  @Test
586  public void testSubmitRandomSizeRequest() throws Exception {
587    Random rn = new Random();
588    final long limit = 10 * 1024 * 1024;
589    final int requestCount = 1 + (int) (rn.nextDouble() * 3);
590    long n = rn.nextLong();
591    if (n < 0) {
592      n = -n;
593    } else if (n == 0) {
594      n = 1;
595    }
596    long putsHeapSize = n % limit;
597    long maxHeapSizePerRequest = putsHeapSize / requestCount;
598    LOG.info("[testSubmitRandomSizeRequest] maxHeapSizePerRequest=" + maxHeapSizePerRequest
599      + ", putsHeapSize=" + putsHeapSize);
600    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
601  }
602
603  @Test
604  public void testSubmitSmallRequest() throws Exception {
605    long maxHeapSizePerRequest = 2 * 1024 * 1024;
606    long putsHeapSize = 100;
607    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
608  }
609
610  @Test
611  public void testSubmitLargeRequest() throws Exception {
612    long maxHeapSizePerRequest = 2 * 1024 * 1024;
613    long putsHeapSize = maxHeapSizePerRequest * 2;
614    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
615  }
616
617  private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception {
618    ClusterConnection conn = createHConnection();
619    final String defaultClazz =
620      conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
621    final long defaultHeapSizePerRequest =
622      conn.getConfiguration().getLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
623        SimpleRequestController.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
624    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
625      SimpleRequestController.class.getName());
626    conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
627      maxHeapSizePerRequest);
628
629    // sn has two regions
630    long putSizeSN = 0;
631    long putSizeSN2 = 0;
632    List<Put> puts = new ArrayList<>();
633    while ((putSizeSN + putSizeSN2) <= putsHeapSize) {
634      Put put1 = new Put(DUMMY_BYTES_1);
635      put1.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
636      Put put2 = new Put(DUMMY_BYTES_2);
637      put2.addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
638      Put put3 = new Put(DUMMY_BYTES_3);
639      put3.addColumn(DUMMY_BYTES_3, DUMMY_BYTES_3, DUMMY_BYTES_3);
640      putSizeSN += (put1.heapSize() + put2.heapSize());
641      putSizeSN2 += put3.heapSize();
642      puts.add(put1);
643      puts.add(put2);
644      puts.add(put3);
645    }
646
647    int minCountSnRequest = (int) calculateRequestCount(putSizeSN, maxHeapSizePerRequest);
648    int minCountSn2Request = (int) calculateRequestCount(putSizeSN2, maxHeapSizePerRequest);
649    LOG.info("Total put count:" + puts.size() + ", putSizeSN:" + putSizeSN + ", putSizeSN2:"
650      + putSizeSN2 + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest + ", minCountSnRequest:"
651      + minCountSnRequest + ", minCountSn2Request:" + minCountSn2Request);
652
653    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
654    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
655    try (BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap)) {
656      mutator.mutate(puts);
657      mutator.flush();
658      List<AsyncRequestFuture> reqs = ap.allReqs;
659
660      int actualSnReqCount = 0;
661      int actualSn2ReqCount = 0;
662      for (AsyncRequestFuture req : reqs) {
663        if (!(req instanceof AsyncRequestFutureImpl)) {
664          continue;
665        }
666        MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req;
667        if (ars.getRequestHeapSize().containsKey(sn)) {
668          ++actualSnReqCount;
669        }
670        if (ars.getRequestHeapSize().containsKey(sn2)) {
671          ++actualSn2ReqCount;
672        }
673      }
674      // If the server is busy, the actual count may be incremented.
675      assertEquals(true, minCountSnRequest <= actualSnReqCount);
676      assertEquals(true, minCountSn2Request <= actualSn2ReqCount);
677      Map<ServerName, Long> sizePerServers = new HashMap<>();
678      for (AsyncRequestFuture req : reqs) {
679        if (!(req instanceof AsyncRequestFutureImpl)) {
680          continue;
681        }
682        MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req;
683        Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize();
684        for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) {
685          long sum = 0;
686          for (long size : entry.getValue()) {
687            assertEquals(true, size <= maxHeapSizePerRequest);
688            sum += size;
689          }
690          assertEquals(true, sum <= maxHeapSizePerRequest);
691          long value = sizePerServers.getOrDefault(entry.getKey(), 0L);
692          sizePerServers.put(entry.getKey(), value + sum);
693        }
694      }
695      assertEquals(true, sizePerServers.containsKey(sn));
696      assertEquals(true, sizePerServers.containsKey(sn2));
697      assertEquals(false, sizePerServers.containsKey(sn3));
698      assertEquals(putSizeSN, (long) sizePerServers.get(sn));
699      assertEquals(putSizeSN2, (long) sizePerServers.get(sn2));
700    }
701    // restore config.
702    conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
703      defaultHeapSizePerRequest);
704    if (defaultClazz != null) {
705      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
706        defaultClazz);
707    }
708  }
709
710  @Test
711  public void testSubmit() throws Exception {
712    ClusterConnection hc = createHConnection();
713    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
714
715    List<Put> puts = new ArrayList<>(1);
716    puts.add(createPut(1, true));
717
718    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
719    Assert.assertTrue(puts.isEmpty());
720  }
721
722  @Test
723  public void testSubmitWithCB() throws Exception {
724    ClusterConnection hc = createHConnection();
725    final AtomicInteger updateCalled = new AtomicInteger(0);
726    Batch.Callback<Object> cb = new Batch.Callback<Object>() {
727      @Override
728      public void update(byte[] region, byte[] row, Object result) {
729        updateCalled.incrementAndGet();
730      }
731    };
732    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
733
734    List<Put> puts = new ArrayList<>(1);
735    puts.add(createPut(1, true));
736
737    final AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, cb, false);
738    Assert.assertTrue(puts.isEmpty());
739    ars.waitUntilDone();
740    Assert.assertEquals(1, updateCalled.get());
741  }
742
743  @Test
744  public void testSubmitBusyRegion() throws Exception {
745    ClusterConnection conn = createHConnection();
746    final String defaultClazz =
747      conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
748    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
749      SimpleRequestController.class.getName());
750    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
751    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
752    List<Put> puts = new ArrayList<>(1);
753    puts.add(createPut(1, true));
754
755    for (int i = 0; i != controller.maxConcurrentTasksPerRegion; ++i) {
756      ap.incTaskCounters(Collections.singleton(hri1.getRegionName()), sn);
757    }
758    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
759    Assert.assertEquals(puts.size(), 1);
760
761    ap.decTaskCounters(Collections.singleton(hri1.getRegionName()), sn);
762    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
763    Assert.assertEquals(0, puts.size());
764    if (defaultClazz != null) {
765      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
766        defaultClazz);
767    }
768  }
769
770  @Test
771  public void testSubmitBusyRegionServer() throws Exception {
772    ClusterConnection conn = createHConnection();
773    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
774    final String defaultClazz =
775      conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
776    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
777      SimpleRequestController.class.getName());
778    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
779    controller.taskCounterPerServer.put(sn2,
780      new AtomicInteger(controller.maxConcurrentTasksPerServer));
781
782    List<Put> puts = new ArrayList<>(4);
783    puts.add(createPut(1, true));
784    puts.add(createPut(3, true)); // <== this one won't be taken, the rs is busy
785    puts.add(createPut(1, true)); // <== this one will make it, the region is already in
786    puts.add(createPut(2, true)); // <== new region, but the rs is ok
787
788    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
789    Assert.assertEquals(" puts=" + puts, 1, puts.size());
790
791    controller.taskCounterPerServer.put(sn2,
792      new AtomicInteger(controller.maxConcurrentTasksPerServer - 1));
793    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
794    Assert.assertTrue(puts.isEmpty());
795    if (defaultClazz != null) {
796      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
797        defaultClazz);
798    }
799  }
800
801  @Test
802  public void testFail() throws Exception {
803    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
804
805    List<Put> puts = new ArrayList<>(1);
806    Put p = createPut(1, false);
807    puts.add(p);
808
809    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
810    Assert.assertEquals(0, puts.size());
811    ars.waitUntilDone();
812    verifyResult(ars, false);
813    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
814
815    Assert.assertEquals(1, ars.getErrors().exceptions.size());
816    Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
817      failure.equals(ars.getErrors().exceptions.get(0)));
818    Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
819      failure.equals(ars.getErrors().exceptions.get(0)));
820
821    Assert.assertEquals(1, ars.getFailedOperations().size());
822    Assert.assertTrue("was: " + ars.getFailedOperations().get(0),
823      p.equals(ars.getFailedOperations().get(0)));
824  }
825
826  @Test
827  public void testSubmitTrue() throws IOException {
828    ClusterConnection conn = createHConnection();
829    final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
830    final String defaultClazz =
831      conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
832    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
833      SimpleRequestController.class.getName());
834    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
835    controller.tasksInProgress.incrementAndGet();
836    final AtomicInteger ai = new AtomicInteger(controller.maxConcurrentTasksPerRegion);
837    controller.taskCounterPerRegion.put(hri1.getRegionName(), ai);
838
839    final AtomicBoolean checkPoint = new AtomicBoolean(false);
840    final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
841
842    Thread t = new Thread() {
843      @Override
844      public void run() {
845        Threads.sleep(1000);
846        Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent
847        ai.decrementAndGet();
848        controller.tasksInProgress.decrementAndGet();
849        checkPoint2.set(true);
850      }
851    };
852
853    List<Put> puts = new ArrayList<>(1);
854    Put p = createPut(1, true);
855    puts.add(p);
856
857    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
858    Assert.assertFalse(puts.isEmpty());
859
860    t.start();
861
862    ap.submit(null, DUMMY_TABLE, puts, true, null, false);
863    Assert.assertTrue(puts.isEmpty());
864
865    checkPoint.set(true);
866    while (!checkPoint2.get()) {
867      Threads.sleep(1);
868    }
869    if (defaultClazz != null) {
870      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
871        defaultClazz);
872    }
873  }
874
875  @Test
876  public void testFailAndSuccess() throws Exception {
877    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
878
879    List<Put> puts = new ArrayList<>(3);
880    puts.add(createPut(1, false));
881    puts.add(createPut(1, true));
882    puts.add(createPut(1, true));
883
884    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
885    Assert.assertTrue(puts.isEmpty());
886    ars.waitUntilDone();
887    verifyResult(ars, false, true, true);
888    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
889    ap.callsCt.set(0);
890    Assert.assertEquals(1, ars.getErrors().actions.size());
891
892    puts.add(createPut(1, true));
893    // Wait for AP to be free. While ars might have the result, ap counters are decreased later.
894    ap.waitForMaximumCurrentTasks(0, null);
895    ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
896    Assert.assertEquals(0, puts.size());
897    ars.waitUntilDone();
898    Assert.assertEquals(1, ap.callsCt.get());
899    verifyResult(ars, true);
900  }
901
902  @Test
903  public void testFlush() throws Exception {
904    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
905
906    List<Put> puts = new ArrayList<>(3);
907    puts.add(createPut(1, false));
908    puts.add(createPut(1, true));
909    puts.add(createPut(1, true));
910
911    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
912    ars.waitUntilDone();
913    verifyResult(ars, false, true, true);
914    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
915
916    Assert.assertEquals(1, ars.getFailedOperations().size());
917  }
918
919  @Test
920  public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException {
921    ClusterConnection hc = createHConnection();
922    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
923    testTaskCount(ap);
924  }
925
926  @Test
927  public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException {
928    Configuration copyConf = new Configuration(CONF);
929    copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
930    MyClientBackoffPolicy bp = new MyClientBackoffPolicy();
931    ClusterConnection conn = createHConnection();
932    Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
933    Mockito.when(conn.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf));
934    Mockito.when(conn.getBackoffPolicy()).thenReturn(bp);
935    final String defaultClazz =
936      conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
937    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
938      SimpleRequestController.class.getName());
939    MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
940    testTaskCount(ap);
941    if (defaultClazz != null) {
942      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
943        defaultClazz);
944    }
945  }
946
947  private void testTaskCount(MyAsyncProcess ap)
948    throws InterruptedIOException, InterruptedException {
949    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
950    List<Put> puts = new ArrayList<>();
951    for (int i = 0; i != 3; ++i) {
952      puts.add(createPut(1, true));
953      puts.add(createPut(2, true));
954      puts.add(createPut(3, true));
955    }
956    ap.submit(null, DUMMY_TABLE, puts, true, null, false);
957    ap.waitForMaximumCurrentTasks(0, null);
958    // More time to wait if there are incorrect task count.
959    TimeUnit.SECONDS.sleep(1);
960    assertEquals(0, controller.tasksInProgress.get());
961    for (AtomicInteger count : controller.taskCounterPerRegion.values()) {
962      assertEquals(0, count.get());
963    }
964    for (AtomicInteger count : controller.taskCounterPerServer.values()) {
965      assertEquals(0, count.get());
966    }
967  }
968
969  @Test
970  public void testMaxTask() throws Exception {
971    ClusterConnection conn = createHConnection();
972    final String defaultClazz =
973      conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
974    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
975      SimpleRequestController.class.getName());
976    final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
977    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
978
979    for (int i = 0; i < 1000; i++) {
980      ap.incTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn);
981    }
982
983    final Thread myThread = Thread.currentThread();
984
985    Thread t = new Thread() {
986      @Override
987      public void run() {
988        Threads.sleep(2000);
989        myThread.interrupt();
990      }
991    };
992
993    List<Put> puts = new ArrayList<>(1);
994    puts.add(createPut(1, true));
995
996    t.start();
997
998    try {
999      ap.submit(null, DUMMY_TABLE, puts, false, null, false);
1000      Assert.fail("We should have been interrupted.");
1001    } catch (InterruptedIOException expected) {
1002    }
1003
1004    final long sleepTime = 2000;
1005
1006    Thread t2 = new Thread() {
1007      @Override
1008      public void run() {
1009        Threads.sleep(sleepTime);
1010        while (controller.tasksInProgress.get() > 0) {
1011          ap.decTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn);
1012        }
1013      }
1014    };
1015    t2.start();
1016
1017    long start = EnvironmentEdgeManager.currentTime();
1018    ap.submit(null, DUMMY_TABLE, new ArrayList<>(), false, null, false);
1019    long end = EnvironmentEdgeManager.currentTime();
1020
1021    // Adds 100 to secure us against approximate timing.
1022    Assert.assertTrue(start + 100L + sleepTime > end);
1023    if (defaultClazz != null) {
1024      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
1025        defaultClazz);
1026    }
1027  }
1028
1029  private ClusterConnection createHConnection() throws IOException {
1030    return createHConnection(CONNECTION_CONFIG);
1031  }
1032
1033  private ClusterConnection createHConnection(ConnectionConfiguration configuration)
1034    throws IOException {
1035    ClusterConnection hc = createHConnectionCommon(configuration);
1036    setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1));
1037    setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2));
1038    setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3));
1039    Mockito
1040      .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean()))
1041      .thenReturn(Arrays.asList(loc1, loc2, loc3));
1042    setMockLocation(hc, FAILS, new RegionLocations(loc2));
1043    return hc;
1044  }
1045
1046  private ClusterConnection createHConnectionWithReplicas(ConnectionConfiguration configuration)
1047    throws IOException {
1048    ClusterConnection hc = createHConnectionCommon(configuration);
1049    setMockLocation(hc, DUMMY_BYTES_1, hrls1);
1050    setMockLocation(hc, DUMMY_BYTES_2, hrls2);
1051    setMockLocation(hc, DUMMY_BYTES_3, hrls3);
1052    List<HRegionLocation> locations = new ArrayList<>();
1053    for (HRegionLocation loc : hrls1.getRegionLocations()) {
1054      locations.add(loc);
1055    }
1056    for (HRegionLocation loc : hrls2.getRegionLocations()) {
1057      locations.add(loc);
1058    }
1059    for (HRegionLocation loc : hrls3.getRegionLocations()) {
1060      locations.add(loc);
1061    }
1062    Mockito
1063      .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean()))
1064      .thenReturn(locations);
1065    return hc;
1066  }
1067
1068  private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result)
1069    throws IOException {
1070    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
1071      Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
1072    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
1073      Mockito.anyBoolean())).thenReturn(result);
1074  }
1075
1076  private ClusterConnection
1077    createHConnectionCommon(ConnectionConfiguration connectionConfiguration) {
1078    ClusterConnection hc = Mockito.mock(ClusterConnection.class);
1079    NonceGenerator ng = Mockito.mock(NonceGenerator.class);
1080    Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
1081    Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
1082    Mockito.when(hc.getConfiguration()).thenReturn(CONF);
1083    Mockito.when(hc.getConnectionConfiguration()).thenReturn(connectionConfiguration);
1084    return hc;
1085  }
1086
1087  @Test
1088  public void testHTablePutSuccess() throws Exception {
1089    ClusterConnection conn = createHConnection();
1090    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1091    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1092    BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
1093
1094    Put put = createPut(1, true);
1095
1096    Assert.assertEquals(conn.getConnectionConfiguration().getWriteBufferSize(),
1097      ht.getWriteBufferSize());
1098    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1099    ht.mutate(put);
1100    ht.flush();
1101    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1102  }
1103
1104  @Test
1105  public void testSettingWriteBufferPeriodicFlushParameters() throws Exception {
1106    ClusterConnection conn = createHConnection();
1107    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1108
1109    checkPeriodicFlushParameters(conn, ap, 1234, 1234, 1234, 1234);
1110    checkPeriodicFlushParameters(conn, ap, 0, 0, 0,
1111      BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
1112    checkPeriodicFlushParameters(conn, ap, -1234, 0, -1234,
1113      BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
1114    checkPeriodicFlushParameters(conn, ap, 1, 1, 1,
1115      BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
1116  }
1117
1118  private void checkPeriodicFlushParameters(ClusterConnection conn, MyAsyncProcess ap, long setTO,
1119    long expectTO, long setTT, long expectTT) {
1120    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1121
1122    // The BufferedMutatorParams does nothing with the value
1123    bufferParam.setWriteBufferPeriodicFlushTimeoutMs(setTO);
1124    bufferParam.setWriteBufferPeriodicFlushTimerTickMs(setTT);
1125    Assert.assertEquals(setTO, bufferParam.getWriteBufferPeriodicFlushTimeoutMs());
1126    Assert.assertEquals(setTT, bufferParam.getWriteBufferPeriodicFlushTimerTickMs());
1127
1128    // The BufferedMutatorImpl corrects illegal values (indirect via BufferedMutatorParams)
1129    BufferedMutatorImpl ht1 = new BufferedMutatorImpl(conn, bufferParam, ap);
1130    Assert.assertEquals(expectTO, ht1.getWriteBufferPeriodicFlushTimeoutMs());
1131    Assert.assertEquals(expectTT, ht1.getWriteBufferPeriodicFlushTimerTickMs());
1132
1133    // The BufferedMutatorImpl corrects illegal values (direct via setter)
1134    BufferedMutatorImpl ht2 =
1135      new BufferedMutatorImpl(conn, createBufferedMutatorParams(ap, DUMMY_TABLE), ap);
1136    ht2.setWriteBufferPeriodicFlush(setTO, setTT);
1137    Assert.assertEquals(expectTO, ht2.getWriteBufferPeriodicFlushTimeoutMs());
1138    Assert.assertEquals(expectTT, ht2.getWriteBufferPeriodicFlushTimerTickMs());
1139
1140  }
1141
1142  @Test
1143  public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception {
1144    ClusterConnection conn = createHConnection();
1145    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1146    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1147
1148    bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1); // Flush ASAP
1149    bufferParam.setWriteBufferPeriodicFlushTimerTickMs(1); // Check every 100ms
1150    bufferParam.writeBufferSize(10000); // Write buffer set to much larger than the single record
1151
1152    BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
1153
1154    // Verify if BufferedMutator has the right settings.
1155    Assert.assertEquals(10000, ht.getWriteBufferSize());
1156    Assert.assertEquals(1, ht.getWriteBufferPeriodicFlushTimeoutMs());
1157    Assert.assertEquals(BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS,
1158      ht.getWriteBufferPeriodicFlushTimerTickMs());
1159
1160    Put put = createPut(1, true);
1161
1162    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
1163    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1164
1165    // ----- Insert, flush immediately, MUST NOT flush automatically
1166    ht.mutate(put);
1167    ht.flush();
1168
1169    Thread.sleep(1000);
1170    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
1171    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1172
1173    // ----- Insert, NO flush, MUST flush automatically
1174    ht.mutate(put);
1175    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
1176    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
1177
1178    // The timerTick should fire every 100ms, so after twice that we must have
1179    // seen at least 1 tick and we should see an automatic flush
1180    Thread.sleep(200);
1181    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1182    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1183
1184    // Ensure it does not flush twice
1185    Thread.sleep(200);
1186    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1187    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1188
1189    // ----- DISABLE AUTO FLUSH, Insert, NO flush, MUST NOT flush automatically
1190    ht.disableWriteBufferPeriodicFlush();
1191    ht.mutate(put);
1192    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1193    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
1194
1195    // Wait for at least 1 timerTick, we should see NO flushes.
1196    Thread.sleep(200);
1197    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1198    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
1199
1200    // Reenable periodic flushing, a flush seems to take about 1 second
1201    // so we wait for 2 seconds and it should have finished the flush.
1202    ht.setWriteBufferPeriodicFlush(1, 100);
1203    Thread.sleep(2000);
1204    Assert.assertEquals(2, ht.getExecutedWriteBufferPeriodicFlushes());
1205    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1206  }
1207
1208  @Test
1209  public void testBufferedMutatorImplWithSharedPool() throws Exception {
1210    ClusterConnection conn = createHConnection();
1211    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1212    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1213    BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap);
1214
1215    ht.close();
1216    assertFalse(ap.service.isShutdown());
1217  }
1218
1219  @Test
1220  public void testFailedPutAndNewPut() throws Exception {
1221    ClusterConnection conn = createHConnection();
1222    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1223    BufferedMutatorParams bufferParam =
1224      createBufferedMutatorParams(ap, DUMMY_TABLE).writeBufferSize(0);
1225    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1226
1227    Put p = createPut(1, false);
1228    try {
1229      mutator.mutate(p);
1230      Assert.fail();
1231    } catch (RetriesExhaustedWithDetailsException expected) {
1232      assertEquals(1, expected.getNumExceptions());
1233      assertTrue(expected.getRow(0) == p);
1234    }
1235    // Let's do all the retries.
1236    ap.waitForMaximumCurrentTasks(0, null);
1237    Assert.assertEquals(0, mutator.size());
1238
1239    // There is no global error so the new put should not fail
1240    mutator.mutate(createPut(1, true));
1241    Assert.assertEquals("the put should not been inserted.", 0, mutator.size());
1242  }
1243
1244  @SuppressWarnings("SelfComparison")
1245  @Test
1246  public void testAction() {
1247    Action action_0 = new Action(new Put(Bytes.toBytes("abc")), 10);
1248    Action action_1 = new Action(new Put(Bytes.toBytes("ccc")), 10);
1249    Action action_2 = new Action(new Put(Bytes.toBytes("ccc")), 10);
1250    Action action_3 = new Action(new Delete(Bytes.toBytes("ccc")), 10);
1251    assertFalse(action_0.equals(action_1));
1252    assertTrue(action_0.equals(action_0));
1253    assertTrue(action_1.equals(action_2));
1254    assertTrue(action_2.equals(action_1));
1255    assertFalse(action_0.equals(new Put(Bytes.toBytes("abc"))));
1256    assertTrue(action_2.equals(action_3));
1257    assertFalse(action_0.equals(action_3));
1258    assertEquals(0, action_0.compareTo(action_0));
1259    assertTrue(action_0.compareTo(action_1) < 0);
1260    assertTrue(action_1.compareTo(action_0) > 0);
1261    assertEquals(0, action_1.compareTo(action_2));
1262  }
1263
1264  @Test
1265  public void testBatch() throws IOException, InterruptedException {
1266    ClusterConnection conn = new MyConnectionImpl(CONF);
1267    HTable ht = (HTable) conn.getTable(DUMMY_TABLE);
1268    ht.multiAp = new MyAsyncProcess(conn, CONF);
1269
1270    List<Put> puts = new ArrayList<>(7);
1271    puts.add(createPut(1, true));
1272    puts.add(createPut(1, true));
1273    puts.add(createPut(1, true));
1274    puts.add(createPut(1, true));
1275    puts.add(createPut(1, false)); // <=== the bad apple, position 4
1276    puts.add(createPut(1, true));
1277    puts.add(createPut(1, false)); // <=== another bad apple, position 6
1278
1279    Object[] res = new Object[puts.size()];
1280    try {
1281      ht.batch(puts, res);
1282      Assert.fail();
1283    } catch (RetriesExhaustedException expected) {
1284    }
1285
1286    Assert.assertEquals(success, res[0]);
1287    Assert.assertEquals(success, res[1]);
1288    Assert.assertEquals(success, res[2]);
1289    Assert.assertEquals(success, res[3]);
1290    Assert.assertEquals(failure, res[4]);
1291    Assert.assertEquals(success, res[5]);
1292    Assert.assertEquals(failure, res[6]);
1293  }
1294
1295  @Test
1296  public void testErrorsServers() throws IOException {
1297    Configuration configuration = new Configuration(CONF);
1298    ClusterConnection conn = new MyConnectionImpl(configuration);
1299    MyAsyncProcess ap = new MyAsyncProcess(conn, configuration);
1300    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1301    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1302    configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true);
1303
1304    Assert.assertNotNull(ap.createServerErrorTracker());
1305    Assert.assertTrue(ap.serverTrackerTimeout > 200L);
1306    ap.serverTrackerTimeout = 1L;
1307
1308    Put p = createPut(1, false);
1309    mutator.mutate(p);
1310
1311    try {
1312      mutator.flush();
1313      Assert.fail();
1314    } catch (RetriesExhaustedWithDetailsException expected) {
1315      assertEquals(1, expected.getNumExceptions());
1316      assertTrue(expected.getRow(0) == p);
1317    }
1318    // Checking that the ErrorsServers came into play and didn't make us stop immediately
1319    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
1320  }
1321
1322  @Test
1323  public void testReadAndWriteTimeout() throws IOException {
1324    final long readTimeout = 10 * 1000;
1325    final long writeTimeout = 20 * 1000;
1326    Configuration copyConf = new Configuration(CONF);
1327    copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout);
1328    copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout);
1329    ClusterConnection conn = new MyConnectionImpl(copyConf);
1330    MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
1331    try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) {
1332      ht.multiAp = ap;
1333      List<Get> gets = new ArrayList<>();
1334      gets.add(new Get(DUMMY_BYTES_1));
1335      gets.add(new Get(DUMMY_BYTES_2));
1336      try {
1337        ht.get(gets);
1338      } catch (ClassCastException e) {
1339        // No result response on this test.
1340      }
1341      assertEquals(readTimeout, ap.previousTimeout);
1342      ap.previousTimeout = -1;
1343
1344      try {
1345        ht.existsAll(gets);
1346      } catch (ClassCastException e) {
1347        // No result response on this test.
1348      }
1349      assertEquals(readTimeout, ap.previousTimeout);
1350      ap.previousTimeout = -1;
1351
1352      List<Delete> deletes = new ArrayList<>();
1353      deletes.add(new Delete(DUMMY_BYTES_1));
1354      deletes.add(new Delete(DUMMY_BYTES_2));
1355      ht.delete(deletes);
1356      assertEquals(writeTimeout, ap.previousTimeout);
1357    }
1358  }
1359
1360  @Test
1361  public void testErrors() throws IOException {
1362    ClusterConnection conn = new MyConnectionImpl(CONF);
1363    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test"));
1364    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1365    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1366
1367    Assert.assertNotNull(ap.createServerErrorTracker());
1368
1369    Put p = createPut(1, true);
1370    mutator.mutate(p);
1371
1372    try {
1373      mutator.flush();
1374      Assert.fail();
1375    } catch (RetriesExhaustedWithDetailsException expected) {
1376      assertEquals(1, expected.getNumExceptions());
1377      assertTrue(expected.getRow(0) == p);
1378    }
1379    // Checking that the ErrorsServers came into play and didn't make us stop immediately
1380    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
1381  }
1382
1383  @Test
1384  public void testCallQueueTooLarge() throws IOException {
1385    ClusterConnection conn = new MyConnectionImpl(CONF);
1386    AsyncProcessWithFailure ap =
1387      new AsyncProcessWithFailure(conn, CONF, new CallQueueTooBigException());
1388    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1389    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1390    Assert.assertNotNull(ap.createServerErrorTracker());
1391    Put p = createPut(1, true);
1392    mutator.mutate(p);
1393
1394    try {
1395      mutator.flush();
1396      Assert.fail();
1397    } catch (RetriesExhaustedWithDetailsException expected) {
1398      assertEquals(1, expected.getNumExceptions());
1399      assertTrue(expected.getRow(0) == p);
1400    }
1401    // Checking that the ErrorsServers came into play and didn't make us stop immediately
1402    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
1403  }
1404
1405  /**
1406   * This test simulates multiple regions on 2 servers. We should have 2 multi requests and 2
1407   * threads: 1 per server, this whatever the number of regions.
1408   */
1409  @Test
1410  public void testThreadCreation() throws Exception {
1411    final int NB_REGS = 100;
1412    List<HRegionLocation> hrls = new ArrayList<>(NB_REGS);
1413    List<Get> gets = new ArrayList<>(NB_REGS);
1414    for (int i = 0; i < NB_REGS; i++) {
1415      HRegionInfo hri =
1416        new HRegionInfo(DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i);
1417      HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
1418      hrls.add(hrl);
1419
1420      Get get = new Get(Bytes.toBytes(i * 10L));
1421      gets.add(get);
1422    }
1423
1424    MyConnectionImpl2 con = new MyConnectionImpl2(hrls, CONF);
1425    MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads);
1426    HTable ht = (HTable) con.getTable(DUMMY_TABLE, ap.service);
1427    ht.multiAp = ap;
1428    ht.batch(gets, null);
1429
1430    Assert.assertEquals(NB_REGS, ap.nbActions.get());
1431    Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get());
1432    Assert.assertEquals("1 thread per server", 2, con.nbThreads.get());
1433
1434    int nbReg = 0;
1435    for (int i = 0; i < NB_REGS; i++) {
1436      if (con.usedRegions[i]) nbReg++;
1437    }
1438    Assert.assertEquals("nbReg=" + nbReg, NB_REGS, nbReg);
1439  }
1440
1441  @Test
1442  public void testReplicaReplicaSuccess() throws Exception {
1443    // Main call takes too long so replicas succeed, except for one region w/o replicas.
1444    // One region has no replica, so the main call succeeds for it.
1445    MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
1446    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
1447    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service)
1448      .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE)
1449      .setRowAccess(rows).setResults(new Object[3]).setSubmittedRows(SubmittedRows.ALL).build();
1450    AsyncRequestFuture ars = ap.submit(task);
1451    verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
1452    Assert.assertEquals(2, ap.getReplicaCallCount());
1453  }
1454
1455  @Test
1456  public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception {
1457    // Main call succeeds before replica calls are kicked off.
1458    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
1459    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
1460    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service)
1461      .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE)
1462      .setRowAccess(rows).setResults(new Object[3]).setSubmittedRows(SubmittedRows.ALL).build();
1463    AsyncRequestFuture ars = ap.submit(task);
1464    verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
1465    Assert.assertEquals(0, ap.getReplicaCallCount());
1466  }
1467
1468  @Test
1469  public void testReplicaParallelCallsSucceed() throws Exception {
1470    // Either main or replica can succeed.
1471    MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
1472    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1473    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service)
1474      .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE)
1475      .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build();
1476    AsyncRequestFuture ars = ap.submit(task);
1477    verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
1478    long replicaCalls = ap.getReplicaCallCount();
1479    Assert.assertTrue(replicaCalls >= 0);
1480    Assert.assertTrue(replicaCalls <= 2);
1481  }
1482
1483  @Test
1484  public void testReplicaPartialReplicaCall() throws Exception {
1485    // One server is slow, so the result for its region comes from replica, whereas
1486    // the result for other region comes from primary before replica calls happen.
1487    // There should be no replica call for that region at all.
1488    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
1489    ap.setPrimaryCallDelay(sn2, 2000);
1490    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1491    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service)
1492      .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE)
1493      .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build();
1494    AsyncRequestFuture ars = ap.submit(task);
1495    verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
1496    Assert.assertEquals(1, ap.getReplicaCallCount());
1497  }
1498
1499  @Test
1500  public void testReplicaMainFailsBeforeReplicaCalls() throws Exception {
1501    // Main calls fail before replica calls can start - this is currently not handled.
1502    // It would probably never happen if we can get location (due to retries),
1503    // and it would require additional synchronization.
1504    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0);
1505    ap.addFailures(hri1, hri2);
1506    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1507    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service)
1508      .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE)
1509      .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build();
1510    AsyncRequestFuture ars = ap.submit(task);
1511    verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
1512    Assert.assertEquals(0, ap.getReplicaCallCount());
1513  }
1514
1515  @Test
1516  public void testReplicaReplicaSuccessWithParallelFailures() throws Exception {
1517    // Main calls fails after replica calls start. For two-replica region, one replica call
1518    // also fails. Regardless, we get replica results for both regions.
1519    MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0);
1520    ap.addFailures(hri1, hri1r2, hri2);
1521    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1522    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service)
1523      .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE)
1524      .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build();
1525    AsyncRequestFuture ars = ap.submit(task);
1526    verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
1527    Assert.assertEquals(2, ap.getReplicaCallCount());
1528  }
1529
1530  @Test
1531  public void testReplicaAllCallsFailForOneRegion() throws Exception {
1532    // For one of the region, all 3, main and replica, calls fail. For the other, replica
1533    // call fails but its exception should not be visible as it did succeed.
1534    MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0);
1535    ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
1536    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1537    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(ap.service)
1538      .setRpcTimeout(RPC_TIMEOUT).setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE)
1539      .setRowAccess(rows).setResults(new Object[2]).setSubmittedRows(SubmittedRows.ALL).build();
1540    AsyncRequestFuture ars = ap.submit(task);
1541    verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
1542    // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1
1543    Assert.assertEquals(3, ars.getErrors().getNumExceptions());
1544    for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) {
1545      Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow());
1546    }
1547  }
1548
1549  private MyAsyncProcessWithReplicas createReplicaAp(int replicaAfterMs, int primaryMs,
1550    int replicaMs) throws Exception {
1551    return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1);
1552  }
1553
1554  private MyAsyncProcessWithReplicas createReplicaAp(int replicaAfterMs, int primaryMs,
1555    int replicaMs, int retries) throws Exception {
1556    // TODO: this is kind of timing dependent... perhaps it should detect from createCaller
1557    // that the replica call has happened and that way control the ordering.
1558    Configuration conf = new Configuration();
1559    conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000);
1560    if (retries >= 0) {
1561      conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
1562    }
1563    ClusterConnection conn = createHConnectionWithReplicas(new ConnectionConfiguration(conf));
1564    MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf);
1565    ap.setCallDelays(primaryMs, replicaMs);
1566    return ap;
1567  }
1568
1569  private BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap, TableName name) {
1570    return new BufferedMutatorParams(name).pool(ap.service).rpcTimeout(RPC_TIMEOUT)
1571      .opertationTimeout(OPERATION_TIMEOUT);
1572  }
1573
1574  private static List<Get> makeTimelineGets(byte[]... rows) {
1575    List<Get> result = new ArrayList<>(rows.length);
1576    for (byte[] row : rows) {
1577      Get get = new Get(row);
1578      get.setConsistency(Consistency.TIMELINE);
1579      result.add(get);
1580    }
1581    return result;
1582  }
1583
1584  private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception {
1585    Object[] actual = ars.getResults();
1586    Assert.assertEquals(expected.length, actual.length);
1587    for (int i = 0; i < expected.length; ++i) {
1588      Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable));
1589    }
1590  }
1591
1592  /** After reading TheDailyWtf, I always wanted to create a MyBoolean enum like this! */
1593  private enum RR {
1594    TRUE,
1595    FALSE,
1596    DONT_CARE,
1597    FAILED
1598  }
1599
1600  private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception {
1601    Object[] actuals = ars.getResults();
1602    Assert.assertEquals(expecteds.length, actuals.length);
1603    for (int i = 0; i < expecteds.length; ++i) {
1604      Object actual = actuals[i];
1605      RR expected = expecteds[i];
1606      Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable);
1607      if (expected != RR.FAILED && expected != RR.DONT_CARE) {
1608        Assert.assertEquals(expected == RR.TRUE, ((Result) actual).isStale());
1609      }
1610    }
1611  }
1612
1613  /**
1614   * @param regCnt  the region: 1 to 3.
1615   * @param success if true, the put will succeed.
1616   * @return a put
1617   */
1618  private Put createPut(int regCnt, boolean success) {
1619    Put p;
1620    if (!success) {
1621      p = new Put(FAILS);
1622    } else switch (regCnt) {
1623      case 1:
1624        p = new Put(DUMMY_BYTES_1);
1625        break;
1626      case 2:
1627        p = new Put(DUMMY_BYTES_2);
1628        break;
1629      case 3:
1630        p = new Put(DUMMY_BYTES_3);
1631        break;
1632      default:
1633        throw new IllegalArgumentException("unknown " + regCnt);
1634    }
1635
1636    p.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
1637
1638    return p;
1639  }
1640
1641  static class MyThreadPoolExecutor extends ThreadPoolExecutor {
1642    public MyThreadPoolExecutor(int coreThreads, int maxThreads, long keepAliveTime,
1643      TimeUnit timeunit, BlockingQueue<Runnable> blockingqueue) {
1644      super(coreThreads, maxThreads, keepAliveTime, timeunit, blockingqueue);
1645    }
1646
1647    @Override
1648    public Future submit(Runnable runnable) {
1649      throw new OutOfMemoryError("OutOfMemory error thrown by means");
1650    }
1651  }
1652
1653  static class AsyncProcessForThrowableCheck extends AsyncProcess {
1654    public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
1655      super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
1656    }
1657  }
1658
1659  @Test
1660  public void testUncheckedException() throws Exception {
1661    // Test the case pool.submit throws unchecked exception
1662    ClusterConnection hc = createHConnection();
1663    MyThreadPoolExecutor myPool =
1664      new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200));
1665    AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, CONF);
1666
1667    List<Put> puts = new ArrayList<>(1);
1668    puts.add(createPut(1, true));
1669    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(myPool).setRpcTimeout(RPC_TIMEOUT)
1670      .setOperationTimeout(OPERATION_TIMEOUT).setTableName(DUMMY_TABLE).setRowAccess(puts)
1671      .setSubmittedRows(SubmittedRows.NORMAL).build();
1672    ap.submit(task);
1673    Assert.assertTrue(puts.isEmpty());
1674  }
1675
1676  /**
1677   * Below tests make sure we could use a special pause setting when retry an exception where
1678   * {@link HBaseServerException#isServerOverloaded(Throwable)} is true, see HBASE-17114
1679   */
1680
1681  @Test
1682  public void testRetryPauseWhenServerOverloadedDueToCQTBE() throws Exception {
1683    testRetryPauseWhenServerIsOverloaded(new CallQueueTooBigException());
1684  }
1685
1686  @Test
1687  public void testRetryPauseWhenServerOverloadedDueToCDE() throws Exception {
1688    testRetryPauseWhenServerIsOverloaded(new CallDroppedException());
1689  }
1690
1691  private void testRetryPauseWhenServerIsOverloaded(HBaseServerException exception)
1692    throws IOException {
1693    Configuration conf = new Configuration(CONF);
1694    final long specialPause = 500L;
1695    final int retries = 1;
1696    conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, specialPause);
1697    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
1698
1699    ClusterConnection conn = new MyConnectionImpl(conf);
1700    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, exception);
1701    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1702    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1703
1704    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
1705
1706    Put p = createPut(1, true);
1707    mutator.mutate(p);
1708
1709    long startTime = EnvironmentEdgeManager.currentTime();
1710    try {
1711      mutator.flush();
1712      Assert.fail();
1713    } catch (RetriesExhaustedWithDetailsException expected) {
1714      assertEquals(1, expected.getNumExceptions());
1715      assertTrue(expected.getRow(0) == p);
1716    }
1717    long actualSleep = EnvironmentEdgeManager.currentTime() - startTime;
1718    long expectedSleep = 0L;
1719    for (int i = 0; i < retries; i++) {
1720      expectedSleep += ConnectionUtils.getPauseTime(specialPause, i);
1721      // Prevent jitter in ConcurrentMapUtils#getPauseTime to affect result
1722      actualSleep += (long) (specialPause * 0.01f);
1723    }
1724    LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
1725    Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually " + actualSleep + "ms",
1726      actualSleep >= expectedSleep);
1727
1728    // check and confirm normal IOE will use the normal pause
1729    final long normalPause =
1730      conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1731    ap = new AsyncProcessWithFailure(conn, conf, new IOException());
1732    bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1733    mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1734    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
1735    mutator.mutate(p);
1736    startTime = EnvironmentEdgeManager.currentTime();
1737    try {
1738      mutator.flush();
1739      Assert.fail();
1740    } catch (RetriesExhaustedWithDetailsException expected) {
1741      assertEquals(1, expected.getNumExceptions());
1742      assertTrue(expected.getRow(0) == p);
1743    }
1744    actualSleep = EnvironmentEdgeManager.currentTime() - startTime;
1745    expectedSleep = 0L;
1746    for (int i = 0; i < retries; i++) {
1747      expectedSleep += ConnectionUtils.getPauseTime(normalPause, i);
1748    }
1749    // plus an additional pause to balance the program execution time
1750    expectedSleep += normalPause;
1751    LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
1752    Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep);
1753  }
1754
1755  @Test
1756  public void testRetryWithExceptionClearsMetaCache() throws Exception {
1757    Configuration myConf = new Configuration(CONF);
1758    myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
1759    ClusterConnection conn = createHConnection(new ConnectionConfiguration(myConf));
1760
1761    AsyncProcessWithFailure ap =
1762      new AsyncProcessWithFailure(conn, myConf, new RegionOpeningException("test"));
1763    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1764    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1765
1766    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
1767
1768    Assert.assertEquals(conn.locateRegion(DUMMY_TABLE, DUMMY_BYTES_1, true, true).toString(),
1769      new RegionLocations(loc1).toString());
1770
1771    Mockito.verify(conn, Mockito.times(0)).clearCaches(Mockito.any());
1772
1773    Put p = createPut(1, true);
1774    mutator.mutate(p);
1775
1776    try {
1777      mutator.flush();
1778      Assert.fail();
1779    } catch (RetriesExhaustedWithDetailsException expected) {
1780      assertEquals(1, expected.getNumExceptions());
1781      assertTrue(expected.getRow(0) == p);
1782    }
1783
1784    Mockito.verify(conn, Mockito.times(1)).clearCaches(loc1.getServerName());
1785  }
1786
1787  @Test
1788  public void testQueueRowAccess() throws Exception {
1789    ClusterConnection conn = createHConnection();
1790    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
1791      new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000));
1792    Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
1793    Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
1794    mutator.mutate(p0);
1795    BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess();
1796    // QueueRowAccess should take all undealt mutations
1797    assertEquals(0, mutator.size());
1798    mutator.mutate(p1);
1799    assertEquals(1, mutator.size());
1800    BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess();
1801    // QueueRowAccess should take all undealt mutations
1802    assertEquals(0, mutator.size());
1803    assertEquals(1, ra0.size());
1804    assertEquals(1, ra1.size());
1805    Iterator<Row> iter0 = ra0.iterator();
1806    Iterator<Row> iter1 = ra1.iterator();
1807    assertTrue(iter0.hasNext());
1808    assertTrue(iter1.hasNext());
1809    // the next() will poll the mutation from inner buffer and update the buffer count
1810    assertTrue(iter0.next() == p0);
1811    assertEquals(1, mutator.getUnflushedSize());
1812    assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
1813    assertTrue(iter1.next() == p1);
1814    assertEquals(0, mutator.getUnflushedSize());
1815    assertEquals(0, mutator.getCurrentWriteBufferSize());
1816    assertFalse(iter0.hasNext());
1817    assertFalse(iter1.hasNext());
1818    // ra0 doest handle the mutation so the mutation won't be pushed back to buffer
1819    iter0.remove();
1820    ra0.close();
1821    assertEquals(0, mutator.size());
1822    assertEquals(0, mutator.getUnflushedSize());
1823    assertEquals(0, mutator.getCurrentWriteBufferSize());
1824    // ra1 doesn't handle the mutation so the mutation will be pushed back to buffer
1825    ra1.close();
1826    assertEquals(1, mutator.size());
1827    assertEquals(1, mutator.getUnflushedSize());
1828    assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
1829  }
1830}