001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.reflect.Field;
028import java.lang.reflect.Modifier;
029import java.net.SocketTimeoutException;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.Set;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.SynchronousQueue;
035import java.util.concurrent.ThreadLocalRandom;
036import java.util.concurrent.ThreadPoolExecutor;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.concurrent.atomic.AtomicReference;
041import java.util.stream.Collectors;
042import java.util.stream.IntStream;
043
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.HBaseClassTestRule;
047import org.apache.hadoop.hbase.HBaseTestingUtility;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.HRegionLocation;
050import org.apache.hadoop.hbase.RegionLocations;
051import org.apache.hadoop.hbase.ServerName;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.Waiter;
054import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
055import org.apache.hadoop.hbase.exceptions.DeserializationException;
056import org.apache.hadoop.hbase.exceptions.RegionMovedException;
057import org.apache.hadoop.hbase.filter.Filter;
058import org.apache.hadoop.hbase.filter.FilterBase;
059import org.apache.hadoop.hbase.ipc.RpcClient;
060import org.apache.hadoop.hbase.master.HMaster;
061import org.apache.hadoop.hbase.regionserver.HRegion;
062import org.apache.hadoop.hbase.regionserver.HRegionServer;
063import org.apache.hadoop.hbase.regionserver.Region;
064import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
065import org.apache.hadoop.hbase.testclassification.LargeTests;
066import org.apache.hadoop.hbase.util.Bytes;
067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
068import org.apache.hadoop.hbase.util.JVMClusterUtil;
069import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
070import org.apache.hadoop.hbase.util.Threads;
071import org.junit.After;
072import org.junit.AfterClass;
073import org.junit.Assert;
074import org.junit.BeforeClass;
075import org.junit.ClassRule;
076import org.junit.Ignore;
077import org.junit.Rule;
078import org.junit.Test;
079import org.junit.experimental.categories.Category;
080import org.junit.rules.TestName;
081import org.slf4j.Logger;
082import org.slf4j.LoggerFactory;
083
084import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
085import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
086import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector.Level;
087
088/**
089 * This class is for testing HBaseConnectionManager features
090 */
091@Category({LargeTests.class})
092public class TestConnectionImplementation {
093
094  @ClassRule
095  public static final HBaseClassTestRule CLASS_RULE =
096      HBaseClassTestRule.forClass(TestConnectionImplementation.class);
097
098  private static final Logger LOG = LoggerFactory.getLogger(TestConnectionImplementation.class);
099  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
100  private static final TableName TABLE_NAME =
101      TableName.valueOf("test");
102  private static final TableName TABLE_NAME1 =
103      TableName.valueOf("test1");
104  private static final TableName TABLE_NAME2 =
105      TableName.valueOf("test2");
106  private static final TableName TABLE_NAME3 =
107      TableName.valueOf("test3");
108  private static final byte[] FAM_NAM = Bytes.toBytes("f");
109  private static final byte[] ROW = Bytes.toBytes("bbb");
110  private static final byte[] ROW_X = Bytes.toBytes("xxx");
111  private static final int RPC_RETRY = 5;
112
113  @Rule
114  public TestName name = new TestName();
115
116  @BeforeClass
117  public static void setUpBeforeClass() throws Exception {
118    ResourceLeakDetector.setLevel(Level.PARANOID);
119    TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
120    // Up the handlers; this test needs more than usual.
121    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
122    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
123    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 3);
124    TEST_UTIL.startMiniCluster(2);
125
126  }
127
128  @AfterClass
129  public static void tearDownAfterClass() throws Exception {
130    TEST_UTIL.shutdownMiniCluster();
131  }
132
133  @After
134  public void tearDown() throws IOException {
135    TEST_UTIL.getAdmin().balancerSwitch(true, true);
136  }
137
138  @Test
139  public void testClusterConnection() throws IOException {
140    ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1,
141        5, TimeUnit.SECONDS,
142        new SynchronousQueue<>(),
143        Threads.newDaemonThreadFactory("test-hcm"));
144
145    Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
146    Connection con2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), otherPool);
147    // make sure the internally created ExecutorService is the one passed
148    assertTrue(otherPool == ((ConnectionImplementation) con2).getCurrentBatchPool());
149
150    final TableName tableName = TableName.valueOf(name.getMethodName());
151    TEST_UTIL.createTable(tableName, FAM_NAM).close();
152    Table table = con1.getTable(tableName, otherPool);
153
154    ExecutorService pool = null;
155
156    if(table instanceof HTable) {
157      HTable t = (HTable) table;
158      // make sure passing a pool to the getTable does not trigger creation of an internal pool
159      assertNull("Internal Thread pool should be null",
160        ((ConnectionImplementation) con1).getCurrentBatchPool());
161      // table should use the pool passed
162      assertTrue(otherPool == t.getPool());
163      t.close();
164
165      t = (HTable) con2.getTable(tableName);
166      // table should use the connectin's internal pool
167      assertTrue(otherPool == t.getPool());
168      t.close();
169
170      t = (HTable) con2.getTable(tableName);
171      // try other API too
172      assertTrue(otherPool == t.getPool());
173      t.close();
174
175      t = (HTable) con2.getTable(tableName);
176      // try other API too
177      assertTrue(otherPool == t.getPool());
178      t.close();
179
180      t = (HTable) con1.getTable(tableName);
181      pool = ((ConnectionImplementation) con1).getCurrentBatchPool();
182      // make sure an internal pool was created
183      assertNotNull("An internal Thread pool should have been created", pool);
184      // and that the table is using it
185      assertTrue(t.getPool() == pool);
186      t.close();
187
188      t = (HTable) con1.getTable(tableName);
189      // still using the *same* internal pool
190      assertTrue(t.getPool() == pool);
191      t.close();
192    } else {
193      table.close();
194    }
195
196    con1.close();
197
198    // if the pool was created on demand it should be closed upon connection close
199    if(pool != null) {
200      assertTrue(pool.isShutdown());
201    }
202
203    con2.close();
204    // if the pool is passed, it is not closed
205    assertFalse(otherPool.isShutdown());
206    otherPool.shutdownNow();
207  }
208
209  /**
210   * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object
211   * @throws IOException Unable to construct admin
212   */
213  @Test
214  public void testAdminFactory() throws IOException {
215    Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
216    Admin admin = con1.getAdmin();
217    assertTrue(admin.getConnection() == con1);
218    assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration());
219    con1.close();
220  }
221
222  // Fails too often!  Needs work.  HBASE-12558
223  // May only fail on non-linux machines? E.g. macosx.
224  @Ignore @Test (expected = RegionServerStoppedException.class)
225  // Depends on mulitcast messaging facility that seems broken in hbase2
226  // See  HBASE-19261 "ClusterStatusPublisher where Master could optionally broadcast notice of
227  // dead servers is broke"
228  public void testClusterStatus() throws Exception {
229    final TableName tableName = TableName.valueOf(name.getMethodName());
230    byte[] cf = "cf".getBytes();
231    byte[] rk = "rk1".getBytes();
232
233    JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
234    rs.waitForServerOnline();
235    final ServerName sn = rs.getRegionServer().getServerName();
236
237    Table t = TEST_UTIL.createTable(tableName, cf);
238    TEST_UTIL.waitTableAvailable(tableName);
239    TEST_UTIL.waitUntilNoRegionsInTransition();
240
241    final ConnectionImplementation hci =  (ConnectionImplementation)TEST_UTIL.getConnection();
242    try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
243      while (l.getRegionLocation(rk).getPort() != sn.getPort()) {
244        TEST_UTIL.getAdmin().move(l.getRegionLocation(rk).getRegionInfo().
245            getEncodedNameAsBytes(), Bytes.toBytes(sn.toString()));
246        TEST_UTIL.waitUntilNoRegionsInTransition();
247        hci.clearRegionCache(tableName);
248      }
249      Assert.assertNotNull(hci.clusterStatusListener);
250      TEST_UTIL.assertRegionOnServer(l.getRegionLocation(rk).getRegionInfo(), sn, 20000);
251    }
252
253    Put p1 = new Put(rk);
254    p1.addColumn(cf, "qual".getBytes(), "val".getBytes());
255    t.put(p1);
256
257    rs.getRegionServer().abort("I'm dead");
258
259    // We want the status to be updated. That's a least 10 second
260    TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
261      @Override
262      public boolean evaluate() throws Exception {
263        return TEST_UTIL.getHBaseCluster().getMaster().getServerManager().
264            getDeadServers().isDeadServer(sn);
265      }
266    });
267
268    TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
269      @Override
270      public boolean evaluate() throws Exception {
271        return hci.clusterStatusListener.isDeadServer(sn);
272      }
273    });
274
275    t.close();
276    hci.getClient(sn);  // will throw an exception: RegionServerStoppedException
277  }
278
279  /**
280   * Test that we can handle connection close: it will trigger a retry, but the calls will finish.
281   */
282  @Test
283  public void testConnectionCloseAllowsInterrupt() throws Exception {
284    testConnectionClose(true);
285  }
286
287  @Test
288  public void testConnectionNotAllowsInterrupt() throws Exception {
289    testConnectionClose(false);
290  }
291
292  private void testConnectionClose(boolean allowsInterrupt) throws Exception {
293    TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
294    TEST_UTIL.createTable(tableName, FAM_NAM).close();
295
296    TEST_UTIL.getAdmin().balancerSwitch(false, true);
297
298    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
299    // We want to work on a separate connection.
300    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
301    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot
302    c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries.
303    c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
304    c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt);
305    // to avoid the client to be stuck when do the Get
306    c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 10000);
307    c2.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10000);
308    c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
309
310    Connection connection = ConnectionFactory.createConnection(c2);
311    final Table table = connection.getTable(tableName);
312
313    Put put = new Put(ROW);
314    put.addColumn(FAM_NAM, ROW, ROW);
315    table.put(put);
316
317    // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
318    final AtomicInteger step = new AtomicInteger(0);
319
320    final AtomicReference<Throwable> failed = new AtomicReference<>(null);
321    Thread t = new Thread("testConnectionCloseThread") {
322      @Override
323      public void run() {
324        int done = 0;
325        try {
326          step.set(1);
327          while (step.get() == 1) {
328            Get get = new Get(ROW);
329            table.get(get);
330            done++;
331            if (done % 100 == 0)
332              LOG.info("done=" + done);
333            // without the sleep, will cause the exception for too many files in
334            // org.apache.hadoop.hdfs.server.datanode.DataXceiver
335            Thread.sleep(100);
336          }
337        } catch (Throwable t) {
338          failed.set(t);
339          LOG.error(t.toString(), t);
340        }
341        step.set(3);
342      }
343    };
344    t.start();
345    TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
346      @Override
347      public boolean evaluate() throws Exception {
348        return step.get() == 1;
349      }
350    });
351
352    ServerName sn;
353    try(RegionLocator rl = connection.getRegionLocator(tableName)) {
354      sn = rl.getRegionLocation(ROW).getServerName();
355    }
356    ConnectionImplementation conn =
357        (ConnectionImplementation) connection;
358    RpcClient rpcClient = conn.getRpcClient();
359
360    LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
361    for (int i = 0; i < 500; i++) {
362      rpcClient.cancelConnections(sn);
363      Thread.sleep(50);
364    }
365
366    step.compareAndSet(1, 2);
367    // The test may fail here if the thread doing the gets is stuck. The way to find
368    //  out what's happening is to look for the thread named 'testConnectionCloseThread'
369    TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() {
370      @Override
371      public boolean evaluate() throws Exception {
372        return step.get() == 3;
373      }
374    });
375    table.close();
376    connection.close();
377    Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
378  }
379
380  /**
381   * Test that connection can become idle without breaking everything.
382   */
383  @Test
384  public void testConnectionIdle() throws Exception {
385    final TableName tableName = TableName.valueOf(name.getMethodName());
386    TEST_UTIL.createTable(tableName, FAM_NAM).close();
387    int idleTime =  20000;
388    boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true);
389
390    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
391    // We want to work on a separate connection.
392    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
393    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed
394    c2.setInt(RpcClient.IDLE_TIME, idleTime);
395
396    Connection connection = ConnectionFactory.createConnection(c2);
397    final Table table = connection.getTable(tableName);
398
399    Put put = new Put(ROW);
400    put.addColumn(FAM_NAM, ROW, ROW);
401    table.put(put);
402
403    ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
404    mee.setValue(System.currentTimeMillis());
405    EnvironmentEdgeManager.injectEdge(mee);
406    LOG.info("first get");
407    table.get(new Get(ROW));
408
409    LOG.info("first get - changing the time & sleeping");
410    mee.incValue(idleTime + 1000);
411    Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle.
412                        // 1500 = sleep time in RpcClient#waitForWork + a margin
413
414    LOG.info("second get - connection has been marked idle in the middle");
415    // To check that the connection actually became idle would need to read some private
416    //  fields of RpcClient.
417    table.get(new Get(ROW));
418    mee.incValue(idleTime + 1000);
419
420    LOG.info("third get - connection is idle, but the reader doesn't know yet");
421    // We're testing here a special case:
422    //  time limit reached BUT connection not yet reclaimed AND a new call.
423    //  in this situation, we don't close the connection, instead we use it immediately.
424    // If we're very unlucky we can have a race condition in the test: the connection is already
425    //  under closing when we do the get, so we have an exception, and we don't retry as the
426    //  retry number is 1. The probability is very very low, and seems acceptable for now. It's
427    //  a test issue only.
428    table.get(new Get(ROW));
429
430    LOG.info("we're done - time will change back");
431
432    table.close();
433
434    connection.close();
435    EnvironmentEdgeManager.reset();
436    TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true);
437  }
438
439    /**
440     * Test that the connection to the dead server is cut immediately when we receive the
441     *  notification.
442     * @throws Exception
443     */
444  @Test
445  public void testConnectionCut() throws Exception {
446    final TableName tableName = TableName.valueOf(name.getMethodName());
447
448    TEST_UTIL.createTable(tableName, FAM_NAM).close();
449    boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true);
450
451    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
452    // We want to work on a separate connection.
453    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
454    // try only once w/o any retry
455    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
456    c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000);
457
458    final Connection connection = ConnectionFactory.createConnection(c2);
459    final Table table = connection.getTable(tableName);
460
461    Put p = new Put(FAM_NAM);
462    p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM);
463    table.put(p);
464
465    final ConnectionImplementation hci =  (ConnectionImplementation) connection;
466
467    final HRegionLocation loc;
468    try(RegionLocator rl = connection.getRegionLocator(tableName)) {
469      loc = rl.getRegionLocation(FAM_NAM);
470    }
471
472    Get get = new Get(FAM_NAM);
473    Assert.assertNotNull(table.get(get));
474
475    get = new Get(FAM_NAM);
476    get.setFilter(new BlockingFilter());
477
478    // This thread will mark the server as dead while we're waiting during a get.
479    Thread t = new Thread() {
480      @Override
481      public void run() {
482        synchronized (syncBlockingFilter) {
483          try {
484            syncBlockingFilter.wait();
485          } catch (InterruptedException e) {
486            throw new RuntimeException(e);
487          }
488        }
489        hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName());
490      }
491    };
492
493    t.start();
494    try {
495      table.get(get);
496      Assert.fail();
497    } catch (IOException expected) {
498      LOG.debug("Received: " + expected);
499      Assert.assertFalse(expected instanceof SocketTimeoutException);
500      Assert.assertFalse(syncBlockingFilter.get());
501    } finally {
502      syncBlockingFilter.set(true);
503      t.join();
504      TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true);
505    }
506
507    table.close();
508    connection.close();
509  }
510
511  protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false);
512
513  public static class BlockingFilter extends FilterBase {
514    @Override
515    public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
516      int i = 0;
517      while (i++ < 1000 && !syncBlockingFilter.get()) {
518        synchronized (syncBlockingFilter) {
519          syncBlockingFilter.notifyAll();
520        }
521        Threads.sleep(100);
522      }
523      syncBlockingFilter.set(true);
524      return false;
525    }
526    @Override
527    public ReturnCode filterCell(final Cell ignored) throws IOException {
528      return ReturnCode.INCLUDE;
529    }
530
531    public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException{
532      return new BlockingFilter();
533    }
534  }
535
536  /**
537   * Test that when we delete a location using the first row of a region
538   * that we really delete it.
539   * @throws Exception
540   */
541  @Test
542  public void testRegionCaching() throws Exception {
543    TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAM_NAM).close();
544    Configuration conf =  new Configuration(TEST_UTIL.getConfiguration());
545    // test with no retry, or client cache will get updated after the first failure
546    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
547    Connection connection = ConnectionFactory.createConnection(conf);
548    final Table table = connection.getTable(TABLE_NAME);
549
550    TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
551    Put put = new Put(ROW);
552    put.addColumn(FAM_NAM, ROW, ROW);
553    table.put(put);
554
555    ConnectionImplementation conn = (ConnectionImplementation) connection;
556
557    assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
558
559    // Here we mess with the cached location making it so the region at TABLE_NAME, ROW is at
560    // a location where the port is current port number +1 -- i.e. a non-existent location.
561    HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
562    final int nextPort = loc.getPort() + 1;
563    conn.updateCachedLocation(loc.getRegionInfo(), loc.getServerName(),
564        ServerName.valueOf("127.0.0.1", nextPort,
565        HConstants.LATEST_TIMESTAMP), HConstants.LATEST_TIMESTAMP);
566    Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW)
567      .getRegionLocation().getPort(), nextPort);
568
569    conn.clearRegionCache(TABLE_NAME, ROW.clone());
570    RegionLocations rl = conn.getCachedLocation(TABLE_NAME, ROW);
571    assertNull("What is this location?? " + rl, rl);
572
573    // We're now going to move the region and check that it works for the client
574    // First a new put to add the location in the cache
575    conn.clearRegionCache(TABLE_NAME);
576    Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME));
577    Put put2 = new Put(ROW);
578    put2.addColumn(FAM_NAM, ROW, ROW);
579    table.put(put2);
580    assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
581    assertNotNull(conn.getCachedLocation(TableName.valueOf(TABLE_NAME.getName()), ROW.clone()));
582
583    TEST_UTIL.getAdmin().setBalancerRunning(false, false);
584    HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
585
586    // We can wait for all regions to be online, that makes log reading easier when debugging
587    TEST_UTIL.waitUntilNoRegionsInTransition();
588
589    // Now moving the region to the second server
590    HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
591    byte[] regionName = toMove.getRegionInfo().getRegionName();
592    byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
593
594    // Choose the other server.
595    int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
596    int destServerId = curServerId == 0? 1: 0;
597
598    HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
599    HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
600
601    ServerName destServerName = destServer.getServerName();
602
603    // Check that we are in the expected state
604    Assert.assertTrue(curServer != destServer);
605    Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName()));
606    Assert.assertFalse( toMove.getPort() == destServerName.getPort());
607    Assert.assertNotNull(curServer.getOnlineRegion(regionName));
608    Assert.assertNull(destServer.getOnlineRegion(regionName));
609    Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
610        getAssignmentManager().hasRegionsInTransition());
611
612    // Moving. It's possible that we don't have all the regions online at this point, so
613    //  the test must depend only on the region we're looking at.
614    LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
615    TEST_UTIL.getAdmin().move(
616      toMove.getRegionInfo().getEncodedNameAsBytes(),
617      destServerName.getServerName().getBytes()
618    );
619
620    while (destServer.getOnlineRegion(regionName) == null ||
621        destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
622        curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
623        master.getAssignmentManager().hasRegionsInTransition()) {
624      // wait for the move to be finished
625      Thread.sleep(1);
626    }
627
628    LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
629
630    // Check our new state.
631    Assert.assertNull(curServer.getOnlineRegion(regionName));
632    Assert.assertNotNull(destServer.getOnlineRegion(regionName));
633    Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
634    Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
635
636
637    // Cache was NOT updated and points to the wrong server
638    Assert.assertFalse(
639        conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation()
640          .getPort() == destServerName.getPort());
641
642    // This part relies on a number of tries equals to 1.
643    // We do a put and expect the cache to be updated, even if we don't retry
644    LOG.info("Put starting");
645    Put put3 = new Put(ROW);
646    put3.addColumn(FAM_NAM, ROW, ROW);
647    try {
648      table.put(put3);
649      Assert.fail("Unreachable point");
650    } catch (RetriesExhaustedWithDetailsException e) {
651      LOG.info("Put done, exception caught: " + e.getClass());
652      Assert.assertEquals(1, e.getNumExceptions());
653      Assert.assertEquals(1, e.getCauses().size());
654      Assert.assertArrayEquals(ROW, e.getRow(0).getRow());
655
656      // Check that we unserialized the exception as expected
657      Throwable cause = ClientExceptionsUtil.findException(e.getCause(0));
658      Assert.assertNotNull(cause);
659      Assert.assertTrue(cause instanceof RegionMovedException);
660    } catch (RetriesExhaustedException ree) {
661      // hbase2 throws RetriesExhaustedException instead of RetriesExhaustedWithDetailsException
662      // as hbase1 used to do. Keep an eye on this to see if this changed behavior is an issue.
663      LOG.info("Put done, exception caught: " + ree.getClass());
664      Throwable cause = ClientExceptionsUtil.findException(ree.getCause());
665      Assert.assertNotNull(cause);
666      Assert.assertTrue(cause instanceof RegionMovedException);
667    }
668    Assert.assertNotNull("Cached connection is null", conn.getCachedLocation(TABLE_NAME, ROW));
669    Assert.assertEquals(
670        "Previous server was " + curServer.getServerName().getHostAndPort(),
671        destServerName.getPort(),
672        conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
673
674    Assert.assertFalse(destServer.getRegionsInTransitionInRS()
675      .containsKey(encodedRegionNameBytes));
676    Assert.assertFalse(curServer.getRegionsInTransitionInRS()
677      .containsKey(encodedRegionNameBytes));
678
679    // We move it back to do another test with a scan
680    LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
681    TEST_UTIL.getAdmin().move(
682      toMove.getRegionInfo().getEncodedNameAsBytes(),
683      curServer.getServerName().getServerName().getBytes()
684    );
685
686    while (curServer.getOnlineRegion(regionName) == null ||
687        destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
688        curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
689        master.getAssignmentManager().hasRegionsInTransition()) {
690      // wait for the move to be finished
691      Thread.sleep(1);
692    }
693
694    // Check our new state.
695    Assert.assertNotNull(curServer.getOnlineRegion(regionName));
696    Assert.assertNull(destServer.getOnlineRegion(regionName));
697    LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString());
698
699    // Cache was NOT updated and points to the wrong server
700    Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() ==
701      curServer.getServerName().getPort());
702
703    Scan sc = new Scan();
704    sc.setStopRow(ROW);
705    sc.setStartRow(ROW);
706
707    // The scanner takes the max retries from the connection configuration, not the table as
708    // the put.
709    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
710
711    try {
712      ResultScanner rs = table.getScanner(sc);
713      while (rs.next() != null) {
714      }
715      Assert.fail("Unreachable point");
716    } catch (RetriesExhaustedException e) {
717      LOG.info("Scan done, expected exception caught: " + e.getClass());
718    }
719
720    // Cache is updated with the right value.
721    Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
722    Assert.assertEquals(
723      "Previous server was "+destServer.getServerName().getHostAndPort(),
724      curServer.getServerName().getPort(),
725      conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
726
727    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
728    table.close();
729    connection.close();
730  }
731
732  /**
733   * Test that Connection or Pool are not closed when managed externally
734   * @throws Exception
735   */
736  @Test
737  public void testConnectionManagement() throws Exception{
738    Table table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM);
739    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
740    Table table = conn.getTable(TABLE_NAME1);
741    table.close();
742    assertFalse(conn.isClosed());
743    if(table instanceof HTable) {
744      assertFalse(((HTable) table).getPool().isShutdown());
745    }
746    table = conn.getTable(TABLE_NAME1);
747    table.close();
748    if(table instanceof HTable) {
749      assertFalse(((HTable) table).getPool().isShutdown());
750    }
751    conn.close();
752    if(table instanceof HTable) {
753      assertTrue(((HTable) table).getPool().isShutdown());
754    }
755    table0.close();
756  }
757
758  /**
759   * Test that stale cache updates don't override newer cached values.
760   */
761  @Test
762  public void testCacheSeqNums() throws Exception{
763    Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME2, FAM_NAM);
764    Put put = new Put(ROW);
765    put.addColumn(FAM_NAM, ROW, ROW);
766    table.put(put);
767    ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection();
768
769    HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
770    assertNotNull(location);
771
772    ServerName anySource = ServerName.valueOf(location.getHostname(), location.getPort() - 1, 0L);
773
774    // Same server as already in cache reporting - overwrites any value despite seqNum.
775    int nextPort = location.getPort() + 1;
776    conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
777        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
778    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
779    Assert.assertEquals(nextPort, location.getPort());
780
781    // No source specified - same.
782    nextPort = location.getPort() + 1;
783    conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
784        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
785    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
786    Assert.assertEquals(nextPort, location.getPort());
787
788    // Higher seqNum - overwrites lower seqNum.
789    nextPort = location.getPort() + 1;
790    conn.updateCachedLocation(location.getRegionInfo(), anySource,
791        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() + 1);
792    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
793    Assert.assertEquals(nextPort, location.getPort());
794
795    // Lower seqNum - does not overwrite higher seqNum.
796    nextPort = location.getPort() + 1;
797    conn.updateCachedLocation(location.getRegionInfo(), anySource,
798        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
799    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
800    Assert.assertEquals(nextPort - 1, location.getPort());
801    table.close();
802  }
803
804  @Test
805  public void testClosing() throws Exception {
806    Configuration configuration =
807      new Configuration(TEST_UTIL.getConfiguration());
808    configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
809      String.valueOf(ThreadLocalRandom.current().nextInt()));
810
811    // as connection caching is going away, now we're just testing
812    // that closed connection does actually get closed.
813
814    Connection c1 = ConnectionFactory.createConnection(configuration);
815    Connection c2 = ConnectionFactory.createConnection(configuration);
816    // no caching, different connections
817    assertTrue(c1 != c2);
818
819    // closing independently
820    c1.close();
821    assertTrue(c1.isClosed());
822    assertFalse(c2.isClosed());
823
824    c2.close();
825    assertTrue(c2.isClosed());
826  }
827
828  /**
829   * Trivial test to verify that nobody messes with
830   * {@link ConnectionFactory#createConnection(Configuration)}
831   */
832  @Test
833  public void testCreateConnection() throws Exception {
834    Configuration configuration = TEST_UTIL.getConfiguration();
835    Connection c1 = ConnectionFactory.createConnection(configuration);
836    Connection c2 = ConnectionFactory.createConnection(configuration);
837    // created from the same configuration, yet they are different
838    assertTrue(c1 != c2);
839    assertTrue(c1.getConfiguration() == c2.getConfiguration());
840  }
841
842  /**
843   * This test checks that one can connect to the cluster with only the
844   *  ZooKeeper quorum set. Other stuff like master address will be read
845   *  from ZK by the client.
846   */
847  @Test
848  public void testConnection() throws Exception{
849    // We create an empty config and add the ZK address.
850    Configuration c = new Configuration();
851    c.set(HConstants.ZOOKEEPER_QUORUM,
852      TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
853    c.set(HConstants.ZOOKEEPER_CLIENT_PORT,
854      TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
855
856    // This should be enough to connect
857    ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(c);
858    assertTrue(conn.isMasterRunning());
859    conn.close();
860  }
861
862  private int setNumTries(ConnectionImplementation hci, int newVal) throws Exception {
863    Field numTries = hci.getClass().getDeclaredField("numTries");
864    numTries.setAccessible(true);
865    Field modifiersField = Field.class.getDeclaredField("modifiers");
866    modifiersField.setAccessible(true);
867    modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
868    final int prevNumRetriesVal = (Integer)numTries.get(hci);
869    numTries.set(hci, newVal);
870
871    return prevNumRetriesVal;
872  }
873
874  @Test
875  public void testMulti() throws Exception {
876    Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME3, FAM_NAM);
877    try {
878      ConnectionImplementation conn = (ConnectionImplementation)TEST_UTIL.getConnection();
879
880      // We're now going to move the region and check that it works for the client
881      // First a new put to add the location in the cache
882      conn.clearRegionCache(TABLE_NAME3);
883      Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3));
884
885      TEST_UTIL.getAdmin().setBalancerRunning(false, false);
886      HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
887
888      // We can wait for all regions to be online, that makes log reading easier when debugging
889      TEST_UTIL.waitUntilNoRegionsInTransition();
890
891      Put put = new Put(ROW_X);
892      put.addColumn(FAM_NAM, ROW_X, ROW_X);
893      table.put(put);
894
895      // Now moving the region to the second server
896      HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation();
897      byte[] regionName = toMove.getRegionInfo().getRegionName();
898      byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
899
900      // Choose the other server.
901      int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
902      int destServerId = (curServerId == 0 ? 1 : 0);
903
904      HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
905      HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
906
907      ServerName destServerName = destServer.getServerName();
908      ServerName metaServerName = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
909
910       //find another row in the cur server that is less than ROW_X
911      List<HRegion> regions = curServer.getRegions(TABLE_NAME3);
912      byte[] otherRow = null;
913       for (Region region : regions) {
914         if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
915             && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) {
916           otherRow = region.getRegionInfo().getStartKey();
917           break;
918         }
919       }
920      assertNotNull(otherRow);
921      // If empty row, set it to first row.-f
922      if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
923      Put put2 = new Put(otherRow);
924      put2.addColumn(FAM_NAM, otherRow, otherRow);
925      table.put(put2); //cache put2's location
926
927      // Check that we are in the expected state
928      Assert.assertTrue(curServer != destServer);
929      Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName());
930      Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
931      Assert.assertNotNull(curServer.getOnlineRegion(regionName));
932      Assert.assertNull(destServer.getOnlineRegion(regionName));
933      Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
934          getAssignmentManager().hasRegionsInTransition());
935
936       // Moving. It's possible that we don't have all the regions online at this point, so
937      //  the test depends only on the region we're looking at.
938      LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
939      TEST_UTIL.getAdmin().move(
940          toMove.getRegionInfo().getEncodedNameAsBytes(),
941          destServerName.getServerName().getBytes()
942      );
943
944      while (destServer.getOnlineRegion(regionName) == null ||
945          destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
946          curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
947          master.getAssignmentManager().hasRegionsInTransition()) {
948        // wait for the move to be finished
949        Thread.sleep(1);
950      }
951
952      LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
953
954      // Check our new state.
955      Assert.assertNull(curServer.getOnlineRegion(regionName));
956      Assert.assertNotNull(destServer.getOnlineRegion(regionName));
957      Assert.assertFalse(destServer.getRegionsInTransitionInRS()
958          .containsKey(encodedRegionNameBytes));
959      Assert.assertFalse(curServer.getRegionsInTransitionInRS()
960          .containsKey(encodedRegionNameBytes));
961
962
963       // Cache was NOT updated and points to the wrong server
964      Assert.assertFalse(
965          conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation()
966              .getPort() == destServerName.getPort());
967
968      // Hijack the number of retry to fail after 2 tries
969      final int prevNumRetriesVal = setNumTries(conn, 2);
970
971      Put put3 = new Put(ROW_X);
972      put3.addColumn(FAM_NAM, ROW_X, ROW_X);
973      Put put4 = new Put(otherRow);
974      put4.addColumn(FAM_NAM, otherRow, otherRow);
975
976      // do multi
977      ArrayList<Put> actions = Lists.newArrayList(put4, put3);
978      table.batch(actions, null); // first should be a valid row,
979      // second we get RegionMovedException.
980
981      setNumTries(conn, prevNumRetriesVal);
982    } finally {
983      table.close();
984    }
985  }
986
987  @Test
988  public void testErrorBackoffTimeCalculation() throws Exception {
989    // TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not.
990    final long ANY_PAUSE = 100;
991    ServerName location = ServerName.valueOf("127.0.0.1", 1, 0);
992    ServerName diffLocation = ServerName.valueOf("127.0.0.1", 2, 0);
993
994    ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
995    EnvironmentEdgeManager.injectEdge(timeMachine);
996    try {
997      long largeAmountOfTime = ANY_PAUSE * 1000;
998      ConnectionImplementation.ServerErrorTracker tracker =
999          new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100);
1000
1001      // The default backoff is 0.
1002      assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
1003
1004      // Check some backoff values from HConstants sequence.
1005      tracker.reportServerError(location);
1006      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
1007        tracker.calculateBackoffTime(location, ANY_PAUSE));
1008      tracker.reportServerError(location);
1009      tracker.reportServerError(location);
1010      tracker.reportServerError(location);
1011      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[3],
1012        tracker.calculateBackoffTime(location, ANY_PAUSE));
1013
1014      // All of this shouldn't affect backoff for different location.
1015      assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1016      tracker.reportServerError(diffLocation);
1017      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
1018        tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1019
1020      // Check with different base.
1021      assertEqualsWithJitter(ANY_PAUSE * 2 * HConstants.RETRY_BACKOFF[3],
1022          tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
1023    } finally {
1024      EnvironmentEdgeManager.reset();
1025    }
1026  }
1027
1028  private static void assertEqualsWithJitter(long expected, long actual) {
1029    assertEqualsWithJitter(expected, actual, expected);
1030  }
1031
1032  private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) {
1033    assertTrue("Value not within jitter: " + expected + " vs " + actual,
1034      Math.abs(actual - expected) <= (0.01f * jitterBase));
1035  }
1036
1037  @Test
1038  public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException {
1039    Configuration config = new Configuration(TEST_UTIL.getConfiguration());
1040
1041    final TableName tableName = TableName.valueOf(name.getMethodName());
1042    TEST_UTIL.createTable(tableName, new byte[][] {FAM_NAM}).close();
1043
1044    Connection connection = ConnectionFactory.createConnection(config);
1045    Table table = connection.getTable(tableName);
1046
1047    // this will cache the meta location and table's region location
1048    table.get(new Get(Bytes.toBytes("foo")));
1049
1050    // restart HBase
1051    TEST_UTIL.shutdownMiniHBaseCluster();
1052    TEST_UTIL.restartHBaseCluster(2);
1053    // this should be able to discover new locations for meta and table's region
1054    table.get(new Get(Bytes.toBytes("foo")));
1055    TEST_UTIL.deleteTable(tableName);
1056    table.close();
1057    connection.close();
1058  }
1059
1060  @Test
1061  public void testLocateRegionsWithRegionReplicas() throws IOException {
1062    int regionReplication = 3;
1063    byte[] family = Bytes.toBytes("cf");
1064    TableName tableName = TableName.valueOf(name.getMethodName());
1065
1066    // Create a table with region replicas
1067    TableDescriptorBuilder builder = TableDescriptorBuilder
1068      .newBuilder(tableName)
1069      .setRegionReplication(regionReplication)
1070      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
1071    TEST_UTIL.getAdmin().createTable(builder.build());
1072
1073    try (ConnectionImplementation con = (ConnectionImplementation) ConnectionFactory.
1074      createConnection(TEST_UTIL.getConfiguration())) {
1075      // Get locations of the regions of the table
1076      List<HRegionLocation> locations = con.locateRegions(tableName, false, false);
1077
1078      // The size of the returned locations should be 3
1079      assertEquals(regionReplication, locations.size());
1080
1081      // The replicaIds of the returned locations should be 0, 1 and 2
1082      Set<Integer> expectedReplicaIds = IntStream.range(0, regionReplication).
1083        boxed().collect(Collectors.toSet());
1084      for (HRegionLocation location : locations) {
1085        assertTrue(expectedReplicaIds.remove(location.getRegion().getReplicaId()));
1086      }
1087    } finally {
1088      TEST_UTIL.deleteTable(tableName);
1089    }
1090  }
1091
1092  @Test
1093  public void testMetaLookupThreadPoolCreated() throws Exception {
1094    final TableName tableName = TableName.valueOf(name.getMethodName());
1095    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
1096    if (TEST_UTIL.getAdmin().tableExists(tableName)) {
1097      TEST_UTIL.getAdmin().disableTable(tableName);
1098      TEST_UTIL.getAdmin().deleteTable(tableName);
1099    }
1100    try (Table htable = TEST_UTIL.createTable(tableName, FAMILIES)) {
1101      byte[] row = Bytes.toBytes("test");
1102      ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection());
1103      // check that metalookup pool would get created
1104      c.relocateRegion(tableName, row);
1105      ExecutorService ex = c.getCurrentMetaLookupPool();
1106      assertNotNull(ex);
1107    }
1108  }
1109
1110  // There is no assertion, but you need to confirm that there is no resource leak output from netty
1111  @Test
1112  public void testCancelConnectionMemoryLeak() throws IOException, InterruptedException {
1113    TableName tableName = TableName.valueOf(name.getMethodName());
1114    TEST_UTIL.createTable(tableName, FAM_NAM).close();
1115    TEST_UTIL.getAdmin().balancerSwitch(false, true);
1116    try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
1117      Table table = connection.getTable(tableName)) {
1118      table.get(new Get(Bytes.toBytes("1")));
1119      ServerName sn = TEST_UTIL.getRSForFirstRegionInTable(tableName).getServerName();
1120      RpcClient rpcClient = ((ConnectionImplementation) connection).getRpcClient();
1121      rpcClient.cancelConnections(sn);
1122      Thread.sleep(1000);
1123      System.gc();
1124      Thread.sleep(1000);
1125    }
1126  }
1127}