001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.hadoop.hbase.HBaseConfiguration;
022import org.apache.hadoop.hbase.IntegrationTestingUtility;
023import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
024import org.apache.hadoop.hbase.testclassification.IntegrationTests;
025import org.apache.hadoop.util.ToolRunner;
026import org.junit.Test;
027import org.junit.experimental.categories.Category;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * Test Bulk Load and MR on a distributed cluster. With FileBased StorefileTracker enabled. It
033 * starts an MR job that creates linked chains The format of rows is like this: Row Key -> Long L:<<
034 * Chain Id >> -> Row Key of the next link in the chain S:<< Chain Id >> -> The step in the chain
035 * that his link is. D:<< Chain Id >> -> Random Data. All chains start on row 0. All rk's are > 0.
036 * After creating the linked lists they are walked over using a TableMapper based Mapreduce Job.
037 * There are a few options exposed: hbase.IntegrationTestBulkLoad.chainLength The number of rows
038 * that will be part of each and every chain. hbase.IntegrationTestBulkLoad.numMaps The number of
039 * mappers that will be run. Each mapper creates on linked list chain.
040 * hbase.IntegrationTestBulkLoad.numImportRounds How many jobs will be run to create linked lists.
041 * hbase.IntegrationTestBulkLoad.tableName The name of the table.
042 * hbase.IntegrationTestBulkLoad.replicaCount How many region replicas to configure for the table
043 * under test.
044 */
045@Category(IntegrationTests.class)
046public class IntegrationTestFileBasedSFTBulkLoad extends IntegrationTestBulkLoad {
047
048  private static final Logger LOG =
049    LoggerFactory.getLogger(IntegrationTestFileBasedSFTBulkLoad.class);
050
051  private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps";
052  private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
053  private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount";
054  private static int NUM_REPLICA_COUNT_DEFAULT = 1;
055
056  @Test
057  public void testFileBasedSFTBulkLoad() throws Exception {
058    super.testBulkLoad();
059  }
060
061  @Override
062  public void setUpCluster() throws Exception {
063    util = getTestingUtil(getConf());
064    util.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL,
065      "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
066    util.initializeCluster(1);
067    int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
068    if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
069      LOG.debug("Region Replicas enabled: " + replicaCount);
070    }
071
072    // Scale this up on a real cluster
073    if (util.isDistributedCluster()) {
074      util.getConfiguration().setIfUnset(NUM_MAPS_KEY,
075        Integer.toString(util.getAdmin().getRegionServers().size() * 10));
076      util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5");
077    } else {
078      util.startMiniMapReduceCluster();
079    }
080  }
081
082  public static void main(String[] args) throws Exception {
083    Configuration conf = HBaseConfiguration.create();
084    IntegrationTestingUtility.setUseDistributedCluster(conf);
085    int status = ToolRunner.run(conf, new IntegrationTestFileBasedSFTBulkLoad(), args);
086    System.exit(status);
087  }
088}