001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import org.apache.hadoop.hbase.Cell; 023import org.apache.hadoop.hbase.CellUtil; 024import org.apache.hadoop.hbase.client.Result; 025import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 026import org.apache.hadoop.hbase.util.Bytes; 027import org.apache.hadoop.mapred.JobConf; 028import org.apache.hadoop.mapred.MapReduceBase; 029import org.apache.hadoop.mapred.OutputCollector; 030import org.apache.hadoop.mapred.Reporter; 031import org.apache.yetus.audience.InterfaceAudience; 032 033/** 034 * Extract grouping columns from input record 035 */ 036@InterfaceAudience.Public 037public class GroupingTableMap extends MapReduceBase 038 implements TableMap<ImmutableBytesWritable, Result> { 039 040 /** 041 * JobConf parameter to specify the columns used to produce the key passed to collect from the map 042 * phase 043 */ 044 public static final String GROUP_COLUMNS = "hbase.mapred.groupingtablemap.columns"; 045 046 protected byte[][] columns; 047 048 /** 049 * Use this before submitting a TableMap job. It will appropriately set up the JobConf. 050 * @param table table to be processed 051 * @param columns space separated list of columns to fetch 052 * @param groupColumns space separated list of columns used to form the key used in collect 053 * @param mapper map class 054 * @param job job configuration object 055 */ 056 @SuppressWarnings("unchecked") 057 public static void initJob(String table, String columns, String groupColumns, 058 Class<? extends TableMap> mapper, JobConf job) { 059 060 TableMapReduceUtil.initTableMapJob(table, columns, mapper, ImmutableBytesWritable.class, 061 Result.class, job); 062 job.set(GROUP_COLUMNS, groupColumns); 063 } 064 065 @Override 066 public void configure(JobConf job) { 067 super.configure(job); 068 String[] cols = job.get(GROUP_COLUMNS, "").split(" "); 069 columns = new byte[cols.length][]; 070 for (int i = 0; i < cols.length; i++) { 071 columns[i] = Bytes.toBytes(cols[i]); 072 } 073 } 074 075 /** 076 * Extract the grouping columns from value to construct a new key. Pass the new key and value to 077 * reduce. If any of the grouping columns are not found in the value, the record is skipped. 078 */ 079 public void map(ImmutableBytesWritable key, Result value, 080 OutputCollector<ImmutableBytesWritable, Result> output, Reporter reporter) throws IOException { 081 082 byte[][] keyVals = extractKeyValues(value); 083 if (keyVals != null) { 084 ImmutableBytesWritable tKey = createGroupKey(keyVals); 085 output.collect(tKey, value); 086 } 087 } 088 089 /** 090 * Extract columns values from the current record. This method returns null if any of the columns 091 * are not found. Override this method if you want to deal with nulls differently. 092 * @return array of byte values 093 */ 094 protected byte[][] extractKeyValues(Result r) { 095 byte[][] keyVals = null; 096 ArrayList<byte[]> foundList = new ArrayList<>(); 097 int numCols = columns.length; 098 if (numCols > 0) { 099 for (Cell value : r.listCells()) { 100 byte[] column = 101 CellUtil.makeColumn(CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value)); 102 for (int i = 0; i < numCols; i++) { 103 if (Bytes.equals(column, columns[i])) { 104 foundList.add(CellUtil.cloneValue(value)); 105 break; 106 } 107 } 108 } 109 if (foundList.size() == numCols) { 110 keyVals = foundList.toArray(new byte[numCols][]); 111 } 112 } 113 return keyVals; 114 } 115 116 /** 117 * Create a key by concatenating multiple column values. Override this function in order to 118 * produce different types of keys. 119 * @return key generated by concatenating multiple column values 120 */ 121 protected ImmutableBytesWritable createGroupKey(byte[][] vals) { 122 if (vals == null) { 123 return null; 124 } 125 StringBuilder sb = new StringBuilder(); 126 for (int i = 0; i < vals.length; i++) { 127 if (i > 0) { 128 sb.append(" "); 129 } 130 sb.append(Bytes.toString(vals[i])); 131 } 132 return new ImmutableBytesWritable(Bytes.toBytesBinary(sb.toString())); 133 } 134}