001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.util.Set;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.fs.Path;
023import org.apache.hadoop.hbase.HBaseConfiguration;
024import org.apache.hadoop.hbase.IntegrationTestBase;
025import org.apache.hadoop.hbase.IntegrationTestingUtility;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.testclassification.IntegrationTests;
028import org.apache.hadoop.hbase.util.Bytes;
029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
030import org.apache.hadoop.util.ToolRunner;
031import org.junit.After;
032import org.junit.Before;
033import org.junit.experimental.categories.Category;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * An integration test to test {@link TableSnapshotInputFormat} which enables reading directly from
039 * snapshot files without going through hbase servers. This test creates a table and loads the table
040 * with the rows ranging from 'aaa' to 'zzz', and for each row, sets the columns f1:(null) and
041 * f2:(null) to be the the same as the row value.
042 *
043 * <pre>
044 * aaa, f1: => aaa
045 * aaa, f2: => aaa
046 * aab, f1: => aab
047 * ....
048 * zzz, f2: => zzz
049 * </pre>
050 *
051 * Then the test creates a snapshot from this table, and overrides the values in the original table
052 * with values 'after_snapshot_value'. The test, then runs a mapreduce job over the snapshot with a
053 * scan start row 'bbb' and stop row 'yyy'. The data is saved in a single reduce output file, and
054 * inspected later to verify that the MR job has seen all the values from the snapshot.
055 * <p>
056 * These parameters can be used to configure the job: <br>
057 * "IntegrationTestTableSnapshotInputFormat.table" =&gt; the name of the table <br>
058 * "IntegrationTestTableSnapshotInputFormat.snapshot" =&gt; the name of the snapshot <br>
059 * "IntegrationTestTableSnapshotInputFormat.numRegions" =&gt; number of regions in the table to be
060 * created (default, 32). <br>
061 * "IntegrationTestTableSnapshotInputFormat.tableDir" =&gt; temporary directory to restore the
062 * snapshot files
063 */
064@Category(IntegrationTests.class)
065// Not runnable as a unit test. See TestTableSnapshotInputFormat
066public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase {
067  private static final Logger LOG =
068    LoggerFactory.getLogger(IntegrationTestTableSnapshotInputFormat.class);
069
070  private static final String TABLE_NAME_KEY = "IntegrationTestTableSnapshotInputFormat.table";
071  private static final String DEFAULT_TABLE_NAME = "IntegrationTestTableSnapshotInputFormat";
072
073  private static final String SNAPSHOT_NAME_KEY =
074    "IntegrationTestTableSnapshotInputFormat.snapshot";
075  private static final String NUM_REGIONS_KEY =
076    "IntegrationTestTableSnapshotInputFormat.numRegions";
077
078  private static final String MR_IMPLEMENTATION_KEY = "IntegrationTestTableSnapshotInputFormat.API";
079  private static final String MAPRED_IMPLEMENTATION = "mapred";
080  private static final String MAPREDUCE_IMPLEMENTATION = "mapreduce";
081
082  private static final int DEFAULT_NUM_REGIONS = 32;
083  private static final String TABLE_DIR_KEY = "IntegrationTestTableSnapshotInputFormat.tableDir";
084
085  private static final byte[] START_ROW = Bytes.toBytes("bbb");
086  private static final byte[] END_ROW = Bytes.toBytes("yyy");
087
088  // mapred API missing feature pairity with mapreduce. See comments in
089  // mapred.TestTableSnapshotInputFormat
090  private static final byte[] MAPRED_START_ROW = Bytes.toBytes("aaa");
091  private static final byte[] MAPRED_END_ROW = Bytes.toBytes("zz{"); // 'z' + 1 => '{'
092
093  private IntegrationTestingUtility util;
094
095  @Override
096  public void setConf(Configuration conf) {
097    super.setConf(conf);
098    util = getTestingUtil(conf);
099  }
100
101  @Override
102  @Before
103  public void setUp() throws Exception {
104    super.setUp();
105    util = getTestingUtil(getConf());
106    util.initializeCluster(1);
107    this.setConf(util.getConfiguration());
108  }
109
110  @Override
111  @After
112  public void cleanUp() throws Exception {
113    util.restoreCluster();
114  }
115
116  @Override
117  public void setUpCluster() throws Exception {
118  }
119
120  @Override
121  public int runTestFromCommandLine() throws Exception {
122    Configuration conf = getConf();
123    TableName tableName = TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
124    String snapshotName = conf.get(SNAPSHOT_NAME_KEY,
125      tableName.getQualifierAsString() + "_snapshot_" + EnvironmentEdgeManager.currentTime());
126    int numRegions = conf.getInt(NUM_REGIONS_KEY, DEFAULT_NUM_REGIONS);
127    String tableDirStr = conf.get(TABLE_DIR_KEY);
128    Path tableDir;
129    if (tableDirStr == null) {
130      tableDir = util.getDataTestDirOnTestFS(tableName.getQualifierAsString());
131    } else {
132      tableDir = new Path(tableDirStr);
133    }
134
135    final String mr = conf.get(MR_IMPLEMENTATION_KEY, MAPREDUCE_IMPLEMENTATION);
136    if (mr.equalsIgnoreCase(MAPREDUCE_IMPLEMENTATION)) {
137      /*
138       * We create the table using HBaseAdmin#createTable(), which will create the table with
139       * desired number of regions. We pass bbb as startKey and yyy as endKey, so if
140       * desiredNumRegions is > 2, we create regions empty - bbb and yyy - empty, and we create
141       * numRegions - 2 regions between bbb - yyy. The test uses a Scan with startRow bbb and endRow
142       * yyy, so, we expect the first and last region to be filtered out in the input format, and we
143       * expect numRegions - 2 splits between bbb and yyy.
144       */
145      LOG.debug("Running job with mapreduce API.");
146      int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions;
147
148      org.apache.hadoop.hbase.mapreduce.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
149        tableName, snapshotName, START_ROW, END_ROW, tableDir, numRegions, 1, expectedNumSplits,
150        false);
151    } else if (mr.equalsIgnoreCase(MAPRED_IMPLEMENTATION)) {
152      /*
153       * Similar considerations to above. The difference is that mapred API does not support
154       * specifying start/end rows (or a scan object at all). Thus the omission of first and last
155       * regions are not performed. See comments in mapred.TestTableSnapshotInputFormat for details
156       * of how that test works around the problem. This feature should be added in follow-on work.
157       */
158      LOG.debug("Running job with mapred API.");
159      int expectedNumSplits = numRegions;
160
161      org.apache.hadoop.hbase.mapred.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
162        tableName, snapshotName, MAPRED_START_ROW, MAPRED_END_ROW, tableDir, numRegions, 1,
163        expectedNumSplits, false);
164    } else {
165      throw new IllegalArgumentException("Unrecognized mapreduce implementation: " + mr + ".");
166    }
167
168    return 0;
169  }
170
171  @Override // Chaos Monkey is not intended to be run with this test
172  public TableName getTablename() {
173    return null;
174  }
175
176  @Override // Chaos Monkey is not inteded to be run with this test
177  protected Set<String> getColumnFamilies() {
178    return null;
179  }
180
181  public static void main(String[] args) throws Exception {
182    Configuration conf = HBaseConfiguration.create();
183    IntegrationTestingUtility.setUseDistributedCluster(conf);
184    int ret = ToolRunner.run(conf, new IntegrationTestTableSnapshotInputFormat(), args);
185    System.exit(ret);
186  }
187
188}