001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.util.Set;
022
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.HBaseConfiguration;
026import org.apache.hadoop.hbase.IntegrationTestBase;
027import org.apache.hadoop.hbase.IntegrationTestingUtility;
028import org.apache.hadoop.hbase.testclassification.IntegrationTests;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.util.Bytes;
031import org.apache.hadoop.util.ToolRunner;
032import org.junit.After;
033import org.junit.Before;
034import org.junit.experimental.categories.Category;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * An integration test to test {@link TableSnapshotInputFormat} which enables
040 * reading directly from snapshot files without going through hbase servers.
041 *
042 * This test creates a table and loads the table with the rows ranging from
043 * 'aaa' to 'zzz', and for each row, sets the columns f1:(null) and f2:(null) to be
044 * the the same as the row value.
045 * <pre>
046 * aaa, f1: => aaa
047 * aaa, f2: => aaa
048 * aab, f1: => aab
049 * ....
050 * zzz, f2: => zzz
051 * </pre>
052 *
053 * Then the test creates a snapshot from this table, and overrides the values in the original
054 * table with values 'after_snapshot_value'. The test, then runs a mapreduce job over the snapshot
055 * with a scan start row 'bbb' and stop row 'yyy'. The data is saved in a single reduce output
056 * file, and
057 * inspected later to verify that the MR job has seen all the values from the snapshot.
058 *
059 * <p> These parameters can be used to configure the job:
060 * <br>"IntegrationTestTableSnapshotInputFormat.table" =&gt; the name of the table
061 * <br>"IntegrationTestTableSnapshotInputFormat.snapshot" =&gt; the name of the snapshot
062 * <br>"IntegrationTestTableSnapshotInputFormat.numRegions" =&gt; number of regions in the table
063 * to be created (default, 32).
064 * <br>"IntegrationTestTableSnapshotInputFormat.tableDir" =&gt; temporary directory to restore the
065 * snapshot files
066 *
067 */
068@Category(IntegrationTests.class)
069// Not runnable as a unit test. See TestTableSnapshotInputFormat
070public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase {
071  private static final Logger LOG =
072      LoggerFactory.getLogger(IntegrationTestTableSnapshotInputFormat.class);
073
074  private static final String TABLE_NAME_KEY = "IntegrationTestTableSnapshotInputFormat.table";
075  private static final String DEFAULT_TABLE_NAME = "IntegrationTestTableSnapshotInputFormat";
076
077  private static final String SNAPSHOT_NAME_KEY =
078      "IntegrationTestTableSnapshotInputFormat.snapshot";
079  private static final String NUM_REGIONS_KEY =
080      "IntegrationTestTableSnapshotInputFormat.numRegions";
081
082  private static final String MR_IMPLEMENTATION_KEY =
083    "IntegrationTestTableSnapshotInputFormat.API";
084  private static final String MAPRED_IMPLEMENTATION = "mapred";
085  private static final String MAPREDUCE_IMPLEMENTATION = "mapreduce";
086
087  private static final int DEFAULT_NUM_REGIONS = 32;
088  private static final String TABLE_DIR_KEY = "IntegrationTestTableSnapshotInputFormat.tableDir";
089
090  private static final byte[] START_ROW = Bytes.toBytes("bbb");
091  private static final byte[] END_ROW = Bytes.toBytes("yyy");
092
093  // mapred API missing feature pairity with mapreduce. See comments in
094  // mapred.TestTableSnapshotInputFormat
095  private static final byte[] MAPRED_START_ROW = Bytes.toBytes("aaa");
096  private static final byte[] MAPRED_END_ROW = Bytes.toBytes("zz{"); // 'z' + 1 => '{'
097
098  private IntegrationTestingUtility util;
099
100  @Override
101  public void setConf(Configuration conf) {
102    super.setConf(conf);
103    util = getTestingUtil(conf);
104  }
105
106  @Override
107  @Before
108  public void setUp() throws Exception {
109    super.setUp();
110    util = getTestingUtil(getConf());
111    util.initializeCluster(1);
112    this.setConf(util.getConfiguration());
113  }
114
115  @Override
116  @After
117  public void cleanUp() throws Exception {
118    util.restoreCluster();
119  }
120
121  @Override
122  public void setUpCluster() throws Exception {
123  }
124
125  @Override
126  public int runTestFromCommandLine() throws Exception {
127    Configuration conf = getConf();
128    TableName tableName = TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
129    String snapshotName = conf.get(SNAPSHOT_NAME_KEY, tableName.getQualifierAsString()
130      + "_snapshot_" + System.currentTimeMillis());
131    int numRegions = conf.getInt(NUM_REGIONS_KEY, DEFAULT_NUM_REGIONS);
132    String tableDirStr = conf.get(TABLE_DIR_KEY);
133    Path tableDir;
134    if (tableDirStr == null) {
135      tableDir = util.getDataTestDirOnTestFS(tableName.getQualifierAsString());
136    } else {
137      tableDir = new Path(tableDirStr);
138    }
139
140    final String mr = conf.get(MR_IMPLEMENTATION_KEY, MAPREDUCE_IMPLEMENTATION);
141    if (mr.equalsIgnoreCase(MAPREDUCE_IMPLEMENTATION)) {
142      /*
143       * We create the table using HBaseAdmin#createTable(), which will create the table
144       * with desired number of regions. We pass bbb as startKey and yyy as endKey, so if
145       * desiredNumRegions is > 2, we create regions empty - bbb and yyy - empty, and we
146       * create numRegions - 2 regions between bbb - yyy. The test uses a Scan with startRow
147       * bbb and endRow yyy, so, we expect the first and last region to be filtered out in
148       * the input format, and we expect numRegions - 2 splits between bbb and yyy.
149       */
150      LOG.debug("Running job with mapreduce API.");
151      int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions;
152
153      org.apache.hadoop.hbase.mapreduce.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
154        tableName, snapshotName, START_ROW, END_ROW, tableDir, numRegions, 1,
155        expectedNumSplits, false);
156    } else if (mr.equalsIgnoreCase(MAPRED_IMPLEMENTATION)) {
157      /*
158       * Similar considerations to above. The difference is that mapred API does not support
159       * specifying start/end rows (or a scan object at all). Thus the omission of first and
160       * last regions are not performed. See comments in mapred.TestTableSnapshotInputFormat
161       * for details of how that test works around the problem. This feature should be added
162       * in follow-on work.
163       */
164      LOG.debug("Running job with mapred API.");
165      int expectedNumSplits = numRegions;
166
167      org.apache.hadoop.hbase.mapred.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
168        tableName, snapshotName, MAPRED_START_ROW, MAPRED_END_ROW, tableDir, numRegions, 1,
169        expectedNumSplits, false);
170    } else {
171      throw new IllegalArgumentException("Unrecognized mapreduce implementation: " + mr +".");
172    }
173
174    return 0;
175  }
176
177  @Override // Chaos Monkey is not intended to be run with this test
178  public TableName getTablename() {
179    return null;
180  }
181
182  @Override // Chaos Monkey is not inteded to be run with this test
183  protected Set<String> getColumnFamilies() { return null; }
184
185  public static void main(String[] args) throws Exception {
186    Configuration conf = HBaseConfiguration.create();
187    IntegrationTestingUtility.setUseDistributedCluster(conf);
188    int ret = ToolRunner.run(conf, new IntegrationTestTableSnapshotInputFormat(), args);
189    System.exit(ret);
190  }
191
192}