001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.reflect.Field;
028import java.lang.reflect.Modifier;
029import java.net.SocketTimeoutException;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.Set;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.SynchronousQueue;
035import java.util.concurrent.ThreadLocalRandom;
036import java.util.concurrent.ThreadPoolExecutor;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.concurrent.atomic.AtomicReference;
041import java.util.stream.Collectors;
042import java.util.stream.IntStream;
043
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.HBaseClassTestRule;
047import org.apache.hadoop.hbase.HBaseTestingUtility;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.HRegionLocation;
050import org.apache.hadoop.hbase.RegionLocations;
051import org.apache.hadoop.hbase.ServerName;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.Waiter;
054import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
055import org.apache.hadoop.hbase.exceptions.DeserializationException;
056import org.apache.hadoop.hbase.exceptions.RegionMovedException;
057import org.apache.hadoop.hbase.filter.Filter;
058import org.apache.hadoop.hbase.filter.FilterBase;
059import org.apache.hadoop.hbase.ipc.RpcClient;
060import org.apache.hadoop.hbase.master.HMaster;
061import org.apache.hadoop.hbase.regionserver.HRegion;
062import org.apache.hadoop.hbase.regionserver.HRegionServer;
063import org.apache.hadoop.hbase.regionserver.Region;
064import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
065import org.apache.hadoop.hbase.testclassification.LargeTests;
066import org.apache.hadoop.hbase.util.Bytes;
067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
068import org.apache.hadoop.hbase.util.JVMClusterUtil;
069import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
070import org.apache.hadoop.hbase.util.Threads;
071import org.junit.AfterClass;
072import org.junit.Assert;
073import org.junit.BeforeClass;
074import org.junit.ClassRule;
075import org.junit.Ignore;
076import org.junit.Rule;
077import org.junit.Test;
078import org.junit.experimental.categories.Category;
079import org.junit.rules.TestName;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082
083import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
084
085/**
086 * This class is for testing HBaseConnectionManager features
087 */
088@Category({LargeTests.class})
089public class TestConnectionImplementation {
090
091  @ClassRule
092  public static final HBaseClassTestRule CLASS_RULE =
093      HBaseClassTestRule.forClass(TestConnectionImplementation.class);
094
095  private static final Logger LOG = LoggerFactory.getLogger(TestConnectionImplementation.class);
096  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
097  private static final TableName TABLE_NAME =
098      TableName.valueOf("test");
099  private static final TableName TABLE_NAME1 =
100      TableName.valueOf("test1");
101  private static final TableName TABLE_NAME2 =
102      TableName.valueOf("test2");
103  private static final TableName TABLE_NAME3 =
104      TableName.valueOf("test3");
105  private static final byte[] FAM_NAM = Bytes.toBytes("f");
106  private static final byte[] ROW = Bytes.toBytes("bbb");
107  private static final byte[] ROW_X = Bytes.toBytes("xxx");
108  private static final int RPC_RETRY = 5;
109
110  @Rule
111  public TestName name = new TestName();
112
113  @BeforeClass
114  public static void setUpBeforeClass() throws Exception {
115    TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
116    // Up the handlers; this test needs more than usual.
117    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
118    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
119    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 3);
120    TEST_UTIL.startMiniCluster(2);
121
122  }
123
124  @AfterClass
125  public static void tearDownAfterClass() throws Exception {
126    TEST_UTIL.shutdownMiniCluster();
127  }
128
129  @Test
130  public void testClusterConnection() throws IOException {
131    ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1,
132        5, TimeUnit.SECONDS,
133        new SynchronousQueue<>(),
134        Threads.newDaemonThreadFactory("test-hcm"));
135
136    Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
137    Connection con2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), otherPool);
138    // make sure the internally created ExecutorService is the one passed
139    assertTrue(otherPool == ((ConnectionImplementation) con2).getCurrentBatchPool());
140
141    final TableName tableName = TableName.valueOf(name.getMethodName());
142    TEST_UTIL.createTable(tableName, FAM_NAM).close();
143    Table table = con1.getTable(tableName, otherPool);
144
145    ExecutorService pool = null;
146
147    if(table instanceof HTable) {
148      HTable t = (HTable) table;
149      // make sure passing a pool to the getTable does not trigger creation of an internal pool
150      assertNull("Internal Thread pool should be null",
151        ((ConnectionImplementation) con1).getCurrentBatchPool());
152      // table should use the pool passed
153      assertTrue(otherPool == t.getPool());
154      t.close();
155
156      t = (HTable) con2.getTable(tableName);
157      // table should use the connectin's internal pool
158      assertTrue(otherPool == t.getPool());
159      t.close();
160
161      t = (HTable) con2.getTable(tableName);
162      // try other API too
163      assertTrue(otherPool == t.getPool());
164      t.close();
165
166      t = (HTable) con2.getTable(tableName);
167      // try other API too
168      assertTrue(otherPool == t.getPool());
169      t.close();
170
171      t = (HTable) con1.getTable(tableName);
172      pool = ((ConnectionImplementation) con1).getCurrentBatchPool();
173      // make sure an internal pool was created
174      assertNotNull("An internal Thread pool should have been created", pool);
175      // and that the table is using it
176      assertTrue(t.getPool() == pool);
177      t.close();
178
179      t = (HTable) con1.getTable(tableName);
180      // still using the *same* internal pool
181      assertTrue(t.getPool() == pool);
182      t.close();
183    } else {
184      table.close();
185    }
186
187    con1.close();
188
189    // if the pool was created on demand it should be closed upon connection close
190    if(pool != null) {
191      assertTrue(pool.isShutdown());
192    }
193
194    con2.close();
195    // if the pool is passed, it is not closed
196    assertFalse(otherPool.isShutdown());
197    otherPool.shutdownNow();
198  }
199
200  /**
201   * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object
202   * @throws IOException Unable to construct admin
203   */
204  @Test
205  public void testAdminFactory() throws IOException {
206    Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
207    Admin admin = con1.getAdmin();
208    assertTrue(admin.getConnection() == con1);
209    assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration());
210    con1.close();
211  }
212
213  // Fails too often!  Needs work.  HBASE-12558
214  // May only fail on non-linux machines? E.g. macosx.
215  @Ignore @Test (expected = RegionServerStoppedException.class)
216  // Depends on mulitcast messaging facility that seems broken in hbase2
217  // See  HBASE-19261 "ClusterStatusPublisher where Master could optionally broadcast notice of
218  // dead servers is broke"
219  public void testClusterStatus() throws Exception {
220    final TableName tableName = TableName.valueOf(name.getMethodName());
221    byte[] cf = "cf".getBytes();
222    byte[] rk = "rk1".getBytes();
223
224    JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
225    rs.waitForServerOnline();
226    final ServerName sn = rs.getRegionServer().getServerName();
227
228    Table t = TEST_UTIL.createTable(tableName, cf);
229    TEST_UTIL.waitTableAvailable(tableName);
230    TEST_UTIL.waitUntilNoRegionsInTransition();
231
232    final ConnectionImplementation hci =  (ConnectionImplementation)TEST_UTIL.getConnection();
233    try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
234      while (l.getRegionLocation(rk).getPort() != sn.getPort()) {
235        TEST_UTIL.getAdmin().move(l.getRegionLocation(rk).getRegionInfo().
236            getEncodedNameAsBytes(), Bytes.toBytes(sn.toString()));
237        TEST_UTIL.waitUntilNoRegionsInTransition();
238        hci.clearRegionCache(tableName);
239      }
240      Assert.assertNotNull(hci.clusterStatusListener);
241      TEST_UTIL.assertRegionOnServer(l.getRegionLocation(rk).getRegionInfo(), sn, 20000);
242    }
243
244    Put p1 = new Put(rk);
245    p1.addColumn(cf, "qual".getBytes(), "val".getBytes());
246    t.put(p1);
247
248    rs.getRegionServer().abort("I'm dead");
249
250    // We want the status to be updated. That's a least 10 second
251    TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
252      @Override
253      public boolean evaluate() throws Exception {
254        return TEST_UTIL.getHBaseCluster().getMaster().getServerManager().
255            getDeadServers().isDeadServer(sn);
256      }
257    });
258
259    TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
260      @Override
261      public boolean evaluate() throws Exception {
262        return hci.clusterStatusListener.isDeadServer(sn);
263      }
264    });
265
266    t.close();
267    hci.getClient(sn);  // will throw an exception: RegionServerStoppedException
268  }
269
270  /**
271   * Test that we can handle connection close: it will trigger a retry, but the calls will finish.
272   */
273  @Test
274  public void testConnectionCloseAllowsInterrupt() throws Exception {
275    testConnectionClose(true);
276  }
277
278  @Test
279  public void testConnectionNotAllowsInterrupt() throws Exception {
280    testConnectionClose(false);
281  }
282
283  private void testConnectionClose(boolean allowsInterrupt) throws Exception {
284    TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
285    TEST_UTIL.createTable(tableName, FAM_NAM).close();
286
287    boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true);
288
289    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
290    // We want to work on a separate connection.
291    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
292    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot
293    c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries.
294    c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
295    c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt);
296    // to avoid the client to be stuck when do the Get
297    c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 10000);
298    c2.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10000);
299    c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
300
301    Connection connection = ConnectionFactory.createConnection(c2);
302    final Table table = connection.getTable(tableName);
303
304    Put put = new Put(ROW);
305    put.addColumn(FAM_NAM, ROW, ROW);
306    table.put(put);
307
308    // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
309    final AtomicInteger step = new AtomicInteger(0);
310
311    final AtomicReference<Throwable> failed = new AtomicReference<>(null);
312    Thread t = new Thread("testConnectionCloseThread") {
313      @Override
314      public void run() {
315        int done = 0;
316        try {
317          step.set(1);
318          while (step.get() == 1) {
319            Get get = new Get(ROW);
320            table.get(get);
321            done++;
322            if (done % 100 == 0)
323              LOG.info("done=" + done);
324            // without the sleep, will cause the exception for too many files in
325            // org.apache.hadoop.hdfs.server.datanode.DataXceiver
326            Thread.sleep(100);
327          }
328        } catch (Throwable t) {
329          failed.set(t);
330          LOG.error(t.toString(), t);
331        }
332        step.set(3);
333      }
334    };
335    t.start();
336    TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
337      @Override
338      public boolean evaluate() throws Exception {
339        return step.get() == 1;
340      }
341    });
342
343    ServerName sn;
344    try(RegionLocator rl = connection.getRegionLocator(tableName)) {
345      sn = rl.getRegionLocation(ROW).getServerName();
346    }
347    ConnectionImplementation conn =
348        (ConnectionImplementation) connection;
349    RpcClient rpcClient = conn.getRpcClient();
350
351    LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
352    for (int i = 0; i < 5000; i++) {
353      rpcClient.cancelConnections(sn);
354      Thread.sleep(5);
355    }
356
357    step.compareAndSet(1, 2);
358    // The test may fail here if the thread doing the gets is stuck. The way to find
359    //  out what's happening is to look for the thread named 'testConnectionCloseThread'
360    TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() {
361      @Override
362      public boolean evaluate() throws Exception {
363        return step.get() == 3;
364      }
365    });
366    table.close();
367    connection.close();
368    Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
369    TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true);
370  }
371
372  /**
373   * Test that connection can become idle without breaking everything.
374   */
375  @Test
376  public void testConnectionIdle() throws Exception {
377    final TableName tableName = TableName.valueOf(name.getMethodName());
378    TEST_UTIL.createTable(tableName, FAM_NAM).close();
379    int idleTime =  20000;
380    boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true);
381
382    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
383    // We want to work on a separate connection.
384    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
385    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed
386    c2.setInt(RpcClient.IDLE_TIME, idleTime);
387
388    Connection connection = ConnectionFactory.createConnection(c2);
389    final Table table = connection.getTable(tableName);
390
391    Put put = new Put(ROW);
392    put.addColumn(FAM_NAM, ROW, ROW);
393    table.put(put);
394
395    ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
396    mee.setValue(System.currentTimeMillis());
397    EnvironmentEdgeManager.injectEdge(mee);
398    LOG.info("first get");
399    table.get(new Get(ROW));
400
401    LOG.info("first get - changing the time & sleeping");
402    mee.incValue(idleTime + 1000);
403    Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle.
404                        // 1500 = sleep time in RpcClient#waitForWork + a margin
405
406    LOG.info("second get - connection has been marked idle in the middle");
407    // To check that the connection actually became idle would need to read some private
408    //  fields of RpcClient.
409    table.get(new Get(ROW));
410    mee.incValue(idleTime + 1000);
411
412    LOG.info("third get - connection is idle, but the reader doesn't know yet");
413    // We're testing here a special case:
414    //  time limit reached BUT connection not yet reclaimed AND a new call.
415    //  in this situation, we don't close the connection, instead we use it immediately.
416    // If we're very unlucky we can have a race condition in the test: the connection is already
417    //  under closing when we do the get, so we have an exception, and we don't retry as the
418    //  retry number is 1. The probability is very very low, and seems acceptable for now. It's
419    //  a test issue only.
420    table.get(new Get(ROW));
421
422    LOG.info("we're done - time will change back");
423
424    table.close();
425
426    connection.close();
427    EnvironmentEdgeManager.reset();
428    TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true);
429  }
430
431    /**
432     * Test that the connection to the dead server is cut immediately when we receive the
433     *  notification.
434     * @throws Exception
435     */
436  @Test
437  public void testConnectionCut() throws Exception {
438    final TableName tableName = TableName.valueOf(name.getMethodName());
439
440    TEST_UTIL.createTable(tableName, FAM_NAM).close();
441    boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, true);
442
443    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
444    // We want to work on a separate connection.
445    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
446    // try only once w/o any retry
447    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
448    c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000);
449
450    final Connection connection = ConnectionFactory.createConnection(c2);
451    final Table table = connection.getTable(tableName);
452
453    Put p = new Put(FAM_NAM);
454    p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM);
455    table.put(p);
456
457    final ConnectionImplementation hci =  (ConnectionImplementation) connection;
458
459    final HRegionLocation loc;
460    try(RegionLocator rl = connection.getRegionLocator(tableName)) {
461      loc = rl.getRegionLocation(FAM_NAM);
462    }
463
464    Get get = new Get(FAM_NAM);
465    Assert.assertNotNull(table.get(get));
466
467    get = new Get(FAM_NAM);
468    get.setFilter(new BlockingFilter());
469
470    // This thread will mark the server as dead while we're waiting during a get.
471    Thread t = new Thread() {
472      @Override
473      public void run() {
474        synchronized (syncBlockingFilter) {
475          try {
476            syncBlockingFilter.wait();
477          } catch (InterruptedException e) {
478            throw new RuntimeException(e);
479          }
480        }
481        hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName());
482      }
483    };
484
485    t.start();
486    try {
487      table.get(get);
488      Assert.fail();
489    } catch (IOException expected) {
490      LOG.debug("Received: " + expected);
491      Assert.assertFalse(expected instanceof SocketTimeoutException);
492      Assert.assertFalse(syncBlockingFilter.get());
493    } finally {
494      syncBlockingFilter.set(true);
495      t.join();
496      TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true);
497    }
498
499    table.close();
500    connection.close();
501  }
502
503  protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false);
504
505  public static class BlockingFilter extends FilterBase {
506    @Override
507    public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
508      int i = 0;
509      while (i++ < 1000 && !syncBlockingFilter.get()) {
510        synchronized (syncBlockingFilter) {
511          syncBlockingFilter.notifyAll();
512        }
513        Threads.sleep(100);
514      }
515      syncBlockingFilter.set(true);
516      return false;
517    }
518    @Override
519    public ReturnCode filterCell(final Cell ignored) throws IOException {
520      return ReturnCode.INCLUDE;
521    }
522
523    public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException{
524      return new BlockingFilter();
525    }
526  }
527
528  /**
529   * Test that when we delete a location using the first row of a region
530   * that we really delete it.
531   * @throws Exception
532   */
533  @Test
534  public void testRegionCaching() throws Exception {
535    TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAM_NAM).close();
536    Configuration conf =  new Configuration(TEST_UTIL.getConfiguration());
537    // test with no retry, or client cache will get updated after the first failure
538    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
539    Connection connection = ConnectionFactory.createConnection(conf);
540    final Table table = connection.getTable(TABLE_NAME);
541
542    TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
543    Put put = new Put(ROW);
544    put.addColumn(FAM_NAM, ROW, ROW);
545    table.put(put);
546
547    ConnectionImplementation conn = (ConnectionImplementation) connection;
548
549    assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
550
551    // Here we mess with the cached location making it so the region at TABLE_NAME, ROW is at
552    // a location where the port is current port number +1 -- i.e. a non-existent location.
553    HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
554    final int nextPort = loc.getPort() + 1;
555    conn.updateCachedLocation(loc.getRegionInfo(), loc.getServerName(),
556        ServerName.valueOf("127.0.0.1", nextPort,
557        HConstants.LATEST_TIMESTAMP), HConstants.LATEST_TIMESTAMP);
558    Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW)
559      .getRegionLocation().getPort(), nextPort);
560
561    conn.clearRegionCache(TABLE_NAME, ROW.clone());
562    RegionLocations rl = conn.getCachedLocation(TABLE_NAME, ROW);
563    assertNull("What is this location?? " + rl, rl);
564
565    // We're now going to move the region and check that it works for the client
566    // First a new put to add the location in the cache
567    conn.clearRegionCache(TABLE_NAME);
568    Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME));
569    Put put2 = new Put(ROW);
570    put2.addColumn(FAM_NAM, ROW, ROW);
571    table.put(put2);
572    assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
573    assertNotNull(conn.getCachedLocation(TableName.valueOf(TABLE_NAME.getName()), ROW.clone()));
574
575    TEST_UTIL.getAdmin().setBalancerRunning(false, false);
576    HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
577
578    // We can wait for all regions to be online, that makes log reading easier when debugging
579    TEST_UTIL.waitUntilNoRegionsInTransition();
580
581    // Now moving the region to the second server
582    HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
583    byte[] regionName = toMove.getRegionInfo().getRegionName();
584    byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
585
586    // Choose the other server.
587    int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
588    int destServerId = curServerId == 0? 1: 0;
589
590    HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
591    HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
592
593    ServerName destServerName = destServer.getServerName();
594
595    // Check that we are in the expected state
596    Assert.assertTrue(curServer != destServer);
597    Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName()));
598    Assert.assertFalse( toMove.getPort() == destServerName.getPort());
599    Assert.assertNotNull(curServer.getOnlineRegion(regionName));
600    Assert.assertNull(destServer.getOnlineRegion(regionName));
601    Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
602        getAssignmentManager().hasRegionsInTransition());
603
604    // Moving. It's possible that we don't have all the regions online at this point, so
605    //  the test must depend only on the region we're looking at.
606    LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
607    TEST_UTIL.getAdmin().move(
608      toMove.getRegionInfo().getEncodedNameAsBytes(),
609      destServerName.getServerName().getBytes()
610    );
611
612    while (destServer.getOnlineRegion(regionName) == null ||
613        destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
614        curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
615        master.getAssignmentManager().hasRegionsInTransition()) {
616      // wait for the move to be finished
617      Thread.sleep(1);
618    }
619
620    LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
621
622    // Check our new state.
623    Assert.assertNull(curServer.getOnlineRegion(regionName));
624    Assert.assertNotNull(destServer.getOnlineRegion(regionName));
625    Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
626    Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
627
628
629    // Cache was NOT updated and points to the wrong server
630    Assert.assertFalse(
631        conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation()
632          .getPort() == destServerName.getPort());
633
634    // This part relies on a number of tries equals to 1.
635    // We do a put and expect the cache to be updated, even if we don't retry
636    LOG.info("Put starting");
637    Put put3 = new Put(ROW);
638    put3.addColumn(FAM_NAM, ROW, ROW);
639    try {
640      table.put(put3);
641      Assert.fail("Unreachable point");
642    } catch (RetriesExhaustedWithDetailsException e) {
643      LOG.info("Put done, exception caught: " + e.getClass());
644      Assert.assertEquals(1, e.getNumExceptions());
645      Assert.assertEquals(1, e.getCauses().size());
646      Assert.assertArrayEquals(ROW, e.getRow(0).getRow());
647
648      // Check that we unserialized the exception as expected
649      Throwable cause = ClientExceptionsUtil.findException(e.getCause(0));
650      Assert.assertNotNull(cause);
651      Assert.assertTrue(cause instanceof RegionMovedException);
652    } catch (RetriesExhaustedException ree) {
653      // hbase2 throws RetriesExhaustedException instead of RetriesExhaustedWithDetailsException
654      // as hbase1 used to do. Keep an eye on this to see if this changed behavior is an issue.
655      LOG.info("Put done, exception caught: " + ree.getClass());
656      Throwable cause = ClientExceptionsUtil.findException(ree.getCause());
657      Assert.assertNotNull(cause);
658      Assert.assertTrue(cause instanceof RegionMovedException);
659    }
660    Assert.assertNotNull("Cached connection is null", conn.getCachedLocation(TABLE_NAME, ROW));
661    Assert.assertEquals(
662        "Previous server was " + curServer.getServerName().getHostAndPort(),
663        destServerName.getPort(),
664        conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
665
666    Assert.assertFalse(destServer.getRegionsInTransitionInRS()
667      .containsKey(encodedRegionNameBytes));
668    Assert.assertFalse(curServer.getRegionsInTransitionInRS()
669      .containsKey(encodedRegionNameBytes));
670
671    // We move it back to do another test with a scan
672    LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
673    TEST_UTIL.getAdmin().move(
674      toMove.getRegionInfo().getEncodedNameAsBytes(),
675      curServer.getServerName().getServerName().getBytes()
676    );
677
678    while (curServer.getOnlineRegion(regionName) == null ||
679        destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
680        curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
681        master.getAssignmentManager().hasRegionsInTransition()) {
682      // wait for the move to be finished
683      Thread.sleep(1);
684    }
685
686    // Check our new state.
687    Assert.assertNotNull(curServer.getOnlineRegion(regionName));
688    Assert.assertNull(destServer.getOnlineRegion(regionName));
689    LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString());
690
691    // Cache was NOT updated and points to the wrong server
692    Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() ==
693      curServer.getServerName().getPort());
694
695    Scan sc = new Scan();
696    sc.setStopRow(ROW);
697    sc.setStartRow(ROW);
698
699    // The scanner takes the max retries from the connection configuration, not the table as
700    // the put.
701    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
702
703    try {
704      ResultScanner rs = table.getScanner(sc);
705      while (rs.next() != null) {
706      }
707      Assert.fail("Unreachable point");
708    } catch (RetriesExhaustedException e) {
709      LOG.info("Scan done, expected exception caught: " + e.getClass());
710    }
711
712    // Cache is updated with the right value.
713    Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
714    Assert.assertEquals(
715      "Previous server was "+destServer.getServerName().getHostAndPort(),
716      curServer.getServerName().getPort(),
717      conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
718
719    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
720    table.close();
721    connection.close();
722  }
723
724  /**
725   * Test that Connection or Pool are not closed when managed externally
726   * @throws Exception
727   */
728  @Test
729  public void testConnectionManagement() throws Exception{
730    Table table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM);
731    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
732    Table table = conn.getTable(TABLE_NAME1);
733    table.close();
734    assertFalse(conn.isClosed());
735    if(table instanceof HTable) {
736      assertFalse(((HTable) table).getPool().isShutdown());
737    }
738    table = conn.getTable(TABLE_NAME1);
739    table.close();
740    if(table instanceof HTable) {
741      assertFalse(((HTable) table).getPool().isShutdown());
742    }
743    conn.close();
744    if(table instanceof HTable) {
745      assertTrue(((HTable) table).getPool().isShutdown());
746    }
747    table0.close();
748  }
749
750  /**
751   * Test that stale cache updates don't override newer cached values.
752   */
753  @Test
754  public void testCacheSeqNums() throws Exception{
755    Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME2, FAM_NAM);
756    Put put = new Put(ROW);
757    put.addColumn(FAM_NAM, ROW, ROW);
758    table.put(put);
759    ConnectionImplementation conn = (ConnectionImplementation) TEST_UTIL.getConnection();
760
761    HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
762    assertNotNull(location);
763
764    ServerName anySource = ServerName.valueOf(location.getHostname(), location.getPort() - 1, 0L);
765
766    // Same server as already in cache reporting - overwrites any value despite seqNum.
767    int nextPort = location.getPort() + 1;
768    conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
769        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
770    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
771    Assert.assertEquals(nextPort, location.getPort());
772
773    // No source specified - same.
774    nextPort = location.getPort() + 1;
775    conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
776        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
777    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
778    Assert.assertEquals(nextPort, location.getPort());
779
780    // Higher seqNum - overwrites lower seqNum.
781    nextPort = location.getPort() + 1;
782    conn.updateCachedLocation(location.getRegionInfo(), anySource,
783        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() + 1);
784    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
785    Assert.assertEquals(nextPort, location.getPort());
786
787    // Lower seqNum - does not overwrite higher seqNum.
788    nextPort = location.getPort() + 1;
789    conn.updateCachedLocation(location.getRegionInfo(), anySource,
790        ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
791    location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
792    Assert.assertEquals(nextPort - 1, location.getPort());
793    table.close();
794  }
795
796  @Test
797  public void testClosing() throws Exception {
798    Configuration configuration =
799      new Configuration(TEST_UTIL.getConfiguration());
800    configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
801      String.valueOf(ThreadLocalRandom.current().nextInt()));
802
803    // as connection caching is going away, now we're just testing
804    // that closed connection does actually get closed.
805
806    Connection c1 = ConnectionFactory.createConnection(configuration);
807    Connection c2 = ConnectionFactory.createConnection(configuration);
808    // no caching, different connections
809    assertTrue(c1 != c2);
810
811    // closing independently
812    c1.close();
813    assertTrue(c1.isClosed());
814    assertFalse(c2.isClosed());
815
816    c2.close();
817    assertTrue(c2.isClosed());
818  }
819
820  /**
821   * Trivial test to verify that nobody messes with
822   * {@link ConnectionFactory#createConnection(Configuration)}
823   */
824  @Test
825  public void testCreateConnection() throws Exception {
826    Configuration configuration = TEST_UTIL.getConfiguration();
827    Connection c1 = ConnectionFactory.createConnection(configuration);
828    Connection c2 = ConnectionFactory.createConnection(configuration);
829    // created from the same configuration, yet they are different
830    assertTrue(c1 != c2);
831    assertTrue(c1.getConfiguration() == c2.getConfiguration());
832  }
833
834  /**
835   * This test checks that one can connect to the cluster with only the
836   *  ZooKeeper quorum set. Other stuff like master address will be read
837   *  from ZK by the client.
838   */
839  @Test
840  public void testConnection() throws Exception{
841    // We create an empty config and add the ZK address.
842    Configuration c = new Configuration();
843    c.set(HConstants.ZOOKEEPER_QUORUM,
844      TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
845    c.set(HConstants.ZOOKEEPER_CLIENT_PORT,
846      TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
847
848    // This should be enough to connect
849    ClusterConnection conn = (ClusterConnection) ConnectionFactory.createConnection(c);
850    assertTrue(conn.isMasterRunning());
851    conn.close();
852  }
853
854  private int setNumTries(ConnectionImplementation hci, int newVal) throws Exception {
855    Field numTries = hci.getClass().getDeclaredField("numTries");
856    numTries.setAccessible(true);
857    Field modifiersField = Field.class.getDeclaredField("modifiers");
858    modifiersField.setAccessible(true);
859    modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
860    final int prevNumRetriesVal = (Integer)numTries.get(hci);
861    numTries.set(hci, newVal);
862
863    return prevNumRetriesVal;
864  }
865
866  @Test
867  public void testMulti() throws Exception {
868    Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME3, FAM_NAM);
869    try {
870      ConnectionImplementation conn = (ConnectionImplementation)TEST_UTIL.getConnection();
871
872      // We're now going to move the region and check that it works for the client
873      // First a new put to add the location in the cache
874      conn.clearRegionCache(TABLE_NAME3);
875      Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3));
876
877      TEST_UTIL.getAdmin().setBalancerRunning(false, false);
878      HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
879
880      // We can wait for all regions to be online, that makes log reading easier when debugging
881      TEST_UTIL.waitUntilNoRegionsInTransition();
882
883      Put put = new Put(ROW_X);
884      put.addColumn(FAM_NAM, ROW_X, ROW_X);
885      table.put(put);
886
887      // Now moving the region to the second server
888      HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation();
889      byte[] regionName = toMove.getRegionInfo().getRegionName();
890      byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
891
892      // Choose the other server.
893      int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
894      int destServerId = (curServerId == 0 ? 1 : 0);
895
896      HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
897      HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
898
899      ServerName destServerName = destServer.getServerName();
900      ServerName metaServerName = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
901
902       //find another row in the cur server that is less than ROW_X
903      List<HRegion> regions = curServer.getRegions(TABLE_NAME3);
904      byte[] otherRow = null;
905       for (Region region : regions) {
906         if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
907             && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) {
908           otherRow = region.getRegionInfo().getStartKey();
909           break;
910         }
911       }
912      assertNotNull(otherRow);
913      // If empty row, set it to first row.-f
914      if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
915      Put put2 = new Put(otherRow);
916      put2.addColumn(FAM_NAM, otherRow, otherRow);
917      table.put(put2); //cache put2's location
918
919      // Check that we are in the expected state
920      Assert.assertTrue(curServer != destServer);
921      Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName());
922      Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
923      Assert.assertNotNull(curServer.getOnlineRegion(regionName));
924      Assert.assertNull(destServer.getOnlineRegion(regionName));
925      Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
926          getAssignmentManager().hasRegionsInTransition());
927
928       // Moving. It's possible that we don't have all the regions online at this point, so
929      //  the test depends only on the region we're looking at.
930      LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
931      TEST_UTIL.getAdmin().move(
932          toMove.getRegionInfo().getEncodedNameAsBytes(),
933          destServerName.getServerName().getBytes()
934      );
935
936      while (destServer.getOnlineRegion(regionName) == null ||
937          destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
938          curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
939          master.getAssignmentManager().hasRegionsInTransition()) {
940        // wait for the move to be finished
941        Thread.sleep(1);
942      }
943
944      LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
945
946      // Check our new state.
947      Assert.assertNull(curServer.getOnlineRegion(regionName));
948      Assert.assertNotNull(destServer.getOnlineRegion(regionName));
949      Assert.assertFalse(destServer.getRegionsInTransitionInRS()
950          .containsKey(encodedRegionNameBytes));
951      Assert.assertFalse(curServer.getRegionsInTransitionInRS()
952          .containsKey(encodedRegionNameBytes));
953
954
955       // Cache was NOT updated and points to the wrong server
956      Assert.assertFalse(
957          conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation()
958              .getPort() == destServerName.getPort());
959
960      // Hijack the number of retry to fail after 2 tries
961      final int prevNumRetriesVal = setNumTries(conn, 2);
962
963      Put put3 = new Put(ROW_X);
964      put3.addColumn(FAM_NAM, ROW_X, ROW_X);
965      Put put4 = new Put(otherRow);
966      put4.addColumn(FAM_NAM, otherRow, otherRow);
967
968      // do multi
969      ArrayList<Put> actions = Lists.newArrayList(put4, put3);
970      table.batch(actions, null); // first should be a valid row,
971      // second we get RegionMovedException.
972
973      setNumTries(conn, prevNumRetriesVal);
974    } finally {
975      table.close();
976    }
977  }
978
979  @Test
980  public void testErrorBackoffTimeCalculation() throws Exception {
981    // TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not.
982    final long ANY_PAUSE = 100;
983    ServerName location = ServerName.valueOf("127.0.0.1", 1, 0);
984    ServerName diffLocation = ServerName.valueOf("127.0.0.1", 2, 0);
985
986    ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
987    EnvironmentEdgeManager.injectEdge(timeMachine);
988    try {
989      long largeAmountOfTime = ANY_PAUSE * 1000;
990      ConnectionImplementation.ServerErrorTracker tracker =
991          new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100);
992
993      // The default backoff is 0.
994      assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
995
996      // Check some backoff values from HConstants sequence.
997      tracker.reportServerError(location);
998      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
999        tracker.calculateBackoffTime(location, ANY_PAUSE));
1000      tracker.reportServerError(location);
1001      tracker.reportServerError(location);
1002      tracker.reportServerError(location);
1003      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[3],
1004        tracker.calculateBackoffTime(location, ANY_PAUSE));
1005
1006      // All of this shouldn't affect backoff for different location.
1007      assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1008      tracker.reportServerError(diffLocation);
1009      assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
1010        tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1011
1012      // Check with different base.
1013      assertEqualsWithJitter(ANY_PAUSE * 2 * HConstants.RETRY_BACKOFF[3],
1014          tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
1015    } finally {
1016      EnvironmentEdgeManager.reset();
1017    }
1018  }
1019
1020  private static void assertEqualsWithJitter(long expected, long actual) {
1021    assertEqualsWithJitter(expected, actual, expected);
1022  }
1023
1024  private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) {
1025    assertTrue("Value not within jitter: " + expected + " vs " + actual,
1026      Math.abs(actual - expected) <= (0.01f * jitterBase));
1027  }
1028
1029  @Test
1030  public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException {
1031    Configuration config = new Configuration(TEST_UTIL.getConfiguration());
1032
1033    final TableName tableName = TableName.valueOf(name.getMethodName());
1034    TEST_UTIL.createTable(tableName, new byte[][] {FAM_NAM}).close();
1035
1036    Connection connection = ConnectionFactory.createConnection(config);
1037    Table table = connection.getTable(tableName);
1038
1039    // this will cache the meta location and table's region location
1040    table.get(new Get(Bytes.toBytes("foo")));
1041
1042    // restart HBase
1043    TEST_UTIL.shutdownMiniHBaseCluster();
1044    TEST_UTIL.restartHBaseCluster(2);
1045    // this should be able to discover new locations for meta and table's region
1046    table.get(new Get(Bytes.toBytes("foo")));
1047    TEST_UTIL.deleteTable(tableName);
1048    table.close();
1049    connection.close();
1050  }
1051
1052  @Test
1053  public void testLocateRegionsWithRegionReplicas() throws IOException {
1054    int regionReplication = 3;
1055    byte[] family = Bytes.toBytes("cf");
1056    TableName tableName = TableName.valueOf(name.getMethodName());
1057
1058    // Create a table with region replicas
1059    TableDescriptorBuilder builder = TableDescriptorBuilder
1060      .newBuilder(tableName)
1061      .setRegionReplication(regionReplication)
1062      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
1063    TEST_UTIL.getAdmin().createTable(builder.build());
1064
1065    try (ConnectionImplementation con = (ConnectionImplementation) ConnectionFactory.
1066      createConnection(TEST_UTIL.getConfiguration())) {
1067      // Get locations of the regions of the table
1068      List<HRegionLocation> locations = con.locateRegions(tableName, false, false);
1069
1070      // The size of the returned locations should be 3
1071      assertEquals(regionReplication, locations.size());
1072
1073      // The replicaIds of the returned locations should be 0, 1 and 2
1074      Set<Integer> expectedReplicaIds = IntStream.range(0, regionReplication).
1075        boxed().collect(Collectors.toSet());
1076      for (HRegionLocation location : locations) {
1077        assertTrue(expectedReplicaIds.remove(location.getRegion().getReplicaId()));
1078      }
1079    } finally {
1080      TEST_UTIL.deleteTable(tableName);
1081    }
1082  }
1083}