001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.reflect.Field;
028import java.lang.reflect.Modifier;
029import java.net.SocketTimeoutException;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.Set;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.SynchronousQueue;
035import java.util.concurrent.ThreadLocalRandom;
036import java.util.concurrent.ThreadPoolExecutor;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.concurrent.atomic.AtomicReference;
041import java.util.stream.Collectors;
042import java.util.stream.IntStream;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.HBaseClassTestRule;
046import org.apache.hadoop.hbase.HBaseTestingUtility;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.HRegionLocation;
049import org.apache.hadoop.hbase.RegionLocations;
050import org.apache.hadoop.hbase.ServerName;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.Waiter;
053import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
054import org.apache.hadoop.hbase.exceptions.DeserializationException;
055import org.apache.hadoop.hbase.exceptions.RegionMovedException;
056import org.apache.hadoop.hbase.filter.Filter;
057import org.apache.hadoop.hbase.filter.FilterBase;
058import org.apache.hadoop.hbase.ipc.RpcClient;
059import org.apache.hadoop.hbase.master.HMaster;
060import org.apache.hadoop.hbase.regionserver.HRegion;
061import org.apache.hadoop.hbase.regionserver.HRegionServer;
062import org.apache.hadoop.hbase.regionserver.Region;
063import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
064import org.apache.hadoop.hbase.testclassification.LargeTests;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.hadoop.hbase.util.JVMClusterUtil;
068import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
069import org.apache.hadoop.hbase.util.Threads;
070import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
071import org.junit.After;
072import org.junit.AfterClass;
073import org.junit.Assert;
074import org.junit.BeforeClass;
075import org.junit.ClassRule;
076import org.junit.Ignore;
077import org.junit.Rule;
078import org.junit.Test;
079import org.junit.experimental.categories.Category;
080import org.junit.rules.TestName;
081import org.slf4j.Logger;
082import org.slf4j.LoggerFactory;
083
084import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
085import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
086import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector.Level;
087
088/**
089 * This class is for testing HBaseConnectionManager features
090 */
091@Category({LargeTests.class})
092public class TestConnectionImplementation {
093
094  @ClassRule
095  public static final HBaseClassTestRule CLASS_RULE =
096      HBaseClassTestRule.forClass(TestConnectionImplementation.class);
097
098  private static final Logger LOG = LoggerFactory.getLogger(TestConnectionImplementation.class);
099  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
100  private static final TableName TABLE_NAME =
101      TableName.valueOf("test");
102  private static final TableName TABLE_NAME1 =
103      TableName.valueOf("test1");
104  private static final TableName TABLE_NAME2 =
105      TableName.valueOf("test2");
106  private static final TableName TABLE_NAME3 =
107      TableName.valueOf("test3");
108  private static final byte[] FAM_NAM = Bytes.toBytes("f");
109  private static final byte[] ROW = Bytes.toBytes("bbb");
110  private static final byte[] ROW_X = Bytes.toBytes("xxx");
111  private static final int RPC_RETRY = 5;
112
113  @Rule
114  public TestName name = new TestName();
115
116  @BeforeClass
117  public static void setUpBeforeClass() throws Exception {
118    ResourceLeakDetector.setLevel(Level.PARANOID);
119    TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
120    // Up the handlers; this test needs more than usual.
121    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
122    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
123    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 3);
124    TEST_UTIL.startMiniCluster(2);
125
126  }
127
128  @AfterClass
129  public static void tearDownAfterClass() throws Exception {
130    TEST_UTIL.shutdownMiniCluster();
131  }
132
133  @After
134  public void tearDown() throws IOException {
135    TEST_UTIL.getAdmin().balancerSwitch(true, true);
136  }
137
138  @Test
139  public void testClusterConnection() throws IOException {
140    ThreadPoolExecutor otherPool =
141      new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new SynchronousQueue<>(),
142        new ThreadFactoryBuilder().setNameFormat("test-hcm-pool-%d")
143          .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
144
145    Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
146    Connection con2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), otherPool);
147    // make sure the internally created ExecutorService is the one passed
148    assertTrue(otherPool == ((ConnectionImplementation) con2).getCurrentBatchPool());
149
150    final TableName tableName = TableName.valueOf(name.getMethodName());
151    TEST_UTIL.createTable(tableName, FAM_NAM).close();
152    Table table = con1.getTable(tableName, otherPool);
153
154    ExecutorService pool = null;
155
156    if(table instanceof HTable) {
157      HTable t = (HTable) table;
158      // make sure passing a pool to the getTable does not trigger creation of an internal pool
159      assertNull("Internal Thread pool should be null",
160        ((ConnectionImplementation) con1).getCurrentBatchPool());
161      // table should use the pool passed
162      assertTrue(otherPool == t.getPool());
163      t.close();
164
165      t = (HTable) con2.getTable(tableName);
166      // table should use the connectin's internal pool
167      assertTrue(otherPool == t.getPool());
168      t.close();
169
170      t = (HTable) con2.getTable(tableName);
171      // try other API too
172      assertTrue(otherPool == t.getPool());
173      t.close();
174
175      t = (HTable) con2.getTable(tableName);
176      // try other API too
177      assertTrue(otherPool == t.getPool());
178      t.close();
179
180      t = (HTable) con1.getTable(tableName);
181      pool = ((ConnectionImplementation) con1).getCurrentBatchPool();
182      // make sure an internal pool was created
183      assertNotNull("An internal Thread pool should have been created", pool);
184      // and that the table is using it
185      assertTrue(t.getPool() == pool);
186      t.close();
187
188      t = (HTable) con1.getTable(tableName);
189      // still using the *same* internal pool
190      assertTrue(t.getPool() == pool);
191      t.close();
192    } else {
193      table.close();
194    }
195
196    con1.close();
197
198    // if the pool was created on demand it should be closed upon connection close
199    if(pool != null) {
200      assertTrue(pool.isShutdown());
201    }
202
203    con2.close();
204    // if the pool is passed, it is not closed
205    assertFalse(otherPool.isShutdown());
206    otherPool.shutdownNow();
207  }
208
209  /**
210   * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object
211   * @throws IOException Unable to construct admin
212   */
213  @Test
214  public void testAdminFactory() throws IOException {
215    Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
216    Admin admin = con1.getAdmin();
217    assertTrue(admin.getConnection() == con1);
218    assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration());
219    con1.close();
220  }
221
222  // Fails too often!  Needs work.  HBASE-12558
223  // May only fail on non-linux machines? E.g. macosx.
224  @Ignore @Test (expected = RegionServerStoppedException.class)
225  // Depends on mulitcast messaging facility that seems broken in hbase2
226  // See  HBASE-19261 "ClusterStatusPublisher where Master could optionally broadcast notice of
227  // dead servers is broke"
228  public void testClusterStatus() throws Exception {
229    final TableName tableName = TableName.valueOf(name.getMethodName());
230    byte[] cf = "cf".getBytes();
231    byte[] rk = "rk1".getBytes();
232
233    JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
234    rs.waitForServerOnline();
235    final ServerName sn = rs.getRegionServer().getServerName();
236
237    Table t = TEST_UTIL.createTable(tableName, cf);
238    TEST_UTIL.waitTableAvailable(tableName);
239    TEST_UTIL.waitUntilNoRegionsInTransition();
240
241    final ConnectionImplementation hci =  (ConnectionImplementation)TEST_UTIL.getConnection();
242    try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
243      while (l.getRegionLocation(rk).getPort() != sn.getPort()) {
244        TEST_UTIL.getAdmin().move(l.getRegionLocation(rk).getRegionInfo().getEncodedNameAsBytes(),
245          sn);
246        TEST_UTIL.waitUntilNoRegionsInTransition();
247        hci.clearRegionCache(tableName);
248      }
249      Assert.assertNotNull(hci.clusterStatusListener);
250      TEST_UTIL.assertRegionOnServer(l.getRegionLocation(rk).getRegionInfo(), sn, 20000);
251    }
252
253    Put p1 = new Put(rk);
254    p1.addColumn(cf, "qual".getBytes(), "val".getBytes());
255    t.put(p1);
256
257    rs.getRegionServer().abort("I'm dead");
258
259    // We want the status to be updated. That's a least 10 second
260    TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
261      @Override
262      public boolean evaluate() throws Exception {
263        return TEST_UTIL.getHBaseCluster().getMaster().getServerManager().
264            getDeadServers().isDeadServer(sn);
265      }
266    });
267
268    TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
269      @Override
270      public boolean evaluate() throws Exception {
271        return hci.clusterStatusListener.isDeadServer(sn);
272      }
273    });
274
275    t.close();
276    hci.getClient(sn);  // will throw an exception: RegionServerStoppedException
277  }
278
279  /**
280   * Test that we can handle connection close: it will trigger a retry, but the calls will finish.
281   */
282  @Test
283  public void testConnectionCloseAllowsInterrupt() throws Exception {
284    testConnectionClose(true);
285  }
286
287  @Test
288  public void testConnectionNotAllowsInterrupt() throws Exception {
289    testConnectionClose(false);
290  }
291
292  private void testConnectionClose(boolean allowsInterrupt) throws Exception {
293    TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
294    TEST_UTIL.createTable(tableName, FAM_NAM).close();
295
296    TEST_UTIL.getAdmin().balancerSwitch(false, true);
297
298    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
299    // We want to work on a separate connection.
300    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
301    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot
302    c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries.
303    c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
304    c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt);
305    // to avoid the client to be stuck when do the Get
306    c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 10000);
307    c2.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10000);
308    c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
309
310    Connection connection = ConnectionFactory.createConnection(c2);
311    final Table table = connection.getTable(tableName);
312
313    Put put = new Put(ROW);
314    put.addColumn(FAM_NAM, ROW, ROW);
315    table.put(put);
316
317    // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
318    final AtomicInteger step = new AtomicInteger(0);
319
320    final AtomicReference<Throwable> failed = new AtomicReference<>(null);
321    Thread t = new Thread("testConnectionCloseThread") {
322      @Override
323      public void run() {
324        int done = 0;
325        try {
326          step.set(1);
327          while (step.get() == 1) {
328            Get get = new Get(ROW);
329            table.get(get);
330            done++;
331            if (done % 100 == 0)
332              LOG.info("done=" + done);
333            // without the sleep, will cause the exception for too many files in
334            // org.apache.hadoop.hdfs.server.datanode.DataXceiver
335            Thread.sleep(100);
336          }
337        } catch (Throwable t) {
338          failed.set(t);
339          LOG.error(t.toString(), t);
340        }
341        step.set(3);
342      }
343    };
344    t.start();
345    TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
346      @Override
347      public boolean evaluate() throws Exception {
348        return step.get() == 1;
349      }
350    });
351
352    ServerName sn;
353    try(RegionLocator rl = connection.getRegionLocator(tableName)) {
354      sn = rl.getRegionLocation(ROW).getServerName();
355    }
356    ConnectionImplementation conn =
357        (ConnectionImplementation) connection;
358    RpcClient rpcClient = conn.getRpcClient();
359
360    LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
361    for (int i = 0; i < 500; i++) {
362      rpcClient.cancelConnections(sn);
363      Thread.sleep(50);
364    }
365
366    step.compareAndSet(1, 2);
367    // The test may fail here if the thread doing the gets is stuck. The way to find
368    //  out what's happening is to look for the thread named 'testConnectionCloseThread'
369    TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() {
370      @Override
371      public boolean evaluate() throws Exception {
372        return step.get() == 3;
373      }
374    });
375    table.close();
376    connection.close();
377    Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
378  }
379
380  /**
381   * Test that connection can become idle without breaking everything.
382   */
383  @Test
384  public void testConnectionIdle() throws Exception {
385    final TableName tableName = TableName.valueOf(name.getMethodName());
386    TEST_UTIL.createTable(tableName, FAM_NAM).close();
387    int idleTime =  20000;
388    boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true);
389
390    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
391    // We want to work on a separate connection.
392    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
393    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed
394    c2.setInt(RpcClient.IDLE_TIME, idleTime);
395
396    Connection connection = ConnectionFactory.createConnection(c2);
397    final Table table = connection.getTable(tableName);
398
399    Put put = new Put(ROW);
400    put.addColumn(FAM_NAM, ROW, ROW);
401    table.put(put);
402
403    ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
404    mee.setValue(System.currentTimeMillis());
405    EnvironmentEdgeManager.injectEdge(mee);
406    LOG.info("first get");
407    table.get(new Get(ROW));
408
409    LOG.info("first get - changing the time & sleeping");
410    mee.incValue(idleTime + 1000);
411    Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle.
412                        // 1500 = sleep time in RpcClient#waitForWork + a margin
413
414    LOG.info("second get - connection has been marked idle in the middle");
415    // To check that the connection actually became idle would need to read some private
416    //  fields of RpcClient.
417    table.get(new Get(ROW));
418    mee.incValue(idleTime + 1000);
419
420    LOG.info("third get - connection is idle, but the reader doesn't know yet");
421    // We're testing here a special case:
422    //  time limit reached BUT connection not yet reclaimed AND a new call.
423    //  in this situation, we don't close the connection, instead we use it immediately.
424    // If we're very unlucky we can have a race condition in the test: the connection is already
425    //  under closing when we do the get, so we have an exception, and we don't retry as the
426    //  retry number is 1. The probability is very very low, and seems acceptable for now. It's
427    //  a test issue only.
428    table.get(new Get(ROW));
429
430    LOG.info("we're done - time will change back");
431
432    table.close();
433
434    connection.close();
435    EnvironmentEdgeManager.reset();
436    TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true);
437  }
438
439    /**
440     * Test that the connection to the dead server is cut immediately when we receive the
441     *  notification.
442     * @throws Exception
443     */
444  @Test
445  public void testConnectionCut() throws Exception {
446    final TableName tableName = TableName.valueOf(name.getMethodName());
447
448    TEST_UTIL.createTable(tableName, FAM_NAM).close();
449    boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true);
450
451    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
452    // We want to work on a separate connection.
453    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
454    // try only once w/o any retry
455    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
456    c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000);
457
458    final Connection connection = ConnectionFactory.createConnection(c2);
459    final Table table = connection.getTable(tableName);
460
461    Put p = new Put(FAM_NAM);
462    p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM);
463    table.put(p);
464
465    final ConnectionImplementation hci =  (ConnectionImplementation) connection;
466
467    final HRegionLocation loc;
468    try(RegionLocator rl = connection.getRegionLocator(tableName)) {
469      loc = rl.getRegionLocation(FAM_NAM);
470    }
471
472    Get get = new Get(FAM_NAM);
473    Assert.assertNotNull(table.get(get));
474
475    get = new Get(FAM_NAM);
476    get.setFilter(new BlockingFilter());
477
478    // This thread will mark the server as dead while we're waiting during a get.
479    Thread t = new Thread() {
480      @Override
481      public void run() {
482        synchronized (syncBlockingFilter) {
483          try {
484            syncBlockingFilter.wait();
485          } catch (InterruptedException e) {
486            throw new RuntimeException(e);
487          }
488        }
489        hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName());
490      }
491    };
492
493    t.start();
494    try {
495      table.get(get);
496      Assert.fail();
497    } catch (IOException expected) {
498      LOG.debug("Received: " + expected);
499      Assert.assertFalse(expected instanceof SocketTimeoutException);
500      Assert.assertFalse(syncBlockingFilter.get());
501    } finally {
502      syncBlockingFilter.set(true);
503      t.join();
504      TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true);
505    }
506
507    table.close();
508    connection.close();
509  }
510
511  protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false);
512
513  public static class BlockingFilter extends FilterBase {
514    @Override
515    public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
516      int i = 0;
517      while (i++ < 1000 && !syncBlockingFilter.get()) {
518        synchronized (syncBlockingFilter) {
519          syncBlockingFilter.notifyAll();
520        }
521        Threads.sleep(100);
522      }
523      syncBlockingFilter.set(true);
524      return false;
525    }
526    @Override
527    public ReturnCode filterCell(final Cell ignored) throws IOException {
528      return ReturnCode.INCLUDE;
529    }
530
531    public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException{
532      return new BlockingFilter();
533    }
534  }
535
536  /**
537   * Test that when we delete a location using the first row of a region
538   * that we really delete it.
539   * @throws Exception
540   */
541  @Test
542  public void testRegionCaching() throws Exception {
543    TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAM_NAM).close();
544    Configuration conf =  new Configuration(TEST_UTIL.getConfiguration());
545    // test with no retry, or client cache will get updated after the first failure
546    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
547    Connection connection = ConnectionFactory.createConnection(conf);
548    final Table table = connection.getTable(TABLE_NAME);
549
550    TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
551    Put put = new Put(ROW);
552    put.addColumn(FAM_NAM, ROW, ROW);
553    table.put(put);
554
555    ConnectionImplementation conn = (ConnectionImplementation) connection;
556
557    assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
558
559    // Here we mess with the cached location making it so the region at TABLE_NAME, ROW is at
560    // a location where the port is current port number +1 -- i.e. a non-existent location.
561    HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
562    final int nextPort = loc.getPort() + 1;
563    conn.updateCachedLocation(loc.getRegionInfo(), loc.getServerName(),
564        ServerName.valueOf("127.0.0.1", nextPort,
565        HConstants.LATEST_TIMESTAMP), HConstants.LATEST_TIMESTAMP);
566    Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW)
567      .getRegionLocation().getPort(), nextPort);
568
569    conn.clearRegionCache(TABLE_NAME, ROW.clone());
570    RegionLocations rl = conn.getCachedLocation(TABLE_NAME, ROW);
571    assertNull("What is this location?? " + rl, rl);
572
573    // We're now going to move the region and check that it works for the client
574    // First a new put to add the location in the cache
575    conn.clearRegionCache(TABLE_NAME);
576    Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME));
577    Put put2 = new Put(ROW);
578    put2.addColumn(FAM_NAM, ROW, ROW);
579    table.put(put2);
580    assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
581    assertNotNull(conn.getCachedLocation(TableName.valueOf(TABLE_NAME.getName()), ROW.clone()));
582
583    TEST_UTIL.getAdmin().setBalancerRunning(false, false);
584    HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
585
586    // We can wait for all regions to be online, that makes log reading easier when debugging
587    TEST_UTIL.waitUntilNoRegionsInTransition();
588
589    // Now moving the region to the second server
590    HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
591    byte[] regionName = toMove.getRegionInfo().getRegionName();
592    byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
593
594    // Choose the other server.
595    int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
596    int destServerId = curServerId == 0? 1: 0;
597
598    HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
599    HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
600
601    ServerName destServerName = destServer.getServerName();
602
603    // Check that we are in the expected state
604    Assert.assertTrue(curServer != destServer);
605    Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName()));
606    Assert.assertFalse( toMove.getPort() == destServerName.getPort());
607    Assert.assertNotNull(curServer.getOnlineRegion(regionName));
608    Assert.assertNull(destServer.getOnlineRegion(regionName));
609    Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
610        getAssignmentManager().hasRegionsInTransition());
611
612    // Moving. It's possible that we don't have all the regions online at this point, so
613    //  the test must depend only on the region we're looking at.
614    LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
615    TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName);
616
617    while (destServer.getOnlineRegion(regionName) == null ||
618        destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
619        curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
620        master.getAssignmentManager().hasRegionsInTransition()) {
621      // wait for the move to be finished
622      Thread.sleep(1);
623    }
624
625    LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
626
627    // Check our new state.
628    Assert.assertNull(curServer.getOnlineRegion(regionName));
629    Assert.assertNotNull(destServer.getOnlineRegion(regionName));
630    Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
631    Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
632
633
634    // Cache was NOT updated and points to the wrong server
635    Assert.assertFalse(
636        conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation()
637          .getPort() == destServerName.getPort());
638
639    // This part relies on a number of tries equals to 1.
640    // We do a put and expect the cache to be updated, even if we don't retry
641    LOG.info("Put starting");
642    Put put3 = new Put(ROW);
643    put3.addColumn(FAM_NAM, ROW, ROW);
644    try {
645      table.put(put3);
646      Assert.fail("Unreachable point");
647    } catch (RetriesExhaustedWithDetailsException e) {
648      LOG.info("Put done, exception caught: " + e.getClass());
649      Assert.assertEquals(1, e.getNumExceptions());
650      Assert.assertEquals(1, e.getCauses().size());
651      Assert.assertArrayEquals(ROW, e.getRow(0).getRow());
652
653      // Check that we unserialized the exception as expected
654      Throwable cause = ClientExceptionsUtil.findException(e.getCause(0));
655      Assert.assertNotNull(cause);
656      Assert.assertTrue(cause instanceof RegionMovedException);
657    } catch (RetriesExhaustedException ree) {
658      // hbase2 throws RetriesExhaustedException instead of RetriesExhaustedWithDetailsException
659      // as hbase1 used to do. Keep an eye on this to see if this changed behavior is an issue.
660      LOG.info("Put done, exception caught: " + ree.getClass());
661      Throwable cause = ClientExceptionsUtil.findException(ree.getCause());
662      Assert.assertNotNull(cause);
663      Assert.assertTrue(cause instanceof RegionMovedException);
664    }
665    Assert.assertNotNull("Cached connection is null", conn.getCachedLocation(TABLE_NAME, ROW));
666    Assert.assertEquals(
667        "Previous server was " + curServer.getServerName().getHostAndPort(),
668        destServerName.getPort(),
669        conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
670
671    Assert.assertFalse(destServer.getRegionsInTransitionInRS()
672      .containsKey(encodedRegionNameBytes));
673    Assert.assertFalse(curServer.getRegionsInTransitionInRS()
674      .containsKey(encodedRegionNameBytes));
675
676    // We move it back to do another test with a scan
677    LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
678    TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(),
679      curServer.getServerName());
680
681    while (curServer.getOnlineRegion(regionName) == null ||
682        destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
683        curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
684        master.getAssignmentManager().hasRegionsInTransition()) {
685      // wait for the move to be finished
686      Thread.sleep(1);
687    }
688
689    // Check our new state.
690    Assert.assertNotNull(curServer.getOnlineRegion(regionName));
691    Assert.assertNull(destServer.getOnlineRegion(regionName));
692    LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString());
693
694    // Cache was NOT updated and points to the wrong server
695    Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() ==
696      curServer.getServerName().getPort());
697
698    Scan sc = new Scan();
699    sc.setStopRow(ROW);
700    sc.setStartRow(ROW);
701
702    // The scanner takes the max retries from the connection configuration, not the table as
703    // the put.
704    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
705
706    try {
707      ResultScanner rs = table.getScanner(sc);
708      while (rs.next() != null) {
709      }
710      Assert.fail("Unreachable point");
711    } catch (RetriesExhaustedException e) {
712      LOG.info("Scan done, expected exception caught: " + e.getClass());
713    }
714
715    // Cache is updated with the right value.
716    Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
717    Assert.assertEquals(
718      "Previous server was "+destServer.getServerName().getHostAndPort(),
719      curServer.getServerName().getPort(),
720      conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
721
722    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
723    table.close();
724    connection.close();
725  }
726
727  /**
728   * Test that Connection or Pool are not closed when managed externally
729   * @throws Exception
730   */
731  @Test
732  public void testConnectionManagement() throws Exception{
733    Table table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM);
734    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
735    Table table = conn.getTable(TABLE_NAME1);
736    table.close();
737    assertFalse(conn.isClosed());
738    if(table instanceof HTable) {
739      assertFalse(((HTable) table).getPool().isShutdown());
740    }
741    table = conn.getTable(TABLE_NAME1);
742    table.close();
743    if(table instanceof HTable) {
744      assertFalse(((HTable) table).getPool().isShutdown());
745    }
746    conn.close();
747    if(table instanceof HTable) {
748      assertTrue(((HTable) table).getPool().isShutdown());
749    }
750    table0.close();
751  }
752
753  /**
754   * Test that stale cache updates don't override newer cached values.
755   */
756  @Test
757  public void testCacheSeqNums() throws Exception{
758    Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME2, FAM_NAM);
759    Put put = new Put(ROW);
760    put.addColumn(FAM_NAM, ROW, ROW);
761    table.put(put);
762    ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection();
763
764    HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
765    assertNotNull(location);
766
767    ServerName anySource = ServerName.valueOf(location.getHostname(), location.getPort() - 1, 0L);
768
769    // Same server as already in cache reporting - overwrites any value despite seqNum.
770    int nextPort = location.getPort() + 1;
771    conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
772        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
773    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
774    Assert.assertEquals(nextPort, location.getPort());
775
776    // No source specified - same.
777    nextPort = location.getPort() + 1;
778    conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
779        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
780    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
781    Assert.assertEquals(nextPort, location.getPort());
782
783    // Higher seqNum - overwrites lower seqNum.
784    nextPort = location.getPort() + 1;
785    conn.updateCachedLocation(location.getRegionInfo(), anySource,
786        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() + 1);
787    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
788    Assert.assertEquals(nextPort, location.getPort());
789
790    // Lower seqNum - does not overwrite higher seqNum.
791    nextPort = location.getPort() + 1;
792    conn.updateCachedLocation(location.getRegionInfo(), anySource,
793        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
794    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
795    Assert.assertEquals(nextPort - 1, location.getPort());
796    table.close();
797  }
798
799  @Test
800  public void testClosing() throws Exception {
801    Configuration configuration =
802      new Configuration(TEST_UTIL.getConfiguration());
803    configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
804      String.valueOf(ThreadLocalRandom.current().nextInt()));
805
806    // as connection caching is going away, now we're just testing
807    // that closed connection does actually get closed.
808
809    Connection c1 = ConnectionFactory.createConnection(configuration);
810    Connection c2 = ConnectionFactory.createConnection(configuration);
811    // no caching, different connections
812    assertTrue(c1 != c2);
813
814    // closing independently
815    c1.close();
816    assertTrue(c1.isClosed());
817    assertFalse(c2.isClosed());
818
819    c2.close();
820    assertTrue(c2.isClosed());
821  }
822
823  /**
824   * Trivial test to verify that nobody messes with
825   * {@link ConnectionFactory#createConnection(Configuration)}
826   */
827  @Test
828  public void testCreateConnection() throws Exception {
829    Configuration configuration = TEST_UTIL.getConfiguration();
830    Connection c1 = ConnectionFactory.createConnection(configuration);
831    Connection c2 = ConnectionFactory.createConnection(configuration);
832    // created from the same configuration, yet they are different
833    assertTrue(c1 != c2);
834    assertTrue(c1.getConfiguration() == c2.getConfiguration());
835  }
836
837  /**
838   * This test checks that one can connect to the cluster with only the
839   *  ZooKeeper quorum set. Other stuff like master address will be read
840   *  from ZK by the client.
841   */
842  @Test
843  public void testConnection() throws Exception{
844    // We create an empty config and add the ZK address.
845    Configuration c = new Configuration();
846    // This test only makes sense for ZK based connection registry.
847    c.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
848        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
849    c.set(HConstants.ZOOKEEPER_QUORUM,
850      TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
851    c.set(HConstants.ZOOKEEPER_CLIENT_PORT,
852      TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
853    // This should be enough to connect
854    ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(c);
855    assertTrue(conn.isMasterRunning());
856    conn.close();
857  }
858
859  private int setNumTries(ConnectionImplementation hci, int newVal) throws Exception {
860    Field numTries = hci.getClass().getDeclaredField("numTries");
861    numTries.setAccessible(true);
862    Field modifiersField = Field.class.getDeclaredField("modifiers");
863    modifiersField.setAccessible(true);
864    modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
865    final int prevNumRetriesVal = (Integer)numTries.get(hci);
866    numTries.set(hci, newVal);
867
868    return prevNumRetriesVal;
869  }
870
871  @Test
872  public void testMulti() throws Exception {
873    Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME3, FAM_NAM);
874    try {
875      ConnectionImplementation conn = (ConnectionImplementation)TEST_UTIL.getConnection();
876
877      // We're now going to move the region and check that it works for the client
878      // First a new put to add the location in the cache
879      conn.clearRegionCache(TABLE_NAME3);
880      Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3));
881
882      TEST_UTIL.getAdmin().setBalancerRunning(false, false);
883      HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
884
885      // We can wait for all regions to be online, that makes log reading easier when debugging
886      TEST_UTIL.waitUntilNoRegionsInTransition();
887
888      Put put = new Put(ROW_X);
889      put.addColumn(FAM_NAM, ROW_X, ROW_X);
890      table.put(put);
891
892      // Now moving the region to the second server
893      HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation();
894      byte[] regionName = toMove.getRegionInfo().getRegionName();
895      byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
896
897      // Choose the other server.
898      int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
899      int destServerId = (curServerId == 0 ? 1 : 0);
900
901      HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
902      HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
903
904      ServerName destServerName = destServer.getServerName();
905      ServerName metaServerName = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
906
907       //find another row in the cur server that is less than ROW_X
908      List<HRegion> regions = curServer.getRegions(TABLE_NAME3);
909      byte[] otherRow = null;
910       for (Region region : regions) {
911         if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
912             && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) {
913           otherRow = region.getRegionInfo().getStartKey();
914           break;
915         }
916       }
917      assertNotNull(otherRow);
918      // If empty row, set it to first row.-f
919      if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
920      Put put2 = new Put(otherRow);
921      put2.addColumn(FAM_NAM, otherRow, otherRow);
922      table.put(put2); //cache put2's location
923
924      // Check that we are in the expected state
925      Assert.assertTrue(curServer != destServer);
926      Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName());
927      Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
928      Assert.assertNotNull(curServer.getOnlineRegion(regionName));
929      Assert.assertNull(destServer.getOnlineRegion(regionName));
930      Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
931          getAssignmentManager().hasRegionsInTransition());
932
933       // Moving. It's possible that we don't have all the regions online at this point, so
934      //  the test depends only on the region we're looking at.
935      LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
936      TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName);
937
938      while (destServer.getOnlineRegion(regionName) == null ||
939          destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
940          curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
941          master.getAssignmentManager().hasRegionsInTransition()) {
942        // wait for the move to be finished
943        Thread.sleep(1);
944      }
945
946      LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
947
948      // Check our new state.
949      Assert.assertNull(curServer.getOnlineRegion(regionName));
950      Assert.assertNotNull(destServer.getOnlineRegion(regionName));
951      Assert.assertFalse(destServer.getRegionsInTransitionInRS()
952          .containsKey(encodedRegionNameBytes));
953      Assert.assertFalse(curServer.getRegionsInTransitionInRS()
954          .containsKey(encodedRegionNameBytes));
955
956
957       // Cache was NOT updated and points to the wrong server
958      Assert.assertFalse(
959          conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation()
960              .getPort() == destServerName.getPort());
961
962      // Hijack the number of retry to fail after 2 tries
963      final int prevNumRetriesVal = setNumTries(conn, 2);
964
965      Put put3 = new Put(ROW_X);
966      put3.addColumn(FAM_NAM, ROW_X, ROW_X);
967      Put put4 = new Put(otherRow);
968      put4.addColumn(FAM_NAM, otherRow, otherRow);
969
970      // do multi
971      ArrayList<Put> actions = Lists.newArrayList(put4, put3);
972      table.batch(actions, null); // first should be a valid row,
973      // second we get RegionMovedException.
974
975      setNumTries(conn, prevNumRetriesVal);
976    } finally {
977      table.close();
978    }
979  }
980
981  @Test
982  public void testErrorBackoffTimeCalculation() throws Exception {
983    // TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not.
984    final long ANY_PAUSE = 100;
985    ServerName location = ServerName.valueOf("127.0.0.1", 1, 0);
986    ServerName diffLocation = ServerName.valueOf("127.0.0.1", 2, 0);
987
988    ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
989    EnvironmentEdgeManager.injectEdge(timeMachine);
990    try {
991      long largeAmountOfTime = ANY_PAUSE * 1000;
992      ConnectionImplementation.ServerErrorTracker tracker =
993          new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100);
994
995      // The default backoff is 0.
996      assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
997
998      // Check some backoff values from HConstants sequence.
999      tracker.reportServerError(location);
1000      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
1001        tracker.calculateBackoffTime(location, ANY_PAUSE));
1002      tracker.reportServerError(location);
1003      tracker.reportServerError(location);
1004      tracker.reportServerError(location);
1005      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[3],
1006        tracker.calculateBackoffTime(location, ANY_PAUSE));
1007
1008      // All of this shouldn't affect backoff for different location.
1009      assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1010      tracker.reportServerError(diffLocation);
1011      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
1012        tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1013
1014      // Check with different base.
1015      assertEqualsWithJitter(ANY_PAUSE * 2 * HConstants.RETRY_BACKOFF[3],
1016          tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
1017    } finally {
1018      EnvironmentEdgeManager.reset();
1019    }
1020  }
1021
1022  private static void assertEqualsWithJitter(long expected, long actual) {
1023    assertEqualsWithJitter(expected, actual, expected);
1024  }
1025
1026  private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) {
1027    assertTrue("Value not within jitter: " + expected + " vs " + actual,
1028      Math.abs(actual - expected) <= (0.01f * jitterBase));
1029  }
1030
1031  @Test
1032  public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException {
1033    Configuration config = new Configuration(TEST_UTIL.getConfiguration());
1034    // This test only makes sense for ZK based connection registry.
1035    config.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
1036        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
1037
1038    final TableName tableName = TableName.valueOf(name.getMethodName());
1039    TEST_UTIL.createTable(tableName, new byte[][] {FAM_NAM}).close();
1040
1041    Connection connection = ConnectionFactory.createConnection(config);
1042    Table table = connection.getTable(tableName);
1043
1044    // this will cache the meta location and table's region location
1045    table.get(new Get(Bytes.toBytes("foo")));
1046
1047    // restart HBase
1048    TEST_UTIL.shutdownMiniHBaseCluster();
1049    TEST_UTIL.restartHBaseCluster(2);
1050    // this should be able to discover new locations for meta and table's region
1051    table.get(new Get(Bytes.toBytes("foo")));
1052    TEST_UTIL.deleteTable(tableName);
1053    table.close();
1054    connection.close();
1055  }
1056
1057  @Test
1058  public void testLocateRegionsWithRegionReplicas() throws IOException {
1059    int regionReplication = 3;
1060    byte[] family = Bytes.toBytes("cf");
1061    TableName tableName = TableName.valueOf(name.getMethodName());
1062
1063    // Create a table with region replicas
1064    TableDescriptorBuilder builder = TableDescriptorBuilder
1065      .newBuilder(tableName)
1066      .setRegionReplication(regionReplication)
1067      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
1068    TEST_UTIL.getAdmin().createTable(builder.build());
1069
1070    try (ConnectionImplementation con = (ConnectionImplementation) ConnectionFactory.
1071      createConnection(TEST_UTIL.getConfiguration())) {
1072      // Get locations of the regions of the table
1073      List<HRegionLocation> locations = con.locateRegions(tableName, false, false);
1074
1075      // The size of the returned locations should be 3
1076      assertEquals(regionReplication, locations.size());
1077
1078      // The replicaIds of the returned locations should be 0, 1 and 2
1079      Set<Integer> expectedReplicaIds = IntStream.range(0, regionReplication).
1080        boxed().collect(Collectors.toSet());
1081      for (HRegionLocation location : locations) {
1082        assertTrue(expectedReplicaIds.remove(location.getRegion().getReplicaId()));
1083      }
1084    } finally {
1085      TEST_UTIL.deleteTable(tableName);
1086    }
1087  }
1088
1089  @Test
1090  public void testMetaLookupThreadPoolCreated() throws Exception {
1091    final TableName tableName = TableName.valueOf(name.getMethodName());
1092    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
1093    if (TEST_UTIL.getAdmin().tableExists(tableName)) {
1094      TEST_UTIL.getAdmin().disableTable(tableName);
1095      TEST_UTIL.getAdmin().deleteTable(tableName);
1096    }
1097    try (Table htable = TEST_UTIL.createTable(tableName, FAMILIES)) {
1098      byte[] row = Bytes.toBytes("test");
1099      ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection());
1100      // check that metalookup pool would get created
1101      c.relocateRegion(tableName, row);
1102      ExecutorService ex = c.getCurrentMetaLookupPool();
1103      assertNotNull(ex);
1104    }
1105  }
1106
1107  // There is no assertion, but you need to confirm that there is no resource leak output from netty
1108  @Test
1109  public void testCancelConnectionMemoryLeak() throws IOException, InterruptedException {
1110    TableName tableName = TableName.valueOf(name.getMethodName());
1111    TEST_UTIL.createTable(tableName, FAM_NAM).close();
1112    TEST_UTIL.getAdmin().balancerSwitch(false, true);
1113    try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
1114      Table table = connection.getTable(tableName)) {
1115      table.get(new Get(Bytes.toBytes("1")));
1116      ServerName sn = TEST_UTIL.getRSForFirstRegionInTable(tableName).getServerName();
1117      RpcClient rpcClient = ((ConnectionImplementation) connection).getRpcClient();
1118      rpcClient.cancelConnections(sn);
1119      Thread.sleep(1000);
1120      System.gc();
1121      Thread.sleep(1000);
1122    }
1123  }
1124}