001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.NavigableMap;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellUtil;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.ResultScanner;
037import org.apache.hadoop.hbase.client.Scan;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.junit.AfterClass;
042import org.junit.BeforeClass;
043import org.junit.Test;
044import org.slf4j.Logger;
045
046/**
047 * A base class for a test Map/Reduce job over HBase tables. The map/reduce process we're testing
048 * on our tables is simple - take every row in the table, reverse the value of a particular cell,
049 * and write it back to the table. Implements common components between mapred and mapreduce
050 * implementations.
051 */
052public abstract class TestTableMapReduceBase {
053  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
054  protected static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest");
055  protected static final TableName TABLE_FOR_NEGATIVE_TESTS = TableName.valueOf("testfailuretable");
056  protected static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
057  protected static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
058
059  protected static final byte[][] columns = new byte[][] {
060    INPUT_FAMILY,
061    OUTPUT_FAMILY
062  };
063
064  /**
065   * Retrieve my logger instance.
066   */
067  protected abstract Logger getLog();
068
069  /**
070   * Handles API-specifics for setting up and executing the job.
071   */
072  protected abstract void runTestOnTable(Table table) throws IOException;
073
074  @BeforeClass
075  public static void beforeClass() throws Exception {
076    UTIL.startMiniCluster();
077    Table table =
078        UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY,
079            OUTPUT_FAMILY });
080    UTIL.loadTable(table, INPUT_FAMILY, false);
081    UTIL.createTable(TABLE_FOR_NEGATIVE_TESTS, new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY });
082  }
083
084  @AfterClass
085  public static void afterClass() throws Exception {
086    UTIL.deleteTable(TABLE_FOR_NEGATIVE_TESTS);
087    UTIL.shutdownMiniCluster();
088  }
089
090  /**
091   * Test a map/reduce against a multi-region table
092   * @throws IOException
093   */
094  @Test
095  public void testMultiRegionTable() throws IOException {
096    runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME));
097  }
098
099  @Test
100  public void testCombiner() throws IOException {
101    Configuration conf = new Configuration(UTIL.getConfiguration());
102    // force use of combiner for testing purposes
103    conf.setInt("mapreduce.map.combine.minspills", 1);
104    runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME));
105  }
106
107  /**
108   * Implements mapper logic for use across APIs.
109   */
110  protected static Put map(ImmutableBytesWritable key, Result value) throws IOException {
111    if (value.size() != 1) {
112      throw new IOException("There should only be one input column");
113    }
114    Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
115      cf = value.getMap();
116    if(!cf.containsKey(INPUT_FAMILY)) {
117      throw new IOException("Wrong input columns. Missing: '" +
118        Bytes.toString(INPUT_FAMILY) + "'.");
119    }
120
121    // Get the original value and reverse it
122
123    String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
124    StringBuilder newValue = new StringBuilder(originalValue);
125    newValue.reverse();
126
127    // Now set the value to be collected
128
129    Put outval = new Put(key.get());
130    outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
131    return outval;
132  }
133
134  protected void verify(TableName tableName) throws IOException {
135    Table table = UTIL.getConnection().getTable(tableName);
136    boolean verified = false;
137    long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
138    int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
139    for (int i = 0; i < numRetries; i++) {
140      try {
141        getLog().info("Verification attempt #" + i);
142        verifyAttempt(table);
143        verified = true;
144        break;
145      } catch (NullPointerException e) {
146        // If here, a cell was empty. Presume its because updates came in
147        // after the scanner had been opened. Wait a while and retry.
148        getLog().debug("Verification attempt failed: " + e.getMessage());
149      }
150      try {
151        Thread.sleep(pause);
152      } catch (InterruptedException e) {
153        // continue
154      }
155    }
156    assertTrue(verified);
157  }
158
159  /**
160   * Looks at every value of the mapreduce output and verifies that indeed
161   * the values have been reversed.
162   * @param table Table to scan.
163   * @throws IOException
164   * @throws NullPointerException if we failed to find a cell value
165   */
166  private void verifyAttempt(final Table table) throws IOException, NullPointerException {
167    Scan scan = new Scan();
168    TableInputFormat.addColumns(scan, columns);
169    ResultScanner scanner = table.getScanner(scan);
170    try {
171      Iterator<Result> itr = scanner.iterator();
172      assertTrue(itr.hasNext());
173      while(itr.hasNext()) {
174        Result r = itr.next();
175        if (getLog().isDebugEnabled()) {
176          if (r.size() > 2 ) {
177            throw new IOException("Too many results, expected 2 got " +
178              r.size());
179          }
180        }
181        byte[] firstValue = null;
182        byte[] secondValue = null;
183        int count = 0;
184         for(Cell kv : r.listCells()) {
185          if (count == 0) {
186            firstValue = CellUtil.cloneValue(kv);
187          }
188          if (count == 1) {
189            secondValue = CellUtil.cloneValue(kv);
190          }
191          count++;
192          if (count == 2) {
193            break;
194          }
195        }
196
197
198        if (firstValue == null) {
199          throw new NullPointerException(Bytes.toString(r.getRow()) +
200            ": first value is null");
201        }
202        String first = Bytes.toString(firstValue);
203
204        if (secondValue == null) {
205          throw new NullPointerException(Bytes.toString(r.getRow()) +
206            ": second value is null");
207        }
208        byte[] secondReversed = new byte[secondValue.length];
209        for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
210          secondReversed[i] = secondValue[j];
211        }
212        String second = Bytes.toString(secondReversed);
213
214        if (first.compareTo(second) != 0) {
215          if (getLog().isDebugEnabled()) {
216            getLog().debug("second key is not the reverse of first. row=" +
217                Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
218                ", second value=" + second);
219          }
220          fail();
221        }
222      }
223    } finally {
224      scanner.close();
225    }
226  }
227}