001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.LinkedList;
032import java.util.List;
033import java.util.Map;
034import java.util.Random;
035import java.util.Set;
036import java.util.TreeSet;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.ExecutorService;
040import java.util.concurrent.Executors;
041import java.util.concurrent.Future;
042import java.util.concurrent.LinkedBlockingQueue;
043import java.util.concurrent.SynchronousQueue;
044import java.util.concurrent.ThreadFactory;
045import java.util.concurrent.ThreadPoolExecutor;
046import java.util.concurrent.TimeUnit;
047import java.util.concurrent.atomic.AtomicBoolean;
048import java.util.concurrent.atomic.AtomicInteger;
049import java.util.concurrent.atomic.AtomicLong;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.hbase.CallQueueTooBigException;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.HBaseClassTestRule;
054import org.apache.hadoop.hbase.HConstants;
055import org.apache.hadoop.hbase.HRegionInfo;
056import org.apache.hadoop.hbase.HRegionLocation;
057import org.apache.hadoop.hbase.RegionLocations;
058import org.apache.hadoop.hbase.ServerName;
059import org.apache.hadoop.hbase.TableName;
060import org.apache.hadoop.hbase.client.AsyncProcessTask.ListRowAccess;
061import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows;
062import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
063import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
064import org.apache.hadoop.hbase.client.coprocessor.Batch;
065import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
066import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
067import org.apache.hadoop.hbase.testclassification.ClientTests;
068import org.apache.hadoop.hbase.testclassification.MediumTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.Threads;
071import org.junit.Assert;
072import org.junit.Before;
073import org.junit.ClassRule;
074import org.junit.Test;
075import org.junit.experimental.categories.Category;
076import org.mockito.Mockito;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080@Category({ClientTests.class, MediumTests.class})
081public class TestAsyncProcess {
082
083  @ClassRule
084  public static final HBaseClassTestRule CLASS_RULE =
085      HBaseClassTestRule.forClass(TestAsyncProcess.class);
086
087  private static final Logger LOG = LoggerFactory.getLogger(TestAsyncProcess.class);
088  private static final TableName DUMMY_TABLE =
089      TableName.valueOf("DUMMY_TABLE");
090  private static final byte[] DUMMY_BYTES_1 = Bytes.toBytes("DUMMY_BYTES_1");
091  private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2");
092  private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3");
093  private static final byte[] FAILS = Bytes.toBytes("FAILS");
094  private Configuration CONF;
095  private ConnectionConfiguration CONNECTION_CONFIG;
096  private static final ServerName sn = ServerName.valueOf("s1,1,1");
097  private static final ServerName sn2 = ServerName.valueOf("s2,2,2");
098  private static final ServerName sn3 = ServerName.valueOf("s3,3,3");
099  private static final HRegionInfo hri1 =
100      new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
101  private static final HRegionInfo hri2 =
102      new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
103  private static final HRegionInfo hri3 =
104      new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
105  private static final HRegionLocation loc1 = new HRegionLocation(hri1, sn);
106  private static final HRegionLocation loc2 = new HRegionLocation(hri2, sn);
107  private static final HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
108
109  // Replica stuff
110  private static final RegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1);
111  private static final RegionInfo hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2);
112  private static final RegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
113  private static final RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
114      new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3));
115  private static final RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2),
116      new HRegionLocation(hri2r1, sn3));
117  private static final RegionLocations hrls3 =
118      new RegionLocations(new HRegionLocation(hri3, sn3), null);
119
120  private static final String success = "success";
121  private static Exception failure = new Exception("failure");
122
123  private static final int NB_RETRIES = 3;
124
125  private int RPC_TIMEOUT;
126  private int OPERATION_TIMEOUT;
127
128  @Before
129  public void beforeEach() {
130    this.CONF = new Configuration();
131    CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
132    this.CONNECTION_CONFIG = new ConnectionConfiguration(CONF);
133    this.RPC_TIMEOUT = CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
134        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
135    this.OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
136        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
137  }
138
139  static class CountingThreadFactory implements ThreadFactory {
140    final AtomicInteger nbThreads;
141    ThreadFactory realFactory =  Threads.newDaemonThreadFactory("test-TestAsyncProcess");
142    @Override
143    public Thread newThread(Runnable r) {
144      nbThreads.incrementAndGet();
145      return realFactory.newThread(r);
146    }
147
148    CountingThreadFactory(AtomicInteger nbThreads){
149      this.nbThreads = nbThreads;
150    }
151  }
152
153  static class MyAsyncProcess extends AsyncProcess {
154    final AtomicInteger nbMultiResponse = new AtomicInteger();
155    final AtomicInteger nbActions = new AtomicInteger();
156    public List<AsyncRequestFuture> allReqs = new ArrayList<>();
157    public AtomicInteger callsCt = new AtomicInteger();
158    private Configuration conf;
159
160    private long previousTimeout = -1;
161    final ExecutorService service;
162    @Override
163    protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(
164      AsyncProcessTask task, List<Action> actions, long nonceGroup) {
165      // Test HTable has tableName of null, so pass DUMMY_TABLE
166      AsyncProcessTask wrap = new AsyncProcessTask(task){
167        @Override
168        public TableName getTableName() {
169          return DUMMY_TABLE;
170        }
171      };
172      AsyncRequestFutureImpl<Res> r = new MyAsyncRequestFutureImpl<>(
173          wrap, actions, nonceGroup, this);
174      allReqs.add(r);
175      return r;
176    }
177
178    public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
179      super(hc, conf,
180          new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
181      service = Executors.newFixedThreadPool(5);
182      this.conf = conf;
183    }
184
185    public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
186      super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
187      service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
188          new SynchronousQueue<>(), new CountingThreadFactory(nbThreads));
189    }
190
191    public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
192        List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
193        boolean needResults) throws InterruptedIOException {
194      AsyncProcessTask task = AsyncProcessTask.newBuilder(callback)
195              .setPool(pool == null ? service : pool)
196              .setTableName(tableName)
197              .setRowAccess(rows)
198              .setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL)
199              .setNeedResults(needResults)
200              .setRpcTimeout(conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
201                    HConstants.DEFAULT_HBASE_RPC_TIMEOUT))
202              .setOperationTimeout(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
203                    HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT))
204              .build();
205      return submit(task);
206    }
207
208    public <CResult> AsyncRequestFuture submit(TableName tableName,
209        final List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
210        boolean needResults) throws InterruptedIOException {
211      return submit(null, tableName, rows, atLeastOne, callback, needResults);
212    }
213
214    @Override
215    public <Res> AsyncRequestFuture submit(AsyncProcessTask<Res> task)
216            throws InterruptedIOException {
217      previousTimeout = task.getRpcTimeout();
218      // We use results in tests to check things, so override to always save them.
219      AsyncProcessTask<Res> wrap = new AsyncProcessTask<Res>(task) {
220        @Override
221        public boolean getNeedResults() {
222          return true;
223        }
224      };
225      return super.submit(wrap);
226    }
227
228    @Override
229    protected RpcRetryingCaller<AbstractResponse> createCaller(
230        CancellableRegionServerCallable callable, int rpcTimeout) {
231      callsCt.incrementAndGet();
232      MultiServerCallable callable1 = (MultiServerCallable) callable;
233      final MultiResponse mr = createMultiResponse(
234          callable1.getMulti(), nbMultiResponse, nbActions,
235          new ResponseGenerator() {
236            @Override
237            public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
238              if (Arrays.equals(FAILS, a.getAction().getRow())) {
239                mr.add(regionName, a.getOriginalIndex(), failure);
240              } else {
241                mr.add(regionName, a.getOriginalIndex(), success);
242              }
243            }
244          });
245
246      return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) {
247        @Override
248        public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
249                                                   int callTimeout)
250            throws IOException, RuntimeException {
251          try {
252            // sleep one second in order for threadpool to start another thread instead of reusing
253            // existing one.
254            Thread.sleep(1000);
255          } catch (InterruptedException e) {
256            // ignore error
257          }
258          return mr;
259        }
260      };
261    }
262
263
264  }
265
266  static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
267    private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
268    public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions,
269      long nonceGroup, AsyncProcess asyncProcess) {
270      super(task, actions, nonceGroup, asyncProcess);
271    }
272
273    @Override
274    protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
275      // Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
276    }
277
278    Map<ServerName, List<Long>> getRequestHeapSize() {
279      return heapSizesByServer;
280    }
281
282    @Override
283    SingleServerRequestRunnable createSingleServerRequest(
284          MultiAction multiAction, int numAttempt, ServerName server,
285        Set<CancellableRegionServerCallable> callsInProgress) {
286      SingleServerRequestRunnable rq = new SingleServerRequestRunnable(
287              multiAction, numAttempt, server, callsInProgress);
288      List<Long> heapCount = heapSizesByServer.get(server);
289      if (heapCount == null) {
290        heapCount = new ArrayList<>();
291        heapSizesByServer.put(server, heapCount);
292      }
293      heapCount.add(heapSizeOf(multiAction));
294      return rq;
295    }
296
297    private long heapSizeOf(MultiAction multiAction) {
298      return multiAction.actions.values().stream()
299              .flatMap(v -> v.stream())
300              .map(action -> action.getAction())
301              .filter(row -> row instanceof Mutation)
302              .mapToLong(row -> ((Mutation) row).heapSize())
303              .sum();
304    }
305  }
306
307  static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse>{
308
309    private final IOException e;
310
311    public CallerWithFailure(IOException e) {
312      super(100, 500, 100, 9);
313      this.e = e;
314    }
315
316    @Override
317    public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
318                                            int callTimeout)
319        throws IOException, RuntimeException {
320      throw e;
321    }
322  }
323
324
325  static class AsyncProcessWithFailure extends MyAsyncProcess {
326
327    private final IOException ioe;
328
329    public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) {
330      super(hc, conf);
331      this.ioe = ioe;
332      serverTrackerTimeout = 1L;
333    }
334
335    @Override
336    protected RpcRetryingCaller<AbstractResponse> createCaller(
337      CancellableRegionServerCallable callable, int rpcTimeout) {
338      callsCt.incrementAndGet();
339      return new CallerWithFailure(ioe);
340    }
341  }
342
343  /**
344   * Make the backoff time always different on each call.
345   */
346  static class MyClientBackoffPolicy implements ClientBackoffPolicy {
347    private final Map<ServerName, AtomicInteger> count = new HashMap<>();
348    @Override
349    public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) {
350      AtomicInteger inc = count.get(serverName);
351      if (inc == null) {
352        inc = new AtomicInteger(0);
353        count.put(serverName, inc);
354      }
355      return inc.getAndIncrement();
356    }
357  }
358
359  static class MyAsyncProcessWithReplicas extends MyAsyncProcess {
360    private Set<byte[]> failures = new TreeSet<>(new Bytes.ByteArrayComparator());
361    private long primarySleepMs = 0, replicaSleepMs = 0;
362    private Map<ServerName, Long> customPrimarySleepMs = new HashMap<>();
363    private final AtomicLong replicaCalls = new AtomicLong(0);
364
365    public void addFailures(RegionInfo... hris) {
366      for (RegionInfo hri : hris) {
367        failures.add(hri.getRegionName());
368      }
369    }
370
371    public long getReplicaCallCount() {
372      return replicaCalls.get();
373    }
374
375    public void setPrimaryCallDelay(ServerName server, long primaryMs) {
376      customPrimarySleepMs.put(server, primaryMs);
377    }
378
379    public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) {
380      super(hc, conf);
381    }
382
383    public void setCallDelays(long primaryMs, long replicaMs) {
384      this.primarySleepMs = primaryMs;
385      this.replicaSleepMs = replicaMs;
386    }
387
388    @Override
389    protected RpcRetryingCaller<AbstractResponse> createCaller(
390        CancellableRegionServerCallable payloadCallable, int rpcTimeout) {
391      MultiServerCallable callable = (MultiServerCallable) payloadCallable;
392      final MultiResponse mr = createMultiResponse(
393          callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
394            @Override
395            public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
396              if (failures.contains(regionName)) {
397                mr.add(regionName, a.getOriginalIndex(), failure);
398              } else {
399                boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId());
400                mr.add(regionName, a.getOriginalIndex(),
401                    Result.create(new Cell[0], null, isStale));
402              }
403            }
404          });
405      // Currently AsyncProcess either sends all-replica, or all-primary request.
406      final boolean isDefault = RegionReplicaUtil.isDefaultReplica(
407          callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId());
408      final ServerName server = ((MultiServerCallable)callable).getServerName();
409      String debugMsg = "Call to " + server + ", primary=" + isDefault + " with "
410          + callable.getMulti().actions.size() + " entries: ";
411      for (byte[] region : callable.getMulti().actions.keySet()) {
412        debugMsg += "[" + Bytes.toStringBinary(region) + "], ";
413      }
414      LOG.debug(debugMsg);
415      if (!isDefault) {
416        replicaCalls.incrementAndGet();
417      }
418
419      return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) {
420        @Override
421        public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
422                                                int callTimeout)
423        throws IOException, RuntimeException {
424          long sleep = -1;
425          if (isDefault) {
426            Long customSleep = customPrimarySleepMs.get(server);
427            sleep = (customSleep == null ? primarySleepMs : customSleep.longValue());
428          } else {
429            sleep = replicaSleepMs;
430          }
431          if (sleep != 0) {
432            try {
433              Thread.sleep(sleep);
434            } catch (InterruptedException e) {
435            }
436          }
437          return mr;
438        }
439      };
440    }
441  }
442
443  static MultiResponse createMultiResponse(final MultiAction multi,
444      AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) {
445    final MultiResponse mr = new MultiResponse();
446    nbMultiResponse.incrementAndGet();
447    for (Map.Entry<byte[], List<Action>> entry : multi.actions.entrySet()) {
448      byte[] regionName = entry.getKey();
449      for (Action a : entry.getValue()) {
450        nbActions.incrementAndGet();
451        gen.addResponse(mr, regionName, a);
452      }
453    }
454    return mr;
455  }
456
457  private static interface ResponseGenerator {
458    void addResponse(final MultiResponse mr, byte[] regionName, Action a);
459  }
460
461  /**
462   * Returns our async process.
463   */
464  static class MyConnectionImpl extends ConnectionImplementation {
465    public static class TestRegistry extends DoNothingAsyncRegistry {
466
467      public TestRegistry(Configuration conf) {
468        super(conf);
469      }
470
471      @Override
472      public CompletableFuture<String> getClusterId() {
473        return CompletableFuture.completedFuture("testClusterId");
474      }
475
476      @Override
477      public CompletableFuture<Integer> getCurrentNrHRS() {
478        return CompletableFuture.completedFuture(1);
479      }
480    }
481
482    final AtomicInteger nbThreads = new AtomicInteger(0);
483
484    protected MyConnectionImpl(Configuration conf) throws IOException {
485      super(setupConf(conf), null, null);
486    }
487
488    private static Configuration setupConf(Configuration conf) {
489      conf.setClass(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, TestRegistry.class,
490        AsyncRegistry.class);
491      return conf;
492    }
493
494    @Override
495    public RegionLocations locateRegion(TableName tableName,
496        byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
497      return new RegionLocations(loc1);
498    }
499
500    @Override
501    public boolean hasCellBlockSupport() {
502      return false;
503    }
504  }
505
506  /**
507   * Returns our async process.
508   */
509  static class MyConnectionImpl2 extends MyConnectionImpl {
510    List<HRegionLocation> hrl;
511    final boolean usedRegions[];
512
513    protected MyConnectionImpl2(List<HRegionLocation> hrl, Configuration conf) throws IOException {
514      super(conf);
515      this.hrl = hrl;
516      this.usedRegions = new boolean[hrl.size()];
517    }
518
519    @Override
520    public RegionLocations locateRegion(TableName tableName,
521        byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
522      int i = 0;
523      for (HRegionLocation hr : hrl){
524        if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
525          usedRegions[i] = true;
526          return new RegionLocations(hr);
527        }
528        i++;
529      }
530      return null;
531    }
532  }
533  @Test
534  public void testListRowAccess() {
535    int count = 10;
536    List<String> values = new LinkedList<>();
537    for (int i = 0; i != count; ++i) {
538      values.add(String.valueOf(i));
539    }
540
541    ListRowAccess<String> taker = new ListRowAccess(values);
542    assertEquals(count, taker.size());
543
544    int restoreCount = 0;
545    int takeCount = 0;
546    Iterator<String> it = taker.iterator();
547    while (it.hasNext()) {
548      String v = it.next();
549      assertEquals(String.valueOf(takeCount), v);
550      ++takeCount;
551      it.remove();
552      if (Math.random() >= 0.5) {
553        break;
554      }
555    }
556    assertEquals(count, taker.size() + takeCount);
557
558    it = taker.iterator();
559    while (it.hasNext()) {
560      String v = it.next();
561      assertEquals(String.valueOf(takeCount), v);
562      ++takeCount;
563      it.remove();
564    }
565    assertEquals(0, taker.size());
566    assertEquals(count, takeCount);
567  }
568  private static long calculateRequestCount(long putSizePerServer, long maxHeapSizePerRequest) {
569    if (putSizePerServer <= maxHeapSizePerRequest) {
570      return 1;
571    } else if (putSizePerServer % maxHeapSizePerRequest == 0) {
572      return putSizePerServer / maxHeapSizePerRequest;
573    } else {
574      return putSizePerServer / maxHeapSizePerRequest + 1;
575    }
576  }
577
578  @Test
579  public void testSubmitSameSizeOfRequest() throws Exception {
580    long writeBuffer = 2 * 1024 * 1024;
581    long putsHeapSize = writeBuffer;
582    doSubmitRequest(writeBuffer, putsHeapSize);
583  }
584
585  @Test
586  public void testSubmitLargeRequestWithUnlimitedSize() throws Exception {
587    long maxHeapSizePerRequest = Long.MAX_VALUE;
588    long putsHeapSize = 2 * 1024 * 1024;
589    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
590  }
591
592  @Test
593  public void testSubmitRandomSizeRequest() throws Exception {
594    Random rn = new Random();
595    final long limit = 10 * 1024 * 1024;
596    final int requestCount = 1 + (int) (rn.nextDouble() * 3);
597    long n = rn.nextLong();
598    if (n < 0) {
599      n = -n;
600    } else if (n == 0) {
601      n = 1;
602    }
603    long putsHeapSize = n % limit;
604    long maxHeapSizePerRequest = putsHeapSize / requestCount;
605    LOG.info("[testSubmitRandomSizeRequest] maxHeapSizePerRequest=" + maxHeapSizePerRequest +
606        ", putsHeapSize=" + putsHeapSize);
607    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
608  }
609
610  @Test
611  public void testSubmitSmallRequest() throws Exception {
612    long maxHeapSizePerRequest = 2 * 1024 * 1024;
613    long putsHeapSize = 100;
614    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
615  }
616
617  @Test
618  public void testSubmitLargeRequest() throws Exception {
619    long maxHeapSizePerRequest = 2 * 1024 * 1024;
620    long putsHeapSize = maxHeapSizePerRequest * 2;
621    doSubmitRequest(maxHeapSizePerRequest, putsHeapSize);
622  }
623
624  private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception {
625    ClusterConnection conn = createHConnection();
626    final String defaultClazz =
627        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
628    final long defaultHeapSizePerRequest = conn.getConfiguration().getLong(
629      SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
630      SimpleRequestController.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
631    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
632      SimpleRequestController.class.getName());
633    conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
634        maxHeapSizePerRequest);
635
636    // sn has two regions
637    long putSizeSN = 0;
638    long putSizeSN2 = 0;
639    List<Put> puts = new ArrayList<>();
640    while ((putSizeSN + putSizeSN2) <= putsHeapSize) {
641      Put put1 = new Put(DUMMY_BYTES_1);
642      put1.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
643      Put put2 = new Put(DUMMY_BYTES_2);
644      put2.addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
645      Put put3 = new Put(DUMMY_BYTES_3);
646      put3.addColumn(DUMMY_BYTES_3, DUMMY_BYTES_3, DUMMY_BYTES_3);
647      putSizeSN += (put1.heapSize() + put2.heapSize());
648      putSizeSN2 += put3.heapSize();
649      puts.add(put1);
650      puts.add(put2);
651      puts.add(put3);
652    }
653
654    int minCountSnRequest = (int) calculateRequestCount(putSizeSN, maxHeapSizePerRequest);
655    int minCountSn2Request = (int) calculateRequestCount(putSizeSN2, maxHeapSizePerRequest);
656    LOG.info("Total put count:" + puts.size() + ", putSizeSN:"+ putSizeSN
657        + ", putSizeSN2:" + putSizeSN2
658        + ", maxHeapSizePerRequest:" + maxHeapSizePerRequest
659        + ", minCountSnRequest:" + minCountSnRequest
660        + ", minCountSn2Request:" + minCountSn2Request);
661
662    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
663    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
664    try (BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap)) {
665      mutator.mutate(puts);
666      mutator.flush();
667      List<AsyncRequestFuture> reqs = ap.allReqs;
668
669      int actualSnReqCount = 0;
670      int actualSn2ReqCount = 0;
671      for (AsyncRequestFuture req : reqs) {
672        if (!(req instanceof AsyncRequestFutureImpl)) {
673          continue;
674        }
675        MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req;
676        if (ars.getRequestHeapSize().containsKey(sn)) {
677          ++actualSnReqCount;
678        }
679        if (ars.getRequestHeapSize().containsKey(sn2)) {
680          ++actualSn2ReqCount;
681        }
682      }
683      // If the server is busy, the actual count may be incremented.
684      assertEquals(true, minCountSnRequest <= actualSnReqCount);
685      assertEquals(true, minCountSn2Request <= actualSn2ReqCount);
686      Map<ServerName, Long> sizePerServers = new HashMap<>();
687      for (AsyncRequestFuture req : reqs) {
688        if (!(req instanceof AsyncRequestFutureImpl)) {
689          continue;
690        }
691        MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req;
692        Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize();
693        for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) {
694          long sum = 0;
695          for (long size : entry.getValue()) {
696            assertEquals(true, size <= maxHeapSizePerRequest);
697            sum += size;
698          }
699          assertEquals(true, sum <= maxHeapSizePerRequest);
700          long value = sizePerServers.getOrDefault(entry.getKey(), 0L);
701          sizePerServers.put(entry.getKey(), value + sum);
702        }
703      }
704      assertEquals(true, sizePerServers.containsKey(sn));
705      assertEquals(true, sizePerServers.containsKey(sn2));
706      assertEquals(false, sizePerServers.containsKey(sn3));
707      assertEquals(putSizeSN, (long) sizePerServers.get(sn));
708      assertEquals(putSizeSN2, (long) sizePerServers.get(sn2));
709    }
710    // restore config.
711    conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
712        defaultHeapSizePerRequest);
713    if (defaultClazz != null) {
714      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
715        defaultClazz);
716    }
717  }
718
719  @Test
720  public void testSubmit() throws Exception {
721    ClusterConnection hc = createHConnection();
722    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
723
724    List<Put> puts = new ArrayList<>(1);
725    puts.add(createPut(1, true));
726
727    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
728    Assert.assertTrue(puts.isEmpty());
729  }
730
731  @Test
732  public void testSubmitWithCB() throws Exception {
733    ClusterConnection hc = createHConnection();
734    final AtomicInteger updateCalled = new AtomicInteger(0);
735    Batch.Callback<Object> cb = new Batch.Callback<Object>() {
736      @Override
737      public void update(byte[] region, byte[] row, Object result) {
738        updateCalled.incrementAndGet();
739      }
740    };
741    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
742
743    List<Put> puts = new ArrayList<>(1);
744    puts.add(createPut(1, true));
745
746    final AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, cb, false);
747    Assert.assertTrue(puts.isEmpty());
748    ars.waitUntilDone();
749    Assert.assertEquals(1, updateCalled.get());
750  }
751
752  @Test
753  public void testSubmitBusyRegion() throws Exception {
754    ClusterConnection conn = createHConnection();
755    final String defaultClazz =
756        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
757    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
758      SimpleRequestController.class.getName());
759    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
760    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
761    List<Put> puts = new ArrayList<>(1);
762    puts.add(createPut(1, true));
763
764    for (int i = 0; i != controller.maxConcurrentTasksPerRegion; ++i) {
765      ap.incTaskCounters(Collections.singleton(hri1.getRegionName()), sn);
766    }
767    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
768    Assert.assertEquals(puts.size(), 1);
769
770    ap.decTaskCounters(Collections.singleton(hri1.getRegionName()), sn);
771    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
772    Assert.assertEquals(0, puts.size());
773    if (defaultClazz != null) {
774      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
775        defaultClazz);
776    }
777  }
778
779
780  @Test
781  public void testSubmitBusyRegionServer() throws Exception {
782    ClusterConnection conn = createHConnection();
783    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
784    final String defaultClazz =
785        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
786    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
787      SimpleRequestController.class.getName());
788    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
789    controller.taskCounterPerServer.put(sn2,
790        new AtomicInteger(controller.maxConcurrentTasksPerServer));
791
792    List<Put> puts = new ArrayList<>(4);
793    puts.add(createPut(1, true));
794    puts.add(createPut(3, true)); // <== this one won't be taken, the rs is busy
795    puts.add(createPut(1, true)); // <== this one will make it, the region is already in
796    puts.add(createPut(2, true)); // <== new region, but the rs is ok
797
798    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
799    Assert.assertEquals(" puts=" + puts, 1, puts.size());
800
801    controller.taskCounterPerServer.put(sn2,
802        new AtomicInteger(controller.maxConcurrentTasksPerServer - 1));
803    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
804    Assert.assertTrue(puts.isEmpty());
805    if (defaultClazz != null) {
806      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
807        defaultClazz);
808    }
809  }
810
811  @Test
812  public void testFail() throws Exception {
813    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
814
815    List<Put> puts = new ArrayList<>(1);
816    Put p = createPut(1, false);
817    puts.add(p);
818
819    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
820    Assert.assertEquals(0, puts.size());
821    ars.waitUntilDone();
822    verifyResult(ars, false);
823    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
824
825    Assert.assertEquals(1, ars.getErrors().exceptions.size());
826    Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
827        failure.equals(ars.getErrors().exceptions.get(0)));
828    Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
829        failure.equals(ars.getErrors().exceptions.get(0)));
830
831    Assert.assertEquals(1, ars.getFailedOperations().size());
832    Assert.assertTrue("was: " + ars.getFailedOperations().get(0),
833        p.equals(ars.getFailedOperations().get(0)));
834  }
835
836
837  @Test
838  public void testSubmitTrue() throws IOException {
839    ClusterConnection conn = createHConnection();
840    final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
841    final String defaultClazz =
842        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
843    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
844      SimpleRequestController.class.getName());
845    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
846    controller.tasksInProgress.incrementAndGet();
847    final AtomicInteger ai = new AtomicInteger(controller.maxConcurrentTasksPerRegion);
848    controller.taskCounterPerRegion.put(hri1.getRegionName(), ai);
849
850    final AtomicBoolean checkPoint = new AtomicBoolean(false);
851    final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
852
853    Thread t = new Thread(){
854      @Override
855      public void run(){
856        Threads.sleep(1000);
857        Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent
858        ai.decrementAndGet();
859        controller.tasksInProgress.decrementAndGet();
860        checkPoint2.set(true);
861      }
862    };
863
864    List<Put> puts = new ArrayList<>(1);
865    Put p = createPut(1, true);
866    puts.add(p);
867
868    ap.submit(null, DUMMY_TABLE, puts, false, null, false);
869    Assert.assertFalse(puts.isEmpty());
870
871    t.start();
872
873    ap.submit(null, DUMMY_TABLE, puts, true, null, false);
874    Assert.assertTrue(puts.isEmpty());
875
876    checkPoint.set(true);
877    while (!checkPoint2.get()){
878      Threads.sleep(1);
879    }
880    if (defaultClazz != null) {
881      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
882        defaultClazz);
883    }
884  }
885
886  @Test
887  public void testFailAndSuccess() throws Exception {
888    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
889
890    List<Put> puts = new ArrayList<>(3);
891    puts.add(createPut(1, false));
892    puts.add(createPut(1, true));
893    puts.add(createPut(1, true));
894
895    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
896    Assert.assertTrue(puts.isEmpty());
897    ars.waitUntilDone();
898    verifyResult(ars, false, true, true);
899    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
900    ap.callsCt.set(0);
901    Assert.assertEquals(1, ars.getErrors().actions.size());
902
903    puts.add(createPut(1, true));
904    // Wait for AP to be free. While ars might have the result, ap counters are decreased later.
905    ap.waitForMaximumCurrentTasks(0, null);
906    ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
907    Assert.assertEquals(0, puts.size());
908    ars.waitUntilDone();
909    Assert.assertEquals(1, ap.callsCt.get());
910    verifyResult(ars, true);
911  }
912
913  @Test
914  public void testFlush() throws Exception {
915    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
916
917    List<Put> puts = new ArrayList<>(3);
918    puts.add(createPut(1, false));
919    puts.add(createPut(1, true));
920    puts.add(createPut(1, true));
921
922    AsyncRequestFuture ars = ap.submit(null, DUMMY_TABLE, puts, false, null, true);
923    ars.waitUntilDone();
924    verifyResult(ars, false, true, true);
925    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
926
927    Assert.assertEquals(1, ars.getFailedOperations().size());
928  }
929
930  @Test
931  public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException {
932    ClusterConnection hc = createHConnection();
933    MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
934    testTaskCount(ap);
935  }
936
937  @Test
938  public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException {
939    Configuration copyConf = new Configuration(CONF);
940    copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
941    MyClientBackoffPolicy bp = new MyClientBackoffPolicy();
942    ClusterConnection conn = createHConnection();
943    Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
944    Mockito.when(conn.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf));
945    Mockito.when(conn.getBackoffPolicy()).thenReturn(bp);
946    final String defaultClazz =
947        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
948    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
949      SimpleRequestController.class.getName());
950    MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
951    testTaskCount(ap);
952    if (defaultClazz != null) {
953      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
954        defaultClazz);
955    }
956  }
957
958  private void testTaskCount(MyAsyncProcess ap)
959      throws InterruptedIOException, InterruptedException {
960    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
961    List<Put> puts = new ArrayList<>();
962    for (int i = 0; i != 3; ++i) {
963      puts.add(createPut(1, true));
964      puts.add(createPut(2, true));
965      puts.add(createPut(3, true));
966    }
967    ap.submit(null, DUMMY_TABLE, puts, true, null, false);
968    ap.waitForMaximumCurrentTasks(0, null);
969    // More time to wait if there are incorrect task count.
970    TimeUnit.SECONDS.sleep(1);
971    assertEquals(0, controller.tasksInProgress.get());
972    for (AtomicInteger count : controller.taskCounterPerRegion.values()) {
973      assertEquals(0, count.get());
974    }
975    for (AtomicInteger count : controller.taskCounterPerServer.values()) {
976      assertEquals(0, count.get());
977    }
978  }
979
980  @Test
981  public void testMaxTask() throws Exception {
982    ClusterConnection conn = createHConnection();
983    final String defaultClazz =
984        conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
985    conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
986      SimpleRequestController.class.getName());
987    final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
988    SimpleRequestController controller = (SimpleRequestController) ap.requestController;
989
990
991    for (int i = 0; i < 1000; i++) {
992      ap.incTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn);
993    }
994
995    final Thread myThread = Thread.currentThread();
996
997    Thread t = new Thread() {
998      @Override
999      public void run() {
1000        Threads.sleep(2000);
1001        myThread.interrupt();
1002      }
1003    };
1004
1005    List<Put> puts = new ArrayList<>(1);
1006    puts.add(createPut(1, true));
1007
1008    t.start();
1009
1010    try {
1011      ap.submit(null, DUMMY_TABLE, puts, false, null, false);
1012      Assert.fail("We should have been interrupted.");
1013    } catch (InterruptedIOException expected) {
1014    }
1015
1016    final long sleepTime = 2000;
1017
1018    Thread t2 = new Thread() {
1019      @Override
1020      public void run() {
1021        Threads.sleep(sleepTime);
1022        while (controller.tasksInProgress.get() > 0) {
1023          ap.decTaskCounters(Collections.singleton(Bytes.toBytes("dummy")), sn);
1024        }
1025      }
1026    };
1027    t2.start();
1028
1029    long start = System.currentTimeMillis();
1030    ap.submit(null, DUMMY_TABLE, new ArrayList<>(), false, null, false);
1031    long end = System.currentTimeMillis();
1032
1033    //Adds 100 to secure us against approximate timing.
1034    Assert.assertTrue(start + 100L + sleepTime > end);
1035    if (defaultClazz != null) {
1036      conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
1037        defaultClazz);
1038    }
1039  }
1040
1041  private ClusterConnection createHConnection() throws IOException {
1042    ClusterConnection hc = createHConnectionCommon();
1043    setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1));
1044    setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2));
1045    setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3));
1046    Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(),
1047        Mockito.anyBoolean())).thenReturn(Arrays.asList(loc1, loc2, loc3));
1048    setMockLocation(hc, FAILS, new RegionLocations(loc2));
1049    return hc;
1050  }
1051
1052  private ClusterConnection createHConnectionWithReplicas() throws IOException {
1053    ClusterConnection hc = createHConnectionCommon();
1054    setMockLocation(hc, DUMMY_BYTES_1, hrls1);
1055    setMockLocation(hc, DUMMY_BYTES_2, hrls2);
1056    setMockLocation(hc, DUMMY_BYTES_3, hrls3);
1057    List<HRegionLocation> locations = new ArrayList<>();
1058    for (HRegionLocation loc : hrls1.getRegionLocations()) {
1059      locations.add(loc);
1060    }
1061    for (HRegionLocation loc : hrls2.getRegionLocations()) {
1062      locations.add(loc);
1063    }
1064    for (HRegionLocation loc : hrls3.getRegionLocations()) {
1065      locations.add(loc);
1066    }
1067    Mockito.when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(),
1068        Mockito.anyBoolean())).thenReturn(locations);
1069    return hc;
1070  }
1071
1072  private static void setMockLocation(ClusterConnection hc, byte[] row,
1073      RegionLocations result) throws IOException {
1074    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
1075        Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
1076    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
1077        Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result);
1078  }
1079
1080  private ClusterConnection createHConnectionCommon() {
1081    ClusterConnection hc = Mockito.mock(ClusterConnection.class);
1082    NonceGenerator ng = Mockito.mock(NonceGenerator.class);
1083    Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
1084    Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
1085    Mockito.when(hc.getConfiguration()).thenReturn(CONF);
1086    Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG);
1087    return hc;
1088  }
1089
1090  @Test
1091  public void testHTablePutSuccess() throws Exception {
1092    ClusterConnection conn = createHConnection();
1093    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1094    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1095    BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
1096
1097    Put put = createPut(1, true);
1098
1099    Assert.assertEquals(conn.getConnectionConfiguration().getWriteBufferSize(),
1100        ht.getWriteBufferSize());
1101    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1102    ht.mutate(put);
1103    ht.flush();
1104    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1105  }
1106
1107  @Test
1108  public void testSettingWriteBufferPeriodicFlushParameters() throws Exception {
1109    ClusterConnection conn = createHConnection();
1110    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1111
1112    checkPeriodicFlushParameters(conn, ap,
1113            1234, 1234,
1114            1234, 1234);
1115    checkPeriodicFlushParameters(conn, ap,
1116               0,    0,
1117               0,    BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
1118    checkPeriodicFlushParameters(conn, ap,
1119           -1234,    0,
1120           -1234,    BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
1121    checkPeriodicFlushParameters(conn, ap,
1122               1,    1,
1123               1,    BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
1124  }
1125
1126  private void checkPeriodicFlushParameters(ClusterConnection conn,
1127                                            MyAsyncProcess ap,
1128                                            long setTO, long expectTO,
1129                                            long setTT, long expectTT
1130                                            ) {
1131    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1132
1133    // The BufferedMutatorParams does nothing with the value
1134    bufferParam.setWriteBufferPeriodicFlushTimeoutMs(setTO);
1135    bufferParam.setWriteBufferPeriodicFlushTimerTickMs(setTT);
1136    Assert.assertEquals(setTO, bufferParam.getWriteBufferPeriodicFlushTimeoutMs());
1137    Assert.assertEquals(setTT, bufferParam.getWriteBufferPeriodicFlushTimerTickMs());
1138
1139    // The BufferedMutatorImpl corrects illegal values (indirect via BufferedMutatorParams)
1140    BufferedMutatorImpl ht1 = new BufferedMutatorImpl(conn, bufferParam, ap);
1141    Assert.assertEquals(expectTO, ht1.getWriteBufferPeriodicFlushTimeoutMs());
1142    Assert.assertEquals(expectTT, ht1.getWriteBufferPeriodicFlushTimerTickMs());
1143
1144    // The BufferedMutatorImpl corrects illegal values (direct via setter)
1145    BufferedMutatorImpl ht2 =
1146            new BufferedMutatorImpl(conn, createBufferedMutatorParams(ap, DUMMY_TABLE), ap);
1147    ht2.setWriteBufferPeriodicFlush(setTO, setTT);
1148    Assert.assertEquals(expectTO, ht2.getWriteBufferPeriodicFlushTimeoutMs());
1149    Assert.assertEquals(expectTT, ht2.getWriteBufferPeriodicFlushTimerTickMs());
1150
1151  }
1152
1153  @Test
1154  public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception {
1155    ClusterConnection conn = createHConnection();
1156    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1157    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1158
1159    bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1);     // Flush ASAP
1160    bufferParam.setWriteBufferPeriodicFlushTimerTickMs(1); // Check every 100ms
1161    bufferParam.writeBufferSize(10000);  // Write buffer set to much larger than the single record
1162
1163    BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
1164
1165    // Verify if BufferedMutator has the right settings.
1166    Assert.assertEquals(10000, ht.getWriteBufferSize());
1167    Assert.assertEquals(1, ht.getWriteBufferPeriodicFlushTimeoutMs());
1168    Assert.assertEquals(BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS,
1169            ht.getWriteBufferPeriodicFlushTimerTickMs());
1170
1171    Put put = createPut(1, true);
1172
1173    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
1174    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1175
1176    // ----- Insert, flush immediately, MUST NOT flush automatically
1177    ht.mutate(put);
1178    ht.flush();
1179
1180    Thread.sleep(1000);
1181    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
1182    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1183
1184    // ----- Insert, NO flush, MUST flush automatically
1185    ht.mutate(put);
1186    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
1187    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
1188
1189    // The timerTick should fire every 100ms, so after twice that we must have
1190    // seen at least 1 tick and we should see an automatic flush
1191    Thread.sleep(200);
1192    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1193    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1194
1195    // Ensure it does not flush twice
1196    Thread.sleep(200);
1197    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1198    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1199
1200    // ----- DISABLE AUTO FLUSH, Insert, NO flush, MUST NOT flush automatically
1201    ht.disableWriteBufferPeriodicFlush();
1202    ht.mutate(put);
1203    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1204    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
1205
1206    // Wait for at least 1 timerTick, we should see NO flushes.
1207    Thread.sleep(200);
1208    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
1209    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
1210
1211    // Reenable periodic flushing, a flush seems to take about 1 second
1212    // so we wait for 2 seconds and it should have finished the flush.
1213    ht.setWriteBufferPeriodicFlush(1, 100);
1214    Thread.sleep(2000);
1215    Assert.assertEquals(2, ht.getExecutedWriteBufferPeriodicFlushes());
1216    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
1217  }
1218
1219
1220  @Test
1221  public void testBufferedMutatorImplWithSharedPool() throws Exception {
1222    ClusterConnection conn = createHConnection();
1223    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1224    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1225    BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap);
1226
1227    ht.close();
1228    assertFalse(ap.service.isShutdown());
1229  }
1230
1231  @Test
1232  public void testFailedPutAndNewPut() throws Exception {
1233    ClusterConnection conn = createHConnection();
1234    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
1235    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE)
1236            .writeBufferSize(0);
1237    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1238
1239    Put p = createPut(1, false);
1240    try {
1241      mutator.mutate(p);
1242      Assert.fail();
1243    } catch (RetriesExhaustedWithDetailsException expected) {
1244      assertEquals(1, expected.getNumExceptions());
1245      assertTrue(expected.getRow(0) == p);
1246    }
1247    // Let's do all the retries.
1248    ap.waitForMaximumCurrentTasks(0, null);
1249    Assert.assertEquals(0, mutator.size());
1250
1251    // There is no global error so the new put should not fail
1252    mutator.mutate(createPut(1, true));
1253    Assert.assertEquals("the put should not been inserted.", 0, mutator.size());
1254  }
1255
1256  @SuppressWarnings("SelfComparison")
1257  @Test
1258  public void testAction() {
1259    Action action_0 = new Action(new Put(Bytes.toBytes("abc")), 10);
1260    Action action_1 = new Action(new Put(Bytes.toBytes("ccc")), 10);
1261    Action action_2 = new Action(new Put(Bytes.toBytes("ccc")), 10);
1262    Action action_3 = new Action(new Delete(Bytes.toBytes("ccc")), 10);
1263    assertFalse(action_0.equals(action_1));
1264    assertTrue(action_0.equals(action_0));
1265    assertTrue(action_1.equals(action_2));
1266    assertTrue(action_2.equals(action_1));
1267    assertFalse(action_0.equals(new Put(Bytes.toBytes("abc"))));
1268    assertTrue(action_2.equals(action_3));
1269    assertFalse(action_0.equals(action_3));
1270    assertEquals(0, action_0.compareTo(action_0));
1271    assertTrue(action_0.compareTo(action_1) < 0);
1272    assertTrue(action_1.compareTo(action_0) > 0);
1273    assertEquals(0, action_1.compareTo(action_2));
1274  }
1275
1276  @Test
1277  public void testBatch() throws IOException, InterruptedException {
1278    ClusterConnection conn = new MyConnectionImpl(CONF);
1279    HTable ht = (HTable) conn.getTable(DUMMY_TABLE);
1280    ht.multiAp = new MyAsyncProcess(conn, CONF);
1281
1282    List<Put> puts = new ArrayList<>(7);
1283    puts.add(createPut(1, true));
1284    puts.add(createPut(1, true));
1285    puts.add(createPut(1, true));
1286    puts.add(createPut(1, true));
1287    puts.add(createPut(1, false)); // <=== the bad apple, position 4
1288    puts.add(createPut(1, true));
1289    puts.add(createPut(1, false)); // <=== another bad apple, position 6
1290
1291    Object[] res = new Object[puts.size()];
1292    try {
1293      ht.batch(puts, res);
1294      Assert.fail();
1295    } catch (RetriesExhaustedException expected) {
1296    }
1297
1298    Assert.assertEquals(success, res[0]);
1299    Assert.assertEquals(success, res[1]);
1300    Assert.assertEquals(success, res[2]);
1301    Assert.assertEquals(success, res[3]);
1302    Assert.assertEquals(failure, res[4]);
1303    Assert.assertEquals(success, res[5]);
1304    Assert.assertEquals(failure, res[6]);
1305  }
1306  @Test
1307  public void testErrorsServers() throws IOException {
1308    Configuration configuration = new Configuration(CONF);
1309    ClusterConnection conn = new MyConnectionImpl(configuration);
1310    MyAsyncProcess ap = new MyAsyncProcess(conn, configuration);
1311    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1312    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1313    configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true);
1314
1315    Assert.assertNotNull(ap.createServerErrorTracker());
1316    Assert.assertTrue(ap.serverTrackerTimeout > 200L);
1317    ap.serverTrackerTimeout = 1L;
1318
1319    Put p = createPut(1, false);
1320    mutator.mutate(p);
1321
1322    try {
1323      mutator.flush();
1324      Assert.fail();
1325    } catch (RetriesExhaustedWithDetailsException expected) {
1326      assertEquals(1, expected.getNumExceptions());
1327      assertTrue(expected.getRow(0) == p);
1328    }
1329    // Checking that the ErrorsServers came into play and didn't make us stop immediately
1330    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
1331  }
1332
1333  @Test
1334  public void testReadAndWriteTimeout() throws IOException {
1335    final long readTimeout = 10 * 1000;
1336    final long writeTimeout = 20 * 1000;
1337    Configuration copyConf = new Configuration(CONF);
1338    copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout);
1339    copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout);
1340    ClusterConnection conn = new MyConnectionImpl(copyConf);
1341    MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf);
1342    try (HTable ht = (HTable) conn.getTable(DUMMY_TABLE)) {
1343      ht.multiAp = ap;
1344      List<Get> gets = new LinkedList<>();
1345      gets.add(new Get(DUMMY_BYTES_1));
1346      gets.add(new Get(DUMMY_BYTES_2));
1347      try {
1348        ht.get(gets);
1349      } catch (ClassCastException e) {
1350        // No result response on this test.
1351      }
1352      assertEquals(readTimeout, ap.previousTimeout);
1353      ap.previousTimeout = -1;
1354
1355      try {
1356        ht.existsAll(gets);
1357      } catch (ClassCastException e) {
1358        // No result response on this test.
1359      }
1360      assertEquals(readTimeout, ap.previousTimeout);
1361      ap.previousTimeout = -1;
1362
1363      List<Delete> deletes = new LinkedList<>();
1364      deletes.add(new Delete(DUMMY_BYTES_1));
1365      deletes.add(new Delete(DUMMY_BYTES_2));
1366      ht.delete(deletes);
1367      assertEquals(writeTimeout, ap.previousTimeout);
1368    }
1369  }
1370
1371  @Test
1372  public void testErrors() throws IOException {
1373    ClusterConnection conn = new MyConnectionImpl(CONF);
1374    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test"));
1375    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1376    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1377
1378    Assert.assertNotNull(ap.createServerErrorTracker());
1379
1380    Put p = createPut(1, true);
1381    mutator.mutate(p);
1382
1383    try {
1384      mutator.flush();
1385      Assert.fail();
1386    } catch (RetriesExhaustedWithDetailsException expected) {
1387      assertEquals(1, expected.getNumExceptions());
1388      assertTrue(expected.getRow(0) == p);
1389    }
1390    // Checking that the ErrorsServers came into play and didn't make us stop immediately
1391    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
1392  }
1393
1394
1395  @Test
1396  public void testCallQueueTooLarge() throws IOException {
1397    ClusterConnection conn = new MyConnectionImpl(CONF);
1398    AsyncProcessWithFailure ap =
1399        new AsyncProcessWithFailure(conn, CONF, new CallQueueTooBigException());
1400    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1401    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1402    Assert.assertNotNull(ap.createServerErrorTracker());
1403    Put p = createPut(1, true);
1404    mutator.mutate(p);
1405
1406    try {
1407      mutator.flush();
1408      Assert.fail();
1409    } catch (RetriesExhaustedWithDetailsException expected) {
1410      assertEquals(1, expected.getNumExceptions());
1411      assertTrue(expected.getRow(0) == p);
1412    }
1413    // Checking that the ErrorsServers came into play and didn't make us stop immediately
1414    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
1415  }
1416  /**
1417   * This test simulates multiple regions on 2 servers. We should have 2 multi requests and
1418   *  2 threads: 1 per server, this whatever the number of regions.
1419   */
1420  @Test
1421  public void testThreadCreation() throws Exception {
1422    final int NB_REGS = 100;
1423    List<HRegionLocation> hrls = new ArrayList<>(NB_REGS);
1424    List<Get> gets = new ArrayList<>(NB_REGS);
1425    for (int i = 0; i < NB_REGS; i++) {
1426      HRegionInfo hri = new HRegionInfo(
1427          DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i);
1428      HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
1429      hrls.add(hrl);
1430
1431      Get get = new Get(Bytes.toBytes(i * 10L));
1432      gets.add(get);
1433    }
1434
1435    MyConnectionImpl2 con = new MyConnectionImpl2(hrls, CONF);
1436    MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads);
1437    HTable ht = (HTable) con.getTable(DUMMY_TABLE, ap.service);
1438    ht.multiAp = ap;
1439    ht.batch(gets, null);
1440
1441    Assert.assertEquals(NB_REGS, ap.nbActions.get());
1442    Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get());
1443    Assert.assertEquals("1 thread per server", 2, con.nbThreads.get());
1444
1445    int nbReg = 0;
1446    for (int i =0; i<NB_REGS; i++){
1447      if (con.usedRegions[i]) nbReg++;
1448    }
1449    Assert.assertEquals("nbReg=" + nbReg, NB_REGS, nbReg);
1450  }
1451
1452  @Test
1453  public void testReplicaReplicaSuccess() throws Exception {
1454    // Main call takes too long so replicas succeed, except for one region w/o replicas.
1455    // One region has no replica, so the main call succeeds for it.
1456    MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
1457    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
1458    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1459            .setPool(ap.service)
1460            .setRpcTimeout(RPC_TIMEOUT)
1461            .setOperationTimeout(OPERATION_TIMEOUT)
1462            .setTableName(DUMMY_TABLE)
1463            .setRowAccess(rows)
1464            .setResults(new Object[3])
1465            .setSubmittedRows(SubmittedRows.ALL)
1466            .build();
1467    AsyncRequestFuture ars = ap.submit(task);
1468    verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
1469    Assert.assertEquals(2, ap.getReplicaCallCount());
1470  }
1471
1472  @Test
1473  public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception {
1474    // Main call succeeds before replica calls are kicked off.
1475    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
1476    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
1477    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1478            .setPool(ap.service)
1479            .setRpcTimeout(RPC_TIMEOUT)
1480            .setOperationTimeout(OPERATION_TIMEOUT)
1481            .setTableName(DUMMY_TABLE)
1482            .setRowAccess(rows)
1483            .setResults(new Object[3])
1484            .setSubmittedRows(SubmittedRows.ALL)
1485            .build();
1486    AsyncRequestFuture ars = ap.submit(task);
1487    verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
1488    Assert.assertEquals(0, ap.getReplicaCallCount());
1489  }
1490
1491  @Test
1492  public void testReplicaParallelCallsSucceed() throws Exception {
1493    // Either main or replica can succeed.
1494    MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
1495    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1496    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1497            .setPool(ap.service)
1498            .setRpcTimeout(RPC_TIMEOUT)
1499            .setOperationTimeout(OPERATION_TIMEOUT)
1500            .setTableName(DUMMY_TABLE)
1501            .setRowAccess(rows)
1502            .setResults(new Object[2])
1503            .setSubmittedRows(SubmittedRows.ALL)
1504            .build();
1505    AsyncRequestFuture ars = ap.submit(task);
1506    verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
1507    long replicaCalls = ap.getReplicaCallCount();
1508    Assert.assertTrue(replicaCalls >= 0);
1509    Assert.assertTrue(replicaCalls <= 2);
1510  }
1511
1512  @Test
1513  public void testReplicaPartialReplicaCall() throws Exception {
1514    // One server is slow, so the result for its region comes from replica, whereas
1515    // the result for other region comes from primary before replica calls happen.
1516    // There should be no replica call for that region at all.
1517    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
1518    ap.setPrimaryCallDelay(sn2, 2000);
1519    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1520    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1521            .setPool(ap.service)
1522            .setRpcTimeout(RPC_TIMEOUT)
1523            .setOperationTimeout(OPERATION_TIMEOUT)
1524            .setTableName(DUMMY_TABLE)
1525            .setRowAccess(rows)
1526            .setResults(new Object[2])
1527            .setSubmittedRows(SubmittedRows.ALL)
1528            .build();
1529    AsyncRequestFuture ars = ap.submit(task);
1530    verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
1531    Assert.assertEquals(1, ap.getReplicaCallCount());
1532  }
1533
1534  @Test
1535  public void testReplicaMainFailsBeforeReplicaCalls() throws Exception {
1536    // Main calls fail before replica calls can start - this is currently not handled.
1537    // It would probably never happen if we can get location (due to retries),
1538    // and it would require additional synchronization.
1539    MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0);
1540    ap.addFailures(hri1, hri2);
1541    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1542    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1543            .setPool(ap.service)
1544            .setRpcTimeout(RPC_TIMEOUT)
1545            .setOperationTimeout(OPERATION_TIMEOUT)
1546            .setTableName(DUMMY_TABLE)
1547            .setRowAccess(rows)
1548            .setResults(new Object[2])
1549            .setSubmittedRows(SubmittedRows.ALL)
1550            .build();
1551    AsyncRequestFuture ars = ap.submit(task);
1552    verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
1553    Assert.assertEquals(0, ap.getReplicaCallCount());
1554  }
1555
1556  @Test
1557  public void testReplicaReplicaSuccessWithParallelFailures() throws Exception {
1558    // Main calls fails after replica calls start. For two-replica region, one replica call
1559    // also fails. Regardless, we get replica results for both regions.
1560    MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0);
1561    ap.addFailures(hri1, hri1r2, hri2);
1562    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1563    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1564            .setPool(ap.service)
1565            .setRpcTimeout(RPC_TIMEOUT)
1566            .setOperationTimeout(OPERATION_TIMEOUT)
1567            .setTableName(DUMMY_TABLE)
1568            .setRowAccess(rows)
1569            .setResults(new Object[2])
1570            .setSubmittedRows(SubmittedRows.ALL)
1571            .build();
1572    AsyncRequestFuture ars = ap.submit(task);
1573    verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
1574    Assert.assertEquals(2, ap.getReplicaCallCount());
1575  }
1576
1577  @Test
1578  public void testReplicaAllCallsFailForOneRegion() throws Exception {
1579    // For one of the region, all 3, main and replica, calls fail. For the other, replica
1580    // call fails but its exception should not be visible as it did succeed.
1581    MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0);
1582    ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
1583    List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
1584    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1585            .setPool(ap.service)
1586            .setRpcTimeout(RPC_TIMEOUT)
1587            .setOperationTimeout(OPERATION_TIMEOUT)
1588            .setTableName(DUMMY_TABLE)
1589            .setRowAccess(rows)
1590            .setResults(new Object[2])
1591            .setSubmittedRows(SubmittedRows.ALL)
1592            .build();
1593    AsyncRequestFuture ars = ap.submit(task);
1594    verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
1595    // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1
1596    Assert.assertEquals(3, ars.getErrors().getNumExceptions());
1597    for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) {
1598      Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow());
1599    }
1600  }
1601
1602  private MyAsyncProcessWithReplicas createReplicaAp(
1603      int replicaAfterMs, int primaryMs, int replicaMs) throws Exception {
1604    return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1);
1605  }
1606
1607  private MyAsyncProcessWithReplicas createReplicaAp(
1608      int replicaAfterMs, int primaryMs, int replicaMs, int retries) throws Exception {
1609    // TODO: this is kind of timing dependent... perhaps it should detect from createCaller
1610    //       that the replica call has happened and that way control the ordering.
1611    Configuration conf = new Configuration();
1612    ClusterConnection conn = createHConnectionWithReplicas();
1613    conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000);
1614    if (retries >= 0) {
1615      conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
1616    }
1617    MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf);
1618    ap.setCallDelays(primaryMs, replicaMs);
1619    return ap;
1620  }
1621
1622  private BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap,
1623      TableName name) {
1624    return new BufferedMutatorParams(name)
1625            .pool(ap.service)
1626            .rpcTimeout(RPC_TIMEOUT)
1627            .opertationTimeout(OPERATION_TIMEOUT);
1628  }
1629
1630  private static List<Get> makeTimelineGets(byte[]... rows) {
1631    List<Get> result = new ArrayList<>(rows.length);
1632    for (byte[] row : rows) {
1633      Get get = new Get(row);
1634      get.setConsistency(Consistency.TIMELINE);
1635      result.add(get);
1636    }
1637    return result;
1638  }
1639
1640  private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception {
1641    Object[] actual = ars.getResults();
1642    Assert.assertEquals(expected.length, actual.length);
1643    for (int i = 0; i < expected.length; ++i) {
1644      Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable));
1645    }
1646  }
1647
1648  /** After reading TheDailyWtf, I always wanted to create a MyBoolean enum like this! */
1649  private enum RR {
1650    TRUE,
1651    FALSE,
1652    DONT_CARE,
1653    FAILED
1654  }
1655
1656  private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception {
1657    Object[] actuals = ars.getResults();
1658    Assert.assertEquals(expecteds.length, actuals.length);
1659    for (int i = 0; i < expecteds.length; ++i) {
1660      Object actual = actuals[i];
1661      RR expected = expecteds[i];
1662      Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable);
1663      if (expected != RR.FAILED && expected != RR.DONT_CARE) {
1664        Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale());
1665      }
1666    }
1667  }
1668
1669  /**
1670   * @param regCnt  the region: 1 to 3.
1671   * @param success if true, the put will succeed.
1672   * @return a put
1673   */
1674  private Put createPut(int regCnt, boolean success) {
1675    Put p;
1676    if (!success) {
1677      p = new Put(FAILS);
1678    } else switch (regCnt){
1679      case 1 :
1680        p = new Put(DUMMY_BYTES_1);
1681        break;
1682      case 2:
1683        p = new Put(DUMMY_BYTES_2);
1684        break;
1685      case 3:
1686        p = new Put(DUMMY_BYTES_3);
1687        break;
1688      default:
1689        throw new IllegalArgumentException("unknown " + regCnt);
1690    }
1691
1692    p.addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
1693
1694    return p;
1695  }
1696
1697  static class MyThreadPoolExecutor extends ThreadPoolExecutor {
1698    public MyThreadPoolExecutor(int coreThreads, int maxThreads, long keepAliveTime,
1699        TimeUnit timeunit, BlockingQueue<Runnable> blockingqueue) {
1700      super(coreThreads, maxThreads, keepAliveTime, timeunit, blockingqueue);
1701    }
1702
1703    @Override
1704    public Future submit(Runnable runnable) {
1705      throw new OutOfMemoryError("OutOfMemory error thrown by means");
1706    }
1707  }
1708
1709  static class AsyncProcessForThrowableCheck extends AsyncProcess {
1710    public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
1711      super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(
1712          conf));
1713    }
1714  }
1715
1716  @Test
1717  public void testUncheckedException() throws Exception {
1718    // Test the case pool.submit throws unchecked exception
1719    ClusterConnection hc = createHConnection();
1720    MyThreadPoolExecutor myPool =
1721        new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
1722            new LinkedBlockingQueue<>(200));
1723    AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, CONF);
1724
1725    List<Put> puts = new ArrayList<>(1);
1726    puts.add(createPut(1, true));
1727    AsyncProcessTask task = AsyncProcessTask.newBuilder()
1728            .setPool(myPool)
1729            .setRpcTimeout(RPC_TIMEOUT)
1730            .setOperationTimeout(OPERATION_TIMEOUT)
1731            .setTableName(DUMMY_TABLE)
1732            .setRowAccess(puts)
1733            .setSubmittedRows(SubmittedRows.NORMAL)
1734            .build();
1735    ap.submit(task);
1736    Assert.assertTrue(puts.isEmpty());
1737  }
1738
1739  /**
1740   * Test and make sure we could use a special pause setting when retry with
1741   * CallQueueTooBigException, see HBASE-17114
1742   * @throws Exception if unexpected error happened during test
1743   */
1744  @Test
1745  public void testRetryPauseWithCallQueueTooBigException() throws Exception {
1746    Configuration myConf = new Configuration(CONF);
1747    final long specialPause = 500L;
1748    final int retries = 1;
1749    myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause);
1750    myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
1751    ClusterConnection conn = new MyConnectionImpl(myConf);
1752    AsyncProcessWithFailure ap =
1753        new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException());
1754    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1755    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1756
1757    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
1758
1759    Put p = createPut(1, true);
1760    mutator.mutate(p);
1761
1762    long startTime = System.currentTimeMillis();
1763    try {
1764      mutator.flush();
1765      Assert.fail();
1766    } catch (RetriesExhaustedWithDetailsException expected) {
1767      assertEquals(1, expected.getNumExceptions());
1768      assertTrue(expected.getRow(0) == p);
1769    }
1770    long actualSleep = System.currentTimeMillis() - startTime;
1771    long expectedSleep = 0L;
1772    for (int i = 0; i < retries; i++) {
1773      expectedSleep += ConnectionUtils.getPauseTime(specialPause, i);
1774      // Prevent jitter in CollectionUtils#getPauseTime to affect result
1775      actualSleep += (long) (specialPause * 0.01f);
1776    }
1777    LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
1778    Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually " + actualSleep + "ms",
1779      actualSleep >= expectedSleep);
1780
1781    // check and confirm normal IOE will use the normal pause
1782    final long normalPause =
1783        myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1784    ap = new AsyncProcessWithFailure(conn, myConf, new IOException());
1785    bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1786    mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1787    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
1788    mutator.mutate(p);
1789    startTime = System.currentTimeMillis();
1790    try {
1791      mutator.flush();
1792      Assert.fail();
1793    } catch (RetriesExhaustedWithDetailsException expected) {
1794      assertEquals(1, expected.getNumExceptions());
1795      assertTrue(expected.getRow(0) == p);
1796    }
1797    actualSleep = System.currentTimeMillis() - startTime;
1798    expectedSleep = 0L;
1799    for (int i = 0; i < retries; i++) {
1800      expectedSleep += ConnectionUtils.getPauseTime(normalPause, i);
1801    }
1802    // plus an additional pause to balance the program execution time
1803    expectedSleep += normalPause;
1804    LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
1805    Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep);
1806  }
1807
1808  @Test
1809  public void testRetryWithExceptionClearsMetaCache() throws Exception {
1810    ClusterConnection conn = createHConnection();
1811    Configuration myConf = conn.getConfiguration();
1812    myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
1813
1814    AsyncProcessWithFailure ap =
1815        new AsyncProcessWithFailure(conn, myConf, new RegionOpeningException("test"));
1816    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
1817    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
1818
1819    Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
1820
1821    Assert.assertEquals(
1822        conn.locateRegion(DUMMY_TABLE, DUMMY_BYTES_1, true, true).toString(),
1823        new RegionLocations(loc1).toString());
1824
1825    Mockito.verify(conn, Mockito.times(0)).clearCaches(Mockito.any());
1826
1827    Put p = createPut(1, true);
1828    mutator.mutate(p);
1829
1830    try {
1831      mutator.flush();
1832      Assert.fail();
1833    } catch (RetriesExhaustedWithDetailsException expected) {
1834      assertEquals(1, expected.getNumExceptions());
1835      assertTrue(expected.getRow(0) == p);
1836    }
1837
1838    Mockito.verify(conn, Mockito.times(1)).clearCaches(loc1.getServerName());
1839  }
1840
1841  @Test
1842  public void testQueueRowAccess() throws Exception {
1843    ClusterConnection conn = createHConnection();
1844    BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
1845      new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000));
1846    Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
1847    Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2);
1848    mutator.mutate(p0);
1849    BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess();
1850    // QueueRowAccess should take all undealt mutations
1851    assertEquals(0, mutator.size());
1852    mutator.mutate(p1);
1853    assertEquals(1, mutator.size());
1854    BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess();
1855    // QueueRowAccess should take all undealt mutations
1856    assertEquals(0, mutator.size());
1857    assertEquals(1, ra0.size());
1858    assertEquals(1, ra1.size());
1859    Iterator<Row> iter0 = ra0.iterator();
1860    Iterator<Row> iter1 = ra1.iterator();
1861    assertTrue(iter0.hasNext());
1862    assertTrue(iter1.hasNext());
1863    // the next() will poll the mutation from inner buffer and update the buffer count
1864    assertTrue(iter0.next() == p0);
1865    assertEquals(1, mutator.getUnflushedSize());
1866    assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
1867    assertTrue(iter1.next() == p1);
1868    assertEquals(0, mutator.getUnflushedSize());
1869    assertEquals(0, mutator.getCurrentWriteBufferSize());
1870    assertFalse(iter0.hasNext());
1871    assertFalse(iter1.hasNext());
1872    // ra0 doest handle the mutation so the mutation won't be pushed back to buffer
1873    iter0.remove();
1874    ra0.close();
1875    assertEquals(0, mutator.size());
1876    assertEquals(0, mutator.getUnflushedSize());
1877    assertEquals(0, mutator.getCurrentWriteBufferSize());
1878    // ra1 doesn't handle the mutation so the mutation will be pushed back to buffer
1879    ra1.close();
1880    assertEquals(1, mutator.size());
1881    assertEquals(1, mutator.getUnflushedSize());
1882    assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize());
1883  }
1884}