001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.junit.Assert.fail;
021
022import java.io.IOException;
023import org.apache.hadoop.hbase.HBaseClassTestRule;
024import org.apache.hadoop.hbase.HBaseTestingUtility;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
027import org.apache.hadoop.hbase.testclassification.MediumTests;
028import org.apache.hadoop.mapred.JobConf;
029import org.apache.hadoop.mapred.RecordWriter;
030import org.junit.AfterClass;
031import org.junit.Before;
032import org.junit.BeforeClass;
033import org.junit.ClassRule;
034import org.junit.Test;
035import org.junit.experimental.categories.Category;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * Spark creates many instances of TableOutputFormat within a single process.  We need to make
041 * sure we can have many instances and not leak connections.
042 *
043 * This test creates a few TableOutputFormats and shouldn't fail due to ZK connection exhaustion.
044 */
045@Category(MediumTests.class)
046public class TestTableOutputFormatConnectionExhaust {
047
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050      HBaseClassTestRule.forClass(TestTableOutputFormatConnectionExhaust.class);
051
052  private static final Logger LOG =
053      LoggerFactory.getLogger(TestTableOutputFormatConnectionExhaust.class);
054
055  private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
056  static final String TABLE = "TestTableOutputFormatConnectionExhaust";
057  static final String FAMILY = "family";
058
059  @BeforeClass
060  public static void beforeClass() throws Exception {
061    // Default in ZookeeperMiniCluster is 1000, setting artificially low to trigger exhaustion.
062    // need min of 7 to properly start the default mini HBase cluster
063    UTIL.getConfiguration().setInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 10);
064    UTIL.startMiniCluster();
065  }
066
067  @AfterClass
068  public static void afterClass() throws Exception {
069    UTIL.shutdownMiniCluster();
070  }
071
072  @Before
073  public void before() throws IOException {
074    LOG.info("before");
075    UTIL.ensureSomeRegionServersAvailable(1);
076    LOG.info("before done");
077  }
078
079  /**
080   * Open and close a TableOutputFormat.  The closing the RecordWriter should release HBase
081   * Connection (ZK) resources, and will throw exception if they are exhausted.
082   */
083  static void openCloseTableOutputFormat(int iter)  throws IOException {
084    LOG.info("Instantiating TableOutputFormat connection  " + iter);
085    JobConf conf = new JobConf();
086    conf.addResource(UTIL.getConfiguration());
087    conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE);
088    TableMapReduceUtil.initTableMapJob(TABLE, FAMILY, TableMap.class,
089        ImmutableBytesWritable.class, ImmutableBytesWritable.class, conf);
090    TableOutputFormat tof = new TableOutputFormat();
091    RecordWriter rw = tof.getRecordWriter(null, conf, TABLE, null);
092    rw.close(null);
093  }
094
095  @Test
096  public void testConnectionExhaustion() throws IOException {
097    int MAX_INSTANCES = 5; // fails on iteration 3 if zk connections leak
098    for (int i = 0; i < MAX_INSTANCES; i++) {
099      final int iter = i;
100      try {
101        openCloseTableOutputFormat(iter);
102      } catch (Exception e) {
103        LOG.error("Exception encountered", e);
104        fail("Failed on iteration " + i);
105      }
106    }
107  }
108
109}