001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.reflect.Field;
028import java.lang.reflect.InvocationTargetException;
029import java.lang.reflect.Modifier;
030import java.net.SocketTimeoutException;
031import java.util.ArrayList;
032import java.util.List;
033import java.util.Set;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.SynchronousQueue;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.ThreadPoolExecutor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicBoolean;
040import java.util.concurrent.atomic.AtomicInteger;
041import java.util.concurrent.atomic.AtomicReference;
042import java.util.stream.Collectors;
043import java.util.stream.IntStream;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.hbase.CallDroppedException;
046import org.apache.hadoop.hbase.CallQueueTooBigException;
047import org.apache.hadoop.hbase.Cell;
048import org.apache.hadoop.hbase.HBaseClassTestRule;
049import org.apache.hadoop.hbase.HBaseServerException;
050import org.apache.hadoop.hbase.HBaseTestingUtility;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.HRegionLocation;
053import org.apache.hadoop.hbase.RegionLocations;
054import org.apache.hadoop.hbase.ServerName;
055import org.apache.hadoop.hbase.TableName;
056import org.apache.hadoop.hbase.Waiter;
057import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
058import org.apache.hadoop.hbase.exceptions.DeserializationException;
059import org.apache.hadoop.hbase.exceptions.RegionMovedException;
060import org.apache.hadoop.hbase.filter.Filter;
061import org.apache.hadoop.hbase.filter.FilterBase;
062import org.apache.hadoop.hbase.ipc.RpcClient;
063import org.apache.hadoop.hbase.master.HMaster;
064import org.apache.hadoop.hbase.regionserver.HRegion;
065import org.apache.hadoop.hbase.regionserver.HRegionServer;
066import org.apache.hadoop.hbase.regionserver.Region;
067import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
068import org.apache.hadoop.hbase.testclassification.LargeTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
071import org.apache.hadoop.hbase.util.JVMClusterUtil;
072import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
073import org.apache.hadoop.hbase.util.Threads;
074import org.junit.After;
075import org.junit.AfterClass;
076import org.junit.Assert;
077import org.junit.BeforeClass;
078import org.junit.ClassRule;
079import org.junit.Ignore;
080import org.junit.Rule;
081import org.junit.Test;
082import org.junit.experimental.categories.Category;
083import org.junit.rules.TestName;
084import org.slf4j.Logger;
085import org.slf4j.LoggerFactory;
086
087import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
088import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
089import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
090import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector.Level;
091
092/**
093 * This class is for testing HBaseConnectionManager features
094 */
095@Category({ LargeTests.class })
096public class TestConnectionImplementation {
097
098  @ClassRule
099  public static final HBaseClassTestRule CLASS_RULE =
100    HBaseClassTestRule.forClass(TestConnectionImplementation.class);
101
102  private static final Logger LOG = LoggerFactory.getLogger(TestConnectionImplementation.class);
103  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
104  private static final TableName TABLE_NAME = TableName.valueOf("test");
105  private static final TableName TABLE_NAME1 = TableName.valueOf("test1");
106  private static final TableName TABLE_NAME2 = TableName.valueOf("test2");
107  private static final TableName TABLE_NAME3 = TableName.valueOf("test3");
108  private static final byte[] FAM_NAM = Bytes.toBytes("f");
109  private static final byte[] ROW = Bytes.toBytes("bbb");
110  private static final byte[] ROW_X = Bytes.toBytes("xxx");
111  private static final int RPC_RETRY = 5;
112
113  @Rule
114  public TestName name = new TestName();
115
116  @BeforeClass
117  public static void setUpBeforeClass() throws Exception {
118    ResourceLeakDetector.setLevel(Level.PARANOID);
119    TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
120    // Up the handlers; this test needs more than usual.
121    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
122    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
123    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 3);
124    TEST_UTIL.startMiniCluster(2);
125
126  }
127
128  @AfterClass
129  public static void tearDownAfterClass() throws Exception {
130    TEST_UTIL.shutdownMiniCluster();
131  }
132
133  @After
134  public void tearDown() throws IOException {
135    TEST_UTIL.getAdmin().balancerSwitch(true, true);
136  }
137
138  @Test
139  public void testClusterConnection() throws IOException {
140    ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS,
141      new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("test-hcm-pool-%d")
142        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
143
144    Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
145    Connection con2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), otherPool);
146    // make sure the internally created ExecutorService is the one passed
147    assertTrue(otherPool == ((ConnectionImplementation) con2).getCurrentBatchPool());
148
149    final TableName tableName = TableName.valueOf(name.getMethodName());
150    TEST_UTIL.createTable(tableName, FAM_NAM).close();
151    Table table = con1.getTable(tableName, otherPool);
152
153    ExecutorService pool = null;
154
155    if (table instanceof HTable) {
156      HTable t = (HTable) table;
157      // make sure passing a pool to the getTable does not trigger creation of an internal pool
158      assertNull("Internal Thread pool should be null",
159        ((ConnectionImplementation) con1).getCurrentBatchPool());
160      // table should use the pool passed
161      assertTrue(otherPool == t.getPool());
162      t.close();
163
164      t = (HTable) con2.getTable(tableName);
165      // table should use the connectin's internal pool
166      assertTrue(otherPool == t.getPool());
167      t.close();
168
169      t = (HTable) con2.getTable(tableName);
170      // try other API too
171      assertTrue(otherPool == t.getPool());
172      t.close();
173
174      t = (HTable) con2.getTable(tableName);
175      // try other API too
176      assertTrue(otherPool == t.getPool());
177      t.close();
178
179      t = (HTable) con1.getTable(tableName);
180      pool = ((ConnectionImplementation) con1).getCurrentBatchPool();
181      // make sure an internal pool was created
182      assertNotNull("An internal Thread pool should have been created", pool);
183      // and that the table is using it
184      assertTrue(t.getPool() == pool);
185      t.close();
186
187      t = (HTable) con1.getTable(tableName);
188      // still using the *same* internal pool
189      assertTrue(t.getPool() == pool);
190      t.close();
191    } else {
192      table.close();
193    }
194
195    con1.close();
196
197    // if the pool was created on demand it should be closed upon connection close
198    if (pool != null) {
199      assertTrue(pool.isShutdown());
200    }
201
202    con2.close();
203    // if the pool is passed, it is not closed
204    assertFalse(otherPool.isShutdown());
205    otherPool.shutdownNow();
206  }
207
208  /**
209   * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object
210   * @throws IOException Unable to construct admin
211   */
212  @Test
213  public void testAdminFactory() throws IOException {
214    Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
215    Admin admin = con1.getAdmin();
216    assertTrue(admin.getConnection() == con1);
217    assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration());
218    con1.close();
219  }
220
221  // Fails too often! Needs work. HBASE-12558
222  // May only fail on non-linux machines? E.g. macosx.
223  @Ignore
224  @Test(expected = RegionServerStoppedException.class)
225  // Depends on mulitcast messaging facility that seems broken in hbase2
226  // See HBASE-19261 "ClusterStatusPublisher where Master could optionally broadcast notice of
227  // dead servers is broke"
228  public void testClusterStatus() throws Exception {
229    final TableName tableName = TableName.valueOf(name.getMethodName());
230    byte[] cf = "cf".getBytes();
231    byte[] rk = "rk1".getBytes();
232
233    JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
234    rs.waitForServerOnline();
235    final ServerName sn = rs.getRegionServer().getServerName();
236
237    Table t = TEST_UTIL.createTable(tableName, cf);
238    TEST_UTIL.waitTableAvailable(tableName);
239    TEST_UTIL.waitUntilNoRegionsInTransition();
240
241    final ConnectionImplementation hci = (ConnectionImplementation) TEST_UTIL.getConnection();
242    try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
243      while (l.getRegionLocation(rk).getPort() != sn.getPort()) {
244        TEST_UTIL.getAdmin().move(l.getRegionLocation(rk).getRegionInfo().getEncodedNameAsBytes(),
245          sn);
246        TEST_UTIL.waitUntilNoRegionsInTransition();
247        hci.clearRegionCache(tableName);
248      }
249      Assert.assertNotNull(hci.clusterStatusListener);
250      TEST_UTIL.assertRegionOnServer(l.getRegionLocation(rk).getRegionInfo(), sn, 20000);
251    }
252
253    Put p1 = new Put(rk);
254    p1.addColumn(cf, "qual".getBytes(), "val".getBytes());
255    t.put(p1);
256
257    rs.getRegionServer().abort("I'm dead");
258
259    // We want the status to be updated. That's a least 10 second
260    TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
261      @Override
262      public boolean evaluate() throws Exception {
263        return TEST_UTIL.getHBaseCluster().getMaster().getServerManager().getDeadServers()
264          .isDeadServer(sn);
265      }
266    });
267
268    TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
269      @Override
270      public boolean evaluate() throws Exception {
271        return hci.clusterStatusListener.isDeadServer(sn);
272      }
273    });
274
275    t.close();
276    hci.getClient(sn); // will throw an exception: RegionServerStoppedException
277  }
278
279  /**
280   * Test that we can handle connection close: it will trigger a retry, but the calls will finish.
281   */
282  @Test
283  public void testConnectionCloseAllowsInterrupt() throws Exception {
284    testConnectionClose(true);
285  }
286
287  @Test
288  public void testConnectionNotAllowsInterrupt() throws Exception {
289    testConnectionClose(false);
290  }
291
292  private void testConnectionClose(boolean allowsInterrupt) throws Exception {
293    TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
294    TEST_UTIL.createTable(tableName, FAM_NAM).close();
295
296    TEST_UTIL.getAdmin().balancerSwitch(false, true);
297
298    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
299    // We want to work on a separate connection.
300    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
301    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot
302    c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries.
303    c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
304    c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt);
305    // to avoid the client to be stuck when do the Get
306    c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 10000);
307    c2.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10000);
308    c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
309
310    Connection connection = ConnectionFactory.createConnection(c2);
311    final Table table = connection.getTable(tableName);
312
313    Put put = new Put(ROW);
314    put.addColumn(FAM_NAM, ROW, ROW);
315    table.put(put);
316
317    // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
318    final AtomicInteger step = new AtomicInteger(0);
319
320    final AtomicReference<Throwable> failed = new AtomicReference<>(null);
321    Thread t = new Thread("testConnectionCloseThread") {
322      @Override
323      public void run() {
324        int done = 0;
325        try {
326          step.set(1);
327          while (step.get() == 1) {
328            Get get = new Get(ROW);
329            table.get(get);
330            done++;
331            if (done % 100 == 0) LOG.info("done=" + done);
332            // without the sleep, will cause the exception for too many files in
333            // org.apache.hadoop.hdfs.server.datanode.DataXceiver
334            Thread.sleep(100);
335          }
336        } catch (Throwable t) {
337          failed.set(t);
338          LOG.error(t.toString(), t);
339        }
340        step.set(3);
341      }
342    };
343    t.start();
344    TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
345      @Override
346      public boolean evaluate() throws Exception {
347        return step.get() == 1;
348      }
349    });
350
351    ServerName sn;
352    try (RegionLocator rl = connection.getRegionLocator(tableName)) {
353      sn = rl.getRegionLocation(ROW).getServerName();
354    }
355    ConnectionImplementation conn = (ConnectionImplementation) connection;
356    RpcClient rpcClient = conn.getRpcClient();
357
358    LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
359    for (int i = 0; i < 500; i++) {
360      rpcClient.cancelConnections(sn);
361      Thread.sleep(50);
362    }
363
364    step.compareAndSet(1, 2);
365    // The test may fail here if the thread doing the gets is stuck. The way to find
366    // out what's happening is to look for the thread named 'testConnectionCloseThread'
367    TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() {
368      @Override
369      public boolean evaluate() throws Exception {
370        return step.get() == 3;
371      }
372    });
373    table.close();
374    connection.close();
375    Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
376  }
377
378  /**
379   * Test that connection can become idle without breaking everything.
380   */
381  @Test
382  public void testConnectionIdle() throws Exception {
383    final TableName tableName = TableName.valueOf(name.getMethodName());
384    TEST_UTIL.createTable(tableName, FAM_NAM).close();
385    int idleTime = 20000;
386    boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true);
387
388    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
389    // We want to work on a separate connection.
390    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
391    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed
392    c2.setInt(RpcClient.IDLE_TIME, idleTime);
393
394    Connection connection = ConnectionFactory.createConnection(c2);
395    final Table table = connection.getTable(tableName);
396
397    Put put = new Put(ROW);
398    put.addColumn(FAM_NAM, ROW, ROW);
399    table.put(put);
400
401    ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
402    mee.setValue(EnvironmentEdgeManager.currentTime());
403    EnvironmentEdgeManager.injectEdge(mee);
404    LOG.info("first get");
405    table.get(new Get(ROW));
406
407    LOG.info("first get - changing the time & sleeping");
408    mee.incValue(idleTime + 1000);
409    Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle.
410                        // 1500 = sleep time in RpcClient#waitForWork + a margin
411
412    LOG.info("second get - connection has been marked idle in the middle");
413    // To check that the connection actually became idle would need to read some private
414    // fields of RpcClient.
415    table.get(new Get(ROW));
416    mee.incValue(idleTime + 1000);
417
418    LOG.info("third get - connection is idle, but the reader doesn't know yet");
419    // We're testing here a special case:
420    // time limit reached BUT connection not yet reclaimed AND a new call.
421    // in this situation, we don't close the connection, instead we use it immediately.
422    // If we're very unlucky we can have a race condition in the test: the connection is already
423    // under closing when we do the get, so we have an exception, and we don't retry as the
424    // retry number is 1. The probability is very very low, and seems acceptable for now. It's
425    // a test issue only.
426    table.get(new Get(ROW));
427
428    LOG.info("we're done - time will change back");
429
430    table.close();
431
432    connection.close();
433    EnvironmentEdgeManager.reset();
434    TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true);
435  }
436
437  /**
438   * Test that the connection to the dead server is cut immediately when we receive the
439   * notification. n
440   */
441  @Test
442  public void testConnectionCut() throws Exception {
443    final TableName tableName = TableName.valueOf(name.getMethodName());
444
445    TEST_UTIL.createTable(tableName, FAM_NAM).close();
446    boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true);
447
448    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
449    // We want to work on a separate connection.
450    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
451    // try only once w/o any retry
452    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
453    c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000);
454
455    final Connection connection = ConnectionFactory.createConnection(c2);
456    final Table table = connection.getTable(tableName);
457
458    Put p = new Put(FAM_NAM);
459    p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM);
460    table.put(p);
461
462    final ConnectionImplementation hci = (ConnectionImplementation) connection;
463
464    final HRegionLocation loc;
465    try (RegionLocator rl = connection.getRegionLocator(tableName)) {
466      loc = rl.getRegionLocation(FAM_NAM);
467    }
468
469    Get get = new Get(FAM_NAM);
470    Assert.assertNotNull(table.get(get));
471
472    get = new Get(FAM_NAM);
473    get.setFilter(new BlockingFilter());
474
475    // This thread will mark the server as dead while we're waiting during a get.
476    Thread t = new Thread() {
477      @Override
478      public void run() {
479        synchronized (syncBlockingFilter) {
480          try {
481            syncBlockingFilter.wait();
482          } catch (InterruptedException e) {
483            throw new RuntimeException(e);
484          }
485        }
486        hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName());
487      }
488    };
489
490    t.start();
491    try {
492      table.get(get);
493      Assert.fail();
494    } catch (IOException expected) {
495      LOG.debug("Received: " + expected);
496      Assert.assertFalse(expected instanceof SocketTimeoutException);
497      Assert.assertFalse(syncBlockingFilter.get());
498    } finally {
499      syncBlockingFilter.set(true);
500      t.join();
501      TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true);
502    }
503
504    table.close();
505    connection.close();
506  }
507
508  protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false);
509
510  public static class BlockingFilter extends FilterBase {
511    @Override
512    public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
513      int i = 0;
514      while (i++ < 1000 && !syncBlockingFilter.get()) {
515        synchronized (syncBlockingFilter) {
516          syncBlockingFilter.notifyAll();
517        }
518        Threads.sleep(100);
519      }
520      syncBlockingFilter.set(true);
521      return false;
522    }
523
524    @Override
525    public ReturnCode filterCell(final Cell ignored) throws IOException {
526      return ReturnCode.INCLUDE;
527    }
528
529    public static Filter parseFrom(final byte[] pbBytes) throws DeserializationException {
530      return new BlockingFilter();
531    }
532  }
533
534  /**
535   * Test that when we delete a location using the first row of a region that we really delete it. n
536   */
537  @Test
538  public void testRegionCaching() throws Exception {
539    TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAM_NAM).close();
540    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
541    // test with no retry, or client cache will get updated after the first failure
542    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
543    Connection connection = ConnectionFactory.createConnection(conf);
544    final Table table = connection.getTable(TABLE_NAME);
545
546    TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
547    Put put = new Put(ROW);
548    put.addColumn(FAM_NAM, ROW, ROW);
549    table.put(put);
550
551    ConnectionImplementation conn = (ConnectionImplementation) connection;
552
553    assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
554
555    // Here we mess with the cached location making it so the region at TABLE_NAME, ROW is at
556    // a location where the port is current port number +1 -- i.e. a non-existent location.
557    HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
558    final int nextPort = loc.getPort() + 1;
559    conn.updateCachedLocation(loc.getRegionInfo(), loc.getServerName(),
560      ServerName.valueOf("127.0.0.1", nextPort, HConstants.LATEST_TIMESTAMP),
561      HConstants.LATEST_TIMESTAMP);
562    Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort(),
563      nextPort);
564
565    conn.clearRegionCache(TABLE_NAME, ROW.clone());
566    RegionLocations rl = conn.getCachedLocation(TABLE_NAME, ROW);
567    assertNull("What is this location?? " + rl, rl);
568
569    // We're now going to move the region and check that it works for the client
570    // First a new put to add the location in the cache
571    conn.clearRegionCache(TABLE_NAME);
572    Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME));
573    Put put2 = new Put(ROW);
574    put2.addColumn(FAM_NAM, ROW, ROW);
575    table.put(put2);
576    assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
577    assertNotNull(conn.getCachedLocation(TableName.valueOf(TABLE_NAME.getName()), ROW.clone()));
578
579    TEST_UTIL.getAdmin().setBalancerRunning(false, false);
580    HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
581
582    // We can wait for all regions to be online, that makes log reading easier when debugging
583    TEST_UTIL.waitUntilNoRegionsInTransition();
584
585    // Now moving the region to the second server
586    HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
587    byte[] regionName = toMove.getRegionInfo().getRegionName();
588    byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
589
590    // Choose the other server.
591    int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
592    int destServerId = curServerId == 0 ? 1 : 0;
593
594    HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
595    HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
596
597    ServerName destServerName = destServer.getServerName();
598
599    // Check that we are in the expected state
600    Assert.assertTrue(curServer != destServer);
601    Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName()));
602    Assert.assertFalse(toMove.getPort() == destServerName.getPort());
603    Assert.assertNotNull(curServer.getOnlineRegion(regionName));
604    Assert.assertNull(destServer.getOnlineRegion(regionName));
605    Assert.assertFalse(
606      TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().hasRegionsInTransition());
607
608    // Moving. It's possible that we don't have all the regions online at this point, so
609    // the test must depend only on the region we're looking at.
610    LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
611    TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName);
612
613    while (
614      destServer.getOnlineRegion(regionName) == null
615        || destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)
616        || curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)
617        || master.getAssignmentManager().hasRegionsInTransition()
618    ) {
619      // wait for the move to be finished
620      Thread.sleep(1);
621    }
622
623    LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString());
624
625    // Check our new state.
626    Assert.assertNull(curServer.getOnlineRegion(regionName));
627    Assert.assertNotNull(destServer.getOnlineRegion(regionName));
628    Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
629    Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
630
631    // Cache was NOT updated and points to the wrong server
632    Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()
633        == destServerName.getPort());
634
635    // This part relies on a number of tries equals to 1.
636    // We do a put and expect the cache to be updated, even if we don't retry
637    LOG.info("Put starting");
638    Put put3 = new Put(ROW);
639    put3.addColumn(FAM_NAM, ROW, ROW);
640    try {
641      table.put(put3);
642      Assert.fail("Unreachable point");
643    } catch (RetriesExhaustedWithDetailsException e) {
644      LOG.info("Put done, exception caught: " + e.getClass());
645      Assert.assertEquals(1, e.getNumExceptions());
646      Assert.assertEquals(1, e.getCauses().size());
647      Assert.assertArrayEquals(ROW, e.getRow(0).getRow());
648
649      // Check that we unserialized the exception as expected
650      Throwable cause = ClientExceptionsUtil.findException(e.getCause(0));
651      Assert.assertNotNull(cause);
652      Assert.assertTrue(cause instanceof RegionMovedException);
653    } catch (RetriesExhaustedException ree) {
654      // hbase2 throws RetriesExhaustedException instead of RetriesExhaustedWithDetailsException
655      // as hbase1 used to do. Keep an eye on this to see if this changed behavior is an issue.
656      LOG.info("Put done, exception caught: " + ree.getClass());
657      Throwable cause = ClientExceptionsUtil.findException(ree.getCause());
658      Assert.assertNotNull(cause);
659      Assert.assertTrue(cause instanceof RegionMovedException);
660    }
661    Assert.assertNotNull("Cached connection is null", conn.getCachedLocation(TABLE_NAME, ROW));
662    Assert.assertEquals("Previous server was " + curServer.getServerName().getAddress(),
663      destServerName.getPort(),
664      conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
665
666    Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
667    Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
668
669    // We move it back to do another test with a scan
670    LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
671    TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(),
672      curServer.getServerName());
673
674    while (
675      curServer.getOnlineRegion(regionName) == null
676        || destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)
677        || curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)
678        || master.getAssignmentManager().hasRegionsInTransition()
679    ) {
680      // wait for the move to be finished
681      Thread.sleep(1);
682    }
683
684    // Check our new state.
685    Assert.assertNotNull(curServer.getOnlineRegion(regionName));
686    Assert.assertNull(destServer.getOnlineRegion(regionName));
687    LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString());
688
689    // Cache was NOT updated and points to the wrong server
690    Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()
691        == curServer.getServerName().getPort());
692
693    Scan sc = new Scan();
694    sc.setStopRow(ROW);
695    sc.setStartRow(ROW);
696
697    // The scanner takes the max retries from the connection configuration, not the table as
698    // the put.
699    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
700
701    try {
702      ResultScanner rs = table.getScanner(sc);
703      while (rs.next() != null) {
704      }
705      Assert.fail("Unreachable point");
706    } catch (RetriesExhaustedException e) {
707      LOG.info("Scan done, expected exception caught: " + e.getClass());
708    }
709
710    // Cache is updated with the right value.
711    Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
712    Assert.assertEquals("Previous server was " + destServer.getServerName().getAddress(),
713      curServer.getServerName().getPort(),
714      conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
715
716    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
717    table.close();
718    connection.close();
719  }
720
721  /**
722   * Test that Connection or Pool are not closed when managed externally n
723   */
724  @Test
725  public void testConnectionManagement() throws Exception {
726    Table table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM);
727    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
728    Table table = conn.getTable(TABLE_NAME1);
729    table.close();
730    assertFalse(conn.isClosed());
731    if (table instanceof HTable) {
732      assertFalse(((HTable) table).getPool().isShutdown());
733    }
734    table = conn.getTable(TABLE_NAME1);
735    table.close();
736    if (table instanceof HTable) {
737      assertFalse(((HTable) table).getPool().isShutdown());
738    }
739    conn.close();
740    if (table instanceof HTable) {
741      assertTrue(((HTable) table).getPool().isShutdown());
742    }
743    table0.close();
744  }
745
746  /**
747   * Test that stale cache updates don't override newer cached values.
748   */
749  @Test
750  public void testCacheSeqNums() throws Exception {
751    Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME2, FAM_NAM);
752    Put put = new Put(ROW);
753    put.addColumn(FAM_NAM, ROW, ROW);
754    table.put(put);
755    ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection();
756
757    HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
758    assertNotNull(location);
759
760    ServerName anySource = ServerName.valueOf(location.getHostname(), location.getPort() - 1, 0L);
761
762    // Same server as already in cache reporting - overwrites any value despite seqNum.
763    int nextPort = location.getPort() + 1;
764    conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
765      ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
766    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
767    Assert.assertEquals(nextPort, location.getPort());
768
769    // No source specified - same.
770    nextPort = location.getPort() + 1;
771    conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
772      ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
773    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
774    Assert.assertEquals(nextPort, location.getPort());
775
776    // Higher seqNum - overwrites lower seqNum.
777    nextPort = location.getPort() + 1;
778    conn.updateCachedLocation(location.getRegionInfo(), anySource,
779      ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() + 1);
780    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
781    Assert.assertEquals(nextPort, location.getPort());
782
783    // Lower seqNum - does not overwrite higher seqNum.
784    nextPort = location.getPort() + 1;
785    conn.updateCachedLocation(location.getRegionInfo(), anySource,
786      ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
787    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
788    Assert.assertEquals(nextPort - 1, location.getPort());
789    table.close();
790  }
791
792  @Test
793  public void testClosing() throws Exception {
794    Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
795    configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
796      String.valueOf(ThreadLocalRandom.current().nextInt()));
797
798    // as connection caching is going away, now we're just testing
799    // that closed connection does actually get closed.
800
801    Connection c1 = ConnectionFactory.createConnection(configuration);
802    Connection c2 = ConnectionFactory.createConnection(configuration);
803    // no caching, different connections
804    assertTrue(c1 != c2);
805
806    // closing independently
807    c1.close();
808    assertTrue(c1.isClosed());
809    assertFalse(c2.isClosed());
810
811    c2.close();
812    assertTrue(c2.isClosed());
813  }
814
815  /**
816   * Trivial test to verify that nobody messes with
817   * {@link ConnectionFactory#createConnection(Configuration)}
818   */
819  @Test
820  public void testCreateConnection() throws Exception {
821    Configuration configuration = TEST_UTIL.getConfiguration();
822    Connection c1 = ConnectionFactory.createConnection(configuration);
823    Connection c2 = ConnectionFactory.createConnection(configuration);
824    // created from the same configuration, yet they are different
825    assertTrue(c1 != c2);
826    assertTrue(c1.getConfiguration() == c2.getConfiguration());
827  }
828
829  /**
830   * This test checks that one can connect to the cluster with only the ZooKeeper quorum set. Other
831   * stuff like master address will be read from ZK by the client.
832   */
833  @Test
834  public void testConnection() throws Exception {
835    // We create an empty config and add the ZK address.
836    Configuration c = new Configuration();
837    // This test only makes sense for ZK based connection registry.
838    c.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
839      HConstants.ZK_CONNECTION_REGISTRY_CLASS);
840    c.set(HConstants.ZOOKEEPER_QUORUM,
841      TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
842    c.set(HConstants.ZOOKEEPER_CLIENT_PORT,
843      TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
844    // This should be enough to connect
845    ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(c);
846    assertTrue(conn.isMasterRunning());
847    conn.close();
848  }
849
850  private int setNumTries(ConnectionImplementation hci, int newVal) throws Exception {
851    Field numTries = hci.getClass().getDeclaredField("numTries");
852    numTries.setAccessible(true);
853    Field modifiersField = Field.class.getDeclaredField("modifiers");
854    modifiersField.setAccessible(true);
855    modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
856    final int prevNumRetriesVal = (Integer) numTries.get(hci);
857    numTries.set(hci, newVal);
858
859    return prevNumRetriesVal;
860  }
861
862  @Test
863  public void testMulti() throws Exception {
864    Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME3, FAM_NAM);
865    try {
866      ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection();
867
868      // We're now going to move the region and check that it works for the client
869      // First a new put to add the location in the cache
870      conn.clearRegionCache(TABLE_NAME3);
871      Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3));
872
873      TEST_UTIL.getAdmin().setBalancerRunning(false, false);
874      HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
875
876      // We can wait for all regions to be online, that makes log reading easier when debugging
877      TEST_UTIL.waitUntilNoRegionsInTransition();
878
879      Put put = new Put(ROW_X);
880      put.addColumn(FAM_NAM, ROW_X, ROW_X);
881      table.put(put);
882
883      // Now moving the region to the second server
884      HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation();
885      byte[] regionName = toMove.getRegionInfo().getRegionName();
886      byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
887
888      // Choose the other server.
889      int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
890      int destServerId = (curServerId == 0 ? 1 : 0);
891
892      HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
893      HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
894
895      ServerName destServerName = destServer.getServerName();
896      ServerName metaServerName = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
897
898      // find another row in the cur server that is less than ROW_X
899      List<HRegion> regions = curServer.getRegions(TABLE_NAME3);
900      byte[] otherRow = null;
901      for (Region region : regions) {
902        if (
903          !region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
904            && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0
905        ) {
906          otherRow = region.getRegionInfo().getStartKey();
907          break;
908        }
909      }
910      assertNotNull(otherRow);
911      // If empty row, set it to first row.-f
912      if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
913      Put put2 = new Put(otherRow);
914      put2.addColumn(FAM_NAM, otherRow, otherRow);
915      table.put(put2); // cache put2's location
916
917      // Check that we are in the expected state
918      Assert.assertTrue(curServer != destServer);
919      Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName());
920      Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
921      Assert.assertNotNull(curServer.getOnlineRegion(regionName));
922      Assert.assertNull(destServer.getOnlineRegion(regionName));
923      Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager()
924        .hasRegionsInTransition());
925
926      // Moving. It's possible that we don't have all the regions online at this point, so
927      // the test depends only on the region we're looking at.
928      LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
929      TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName);
930
931      while (
932        destServer.getOnlineRegion(regionName) == null
933          || destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)
934          || curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)
935          || master.getAssignmentManager().hasRegionsInTransition()
936      ) {
937        // wait for the move to be finished
938        Thread.sleep(1);
939      }
940
941      LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString());
942
943      // Check our new state.
944      Assert.assertNull(curServer.getOnlineRegion(regionName));
945      Assert.assertNotNull(destServer.getOnlineRegion(regionName));
946      Assert
947        .assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
948      Assert
949        .assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
950
951      // Cache was NOT updated and points to the wrong server
952      Assert.assertFalse(conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation().getPort()
953          == destServerName.getPort());
954
955      // Hijack the number of retry to fail after 2 tries
956      final int prevNumRetriesVal = setNumTries(conn, 2);
957
958      Put put3 = new Put(ROW_X);
959      put3.addColumn(FAM_NAM, ROW_X, ROW_X);
960      Put put4 = new Put(otherRow);
961      put4.addColumn(FAM_NAM, otherRow, otherRow);
962
963      // do multi
964      ArrayList<Put> actions = Lists.newArrayList(put4, put3);
965      table.batch(actions, null); // first should be a valid row,
966      // second we get RegionMovedException.
967
968      setNumTries(conn, prevNumRetriesVal);
969    } finally {
970      table.close();
971    }
972  }
973
974  @Test
975  public void testErrorBackoffTimeCalculation() throws Exception {
976    // TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not.
977    final long ANY_PAUSE = 100;
978    ServerName location = ServerName.valueOf("127.0.0.1", 1, 0);
979    ServerName diffLocation = ServerName.valueOf("127.0.0.1", 2, 0);
980
981    ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
982    EnvironmentEdgeManager.injectEdge(timeMachine);
983    try {
984      long largeAmountOfTime = ANY_PAUSE * 1000;
985      ConnectionImplementation.ServerErrorTracker tracker =
986        new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100);
987
988      // The default backoff is 0.
989      assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
990
991      // Check some backoff values from HConstants sequence.
992      tracker.reportServerError(location);
993      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
994        tracker.calculateBackoffTime(location, ANY_PAUSE));
995      tracker.reportServerError(location);
996      tracker.reportServerError(location);
997      tracker.reportServerError(location);
998      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[3],
999        tracker.calculateBackoffTime(location, ANY_PAUSE));
1000
1001      // All of this shouldn't affect backoff for different location.
1002      assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1003      tracker.reportServerError(diffLocation);
1004      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
1005        tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1006
1007      // Check with different base.
1008      assertEqualsWithJitter(ANY_PAUSE * 2 * HConstants.RETRY_BACKOFF[3],
1009        tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
1010    } finally {
1011      EnvironmentEdgeManager.reset();
1012    }
1013  }
1014
1015  private static void assertEqualsWithJitter(long expected, long actual) {
1016    assertEqualsWithJitter(expected, actual, expected);
1017  }
1018
1019  private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) {
1020    assertTrue("Value not within jitter: " + expected + " vs " + actual,
1021      Math.abs(actual - expected) <= (0.01f * jitterBase));
1022  }
1023
1024  @Test
1025  public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException {
1026    Configuration config = new Configuration(TEST_UTIL.getConfiguration());
1027    // This test only makes sense for ZK based connection registry.
1028    config.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
1029      HConstants.ZK_CONNECTION_REGISTRY_CLASS);
1030
1031    final TableName tableName = TableName.valueOf(name.getMethodName());
1032    TEST_UTIL.createTable(tableName, new byte[][] { FAM_NAM }).close();
1033
1034    Connection connection = ConnectionFactory.createConnection(config);
1035    Table table = connection.getTable(tableName);
1036
1037    // this will cache the meta location and table's region location
1038    table.get(new Get(Bytes.toBytes("foo")));
1039
1040    // restart HBase
1041    TEST_UTIL.shutdownMiniHBaseCluster();
1042    TEST_UTIL.restartHBaseCluster(2);
1043    // this should be able to discover new locations for meta and table's region
1044    table.get(new Get(Bytes.toBytes("foo")));
1045    TEST_UTIL.deleteTable(tableName);
1046    table.close();
1047    connection.close();
1048  }
1049
1050  @Test
1051  public void testLocateRegionsWithRegionReplicas() throws IOException {
1052    int regionReplication = 3;
1053    byte[] family = Bytes.toBytes("cf");
1054    TableName tableName = TableName.valueOf(name.getMethodName());
1055
1056    // Create a table with region replicas
1057    TableDescriptorBuilder builder =
1058      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(regionReplication)
1059        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
1060    TEST_UTIL.getAdmin().createTable(builder.build());
1061
1062    try (ConnectionImplementation con =
1063      (ConnectionImplementation) ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
1064      // Get locations of the regions of the table
1065      List<HRegionLocation> locations = con.locateRegions(tableName, false, false);
1066
1067      // The size of the returned locations should be 3
1068      assertEquals(regionReplication, locations.size());
1069
1070      // The replicaIds of the returned locations should be 0, 1 and 2
1071      Set<Integer> expectedReplicaIds =
1072        IntStream.range(0, regionReplication).boxed().collect(Collectors.toSet());
1073      for (HRegionLocation location : locations) {
1074        assertTrue(expectedReplicaIds.remove(location.getRegion().getReplicaId()));
1075      }
1076    } finally {
1077      TEST_UTIL.deleteTable(tableName);
1078    }
1079  }
1080
1081  @Test
1082  public void testLocateRegionsRetrySpecialPauseCQTBE() throws IOException {
1083    testLocateRegionsRetrySpecialPause(CallQueueTooBigException.class);
1084  }
1085
1086  @Test
1087  public void testLocateRegionsRetrySpecialPauseCDE() throws IOException {
1088    testLocateRegionsRetrySpecialPause(CallDroppedException.class);
1089  }
1090
1091  private void testLocateRegionsRetrySpecialPause(
1092    Class<? extends HBaseServerException> exceptionClass) throws IOException {
1093
1094    int regionReplication = 3;
1095    byte[] family = Bytes.toBytes("cf");
1096    TableName tableName = TableName.valueOf(name.getMethodName());
1097
1098    // Create a table with region replicas
1099    TableDescriptorBuilder builder =
1100      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(regionReplication)
1101        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
1102    TEST_UTIL.getAdmin().createTable(builder.build());
1103
1104    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
1105
1106    conf.setClass(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, ThrowingCallerFactory.class,
1107      RpcRetryingCallerFactory.class);
1108    conf.setClass("testSpecialPauseException", exceptionClass, HBaseServerException.class);
1109
1110    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
1111    // normal pause very short, 10 millis
1112    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10);
1113
1114    // special pause 10x longer, so we can detect it
1115    long specialPauseMillis = 1000;
1116    conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
1117      specialPauseMillis);
1118
1119    try (ConnectionImplementation con =
1120      (ConnectionImplementation) ConnectionFactory.createConnection(conf)) {
1121      // Get locations of the regions of the table
1122
1123      long start = System.nanoTime();
1124      try {
1125        con.locateRegion(tableName, new byte[0], false, true, 0);
1126      } catch (HBaseServerException e) {
1127        assertTrue(e.isServerOverloaded());
1128        // pass: expected
1129      }
1130      assertTrue(System.nanoTime() - start > TimeUnit.MILLISECONDS.toNanos(specialPauseMillis));
1131    } finally {
1132      TEST_UTIL.deleteTable(tableName);
1133    }
1134  }
1135
1136  private static class ThrowingCallerFactory extends RpcRetryingCallerFactory {
1137
1138    private final Class<? extends HBaseServerException> exceptionClass;
1139
1140    public ThrowingCallerFactory(Configuration conf) {
1141      super(conf);
1142      this.exceptionClass =
1143        conf.getClass("testSpecialPauseException", null, HBaseServerException.class);
1144    }
1145
1146    @Override
1147    public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
1148      return newCaller();
1149    }
1150
1151    @Override
1152    public <T> RpcRetryingCaller<T> newCaller() {
1153      return new RpcRetryingCaller<T>() {
1154        @Override
1155        public void cancel() {
1156
1157        }
1158
1159        @Override
1160        public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
1161          throws IOException, RuntimeException {
1162          return callWithoutRetries(null, 0);
1163        }
1164
1165        @Override
1166        public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
1167          throws IOException, RuntimeException {
1168          try {
1169            throw exceptionClass.getConstructor().newInstance();
1170          } catch (IllegalAccessException | InstantiationException | InvocationTargetException
1171            | NoSuchMethodException e) {
1172            throw new RuntimeException(e);
1173          }
1174        }
1175      };
1176    }
1177  }
1178
1179  @Test
1180  public void testMetaLookupThreadPoolCreated() throws Exception {
1181    final TableName tableName = TableName.valueOf(name.getMethodName());
1182    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
1183    if (TEST_UTIL.getAdmin().tableExists(tableName)) {
1184      TEST_UTIL.getAdmin().disableTable(tableName);
1185      TEST_UTIL.getAdmin().deleteTable(tableName);
1186    }
1187    try (Table htable = TEST_UTIL.createTable(tableName, FAMILIES)) {
1188      byte[] row = Bytes.toBytes("test");
1189      ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection());
1190      // check that metalookup pool would get created
1191      c.relocateRegion(tableName, row);
1192      ExecutorService ex = c.getCurrentMetaLookupPool();
1193      assertNotNull(ex);
1194    }
1195  }
1196
1197  // There is no assertion, but you need to confirm that there is no resource leak output from netty
1198  @Test
1199  public void testCancelConnectionMemoryLeak() throws IOException, InterruptedException {
1200    TableName tableName = TableName.valueOf(name.getMethodName());
1201    TEST_UTIL.createTable(tableName, FAM_NAM).close();
1202    TEST_UTIL.getAdmin().balancerSwitch(false, true);
1203    try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
1204      Table table = connection.getTable(tableName)) {
1205      table.get(new Get(Bytes.toBytes("1")));
1206      ServerName sn = TEST_UTIL.getRSForFirstRegionInTable(tableName).getServerName();
1207      RpcClient rpcClient = ((ConnectionImplementation) connection).getRpcClient();
1208      rpcClient.cancelConnections(sn);
1209      Thread.sleep(1000);
1210      System.gc();
1211      Thread.sleep(1000);
1212    }
1213  }
1214}