001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import org.apache.hadoop.conf.Configuration; 021import org.apache.hadoop.hbase.HBaseConfiguration; 022import org.apache.hadoop.hbase.IntegrationTestingUtility; 023import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 024import org.apache.hadoop.hbase.testclassification.IntegrationTests; 025import org.apache.hadoop.util.ToolRunner; 026import org.junit.Test; 027import org.junit.experimental.categories.Category; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * Test Bulk Load and MR on a distributed cluster. With FileBased StorefileTracker enabled. It 033 * starts an MR job that creates linked chains The format of rows is like this: Row Key -> Long L:<< 034 * Chain Id >> -> Row Key of the next link in the chain S:<< Chain Id >> -> The step in the chain 035 * that his link is. D:<< Chain Id >> -> Random Data. All chains start on row 0. All rk's are > 0. 036 * After creating the linked lists they are walked over using a TableMapper based Mapreduce Job. 037 * There are a few options exposed: hbase.IntegrationTestBulkLoad.chainLength The number of rows 038 * that will be part of each and every chain. hbase.IntegrationTestBulkLoad.numMaps The number of 039 * mappers that will be run. Each mapper creates on linked list chain. 040 * hbase.IntegrationTestBulkLoad.numImportRounds How many jobs will be run to create linked lists. 041 * hbase.IntegrationTestBulkLoad.tableName The name of the table. 042 * hbase.IntegrationTestBulkLoad.replicaCount How many region replicas to configure for the table 043 * under test. 044 */ 045@Category(IntegrationTests.class) 046public class IntegrationTestFileBasedSFTBulkLoad extends IntegrationTestBulkLoad { 047 048 private static final Logger LOG = 049 LoggerFactory.getLogger(IntegrationTestFileBasedSFTBulkLoad.class); 050 051 private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps"; 052 private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds"; 053 private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount"; 054 private static int NUM_REPLICA_COUNT_DEFAULT = 1; 055 056 @Test 057 public void testFileBasedSFTBulkLoad() throws Exception { 058 super.testBulkLoad(); 059 } 060 061 @Override 062 public void setUpCluster() throws Exception { 063 util = getTestingUtil(getConf()); 064 util.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL, 065 "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); 066 util.initializeCluster(1); 067 int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); 068 if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) { 069 LOG.debug("Region Replicas enabled: " + replicaCount); 070 } 071 072 // Scale this up on a real cluster 073 if (util.isDistributedCluster()) { 074 util.getConfiguration().setIfUnset(NUM_MAPS_KEY, 075 Integer.toString(util.getAdmin().getRegionServers().size() * 10)); 076 util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5"); 077 } else { 078 util.startMiniMapReduceCluster(); 079 } 080 } 081 082 public static void main(String[] args) throws Exception { 083 Configuration conf = HBaseConfiguration.create(); 084 IntegrationTestingUtility.setUseDistributedCluster(conf); 085 int status = ToolRunner.run(conf, new IntegrationTestFileBasedSFTBulkLoad(), args); 086 System.exit(status); 087 } 088}