001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import org.apache.hadoop.hbase.Cell;
023import org.apache.hadoop.hbase.CellUtil;
024import org.apache.hadoop.hbase.client.Result;
025import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
026import org.apache.hadoop.hbase.util.Bytes;
027import org.apache.hadoop.mapred.JobConf;
028import org.apache.hadoop.mapred.MapReduceBase;
029import org.apache.hadoop.mapred.OutputCollector;
030import org.apache.hadoop.mapred.Reporter;
031import org.apache.yetus.audience.InterfaceAudience;
032
033/**
034 * Extract grouping columns from input record
035 */
036@InterfaceAudience.Public
037public class GroupingTableMap extends MapReduceBase
038  implements TableMap<ImmutableBytesWritable, Result> {
039
040  /**
041   * JobConf parameter to specify the columns used to produce the key passed to collect from the map
042   * phase
043   */
044  public static final String GROUP_COLUMNS = "hbase.mapred.groupingtablemap.columns";
045
046  protected byte[][] columns;
047
048  /**
049   * Use this before submitting a TableMap job. It will appropriately set up the JobConf.
050   * @param table        table to be processed
051   * @param columns      space separated list of columns to fetch
052   * @param groupColumns space separated list of columns used to form the key used in collect
053   * @param mapper       map class
054   * @param job          job configuration object
055   */
056  @SuppressWarnings("unchecked")
057  public static void initJob(String table, String columns, String groupColumns,
058    Class<? extends TableMap> mapper, JobConf job) {
059
060    TableMapReduceUtil.initTableMapJob(table, columns, mapper, ImmutableBytesWritable.class,
061      Result.class, job);
062    job.set(GROUP_COLUMNS, groupColumns);
063  }
064
065  @Override
066  public void configure(JobConf job) {
067    super.configure(job);
068    String[] cols = job.get(GROUP_COLUMNS, "").split(" ");
069    columns = new byte[cols.length][];
070    for (int i = 0; i < cols.length; i++) {
071      columns[i] = Bytes.toBytes(cols[i]);
072    }
073  }
074
075  /**
076   * Extract the grouping columns from value to construct a new key. Pass the new key and value to
077   * reduce. If any of the grouping columns are not found in the value, the record is skipped.
078   */
079  public void map(ImmutableBytesWritable key, Result value,
080    OutputCollector<ImmutableBytesWritable, Result> output, Reporter reporter) throws IOException {
081
082    byte[][] keyVals = extractKeyValues(value);
083    if (keyVals != null) {
084      ImmutableBytesWritable tKey = createGroupKey(keyVals);
085      output.collect(tKey, value);
086    }
087  }
088
089  /**
090   * Extract columns values from the current record. This method returns null if any of the columns
091   * are not found. Override this method if you want to deal with nulls differently.
092   * @return array of byte values
093   */
094  protected byte[][] extractKeyValues(Result r) {
095    byte[][] keyVals = null;
096    ArrayList<byte[]> foundList = new ArrayList<>();
097    int numCols = columns.length;
098    if (numCols > 0) {
099      for (Cell value : r.listCells()) {
100        byte[] column =
101          CellUtil.makeColumn(CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value));
102        for (int i = 0; i < numCols; i++) {
103          if (Bytes.equals(column, columns[i])) {
104            foundList.add(CellUtil.cloneValue(value));
105            break;
106          }
107        }
108      }
109      if (foundList.size() == numCols) {
110        keyVals = foundList.toArray(new byte[numCols][]);
111      }
112    }
113    return keyVals;
114  }
115
116  /**
117   * Create a key by concatenating multiple column values. Override this function in order to
118   * produce different types of keys.
119   * @return key generated by concatenating multiple column values
120   */
121  protected ImmutableBytesWritable createGroupKey(byte[][] vals) {
122    if (vals == null) {
123      return null;
124    }
125    StringBuilder sb = new StringBuilder();
126    for (int i = 0; i < vals.length; i++) {
127      if (i > 0) {
128        sb.append(" ");
129      }
130      sb.append(Bytes.toString(vals[i]));
131    }
132    return new ImmutableBytesWritable(Bytes.toBytesBinary(sb.toString()));
133  }
134}