001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import org.apache.hadoop.io.LongWritable;
021import org.apache.hadoop.io.Text;
022import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
023import org.apache.hadoop.hbase.client.Put;
024import org.apache.hadoop.hbase.client.Durability;
025import org.apache.hadoop.hbase.util.Bytes;
026import org.apache.hadoop.hbase.KeyValue;
027
028import java.io.IOException;
029
030/**
031 * Dummy mapper used for unit tests to verify that the mapper can be injected.
032 * This approach would be used if a custom transformation needed to be done after
033 * reading the input data before writing it to HFiles.
034 */
035public class TsvImporterCustomTestMapper extends TsvImporterMapper {
036
037  @Override
038  protected void setup(Context context) {
039    doSetup(context);
040  }
041
042  /**
043   * Convert a line of TSV text into an HBase table row after transforming the
044   * values by multiplying them by 3.
045   */
046  @Override
047  public void map(LongWritable offset, Text value, Context context)
048        throws IOException {
049    byte[] family = Bytes.toBytes("FAM");
050    final byte[][] qualifiers = { Bytes.toBytes("A"), Bytes.toBytes("B") };
051
052    // do some basic line parsing
053    byte[] lineBytes = value.getBytes();
054    String[] valueTokens = new String(lineBytes, "UTF-8").split("\u001b");
055
056    // create the rowKey and Put
057    ImmutableBytesWritable rowKey =
058      new ImmutableBytesWritable(Bytes.toBytes(valueTokens[0]));
059    Put put = new Put(rowKey.copyBytes());
060    put.setDurability(Durability.SKIP_WAL);
061
062    //The value should look like this: VALUE1 or VALUE2. Let's multiply
063    //the integer by 3
064    for(int i = 1; i < valueTokens.length; i++) {
065      String prefix = valueTokens[i].substring(0, "VALUE".length());
066      String suffix = valueTokens[i].substring("VALUE".length());
067      String newValue = prefix + Integer.parseInt(suffix) * 3;
068
069      KeyValue kv = new KeyValue(rowKey.copyBytes(), family,
070          qualifiers[i-1], Bytes.toBytes(newValue));
071      put.add(kv);
072    }
073
074    try {
075      context.write(rowKey, put);
076    } catch (InterruptedException e) {
077      e.printStackTrace();
078    }
079  }
080}