001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.List;
026import java.util.Set;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.concurrent.atomic.AtomicInteger;
029import java.util.concurrent.atomic.AtomicReference;
030import java.util.stream.Collectors;
031import java.util.stream.IntStream;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HRegionLocation;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.Waiter;
041import org.apache.hadoop.hbase.client.coprocessor.Batch;
042import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
043import org.apache.hadoop.hbase.ipc.RpcClient;
044import org.apache.hadoop.hbase.testclassification.LargeTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
048import org.junit.After;
049import org.junit.AfterClass;
050import org.junit.Assert;
051import org.junit.BeforeClass;
052import org.junit.ClassRule;
053import org.junit.Rule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.junit.rules.TestName;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
061import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
062import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector.Level;
063
064import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
066
067/**
068 * This class is for testing {@link Connection}.
069 */
070@Category({ LargeTests.class })
071public class TestConnection {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075    HBaseClassTestRule.forClass(TestConnection.class);
076
077  private static final Logger LOG = LoggerFactory.getLogger(TestConnection.class);
078  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
079
080  private static final byte[] FAM_NAM = Bytes.toBytes("f");
081  private static final byte[] ROW = Bytes.toBytes("bbb");
082  private static final int RPC_RETRY = 5;
083
084  @Rule
085  public TestName name = new TestName();
086
087  @BeforeClass
088  public static void setUpBeforeClass() throws Exception {
089    ResourceLeakDetector.setLevel(Level.PARANOID);
090    TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
091    // Up the handlers; this test needs more than usual.
092    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
093    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
094    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 3);
095    TEST_UTIL.startMiniCluster(2);
096
097  }
098
099  @AfterClass
100  public static void tearDownAfterClass() throws Exception {
101    TEST_UTIL.shutdownMiniCluster();
102  }
103
104  @After
105  public void tearDown() throws IOException {
106    TEST_UTIL.getAdmin().balancerSwitch(true, true);
107  }
108
109  /**
110   * Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object
111   * @throws IOException Unable to construct admin
112   */
113  @Test
114  public void testAdminFactory() throws IOException {
115    Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
116    Admin admin = con1.getAdmin();
117    assertTrue(admin.getConnection() == con1);
118    assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration());
119    con1.close();
120  }
121
122  /**
123   * Test that we can handle connection close: it will trigger a retry, but the calls will finish.
124   */
125  @Test
126  public void testConnectionCloseAllowsInterrupt() throws Exception {
127    testConnectionClose(true);
128  }
129
130  @Test
131  public void testConnectionNotAllowsInterrupt() throws Exception {
132    testConnectionClose(false);
133  }
134
135  private void testConnectionClose(boolean allowsInterrupt) throws Exception {
136    TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
137    TEST_UTIL.createTable(tableName, FAM_NAM).close();
138
139    TEST_UTIL.getAdmin().balancerSwitch(false, true);
140
141    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
142    // We want to work on a separate connection.
143    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
144    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot
145    c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries.
146    c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
147    c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt);
148    // to avoid the client to be stuck when do the Get
149    c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 10000);
150    c2.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10000);
151    c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
152
153    Connection connection = ConnectionFactory.createConnection(c2);
154    final Table table = connection.getTable(tableName);
155
156    Put put = new Put(ROW);
157    put.addColumn(FAM_NAM, ROW, ROW);
158    table.put(put);
159
160    // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
161    final AtomicInteger step = new AtomicInteger(0);
162
163    final AtomicReference<Throwable> failed = new AtomicReference<>(null);
164    Thread t = new Thread("testConnectionCloseThread") {
165      @Override
166      public void run() {
167        int done = 0;
168        try {
169          step.set(1);
170          while (step.get() == 1) {
171            Get get = new Get(ROW);
172            table.get(get);
173            done++;
174            if (done % 100 == 0) {
175              LOG.info("done=" + done);
176            }
177            // without the sleep, will cause the exception for too many files in
178            // org.apache.hadoop.hdfs.server.datanode.DataXceiver
179            Thread.sleep(100);
180          }
181        } catch (Throwable t) {
182          failed.set(t);
183          LOG.error(t.toString(), t);
184        }
185        step.set(3);
186      }
187    };
188    t.start();
189    TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
190      @Override
191      public boolean evaluate() throws Exception {
192        return step.get() == 1;
193      }
194    });
195
196    ServerName sn;
197    try (RegionLocator rl = connection.getRegionLocator(tableName)) {
198      sn = rl.getRegionLocation(ROW).getServerName();
199    }
200
201    RpcClient rpcClient = ((AsyncConnectionImpl) connection.toAsyncConnection()).rpcClient;
202
203    LOG.info("Going to cancel connections. connection=" + connection.toString() + ", sn=" + sn);
204    for (int i = 0; i < 500; i++) {
205      rpcClient.cancelConnections(sn);
206      Thread.sleep(50);
207    }
208
209    step.compareAndSet(1, 2);
210    // The test may fail here if the thread doing the gets is stuck. The way to find
211    // out what's happening is to look for the thread named 'testConnectionCloseThread'
212    TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() {
213      @Override
214      public boolean evaluate() throws Exception {
215        return step.get() == 3;
216      }
217    });
218    table.close();
219    connection.close();
220    Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
221  }
222
223  /**
224   * Test that connection can become idle without breaking everything.
225   */
226  @Test
227  public void testConnectionIdle() throws Exception {
228    final TableName tableName = TableName.valueOf(name.getMethodName());
229    TEST_UTIL.createTable(tableName, FAM_NAM).close();
230    int idleTime = 20000;
231    boolean previousBalance = TEST_UTIL.getAdmin().balancerSwitch(false, true);
232
233    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
234    // We want to work on a separate connection.
235    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
236    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed
237    c2.setInt(RpcClient.IDLE_TIME, idleTime);
238
239    Connection connection = ConnectionFactory.createConnection(c2);
240    final Table table = connection.getTable(tableName);
241
242    Put put = new Put(ROW);
243    put.addColumn(FAM_NAM, ROW, ROW);
244    table.put(put);
245
246    ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
247    mee.setValue(EnvironmentEdgeManager.currentTime());
248    EnvironmentEdgeManager.injectEdge(mee);
249    LOG.info("first get");
250    table.get(new Get(ROW));
251
252    LOG.info("first get - changing the time & sleeping");
253    mee.incValue(idleTime + 1000);
254    Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle.
255                        // 1500 = sleep time in RpcClient#waitForWork + a margin
256
257    LOG.info("second get - connection has been marked idle in the middle");
258    // To check that the connection actually became idle would need to read some private
259    // fields of RpcClient.
260    table.get(new Get(ROW));
261    mee.incValue(idleTime + 1000);
262
263    LOG.info("third get - connection is idle, but the reader doesn't know yet");
264    // We're testing here a special case:
265    // time limit reached BUT connection not yet reclaimed AND a new call.
266    // in this situation, we don't close the connection, instead we use it immediately.
267    // If we're very unlucky we can have a race condition in the test: the connection is already
268    // under closing when we do the get, so we have an exception, and we don't retry as the
269    // retry number is 1. The probability is very very low, and seems acceptable for now. It's
270    // a test issue only.
271    table.get(new Get(ROW));
272
273    LOG.info("we're done - time will change back");
274
275    table.close();
276
277    connection.close();
278    EnvironmentEdgeManager.reset();
279    TEST_UTIL.getAdmin().balancerSwitch(previousBalance, true);
280  }
281
282  @Test
283  public void testClosing() throws Exception {
284    Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
285    configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
286      String.valueOf(ThreadLocalRandom.current().nextInt()));
287
288    // as connection caching is going away, now we're just testing
289    // that closed connection does actually get closed.
290
291    Connection c1 = ConnectionFactory.createConnection(configuration);
292    Connection c2 = ConnectionFactory.createConnection(configuration);
293    // no caching, different connections
294    assertTrue(c1 != c2);
295
296    // closing independently
297    c1.close();
298    assertTrue(c1.isClosed());
299    assertFalse(c2.isClosed());
300
301    c2.close();
302    assertTrue(c2.isClosed());
303  }
304
305  /**
306   * Trivial test to verify that nobody messes with
307   * {@link ConnectionFactory#createConnection(Configuration)}
308   */
309  @Test
310  public void testCreateConnection() throws Exception {
311    Configuration configuration = TEST_UTIL.getConfiguration();
312    Connection c1 = ConnectionFactory.createConnection(configuration);
313    Connection c2 = ConnectionFactory.createConnection(configuration);
314    // created from the same configuration, yet they are different
315    assertTrue(c1 != c2);
316    assertTrue(c1.getConfiguration() == c2.getConfiguration());
317  }
318
319  /*
320   * ====> With MasterRegistry, connections cannot outlast the masters' lifetime.
321   * @Test public void testConnectionRideOverClusterRestart() throws IOException,
322   * InterruptedException { Configuration config = new Configuration(TEST_UTIL.getConfiguration());
323   * final TableName tableName = TableName.valueOf(name.getMethodName());
324   * TEST_UTIL.createTable(tableName, new byte[][] { FAM_NAM }).close(); Connection connection =
325   * ConnectionFactory.createConnection(config); Table table = connection.getTable(tableName); //
326   * this will cache the meta location and table's region location table.get(new
327   * Get(Bytes.toBytes("foo"))); // restart HBase TEST_UTIL.shutdownMiniHBaseCluster();
328   * TEST_UTIL.restartHBaseCluster(2); // this should be able to discover new locations for meta and
329   * table's region table.get(new Get(Bytes.toBytes("foo"))); TEST_UTIL.deleteTable(tableName);
330   * table.close(); connection.close(); }
331   */
332
333  @Test
334  public void testLocateRegionsWithRegionReplicas() throws IOException {
335    int regionReplication = 3;
336    byte[] family = Bytes.toBytes("cf");
337    TableName tableName = TableName.valueOf(name.getMethodName());
338
339    // Create a table with region replicas
340    TableDescriptorBuilder builder =
341      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(regionReplication)
342        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
343    TEST_UTIL.getAdmin().createTable(builder.build());
344
345    try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
346      RegionLocator locator = conn.getRegionLocator(tableName)) {
347      // Get locations of the regions of the table
348      List<HRegionLocation> locations = locator.getAllRegionLocations();
349
350      // The size of the returned locations should be 3
351      assertEquals(regionReplication, locations.size());
352
353      // The replicaIds of the returned locations should be 0, 1 and 2
354      Set<Integer> expectedReplicaIds =
355        IntStream.range(0, regionReplication).boxed().collect(Collectors.toSet());
356      for (HRegionLocation location : locations) {
357        assertTrue(expectedReplicaIds.remove(location.getRegion().getReplicaId()));
358      }
359    } finally {
360      TEST_UTIL.deleteTable(tableName);
361    }
362  }
363
364  @Test(expected = DoNotRetryIOException.class)
365  public void testClosedConnection() throws ServiceException, Throwable {
366    byte[] family = Bytes.toBytes("cf");
367    TableName tableName = TableName.valueOf(name.getMethodName());
368    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
369      .setCoprocessor(MultiRowMutationEndpoint.class.getName())
370      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
371    TEST_UTIL.getAdmin().createTable(builder.build());
372
373    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
374    // cache the location
375    try (Table table = conn.getTable(tableName)) {
376      table.get(new Get(Bytes.toBytes(0)));
377    } finally {
378      conn.close();
379    }
380    Batch.Call<MultiRowMutationService, MutateRowsResponse> callable = service -> {
381      throw new RuntimeException("Should not arrive here");
382    };
383    conn.getTable(tableName).coprocessorService(MultiRowMutationService.class,
384      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, callable);
385  }
386
387  // There is no assertion, but you need to confirm that there is no resource leak output from netty
388  @Test
389  public void testCancelConnectionMemoryLeak() throws IOException, InterruptedException {
390    TableName tableName = TableName.valueOf(name.getMethodName());
391    TEST_UTIL.createTable(tableName, FAM_NAM).close();
392    TEST_UTIL.getAdmin().balancerSwitch(false, true);
393    try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
394      Table table = connection.getTable(tableName)) {
395      table.get(new Get(Bytes.toBytes("1")));
396      ServerName sn = TEST_UTIL.getRSForFirstRegionInTable(tableName).getServerName();
397      RpcClient rpcClient = ((AsyncConnectionImpl) connection.toAsyncConnection()).rpcClient;
398      rpcClient.cancelConnections(sn);
399      Thread.sleep(1000);
400      System.gc();
401      Thread.sleep(1000);
402    }
403  }
404}