001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.junit.Assert.fail;
021
022import java.io.IOException;
023import org.apache.hadoop.hbase.HBaseClassTestRule;
024import org.apache.hadoop.hbase.HBaseTestingUtil;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
027import org.apache.hadoop.hbase.testclassification.MediumTests;
028import org.apache.hadoop.mapred.JobConf;
029import org.apache.hadoop.mapred.RecordWriter;
030import org.junit.AfterClass;
031import org.junit.Before;
032import org.junit.BeforeClass;
033import org.junit.ClassRule;
034import org.junit.Test;
035import org.junit.experimental.categories.Category;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * Spark creates many instances of TableOutputFormat within a single process. We need to make sure
041 * we can have many instances and not leak connections. This test creates a few TableOutputFormats
042 * and shouldn't fail due to ZK connection exhaustion.
043 */
044@Category(MediumTests.class)
045public class TestTableOutputFormatConnectionExhaust {
046
047  @ClassRule
048  public static final HBaseClassTestRule CLASS_RULE =
049    HBaseClassTestRule.forClass(TestTableOutputFormatConnectionExhaust.class);
050
051  private static final Logger LOG =
052    LoggerFactory.getLogger(TestTableOutputFormatConnectionExhaust.class);
053
054  private final static HBaseTestingUtil UTIL = new HBaseTestingUtil();
055  static final String TABLE = "TestTableOutputFormatConnectionExhaust";
056  static final String FAMILY = "family";
057
058  @BeforeClass
059  public static void beforeClass() throws Exception {
060    // Default in ZookeeperMiniCluster is 1000, setting artificially low to trigger exhaustion.
061    // need min of 7 to properly start the default mini HBase cluster
062    UTIL.getConfiguration().setInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 10);
063    UTIL.startMiniCluster();
064  }
065
066  @AfterClass
067  public static void afterClass() throws Exception {
068    UTIL.shutdownMiniCluster();
069  }
070
071  @Before
072  public void before() throws IOException {
073    LOG.info("before");
074    UTIL.ensureSomeRegionServersAvailable(1);
075    LOG.info("before done");
076  }
077
078  /**
079   * Open and close a TableOutputFormat. The closing the RecordWriter should release HBase
080   * Connection (ZK) resources, and will throw exception if they are exhausted.
081   */
082  static void openCloseTableOutputFormat(int iter) throws IOException {
083    LOG.info("Instantiating TableOutputFormat connection  " + iter);
084    JobConf conf = new JobConf();
085    conf.addResource(UTIL.getConfiguration());
086    conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE);
087    TableMapReduceUtil.initTableMapJob(TABLE, FAMILY, TableMap.class, ImmutableBytesWritable.class,
088      ImmutableBytesWritable.class, conf);
089    TableOutputFormat tof = new TableOutputFormat();
090    RecordWriter rw = tof.getRecordWriter(null, conf, TABLE, null);
091    rw.close(null);
092  }
093
094  @Test
095  public void testConnectionExhaustion() throws IOException {
096    int MAX_INSTANCES = 5; // fails on iteration 3 if zk connections leak
097    for (int i = 0; i < MAX_INSTANCES; i++) {
098      final int iter = i;
099      try {
100        openCloseTableOutputFormat(iter);
101      } catch (Exception e) {
102        LOG.error("Exception encountered", e);
103        fail("Failed on iteration " + i);
104      }
105    }
106  }
107
108}