001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.reflect.Field;
028import java.lang.reflect.Modifier;
029import java.net.SocketTimeoutException;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.Set;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.SynchronousQueue;
035import java.util.concurrent.ThreadLocalRandom;
036import java.util.concurrent.ThreadPoolExecutor;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.concurrent.atomic.AtomicReference;
041import java.util.stream.Collectors;
042import java.util.stream.IntStream;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.HBaseClassTestRule;
046import org.apache.hadoop.hbase.HBaseTestingUtility;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.HRegionLocation;
049import org.apache.hadoop.hbase.RegionLocations;
050import org.apache.hadoop.hbase.ServerName;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.Waiter;
053import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
054import org.apache.hadoop.hbase.exceptions.DeserializationException;
055import org.apache.hadoop.hbase.exceptions.RegionMovedException;
056import org.apache.hadoop.hbase.filter.Filter;
057import org.apache.hadoop.hbase.filter.FilterBase;
058import org.apache.hadoop.hbase.ipc.RpcClient;
059import org.apache.hadoop.hbase.master.HMaster;
060import org.apache.hadoop.hbase.regionserver.HRegion;
061import org.apache.hadoop.hbase.regionserver.HRegionServer;
062import org.apache.hadoop.hbase.regionserver.Region;
063import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
064import org.apache.hadoop.hbase.testclassification.LargeTests;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.hadoop.hbase.util.JVMClusterUtil;
068import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
069import org.apache.hadoop.hbase.util.Threads;
070import org.junit.AfterClass;
071import org.junit.Assert;
072import org.junit.BeforeClass;
073import org.junit.ClassRule;
074import org.junit.Ignore;
075import org.junit.Rule;
076import org.junit.Test;
077import org.junit.experimental.categories.Category;
078import org.junit.rules.TestName;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
083
084/**
085 * This class is for testing HBaseConnectionManager features
086 */
087@Category({LargeTests.class})
088public class TestConnectionImplementation {
089
090  @ClassRule
091  public static final HBaseClassTestRule CLASS_RULE =
092      HBaseClassTestRule.forClass(TestConnectionImplementation.class);
093
094  private static final Logger LOG = LoggerFactory.getLogger(TestConnectionImplementation.class);
095  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
096  private static final TableName TABLE_NAME =
097      TableName.valueOf("test");
098  private static final TableName TABLE_NAME1 =
099      TableName.valueOf("test1");
100  private static final TableName TABLE_NAME2 =
101      TableName.valueOf("test2");
102  private static final TableName TABLE_NAME3 =
103      TableName.valueOf("test3");
104  private static final byte[] FAM_NAM = Bytes.toBytes("f");
105  private static final byte[] ROW = Bytes.toBytes("bbb");
106  private static final byte[] ROW_X = Bytes.toBytes("xxx");
107  private static final int RPC_RETRY = 5;
108
109  @Rule
110  public TestName name = new TestName();
111
112  @BeforeClass
113  public static void setUpBeforeClass() throws Exception {
114    TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
115    // Up the handlers; this test needs more than usual.
116    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
117    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
118    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 3);
119    TEST_UTIL.startMiniCluster(2);
120
121  }
122
123  @AfterClass
124  public static void tearDownAfterClass() throws Exception {
125    TEST_UTIL.shutdownMiniCluster();
126  }
127
128  @Test
129  public void testClusterConnection() throws IOException {
130    ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1,
131        5, TimeUnit.SECONDS,
132        new SynchronousQueue<>(),
133        Threads.newDaemonThreadFactory("test-hcm"));
134
135    Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
136    Connection con2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), otherPool);
137    // make sure the internally created ExecutorService is the one passed
138    assertTrue(otherPool == ((ConnectionImplementation) con2).getCurrentBatchPool());
139
140    final TableName tableName = TableName.valueOf(name.getMethodName());
141    TEST_UTIL.createTable(tableName, FAM_NAM).close();
142    Table table = con1.getTable(tableName, otherPool);
143
144    ExecutorService pool = null;
145
146    if(table instanceof HTable) {
147      HTable t = (HTable) table;
148      // make sure passing a pool to the getTable does not trigger creation of an internal pool
149      assertNull("Internal Thread pool should be null",
150        ((ConnectionImplementation) con1).getCurrentBatchPool());
151      // table should use the pool passed
152      assertTrue(otherPool == t.getPool());
153      t.close();
154
155      t = (HTable) con2.getTable(tableName);
156      // table should use the connectin's internal pool
157      assertTrue(otherPool == t.getPool());
158      t.close();
159
160      t = (HTable) con2.getTable(tableName);
161      // try other API too
162      assertTrue(otherPool == t.getPool());
163      t.close();
164
165      t = (HTable) con2.getTable(tableName);
166      // try other API too
167      assertTrue(otherPool == t.getPool());
168      t.close();
169
170      t = (HTable) con1.getTable(tableName);
171      pool = ((ConnectionImplementation) con1).getCurrentBatchPool();
172      // make sure an internal pool was created
173      assertNotNull("An internal Thread pool should have been created", pool);
174      // and that the table is using it
175      assertTrue(t.getPool() == pool);
176      t.close();
177
178      t = (HTable) con1.getTable(tableName);
179      // still using the *same* internal pool
180      assertTrue(t.getPool() == pool);
181      t.close();
182    } else {
183      table.close();
184    }
185
186    con1.close();
187
188    // if the pool was created on demand it should be closed upon connection close
189    if(pool != null) {
190      assertTrue(pool.isShutdown());
191    }
192
193    con2.close();
194    // if the pool is passed, it is not closed
195    assertFalse(otherPool.isShutdown());
196    otherPool.shutdownNow();
197  }
198
199  /**
200   * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object
201   * @throws IOException Unable to construct admin
202   */
203  @Test
204  public void testAdminFactory() throws IOException {
205    Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
206    Admin admin = con1.getAdmin();
207    assertTrue(admin.getConnection() == con1);
208    assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration());
209    con1.close();
210  }
211
212  // Fails too often!  Needs work.  HBASE-12558
213  // May only fail on non-linux machines? E.g. macosx.
214  @Ignore @Test (expected = RegionServerStoppedException.class)
215  // Depends on mulitcast messaging facility that seems broken in hbase2
216  // See  HBASE-19261 "ClusterStatusPublisher where Master could optionally broadcast notice of
217  // dead servers is broke"
218  public void testClusterStatus() throws Exception {
219    final TableName tableName = TableName.valueOf(name.getMethodName());
220    byte[] cf = Bytes.toBytes("cf");
221    byte[] rk = Bytes.toBytes("rk1");
222
223    JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
224    rs.waitForServerOnline();
225    final ServerName sn = rs.getRegionServer().getServerName();
226
227    Table t = TEST_UTIL.createTable(tableName, cf);
228    TEST_UTIL.waitTableAvailable(tableName);
229    TEST_UTIL.waitUntilNoRegionsInTransition();
230
231    final ConnectionImplementation hci =  (ConnectionImplementation)TEST_UTIL.getConnection();
232    try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
233      while (l.getRegionLocation(rk).getPort() != sn.getPort()) {
234        TEST_UTIL.getAdmin().move(l.getRegionLocation(rk).getRegionInfo().getEncodedNameAsBytes(),
235          sn);
236        TEST_UTIL.waitUntilNoRegionsInTransition();
237        hci.clearRegionCache(tableName);
238      }
239      Assert.assertNotNull(hci.clusterStatusListener);
240      TEST_UTIL.assertRegionOnServer(l.getRegionLocation(rk).getRegionInfo(), sn, 20000);
241    }
242
243    Put p1 = new Put(rk);
244    p1.addColumn(cf, Bytes.toBytes("qual"), Bytes.toBytes("val"));
245    t.put(p1);
246
247    rs.getRegionServer().abort("I'm dead");
248
249    // We want the status to be updated. That's a least 10 second
250    TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
251      @Override
252      public boolean evaluate() throws Exception {
253        return TEST_UTIL.getHBaseCluster().getMaster().getServerManager().
254            getDeadServers().isDeadServer(sn);
255      }
256    });
257
258    TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
259      @Override
260      public boolean evaluate() throws Exception {
261        return hci.clusterStatusListener.isDeadServer(sn);
262      }
263    });
264
265    t.close();
266    hci.getClient(sn);  // will throw an exception: RegionServerStoppedException
267  }
268
269  /**
270   * Test that we can handle connection close: it will trigger a retry, but the calls will finish.
271   */
272  @Test
273  public void testConnectionCloseAllowsInterrupt() throws Exception {
274    testConnectionClose(true);
275  }
276
277  @Test
278  public void testConnectionNotAllowsInterrupt() throws Exception {
279    testConnectionClose(false);
280  }
281
282  private void testConnectionClose(boolean allowsInterrupt) throws Exception {
283    TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
284    TEST_UTIL.createTable(tableName, FAM_NAM).close();
285
286    boolean previousBalance = TEST_UTIL.getAdmin().balancerSwitch(false, true);
287
288    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
289    // We want to work on a separate connection.
290    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
291    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot
292    c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries.
293    c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
294    c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt);
295    // to avoid the client to be stuck when do the Get
296    c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 10000);
297    c2.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10000);
298    c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
299
300    Connection connection = ConnectionFactory.createConnection(c2);
301    final Table table = connection.getTable(tableName);
302
303    Put put = new Put(ROW);
304    put.addColumn(FAM_NAM, ROW, ROW);
305    table.put(put);
306
307    // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
308    final AtomicInteger step = new AtomicInteger(0);
309
310    final AtomicReference<Throwable> failed = new AtomicReference<>(null);
311    Thread t = new Thread("testConnectionCloseThread") {
312      @Override
313      public void run() {
314        int done = 0;
315        try {
316          step.set(1);
317          while (step.get() == 1) {
318            Get get = new Get(ROW);
319            table.get(get);
320            done++;
321            if (done % 100 == 0)
322              LOG.info("done=" + done);
323            // without the sleep, will cause the exception for too many files in
324            // org.apache.hadoop.hdfs.server.datanode.DataXceiver
325            Thread.sleep(100);
326          }
327        } catch (Throwable t) {
328          failed.set(t);
329          LOG.error(t.toString(), t);
330        }
331        step.set(3);
332      }
333    };
334    t.start();
335    TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
336      @Override
337      public boolean evaluate() throws Exception {
338        return step.get() == 1;
339      }
340    });
341
342    ServerName sn;
343    try(RegionLocator rl = connection.getRegionLocator(tableName)) {
344      sn = rl.getRegionLocation(ROW).getServerName();
345    }
346    ConnectionImplementation conn =
347        (ConnectionImplementation) connection;
348    RpcClient rpcClient = conn.getRpcClient();
349
350    LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
351    for (int i = 0; i < 5000; i++) {
352      rpcClient.cancelConnections(sn);
353      Thread.sleep(5);
354    }
355
356    step.compareAndSet(1, 2);
357    // The test may fail here if the thread doing the gets is stuck. The way to find
358    //  out what's happening is to look for the thread named 'testConnectionCloseThread'
359    TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() {
360      @Override
361      public boolean evaluate() throws Exception {
362        return step.get() == 3;
363      }
364    });
365    table.close();
366    connection.close();
367    Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
368    TEST_UTIL.getAdmin().balancerSwitch(previousBalance, true);
369  }
370
371  /**
372   * Test that connection can become idle without breaking everything.
373   */
374  @Test
375  public void testConnectionIdle() throws Exception {
376    final TableName tableName = TableName.valueOf(name.getMethodName());
377    TEST_UTIL.createTable(tableName, FAM_NAM).close();
378    int idleTime =  20000;
379    boolean previousBalance = TEST_UTIL.getAdmin().balancerSwitch(false, true);
380
381    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
382    // We want to work on a separate connection.
383    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
384    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed
385    c2.setInt(RpcClient.IDLE_TIME, idleTime);
386
387    Connection connection = ConnectionFactory.createConnection(c2);
388    final Table table = connection.getTable(tableName);
389
390    Put put = new Put(ROW);
391    put.addColumn(FAM_NAM, ROW, ROW);
392    table.put(put);
393
394    ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
395    mee.setValue(System.currentTimeMillis());
396    EnvironmentEdgeManager.injectEdge(mee);
397    LOG.info("first get");
398    table.get(new Get(ROW));
399
400    LOG.info("first get - changing the time & sleeping");
401    mee.incValue(idleTime + 1000);
402    Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle.
403                        // 1500 = sleep time in RpcClient#waitForWork + a margin
404
405    LOG.info("second get - connection has been marked idle in the middle");
406    // To check that the connection actually became idle would need to read some private
407    //  fields of RpcClient.
408    table.get(new Get(ROW));
409    mee.incValue(idleTime + 1000);
410
411    LOG.info("third get - connection is idle, but the reader doesn't know yet");
412    // We're testing here a special case:
413    //  time limit reached BUT connection not yet reclaimed AND a new call.
414    //  in this situation, we don't close the connection, instead we use it immediately.
415    // If we're very unlucky we can have a race condition in the test: the connection is already
416    //  under closing when we do the get, so we have an exception, and we don't retry as the
417    //  retry number is 1. The probability is very very low, and seems acceptable for now. It's
418    //  a test issue only.
419    table.get(new Get(ROW));
420
421    LOG.info("we're done - time will change back");
422
423    table.close();
424
425    connection.close();
426    EnvironmentEdgeManager.reset();
427    TEST_UTIL.getAdmin().balancerSwitch(previousBalance, true);
428  }
429
430    /**
431     * Test that the connection to the dead server is cut immediately when we receive the
432     *  notification.
433     * @throws Exception
434     */
435  @Test
436  public void testConnectionCut() throws Exception {
437    final TableName tableName = TableName.valueOf(name.getMethodName());
438
439    TEST_UTIL.createTable(tableName, FAM_NAM).close();
440    boolean previousBalance = TEST_UTIL.getAdmin().balancerSwitch(false, true);
441
442    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
443    // We want to work on a separate connection.
444    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
445    // try only once w/o any retry
446    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
447    c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000);
448
449    final Connection connection = ConnectionFactory.createConnection(c2);
450    final Table table = connection.getTable(tableName);
451
452    Put p = new Put(FAM_NAM);
453    p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM);
454    table.put(p);
455
456    final ConnectionImplementation hci =  (ConnectionImplementation) connection;
457
458    final HRegionLocation loc;
459    try(RegionLocator rl = connection.getRegionLocator(tableName)) {
460      loc = rl.getRegionLocation(FAM_NAM);
461    }
462
463    Get get = new Get(FAM_NAM);
464    Assert.assertNotNull(table.get(get));
465
466    get = new Get(FAM_NAM);
467    get.setFilter(new BlockingFilter());
468
469    // This thread will mark the server as dead while we're waiting during a get.
470    Thread t = new Thread() {
471      @Override
472      public void run() {
473        synchronized (syncBlockingFilter) {
474          try {
475            syncBlockingFilter.wait();
476          } catch (InterruptedException e) {
477            throw new RuntimeException(e);
478          }
479        }
480        hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName());
481      }
482    };
483
484    t.start();
485    try {
486      table.get(get);
487      Assert.fail();
488    } catch (IOException expected) {
489      LOG.debug("Received: " + expected);
490      Assert.assertFalse(expected instanceof SocketTimeoutException);
491      Assert.assertFalse(syncBlockingFilter.get());
492    } finally {
493      syncBlockingFilter.set(true);
494      t.join();
495      TEST_UTIL.getAdmin().balancerSwitch(previousBalance, true);
496    }
497
498    table.close();
499    connection.close();
500  }
501
502  protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false);
503
504  public static class BlockingFilter extends FilterBase {
505    @Override
506    public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
507      int i = 0;
508      while (i++ < 1000 && !syncBlockingFilter.get()) {
509        synchronized (syncBlockingFilter) {
510          syncBlockingFilter.notifyAll();
511        }
512        Threads.sleep(100);
513      }
514      syncBlockingFilter.set(true);
515      return false;
516    }
517    @Override
518    public ReturnCode filterCell(final Cell ignored) throws IOException {
519      return ReturnCode.INCLUDE;
520    }
521
522    public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException{
523      return new BlockingFilter();
524    }
525  }
526
527  /**
528   * Test that when we delete a location using the first row of a region
529   * that we really delete it.
530   * @throws Exception
531   */
532  @Test
533  public void testRegionCaching() throws Exception {
534    TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAM_NAM).close();
535    Configuration conf =  new Configuration(TEST_UTIL.getConfiguration());
536    // test with no retry, or client cache will get updated after the first failure
537    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
538    Connection connection = ConnectionFactory.createConnection(conf);
539    final Table table = connection.getTable(TABLE_NAME);
540
541    TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
542    Put put = new Put(ROW);
543    put.addColumn(FAM_NAM, ROW, ROW);
544    table.put(put);
545
546    ConnectionImplementation conn = (ConnectionImplementation) connection;
547
548    assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
549
550    // Here we mess with the cached location making it so the region at TABLE_NAME, ROW is at
551    // a location where the port is current port number +1 -- i.e. a non-existent location.
552    HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
553    final int nextPort = loc.getPort() + 1;
554    conn.updateCachedLocation(loc.getRegionInfo(), loc.getServerName(),
555        ServerName.valueOf("127.0.0.1", nextPort,
556        HConstants.LATEST_TIMESTAMP), HConstants.LATEST_TIMESTAMP);
557    Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW)
558      .getRegionLocation().getPort(), nextPort);
559
560    conn.clearRegionCache(TABLE_NAME, ROW.clone());
561    RegionLocations rl = conn.getCachedLocation(TABLE_NAME, ROW);
562    assertNull("What is this location?? " + rl, rl);
563
564    // We're now going to move the region and check that it works for the client
565    // First a new put to add the location in the cache
566    conn.clearRegionCache(TABLE_NAME);
567    Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME));
568    Put put2 = new Put(ROW);
569    put2.addColumn(FAM_NAM, ROW, ROW);
570    table.put(put2);
571    assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
572    assertNotNull(conn.getCachedLocation(TableName.valueOf(TABLE_NAME.getName()), ROW.clone()));
573
574    TEST_UTIL.getAdmin().balancerSwitch(false, false);
575    HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
576
577    // We can wait for all regions to be online, that makes log reading easier when debugging
578    TEST_UTIL.waitUntilNoRegionsInTransition();
579
580    // Now moving the region to the second server
581    HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
582    byte[] regionName = toMove.getRegionInfo().getRegionName();
583    byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
584
585    // Choose the other server.
586    int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
587    int destServerId = curServerId == 0? 1: 0;
588
589    HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
590    HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
591
592    ServerName destServerName = destServer.getServerName();
593
594    // Check that we are in the expected state
595    Assert.assertTrue(curServer != destServer);
596    Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName()));
597    Assert.assertFalse( toMove.getPort() == destServerName.getPort());
598    Assert.assertNotNull(curServer.getOnlineRegion(regionName));
599    Assert.assertNull(destServer.getOnlineRegion(regionName));
600    Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
601        getAssignmentManager().hasRegionsInTransition());
602
603    // Moving. It's possible that we don't have all the regions online at this point, so
604    //  the test must depend only on the region we're looking at.
605    LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
606    TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName);
607
608    while (destServer.getOnlineRegion(regionName) == null ||
609        destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
610        curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
611        master.getAssignmentManager().hasRegionsInTransition()) {
612      // wait for the move to be finished
613      Thread.sleep(1);
614    }
615
616    LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
617
618    // Check our new state.
619    Assert.assertNull(curServer.getOnlineRegion(regionName));
620    Assert.assertNotNull(destServer.getOnlineRegion(regionName));
621    Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
622    Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
623
624
625    // Cache was NOT updated and points to the wrong server
626    Assert.assertFalse(
627        conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation()
628          .getPort() == destServerName.getPort());
629
630    // This part relies on a number of tries equals to 1.
631    // We do a put and expect the cache to be updated, even if we don't retry
632    LOG.info("Put starting");
633    Put put3 = new Put(ROW);
634    put3.addColumn(FAM_NAM, ROW, ROW);
635    try {
636      table.put(put3);
637      Assert.fail("Unreachable point");
638    } catch (RetriesExhaustedWithDetailsException e) {
639      LOG.info("Put done, exception caught: " + e.getClass());
640      Assert.assertEquals(1, e.getNumExceptions());
641      Assert.assertEquals(1, e.getCauses().size());
642      Assert.assertArrayEquals(ROW, e.getRow(0).getRow());
643
644      // Check that we unserialized the exception as expected
645      Throwable cause = ClientExceptionsUtil.findException(e.getCause(0));
646      Assert.assertNotNull(cause);
647      Assert.assertTrue(cause instanceof RegionMovedException);
648    } catch (RetriesExhaustedException ree) {
649      // hbase2 throws RetriesExhaustedException instead of RetriesExhaustedWithDetailsException
650      // as hbase1 used to do. Keep an eye on this to see if this changed behavior is an issue.
651      LOG.info("Put done, exception caught: " + ree.getClass());
652      Throwable cause = ClientExceptionsUtil.findException(ree.getCause());
653      Assert.assertNotNull(cause);
654      Assert.assertTrue(cause instanceof RegionMovedException);
655    }
656    Assert.assertNotNull("Cached connection is null", conn.getCachedLocation(TABLE_NAME, ROW));
657    Assert.assertEquals(
658        "Previous server was " + curServer.getServerName().getHostAndPort(),
659        destServerName.getPort(),
660        conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
661
662    Assert.assertFalse(destServer.getRegionsInTransitionInRS()
663      .containsKey(encodedRegionNameBytes));
664    Assert.assertFalse(curServer.getRegionsInTransitionInRS()
665      .containsKey(encodedRegionNameBytes));
666
667    // We move it back to do another test with a scan
668    LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
669    TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(),
670      curServer.getServerName());
671
672    while (curServer.getOnlineRegion(regionName) == null ||
673        destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
674        curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
675        master.getAssignmentManager().hasRegionsInTransition()) {
676      // wait for the move to be finished
677      Thread.sleep(1);
678    }
679
680    // Check our new state.
681    Assert.assertNotNull(curServer.getOnlineRegion(regionName));
682    Assert.assertNull(destServer.getOnlineRegion(regionName));
683    LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString());
684
685    // Cache was NOT updated and points to the wrong server
686    Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() ==
687      curServer.getServerName().getPort());
688
689    Scan sc = new Scan();
690    sc.setStopRow(ROW);
691    sc.setStartRow(ROW);
692
693    // The scanner takes the max retries from the connection configuration, not the table as
694    // the put.
695    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
696
697    try {
698      ResultScanner rs = table.getScanner(sc);
699      while (rs.next() != null) {
700      }
701      Assert.fail("Unreachable point");
702    } catch (RetriesExhaustedException e) {
703      LOG.info("Scan done, expected exception caught: " + e.getClass());
704    }
705
706    // Cache is updated with the right value.
707    Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
708    Assert.assertEquals(
709      "Previous server was "+destServer.getServerName().getHostAndPort(),
710      curServer.getServerName().getPort(),
711      conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
712
713    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
714    table.close();
715    connection.close();
716  }
717
718  /**
719   * Test that Connection or Pool are not closed when managed externally
720   * @throws Exception
721   */
722  @Test
723  public void testConnectionManagement() throws Exception{
724    Table table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM);
725    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
726    Table table = conn.getTable(TABLE_NAME1);
727    table.close();
728    assertFalse(conn.isClosed());
729    if(table instanceof HTable) {
730      assertFalse(((HTable) table).getPool().isShutdown());
731    }
732    table = conn.getTable(TABLE_NAME1);
733    table.close();
734    if(table instanceof HTable) {
735      assertFalse(((HTable) table).getPool().isShutdown());
736    }
737    conn.close();
738    if(table instanceof HTable) {
739      assertTrue(((HTable) table).getPool().isShutdown());
740    }
741    table0.close();
742  }
743
744  /**
745   * Test that stale cache updates don't override newer cached values.
746   */
747  @Test
748  public void testCacheSeqNums() throws Exception{
749    Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME2, FAM_NAM);
750    Put put = new Put(ROW);
751    put.addColumn(FAM_NAM, ROW, ROW);
752    table.put(put);
753    ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection();
754
755    HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
756    assertNotNull(location);
757
758    ServerName anySource = ServerName.valueOf(location.getHostname(), location.getPort() - 1, 0L);
759
760    // Same server as already in cache reporting - overwrites any value despite seqNum.
761    int nextPort = location.getPort() + 1;
762    conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
763        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
764    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
765    Assert.assertEquals(nextPort, location.getPort());
766
767    // No source specified - same.
768    nextPort = location.getPort() + 1;
769    conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
770        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
771    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
772    Assert.assertEquals(nextPort, location.getPort());
773
774    // Higher seqNum - overwrites lower seqNum.
775    nextPort = location.getPort() + 1;
776    conn.updateCachedLocation(location.getRegionInfo(), anySource,
777        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() + 1);
778    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
779    Assert.assertEquals(nextPort, location.getPort());
780
781    // Lower seqNum - does not overwrite higher seqNum.
782    nextPort = location.getPort() + 1;
783    conn.updateCachedLocation(location.getRegionInfo(), anySource,
784        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
785    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
786    Assert.assertEquals(nextPort - 1, location.getPort());
787    table.close();
788  }
789
790  @Test
791  public void testClosing() throws Exception {
792    Configuration configuration =
793      new Configuration(TEST_UTIL.getConfiguration());
794    configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
795      String.valueOf(ThreadLocalRandom.current().nextInt()));
796
797    // as connection caching is going away, now we're just testing
798    // that closed connection does actually get closed.
799
800    Connection c1 = ConnectionFactory.createConnection(configuration);
801    Connection c2 = ConnectionFactory.createConnection(configuration);
802    // no caching, different connections
803    assertTrue(c1 != c2);
804
805    // closing independently
806    c1.close();
807    assertTrue(c1.isClosed());
808    assertFalse(c2.isClosed());
809
810    c2.close();
811    assertTrue(c2.isClosed());
812  }
813
814  /**
815   * Trivial test to verify that nobody messes with
816   * {@link ConnectionFactory#createConnection(Configuration)}
817   */
818  @Test
819  public void testCreateConnection() throws Exception {
820    Configuration configuration = TEST_UTIL.getConfiguration();
821    Connection c1 = ConnectionFactory.createConnection(configuration);
822    Connection c2 = ConnectionFactory.createConnection(configuration);
823    // created from the same configuration, yet they are different
824    assertTrue(c1 != c2);
825    assertTrue(c1.getConfiguration() == c2.getConfiguration());
826  }
827
828  /**
829   * This test checks that one can connect to the cluster with only the
830   *  ZooKeeper quorum set. Other stuff like master address will be read
831   *  from ZK by the client.
832   */
833  @Test
834  public void testConnection() throws Exception{
835    // We create an empty config and add the ZK address.
836    Configuration c = new Configuration();
837    c.set(HConstants.ZOOKEEPER_QUORUM,
838      TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
839    c.set(HConstants.ZOOKEEPER_CLIENT_PORT,
840      TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
841
842    // This should be enough to connect
843    ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(c);
844    assertTrue(conn.isMasterRunning());
845    conn.close();
846  }
847
848  private int setNumTries(ConnectionImplementation hci, int newVal) throws Exception {
849    Field numTries = hci.getClass().getDeclaredField("numTries");
850    numTries.setAccessible(true);
851    Field modifiersField = Field.class.getDeclaredField("modifiers");
852    modifiersField.setAccessible(true);
853    modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
854    final int prevNumRetriesVal = (Integer)numTries.get(hci);
855    numTries.set(hci, newVal);
856
857    return prevNumRetriesVal;
858  }
859
860  @Test
861  public void testMulti() throws Exception {
862    Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME3, FAM_NAM);
863    try {
864      ConnectionImplementation conn = (ConnectionImplementation)TEST_UTIL.getConnection();
865
866      // We're now going to move the region and check that it works for the client
867      // First a new put to add the location in the cache
868      conn.clearRegionCache(TABLE_NAME3);
869      Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3));
870
871      TEST_UTIL.getAdmin().balancerSwitch(false, false);
872      HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
873
874      // We can wait for all regions to be online, that makes log reading easier when debugging
875      TEST_UTIL.waitUntilNoRegionsInTransition();
876
877      Put put = new Put(ROW_X);
878      put.addColumn(FAM_NAM, ROW_X, ROW_X);
879      table.put(put);
880
881      // Now moving the region to the second server
882      HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation();
883      byte[] regionName = toMove.getRegionInfo().getRegionName();
884      byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
885
886      // Choose the other server.
887      int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
888      int destServerId = (curServerId == 0 ? 1 : 0);
889
890      HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
891      HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
892
893      ServerName destServerName = destServer.getServerName();
894      ServerName metaServerName = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
895
896       //find another row in the cur server that is less than ROW_X
897      List<HRegion> regions = curServer.getRegions(TABLE_NAME3);
898      byte[] otherRow = null;
899       for (Region region : regions) {
900         if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
901             && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) {
902           otherRow = region.getRegionInfo().getStartKey();
903           break;
904         }
905       }
906      assertNotNull(otherRow);
907      // If empty row, set it to first row.-f
908      if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
909      Put put2 = new Put(otherRow);
910      put2.addColumn(FAM_NAM, otherRow, otherRow);
911      table.put(put2); //cache put2's location
912
913      // Check that we are in the expected state
914      Assert.assertTrue(curServer != destServer);
915      Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName());
916      Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
917      Assert.assertNotNull(curServer.getOnlineRegion(regionName));
918      Assert.assertNull(destServer.getOnlineRegion(regionName));
919      Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
920          getAssignmentManager().hasRegionsInTransition());
921
922       // Moving. It's possible that we don't have all the regions online at this point, so
923      //  the test depends only on the region we're looking at.
924      LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
925      TEST_UTIL.getAdmin().move(toMove.getRegionInfo().getEncodedNameAsBytes(), destServerName);
926
927      while (destServer.getOnlineRegion(regionName) == null ||
928          destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
929          curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
930          master.getAssignmentManager().hasRegionsInTransition()) {
931        // wait for the move to be finished
932        Thread.sleep(1);
933      }
934
935      LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
936
937      // Check our new state.
938      Assert.assertNull(curServer.getOnlineRegion(regionName));
939      Assert.assertNotNull(destServer.getOnlineRegion(regionName));
940      Assert.assertFalse(destServer.getRegionsInTransitionInRS()
941          .containsKey(encodedRegionNameBytes));
942      Assert.assertFalse(curServer.getRegionsInTransitionInRS()
943          .containsKey(encodedRegionNameBytes));
944
945
946       // Cache was NOT updated and points to the wrong server
947      Assert.assertFalse(
948          conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation()
949              .getPort() == destServerName.getPort());
950
951      // Hijack the number of retry to fail after 2 tries
952      final int prevNumRetriesVal = setNumTries(conn, 2);
953
954      Put put3 = new Put(ROW_X);
955      put3.addColumn(FAM_NAM, ROW_X, ROW_X);
956      Put put4 = new Put(otherRow);
957      put4.addColumn(FAM_NAM, otherRow, otherRow);
958
959      // do multi
960      ArrayList<Put> actions = Lists.newArrayList(put4, put3);
961      table.batch(actions, null); // first should be a valid row,
962      // second we get RegionMovedException.
963
964      setNumTries(conn, prevNumRetriesVal);
965    } finally {
966      table.close();
967    }
968  }
969
970  @Test
971  public void testErrorBackoffTimeCalculation() throws Exception {
972    // TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not.
973    final long ANY_PAUSE = 100;
974    ServerName location = ServerName.valueOf("127.0.0.1", 1, 0);
975    ServerName diffLocation = ServerName.valueOf("127.0.0.1", 2, 0);
976
977    ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
978    EnvironmentEdgeManager.injectEdge(timeMachine);
979    try {
980      long largeAmountOfTime = ANY_PAUSE * 1000;
981      ConnectionImplementation.ServerErrorTracker tracker =
982          new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100);
983
984      // The default backoff is 0.
985      assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
986
987      // Check some backoff values from HConstants sequence.
988      tracker.reportServerError(location);
989      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
990        tracker.calculateBackoffTime(location, ANY_PAUSE));
991      tracker.reportServerError(location);
992      tracker.reportServerError(location);
993      tracker.reportServerError(location);
994      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[3],
995        tracker.calculateBackoffTime(location, ANY_PAUSE));
996
997      // All of this shouldn't affect backoff for different location.
998      assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
999      tracker.reportServerError(diffLocation);
1000      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
1001        tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1002
1003      // Check with different base.
1004      assertEqualsWithJitter(ANY_PAUSE * 2 * HConstants.RETRY_BACKOFF[3],
1005          tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
1006    } finally {
1007      EnvironmentEdgeManager.reset();
1008    }
1009  }
1010
1011  private static void assertEqualsWithJitter(long expected, long actual) {
1012    assertEqualsWithJitter(expected, actual, expected);
1013  }
1014
1015  private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) {
1016    assertTrue("Value not within jitter: " + expected + " vs " + actual,
1017      Math.abs(actual - expected) <= (0.01f * jitterBase));
1018  }
1019
1020  @Test
1021  public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException {
1022    Configuration config = new Configuration(TEST_UTIL.getConfiguration());
1023
1024    final TableName tableName = TableName.valueOf(name.getMethodName());
1025    TEST_UTIL.createTable(tableName, new byte[][] {FAM_NAM}).close();
1026
1027    Connection connection = ConnectionFactory.createConnection(config);
1028    Table table = connection.getTable(tableName);
1029
1030    // this will cache the meta location and table's region location
1031    table.get(new Get(Bytes.toBytes("foo")));
1032
1033    // restart HBase
1034    TEST_UTIL.shutdownMiniHBaseCluster();
1035    TEST_UTIL.restartHBaseCluster(2);
1036    // this should be able to discover new locations for meta and table's region
1037    table.get(new Get(Bytes.toBytes("foo")));
1038    TEST_UTIL.deleteTable(tableName);
1039    table.close();
1040    connection.close();
1041  }
1042
1043  @Test
1044  public void testLocateRegionsWithRegionReplicas() throws IOException {
1045    int regionReplication = 3;
1046    byte[] family = Bytes.toBytes("cf");
1047    TableName tableName = TableName.valueOf(name.getMethodName());
1048
1049    // Create a table with region replicas
1050    TableDescriptorBuilder builder = TableDescriptorBuilder
1051      .newBuilder(tableName)
1052      .setRegionReplication(regionReplication)
1053      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
1054    TEST_UTIL.getAdmin().createTable(builder.build());
1055
1056    try (ConnectionImplementation con = (ConnectionImplementation) ConnectionFactory.
1057      createConnection(TEST_UTIL.getConfiguration())) {
1058      // Get locations of the regions of the table
1059      List<HRegionLocation> locations = con.locateRegions(tableName, false, false);
1060
1061      // The size of the returned locations should be 3
1062      assertEquals(regionReplication, locations.size());
1063
1064      // The replicaIds of the returned locations should be 0, 1 and 2
1065      Set<Integer> expectedReplicaIds = IntStream.range(0, regionReplication).
1066        boxed().collect(Collectors.toSet());
1067      for (HRegionLocation location : locations) {
1068        assertTrue(expectedReplicaIds.remove(location.getRegion().getReplicaId()));
1069      }
1070    } finally {
1071      TEST_UTIL.deleteTable(tableName);
1072    }
1073  }
1074
1075  @Test
1076  public void testMetaLookupThreadPoolCreated() throws Exception {
1077    final TableName tableName = TableName.valueOf(name.getMethodName());
1078    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
1079    if (TEST_UTIL.getAdmin().tableExists(tableName)) {
1080      TEST_UTIL.getAdmin().disableTable(tableName);
1081      TEST_UTIL.getAdmin().deleteTable(tableName);
1082    }
1083    try (Table htable = TEST_UTIL.createTable(tableName, FAMILIES)) {
1084      byte[] row = Bytes.toBytes("test");
1085      ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection());
1086      // check that metalookup pool would get created
1087      c.relocateRegion(tableName, row);
1088      ExecutorService ex = c.getCurrentMetaLookupPool();
1089      assertNotNull(ex);
1090    }
1091  }
1092}