001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapred;
020
021import java.io.IOException;
022import java.util.ArrayList;
023
024import org.apache.yetus.audience.InterfaceAudience;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.client.Result;
028import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.mapred.JobConf;
031import org.apache.hadoop.mapred.MapReduceBase;
032import org.apache.hadoop.mapred.OutputCollector;
033import org.apache.hadoop.mapred.Reporter;
034
035
036/**
037 * Extract grouping columns from input record
038 */
039@InterfaceAudience.Public
040public class GroupingTableMap
041extends MapReduceBase
042implements TableMap<ImmutableBytesWritable,Result> {
043
044  /**
045   * JobConf parameter to specify the columns used to produce the key passed to
046   * collect from the map phase
047   */
048  public static final String GROUP_COLUMNS =
049    "hbase.mapred.groupingtablemap.columns";
050
051  protected byte [][] columns;
052
053  /**
054   * Use this before submitting a TableMap job. It will appropriately set up the
055   * JobConf.
056   *
057   * @param table table to be processed
058   * @param columns space separated list of columns to fetch
059   * @param groupColumns space separated list of columns used to form the key
060   * used in collect
061   * @param mapper map class
062   * @param job job configuration object
063   */
064  @SuppressWarnings("unchecked")
065  public static void initJob(String table, String columns, String groupColumns,
066    Class<? extends TableMap> mapper, JobConf job) {
067
068    TableMapReduceUtil.initTableMapJob(table, columns, mapper,
069        ImmutableBytesWritable.class, Result.class, job);
070    job.set(GROUP_COLUMNS, groupColumns);
071  }
072
073  @Override
074  public void configure(JobConf job) {
075    super.configure(job);
076    String[] cols = job.get(GROUP_COLUMNS, "").split(" ");
077    columns = new byte[cols.length][];
078    for(int i = 0; i < cols.length; i++) {
079      columns[i] = Bytes.toBytes(cols[i]);
080    }
081  }
082
083  /**
084   * Extract the grouping columns from value to construct a new key.
085   *
086   * Pass the new key and value to reduce.
087   * If any of the grouping columns are not found in the value, the record is skipped.
088   * @param key
089   * @param value
090   * @param output
091   * @param reporter
092   * @throws IOException
093   */
094  public void map(ImmutableBytesWritable key, Result value,
095      OutputCollector<ImmutableBytesWritable,Result> output,
096      Reporter reporter) throws IOException {
097
098    byte[][] keyVals = extractKeyValues(value);
099    if(keyVals != null) {
100      ImmutableBytesWritable tKey = createGroupKey(keyVals);
101      output.collect(tKey, value);
102    }
103  }
104
105  /**
106   * Extract columns values from the current record. This method returns
107   * null if any of the columns are not found.
108   *
109   * Override this method if you want to deal with nulls differently.
110   *
111   * @param r
112   * @return array of byte values
113   */
114  protected byte[][] extractKeyValues(Result r) {
115    byte[][] keyVals = null;
116    ArrayList<byte[]> foundList = new ArrayList<>();
117    int numCols = columns.length;
118    if (numCols > 0) {
119      for (Cell value: r.listCells()) {
120        byte [] column = CellUtil.makeColumn(CellUtil.cloneFamily(value),
121            CellUtil.cloneQualifier(value));
122        for (int i = 0; i < numCols; i++) {
123          if (Bytes.equals(column, columns[i])) {
124            foundList.add(CellUtil.cloneValue(value));
125            break;
126          }
127        }
128      }
129      if(foundList.size() == numCols) {
130        keyVals = foundList.toArray(new byte[numCols][]);
131      }
132    }
133    return keyVals;
134  }
135
136  /**
137   * Create a key by concatenating multiple column values.
138   * Override this function in order to produce different types of keys.
139   *
140   * @param vals
141   * @return key generated by concatenating multiple column values
142   */
143  protected ImmutableBytesWritable createGroupKey(byte[][] vals) {
144    if(vals == null) {
145      return null;
146    }
147    StringBuilder sb =  new StringBuilder();
148    for(int i = 0; i < vals.length; i++) {
149      if(i > 0) {
150        sb.append(" ");
151      }
152      sb.append(Bytes.toString(vals[i]));
153    }
154    return new ImmutableBytesWritable(Bytes.toBytesBinary(sb.toString()));
155  }
156}