001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.reflect.Field;
028import java.lang.reflect.Modifier;
029import java.net.SocketTimeoutException;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.Set;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.SynchronousQueue;
035import java.util.concurrent.ThreadLocalRandom;
036import java.util.concurrent.ThreadPoolExecutor;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.concurrent.atomic.AtomicReference;
041import java.util.stream.Collectors;
042import java.util.stream.IntStream;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.HBaseClassTestRule;
046import org.apache.hadoop.hbase.HBaseTestingUtility;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.HRegionLocation;
049import org.apache.hadoop.hbase.RegionLocations;
050import org.apache.hadoop.hbase.ServerName;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.Waiter;
053import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
054import org.apache.hadoop.hbase.exceptions.DeserializationException;
055import org.apache.hadoop.hbase.exceptions.RegionMovedException;
056import org.apache.hadoop.hbase.filter.Filter;
057import org.apache.hadoop.hbase.filter.FilterBase;
058import org.apache.hadoop.hbase.ipc.RpcClient;
059import org.apache.hadoop.hbase.master.HMaster;
060import org.apache.hadoop.hbase.regionserver.HRegion;
061import org.apache.hadoop.hbase.regionserver.HRegionServer;
062import org.apache.hadoop.hbase.regionserver.Region;
063import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
064import org.apache.hadoop.hbase.testclassification.LargeTests;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.hadoop.hbase.util.JVMClusterUtil;
068import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
069import org.apache.hadoop.hbase.util.Threads;
070import org.junit.After;
071import org.junit.AfterClass;
072import org.junit.Assert;
073import org.junit.BeforeClass;
074import org.junit.ClassRule;
075import org.junit.Ignore;
076import org.junit.Rule;
077import org.junit.Test;
078import org.junit.experimental.categories.Category;
079import org.junit.rules.TestName;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082
083import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
084import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
085import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector.Level;
086
087/**
088 * This class is for testing HBaseConnectionManager features
089 */
090@Category({LargeTests.class})
091public class TestConnectionImplementation {
092
093  @ClassRule
094  public static final HBaseClassTestRule CLASS_RULE =
095      HBaseClassTestRule.forClass(TestConnectionImplementation.class);
096
097  private static final Logger LOG = LoggerFactory.getLogger(TestConnectionImplementation.class);
098  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
099  private static final TableName TABLE_NAME =
100      TableName.valueOf("test");
101  private static final TableName TABLE_NAME1 =
102      TableName.valueOf("test1");
103  private static final TableName TABLE_NAME2 =
104      TableName.valueOf("test2");
105  private static final TableName TABLE_NAME3 =
106      TableName.valueOf("test3");
107  private static final byte[] FAM_NAM = Bytes.toBytes("f");
108  private static final byte[] ROW = Bytes.toBytes("bbb");
109  private static final byte[] ROW_X = Bytes.toBytes("xxx");
110  private static final int RPC_RETRY = 5;
111
112  @Rule
113  public TestName name = new TestName();
114
115  @BeforeClass
116  public static void setUpBeforeClass() throws Exception {
117    ResourceLeakDetector.setLevel(Level.PARANOID);
118    TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
119    // Up the handlers; this test needs more than usual.
120    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
121    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
122    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 3);
123    TEST_UTIL.startMiniCluster(2);
124
125  }
126
127  @AfterClass
128  public static void tearDownAfterClass() throws Exception {
129    TEST_UTIL.shutdownMiniCluster();
130  }
131
132  @After
133  public void tearDown() throws IOException {
134    TEST_UTIL.getAdmin().balancerSwitch(true, true);
135  }
136
137  @Test
138  public void testClusterConnection() throws IOException {
139    ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1,
140        5, TimeUnit.SECONDS,
141        new SynchronousQueue<>(),
142        Threads.newDaemonThreadFactory("test-hcm"));
143
144    Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
145    Connection con2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), otherPool);
146    // make sure the internally created ExecutorService is the one passed
147    assertTrue(otherPool == ((ConnectionImplementation) con2).getCurrentBatchPool());
148
149    final TableName tableName = TableName.valueOf(name.getMethodName());
150    TEST_UTIL.createTable(tableName, FAM_NAM).close();
151    Table table = con1.getTable(tableName, otherPool);
152
153    ExecutorService pool = null;
154
155    if(table instanceof HTable) {
156      HTable t = (HTable) table;
157      // make sure passing a pool to the getTable does not trigger creation of an internal pool
158      assertNull("Internal Thread pool should be null",
159        ((ConnectionImplementation) con1).getCurrentBatchPool());
160      // table should use the pool passed
161      assertTrue(otherPool == t.getPool());
162      t.close();
163
164      t = (HTable) con2.getTable(tableName);
165      // table should use the connectin's internal pool
166      assertTrue(otherPool == t.getPool());
167      t.close();
168
169      t = (HTable) con2.getTable(tableName);
170      // try other API too
171      assertTrue(otherPool == t.getPool());
172      t.close();
173
174      t = (HTable) con2.getTable(tableName);
175      // try other API too
176      assertTrue(otherPool == t.getPool());
177      t.close();
178
179      t = (HTable) con1.getTable(tableName);
180      pool = ((ConnectionImplementation) con1).getCurrentBatchPool();
181      // make sure an internal pool was created
182      assertNotNull("An internal Thread pool should have been created", pool);
183      // and that the table is using it
184      assertTrue(t.getPool() == pool);
185      t.close();
186
187      t = (HTable) con1.getTable(tableName);
188      // still using the *same* internal pool
189      assertTrue(t.getPool() == pool);
190      t.close();
191    } else {
192      table.close();
193    }
194
195    con1.close();
196
197    // if the pool was created on demand it should be closed upon connection close
198    if(pool != null) {
199      assertTrue(pool.isShutdown());
200    }
201
202    con2.close();
203    // if the pool is passed, it is not closed
204    assertFalse(otherPool.isShutdown());
205    otherPool.shutdownNow();
206  }
207
208  /**
209   * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object
210   * @throws IOException Unable to construct admin
211   */
212  @Test
213  public void testAdminFactory() throws IOException {
214    Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
215    Admin admin = con1.getAdmin();
216    assertTrue(admin.getConnection() == con1);
217    assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration());
218    con1.close();
219  }
220
221  // Fails too often!  Needs work.  HBASE-12558
222  // May only fail on non-linux machines? E.g. macosx.
223  @Ignore @Test (expected = RegionServerStoppedException.class)
224  // Depends on mulitcast messaging facility that seems broken in hbase2
225  // See  HBASE-19261 "ClusterStatusPublisher where Master could optionally broadcast notice of
226  // dead servers is broke"
227  public void testClusterStatus() throws Exception {
228    final TableName tableName = TableName.valueOf(name.getMethodName());
229    byte[] cf = "cf".getBytes();
230    byte[] rk = "rk1".getBytes();
231
232    JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
233    rs.waitForServerOnline();
234    final ServerName sn = rs.getRegionServer().getServerName();
235
236    Table t = TEST_UTIL.createTable(tableName, cf);
237    TEST_UTIL.waitTableAvailable(tableName);
238    TEST_UTIL.waitUntilNoRegionsInTransition();
239
240    final ConnectionImplementation hci =  (ConnectionImplementation)TEST_UTIL.getConnection();
241    try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
242      while (l.getRegionLocation(rk).getPort() != sn.getPort()) {
243        TEST_UTIL.getAdmin().move(l.getRegionLocation(rk).getRegionInfo().getEncodedNameAsBytes(),
244          sn);
245        TEST_UTIL.waitUntilNoRegionsInTransition();
246        hci.clearRegionCache(tableName);
247      }
248      Assert.assertNotNull(hci.clusterStatusListener);
249      TEST_UTIL.assertRegionOnServer(l.getRegionLocation(rk).getRegionInfo(), sn, 20000);
250    }
251
252    Put p1 = new Put(rk);
253    p1.addColumn(cf, "qual".getBytes(), "val".getBytes());
254    t.put(p1);
255
256    rs.getRegionServer().abort("I'm dead");
257
258    // We want the status to be updated. That's a least 10 second
259    TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
260      @Override
261      public boolean evaluate() throws Exception {
262        return TEST_UTIL.getHBaseCluster().getMaster().getServerManager().
263            getDeadServers().isDeadServer(sn);
264      }
265    });
266
267    TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
268      @Override
269      public boolean evaluate() throws Exception {
270        return hci.clusterStatusListener.isDeadServer(sn);
271      }
272    });
273
274    t.close();
275    hci.getClient(sn);  // will throw an exception: RegionServerStoppedException
276  }
277
278  /**
279   * Test that we can handle connection close: it will trigger a retry, but the calls will finish.
280   */
281  @Test
282  public void testConnectionCloseAllowsInterrupt() throws Exception {
283    testConnectionClose(true);
284  }
285
286  @Test
287  public void testConnectionNotAllowsInterrupt() throws Exception {
288    testConnectionClose(false);
289  }
290
291  private void testConnectionClose(boolean allowsInterrupt) throws Exception {
292    TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
293    TEST_UTIL.createTable(tableName, FAM_NAM).close();
294
295    TEST_UTIL.getAdmin().balancerSwitch(false, true);
296
297    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
298    // We want to work on a separate connection.
299    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
300    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot
301    c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries.
302    c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
303    c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt);
304    // to avoid the client to be stuck when do the Get
305    c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 10000);
306    c2.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10000);
307    c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
308
309    Connection connection = ConnectionFactory.createConnection(c2);
310    final Table table = connection.getTable(tableName);
311
312    Put put = new Put(ROW);
313    put.addColumn(FAM_NAM, ROW, ROW);
314    table.put(put);
315
316    // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
317    final AtomicInteger step = new AtomicInteger(0);
318
319    final AtomicReference<Throwable> failed = new AtomicReference<>(null);
320    Thread t = new Thread("testConnectionCloseThread") {
321      @Override
322      public void run() {
323        int done = 0;
324        try {
325          step.set(1);
326          while (step.get() == 1) {
327            Get get = new Get(ROW);
328            table.get(get);
329            done++;
330            if (done % 100 == 0)
331              LOG.info("done=" + done);
332            // without the sleep, will cause the exception for too many files in
333            // org.apache.hadoop.hdfs.server.datanode.DataXceiver
334            Thread.sleep(100);
335          }
336        } catch (Throwable t) {
337          failed.set(t);
338          LOG.error(t.toString(), t);
339        }
340        step.set(3);
341      }
342    };
343    t.start();
344    TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
345      @Override
346      public boolean evaluate() throws Exception {
347        return step.get() == 1;
348      }
349    });
350
351    ServerName sn;
352    try(RegionLocator rl = connection.getRegionLocator(tableName)) {
353      sn = rl.getRegionLocation(ROW).getServerName();
354    }
355    ConnectionImplementation conn =
356        (ConnectionImplementation) connection;
357    RpcClient rpcClient = conn.getRpcClient();
358
359    LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
360    for (int i = 0; i < 500; i++) {
361      rpcClient.cancelConnections(sn);
362      Thread.sleep(50);
363    }
364
365    step.compareAndSet(1, 2);
366    // The test may fail here if the thread doing the gets is stuck. The way to find
367    //  out what's happening is to look for the thread named 'testConnectionCloseThread'
368    TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() {
369      @Override
370      public boolean evaluate() throws Exception {
371        return step.get() == 3;
372      }
373    });
374    table.close();
375    connection.close();
376    Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
377  }
378
379  /**
380   * Test that connection can become idle without breaking everything.
381   */
382  @Test
383  public void testConnectionIdle() throws Exception {
384    final TableName tableName = TableName.valueOf(name.getMethodName());
385    TEST_UTIL.createTable(tableName, FAM_NAM).close();
386    int idleTime =  20000;
387    boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true);
388
389    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
390    // We want to work on a separate connection.
391    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
392    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed
393    c2.setInt(RpcClient.IDLE_TIME, idleTime);
394
395    Connection connection = ConnectionFactory.createConnection(c2);
396    final Table table = connection.getTable(tableName);
397
398    Put put = new Put(ROW);
399    put.addColumn(FAM_NAM, ROW, ROW);
400    table.put(put);
401
402    ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
403    mee.setValue(System.currentTimeMillis());
404    EnvironmentEdgeManager.injectEdge(mee);
405    LOG.info("first get");
406    table.get(new Get(ROW));
407
408    LOG.info("first get - changing the time & sleeping");
409    mee.incValue(idleTime + 1000);
410    Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle.
411                        // 1500 = sleep time in RpcClient#waitForWork + a margin
412
413    LOG.info("second get - connection has been marked idle in the middle");
414    // To check that the connection actually became idle would need to read some private
415    //  fields of RpcClient.
416    table.get(new Get(ROW));
417    mee.incValue(idleTime + 1000);
418
419    LOG.info("third get - connection is idle, but the reader doesn't know yet");
420    // We're testing here a special case:
421    //  time limit reached BUT connection not yet reclaimed AND a new call.
422    //  in this situation, we don't close the connection, instead we use it immediately.
423    // If we're very unlucky we can have a race condition in the test: the connection is already
424    //  under closing when we do the get, so we have an exception, and we don't retry as the
425    //  retry number is 1. The probability is very very low, and seems acceptable for now. It's
426    //  a test issue only.
427    table.get(new Get(ROW));
428
429    LOG.info("we're done - time will change back");
430
431    table.close();
432
433    connection.close();
434    EnvironmentEdgeManager.reset();
435    TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true);
436  }
437
438    /**
439     * Test that the connection to the dead server is cut immediately when we receive the
440     *  notification.
441     * @throws Exception
442     */
443  @Test
444  public void testConnectionCut() throws Exception {
445    final TableName tableName = TableName.valueOf(name.getMethodName());
446
447    TEST_UTIL.createTable(tableName, FAM_NAM).close();
448    boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true);
449
450    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
451    // We want to work on a separate connection.
452    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
453    // try only once w/o any retry
454    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
455    c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000);
456
457    final Connection connection = ConnectionFactory.createConnection(c2);
458    final Table table = connection.getTable(tableName);
459
460    Put p = new Put(FAM_NAM);
461    p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM);
462    table.put(p);
463
464    final ConnectionImplementation hci =  (ConnectionImplementation) connection;
465
466    final HRegionLocation loc;
467    try(RegionLocator rl = connection.getRegionLocator(tableName)) {
468      loc = rl.getRegionLocation(FAM_NAM);
469    }
470
471    Get get = new Get(FAM_NAM);
472    Assert.assertNotNull(table.get(get));
473
474    get = new Get(FAM_NAM);
475    get.setFilter(new BlockingFilter());
476
477    // This thread will mark the server as dead while we're waiting during a get.
478    Thread t = new Thread() {
479      @Override
480      public void run() {
481        synchronized (syncBlockingFilter) {
482          try {
483            syncBlockingFilter.wait();
484          } catch (InterruptedException e) {
485            throw new RuntimeException(e);
486          }
487        }
488        hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName());
489      }
490    };
491
492    t.start();
493    try {
494      table.get(get);
495      Assert.fail();
496    } catch (IOException expected) {
497      LOG.debug("Received: " + expected);
498      Assert.assertFalse(expected instanceof SocketTimeoutException);
499      Assert.assertFalse(syncBlockingFilter.get());
500    } finally {
501      syncBlockingFilter.set(true);
502      t.join();
503      TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true);
504    }
505
506    table.close();
507    connection.close();
508  }
509
510  protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false);
511
512  public static class BlockingFilter extends FilterBase {
513    @Override
514    public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
515      int i = 0;
516      while (i++ < 1000 && !syncBlockingFilter.get()) {
517        synchronized (syncBlockingFilter) {
518          syncBlockingFilter.notifyAll();
519        }
520        Threads.sleep(100);
521      }
522      syncBlockingFilter.set(true);
523      return false;
524    }
525    @Override
526    public ReturnCode filterCell(final Cell ignored) throws IOException {
527      return ReturnCode.INCLUDE;
528    }
529
530    public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException{
531      return new BlockingFilter();
532    }
533  }
534
535  /**
536   * Test that when we delete a location using the first row of a region
537   * that we really delete it.
538   * @throws Exception
539   */
540  @Test
541  public void testRegionCaching() throws Exception {
542    TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAM_NAM).close();
543    Configuration conf =  new Configuration(TEST_UTIL.getConfiguration());
544    // test with no retry, or client cache will get updated after the first failure
545    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
546    Connection connection = ConnectionFactory.createConnection(conf);
547    final Table table = connection.getTable(TABLE_NAME);
548
549    TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
550    Put put = new Put(ROW);
551    put.addColumn(FAM_NAM, ROW, ROW);
552    table.put(put);
553
554    ConnectionImplementation conn = (ConnectionImplementation) connection;
555
556    assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
557
558    // Here we mess with the cached location making it so the region at TABLE_NAME, ROW is at
559    // a location where the port is current port number +1 -- i.e. a non-existent location.
560    HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
561    final int nextPort = loc.getPort() + 1;
562    conn.updateCachedLocation(loc.getRegionInfo(), loc.getServerName(),
563        ServerName.valueOf("127.0.0.1", nextPort,
564        HConstants.LATEST_TIMESTAMP), HConstants.LATEST_TIMESTAMP);
565    Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW)
566      .getRegionLocation().getPort(), nextPort);
567
568    conn.clearRegionCache(TABLE_NAME, ROW.clone());
569    RegionLocations rl = conn.getCachedLocation(TABLE_NAME, ROW);
570    assertNull("What is this location?? " + rl, rl);
571
572    // We're now going to move the region and check that it works for the client
573    // First a new put to add the location in the cache
574    conn.clearRegionCache(TABLE_NAME);
575    Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME));
576    Put put2 = new Put(ROW);
577    put2.addColumn(FAM_NAM, ROW, ROW);
578    table.put(put2);
579    assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
580    assertNotNull(conn.getCachedLocation(TableName.valueOf(TABLE_NAME.getName()), ROW.clone()));
581
582    TEST_UTIL.getAdmin().setBalancerRunning(false, false);
583    HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
584
585    // We can wait for all regions to be online, that makes log reading easier when debugging
586    TEST_UTIL.waitUntilNoRegionsInTransition();
587
588    // Now moving the region to the second server
589    HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
590    byte[] regionName = toMove.getRegionInfo().getRegionName();
591    byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
592
593    // Choose the other server.
594    int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
595    int destServerId = curServerId == 0? 1: 0;
596
597    HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
598    HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
599
600    ServerName destServerName = destServer.getServerName();
601
602    // Check that we are in the expected state
603    Assert.assertTrue(curServer != destServer);
604    Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName()));
605    Assert.assertFalse( toMove.getPort() == destServerName.getPort());
606    Assert.assertNotNull(curServer.getOnlineRegion(regionName));
607    Assert.assertNull(destServer.getOnlineRegion(regionName));
608    Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
609        getAssignmentManager().hasRegionsInTransition());
610
611    // Moving. It's possible that we don't have all the regions online at this point, so
612    //  the test must depend only on the region we're looking at.
613    LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
614    TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName);
615
616    while (destServer.getOnlineRegion(regionName) == null ||
617        destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
618        curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
619        master.getAssignmentManager().hasRegionsInTransition()) {
620      // wait for the move to be finished
621      Thread.sleep(1);
622    }
623
624    LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
625
626    // Check our new state.
627    Assert.assertNull(curServer.getOnlineRegion(regionName));
628    Assert.assertNotNull(destServer.getOnlineRegion(regionName));
629    Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
630    Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
631
632
633    // Cache was NOT updated and points to the wrong server
634    Assert.assertFalse(
635        conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation()
636          .getPort() == destServerName.getPort());
637
638    // This part relies on a number of tries equals to 1.
639    // We do a put and expect the cache to be updated, even if we don't retry
640    LOG.info("Put starting");
641    Put put3 = new Put(ROW);
642    put3.addColumn(FAM_NAM, ROW, ROW);
643    try {
644      table.put(put3);
645      Assert.fail("Unreachable point");
646    } catch (RetriesExhaustedWithDetailsException e) {
647      LOG.info("Put done, exception caught: " + e.getClass());
648      Assert.assertEquals(1, e.getNumExceptions());
649      Assert.assertEquals(1, e.getCauses().size());
650      Assert.assertArrayEquals(ROW, e.getRow(0).getRow());
651
652      // Check that we unserialized the exception as expected
653      Throwable cause = ClientExceptionsUtil.findException(e.getCause(0));
654      Assert.assertNotNull(cause);
655      Assert.assertTrue(cause instanceof RegionMovedException);
656    } catch (RetriesExhaustedException ree) {
657      // hbase2 throws RetriesExhaustedException instead of RetriesExhaustedWithDetailsException
658      // as hbase1 used to do. Keep an eye on this to see if this changed behavior is an issue.
659      LOG.info("Put done, exception caught: " + ree.getClass());
660      Throwable cause = ClientExceptionsUtil.findException(ree.getCause());
661      Assert.assertNotNull(cause);
662      Assert.assertTrue(cause instanceof RegionMovedException);
663    }
664    Assert.assertNotNull("Cached connection is null", conn.getCachedLocation(TABLE_NAME, ROW));
665    Assert.assertEquals(
666        "Previous server was " + curServer.getServerName().getHostAndPort(),
667        destServerName.getPort(),
668        conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
669
670    Assert.assertFalse(destServer.getRegionsInTransitionInRS()
671      .containsKey(encodedRegionNameBytes));
672    Assert.assertFalse(curServer.getRegionsInTransitionInRS()
673      .containsKey(encodedRegionNameBytes));
674
675    // We move it back to do another test with a scan
676    LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
677    TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(),
678      curServer.getServerName());
679
680    while (curServer.getOnlineRegion(regionName) == null ||
681        destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
682        curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
683        master.getAssignmentManager().hasRegionsInTransition()) {
684      // wait for the move to be finished
685      Thread.sleep(1);
686    }
687
688    // Check our new state.
689    Assert.assertNotNull(curServer.getOnlineRegion(regionName));
690    Assert.assertNull(destServer.getOnlineRegion(regionName));
691    LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString());
692
693    // Cache was NOT updated and points to the wrong server
694    Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() ==
695      curServer.getServerName().getPort());
696
697    Scan sc = new Scan();
698    sc.setStopRow(ROW);
699    sc.setStartRow(ROW);
700
701    // The scanner takes the max retries from the connection configuration, not the table as
702    // the put.
703    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
704
705    try {
706      ResultScanner rs = table.getScanner(sc);
707      while (rs.next() != null) {
708      }
709      Assert.fail("Unreachable point");
710    } catch (RetriesExhaustedException e) {
711      LOG.info("Scan done, expected exception caught: " + e.getClass());
712    }
713
714    // Cache is updated with the right value.
715    Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
716    Assert.assertEquals(
717      "Previous server was "+destServer.getServerName().getHostAndPort(),
718      curServer.getServerName().getPort(),
719      conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
720
721    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
722    table.close();
723    connection.close();
724  }
725
726  /**
727   * Test that Connection or Pool are not closed when managed externally
728   * @throws Exception
729   */
730  @Test
731  public void testConnectionManagement() throws Exception{
732    Table table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM);
733    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
734    Table table = conn.getTable(TABLE_NAME1);
735    table.close();
736    assertFalse(conn.isClosed());
737    if(table instanceof HTable) {
738      assertFalse(((HTable) table).getPool().isShutdown());
739    }
740    table = conn.getTable(TABLE_NAME1);
741    table.close();
742    if(table instanceof HTable) {
743      assertFalse(((HTable) table).getPool().isShutdown());
744    }
745    conn.close();
746    if(table instanceof HTable) {
747      assertTrue(((HTable) table).getPool().isShutdown());
748    }
749    table0.close();
750  }
751
752  /**
753   * Test that stale cache updates don't override newer cached values.
754   */
755  @Test
756  public void testCacheSeqNums() throws Exception{
757    Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME2, FAM_NAM);
758    Put put = new Put(ROW);
759    put.addColumn(FAM_NAM, ROW, ROW);
760    table.put(put);
761    ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection();
762
763    HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
764    assertNotNull(location);
765
766    ServerName anySource = ServerName.valueOf(location.getHostname(), location.getPort() - 1, 0L);
767
768    // Same server as already in cache reporting - overwrites any value despite seqNum.
769    int nextPort = location.getPort() + 1;
770    conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
771        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
772    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
773    Assert.assertEquals(nextPort, location.getPort());
774
775    // No source specified - same.
776    nextPort = location.getPort() + 1;
777    conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
778        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
779    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
780    Assert.assertEquals(nextPort, location.getPort());
781
782    // Higher seqNum - overwrites lower seqNum.
783    nextPort = location.getPort() + 1;
784    conn.updateCachedLocation(location.getRegionInfo(), anySource,
785        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() + 1);
786    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
787    Assert.assertEquals(nextPort, location.getPort());
788
789    // Lower seqNum - does not overwrite higher seqNum.
790    nextPort = location.getPort() + 1;
791    conn.updateCachedLocation(location.getRegionInfo(), anySource,
792        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
793    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
794    Assert.assertEquals(nextPort - 1, location.getPort());
795    table.close();
796  }
797
798  @Test
799  public void testClosing() throws Exception {
800    Configuration configuration =
801      new Configuration(TEST_UTIL.getConfiguration());
802    configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
803      String.valueOf(ThreadLocalRandom.current().nextInt()));
804
805    // as connection caching is going away, now we're just testing
806    // that closed connection does actually get closed.
807
808    Connection c1 = ConnectionFactory.createConnection(configuration);
809    Connection c2 = ConnectionFactory.createConnection(configuration);
810    // no caching, different connections
811    assertTrue(c1 != c2);
812
813    // closing independently
814    c1.close();
815    assertTrue(c1.isClosed());
816    assertFalse(c2.isClosed());
817
818    c2.close();
819    assertTrue(c2.isClosed());
820  }
821
822  /**
823   * Trivial test to verify that nobody messes with
824   * {@link ConnectionFactory#createConnection(Configuration)}
825   */
826  @Test
827  public void testCreateConnection() throws Exception {
828    Configuration configuration = TEST_UTIL.getConfiguration();
829    Connection c1 = ConnectionFactory.createConnection(configuration);
830    Connection c2 = ConnectionFactory.createConnection(configuration);
831    // created from the same configuration, yet they are different
832    assertTrue(c1 != c2);
833    assertTrue(c1.getConfiguration() == c2.getConfiguration());
834  }
835
836  /**
837   * This test checks that one can connect to the cluster with only the
838   *  ZooKeeper quorum set. Other stuff like master address will be read
839   *  from ZK by the client.
840   */
841  @Test
842  public void testConnection() throws Exception{
843    // We create an empty config and add the ZK address.
844    Configuration c = new Configuration();
845    // This test only makes sense for ZK based connection registry.
846    c.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
847        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
848    c.set(HConstants.ZOOKEEPER_QUORUM,
849      TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
850    c.set(HConstants.ZOOKEEPER_CLIENT_PORT,
851      TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
852    // This should be enough to connect
853    ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(c);
854    assertTrue(conn.isMasterRunning());
855    conn.close();
856  }
857
858  private int setNumTries(ConnectionImplementation hci, int newVal) throws Exception {
859    Field numTries = hci.getClass().getDeclaredField("numTries");
860    numTries.setAccessible(true);
861    Field modifiersField = Field.class.getDeclaredField("modifiers");
862    modifiersField.setAccessible(true);
863    modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
864    final int prevNumRetriesVal = (Integer)numTries.get(hci);
865    numTries.set(hci, newVal);
866
867    return prevNumRetriesVal;
868  }
869
870  @Test
871  public void testMulti() throws Exception {
872    Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME3, FAM_NAM);
873    try {
874      ConnectionImplementation conn = (ConnectionImplementation)TEST_UTIL.getConnection();
875
876      // We're now going to move the region and check that it works for the client
877      // First a new put to add the location in the cache
878      conn.clearRegionCache(TABLE_NAME3);
879      Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3));
880
881      TEST_UTIL.getAdmin().setBalancerRunning(false, false);
882      HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
883
884      // We can wait for all regions to be online, that makes log reading easier when debugging
885      TEST_UTIL.waitUntilNoRegionsInTransition();
886
887      Put put = new Put(ROW_X);
888      put.addColumn(FAM_NAM, ROW_X, ROW_X);
889      table.put(put);
890
891      // Now moving the region to the second server
892      HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation();
893      byte[] regionName = toMove.getRegionInfo().getRegionName();
894      byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
895
896      // Choose the other server.
897      int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
898      int destServerId = (curServerId == 0 ? 1 : 0);
899
900      HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
901      HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
902
903      ServerName destServerName = destServer.getServerName();
904      ServerName metaServerName = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
905
906       //find another row in the cur server that is less than ROW_X
907      List<HRegion> regions = curServer.getRegions(TABLE_NAME3);
908      byte[] otherRow = null;
909       for (Region region : regions) {
910         if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
911             && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) {
912           otherRow = region.getRegionInfo().getStartKey();
913           break;
914         }
915       }
916      assertNotNull(otherRow);
917      // If empty row, set it to first row.-f
918      if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
919      Put put2 = new Put(otherRow);
920      put2.addColumn(FAM_NAM, otherRow, otherRow);
921      table.put(put2); //cache put2's location
922
923      // Check that we are in the expected state
924      Assert.assertTrue(curServer != destServer);
925      Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName());
926      Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
927      Assert.assertNotNull(curServer.getOnlineRegion(regionName));
928      Assert.assertNull(destServer.getOnlineRegion(regionName));
929      Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
930          getAssignmentManager().hasRegionsInTransition());
931
932       // Moving. It's possible that we don't have all the regions online at this point, so
933      //  the test depends only on the region we're looking at.
934      LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
935      TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName);
936
937      while (destServer.getOnlineRegion(regionName) == null ||
938          destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
939          curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
940          master.getAssignmentManager().hasRegionsInTransition()) {
941        // wait for the move to be finished
942        Thread.sleep(1);
943      }
944
945      LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
946
947      // Check our new state.
948      Assert.assertNull(curServer.getOnlineRegion(regionName));
949      Assert.assertNotNull(destServer.getOnlineRegion(regionName));
950      Assert.assertFalse(destServer.getRegionsInTransitionInRS()
951          .containsKey(encodedRegionNameBytes));
952      Assert.assertFalse(curServer.getRegionsInTransitionInRS()
953          .containsKey(encodedRegionNameBytes));
954
955
956       // Cache was NOT updated and points to the wrong server
957      Assert.assertFalse(
958          conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation()
959              .getPort() == destServerName.getPort());
960
961      // Hijack the number of retry to fail after 2 tries
962      final int prevNumRetriesVal = setNumTries(conn, 2);
963
964      Put put3 = new Put(ROW_X);
965      put3.addColumn(FAM_NAM, ROW_X, ROW_X);
966      Put put4 = new Put(otherRow);
967      put4.addColumn(FAM_NAM, otherRow, otherRow);
968
969      // do multi
970      ArrayList<Put> actions = Lists.newArrayList(put4, put3);
971      table.batch(actions, null); // first should be a valid row,
972      // second we get RegionMovedException.
973
974      setNumTries(conn, prevNumRetriesVal);
975    } finally {
976      table.close();
977    }
978  }
979
980  @Test
981  public void testErrorBackoffTimeCalculation() throws Exception {
982    // TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not.
983    final long ANY_PAUSE = 100;
984    ServerName location = ServerName.valueOf("127.0.0.1", 1, 0);
985    ServerName diffLocation = ServerName.valueOf("127.0.0.1", 2, 0);
986
987    ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
988    EnvironmentEdgeManager.injectEdge(timeMachine);
989    try {
990      long largeAmountOfTime = ANY_PAUSE * 1000;
991      ConnectionImplementation.ServerErrorTracker tracker =
992          new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100);
993
994      // The default backoff is 0.
995      assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
996
997      // Check some backoff values from HConstants sequence.
998      tracker.reportServerError(location);
999      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
1000        tracker.calculateBackoffTime(location, ANY_PAUSE));
1001      tracker.reportServerError(location);
1002      tracker.reportServerError(location);
1003      tracker.reportServerError(location);
1004      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[3],
1005        tracker.calculateBackoffTime(location, ANY_PAUSE));
1006
1007      // All of this shouldn't affect backoff for different location.
1008      assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1009      tracker.reportServerError(diffLocation);
1010      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
1011        tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1012
1013      // Check with different base.
1014      assertEqualsWithJitter(ANY_PAUSE * 2 * HConstants.RETRY_BACKOFF[3],
1015          tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
1016    } finally {
1017      EnvironmentEdgeManager.reset();
1018    }
1019  }
1020
1021  private static void assertEqualsWithJitter(long expected, long actual) {
1022    assertEqualsWithJitter(expected, actual, expected);
1023  }
1024
1025  private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) {
1026    assertTrue("Value not within jitter: " + expected + " vs " + actual,
1027      Math.abs(actual - expected) <= (0.01f * jitterBase));
1028  }
1029
1030  @Test
1031  public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException {
1032    Configuration config = new Configuration(TEST_UTIL.getConfiguration());
1033    // This test only makes sense for ZK based connection registry.
1034    config.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
1035        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
1036
1037    final TableName tableName = TableName.valueOf(name.getMethodName());
1038    TEST_UTIL.createTable(tableName, new byte[][] {FAM_NAM}).close();
1039
1040    Connection connection = ConnectionFactory.createConnection(config);
1041    Table table = connection.getTable(tableName);
1042
1043    // this will cache the meta location and table's region location
1044    table.get(new Get(Bytes.toBytes("foo")));
1045
1046    // restart HBase
1047    TEST_UTIL.shutdownMiniHBaseCluster();
1048    TEST_UTIL.restartHBaseCluster(2);
1049    // this should be able to discover new locations for meta and table's region
1050    table.get(new Get(Bytes.toBytes("foo")));
1051    TEST_UTIL.deleteTable(tableName);
1052    table.close();
1053    connection.close();
1054  }
1055
1056  @Test
1057  public void testLocateRegionsWithRegionReplicas() throws IOException {
1058    int regionReplication = 3;
1059    byte[] family = Bytes.toBytes("cf");
1060    TableName tableName = TableName.valueOf(name.getMethodName());
1061
1062    // Create a table with region replicas
1063    TableDescriptorBuilder builder = TableDescriptorBuilder
1064      .newBuilder(tableName)
1065      .setRegionReplication(regionReplication)
1066      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
1067    TEST_UTIL.getAdmin().createTable(builder.build());
1068
1069    try (ConnectionImplementation con = (ConnectionImplementation) ConnectionFactory.
1070      createConnection(TEST_UTIL.getConfiguration())) {
1071      // Get locations of the regions of the table
1072      List<HRegionLocation> locations = con.locateRegions(tableName, false, false);
1073
1074      // The size of the returned locations should be 3
1075      assertEquals(regionReplication, locations.size());
1076
1077      // The replicaIds of the returned locations should be 0, 1 and 2
1078      Set<Integer> expectedReplicaIds = IntStream.range(0, regionReplication).
1079        boxed().collect(Collectors.toSet());
1080      for (HRegionLocation location : locations) {
1081        assertTrue(expectedReplicaIds.remove(location.getRegion().getReplicaId()));
1082      }
1083    } finally {
1084      TEST_UTIL.deleteTable(tableName);
1085    }
1086  }
1087
1088  @Test
1089  public void testMetaLookupThreadPoolCreated() throws Exception {
1090    final TableName tableName = TableName.valueOf(name.getMethodName());
1091    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
1092    if (TEST_UTIL.getAdmin().tableExists(tableName)) {
1093      TEST_UTIL.getAdmin().disableTable(tableName);
1094      TEST_UTIL.getAdmin().deleteTable(tableName);
1095    }
1096    try (Table htable = TEST_UTIL.createTable(tableName, FAMILIES)) {
1097      byte[] row = Bytes.toBytes("test");
1098      ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection());
1099      // check that metalookup pool would get created
1100      c.relocateRegion(tableName, row);
1101      ExecutorService ex = c.getCurrentMetaLookupPool();
1102      assertNotNull(ex);
1103    }
1104  }
1105
1106  // There is no assertion, but you need to confirm that there is no resource leak output from netty
1107  @Test
1108  public void testCancelConnectionMemoryLeak() throws IOException, InterruptedException {
1109    TableName tableName = TableName.valueOf(name.getMethodName());
1110    TEST_UTIL.createTable(tableName, FAM_NAM).close();
1111    TEST_UTIL.getAdmin().balancerSwitch(false, true);
1112    try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
1113      Table table = connection.getTable(tableName)) {
1114      table.get(new Get(Bytes.toBytes("1")));
1115      ServerName sn = TEST_UTIL.getRSForFirstRegionInTable(tableName).getServerName();
1116      RpcClient rpcClient = ((ConnectionImplementation) connection).getRpcClient();
1117      rpcClient.cancelConnections(sn);
1118      Thread.sleep(1000);
1119      System.gc();
1120      Thread.sleep(1000);
1121    }
1122  }
1123}