001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapred; 020 021import java.io.IOException; 022import java.util.ArrayList; 023 024import org.apache.yetus.audience.InterfaceAudience; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.CellUtil; 027import org.apache.hadoop.hbase.client.Result; 028import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.hadoop.mapred.JobConf; 031import org.apache.hadoop.mapred.MapReduceBase; 032import org.apache.hadoop.mapred.OutputCollector; 033import org.apache.hadoop.mapred.Reporter; 034 035 036/** 037 * Extract grouping columns from input record 038 */ 039@InterfaceAudience.Public 040public class GroupingTableMap 041extends MapReduceBase 042implements TableMap<ImmutableBytesWritable,Result> { 043 044 /** 045 * JobConf parameter to specify the columns used to produce the key passed to 046 * collect from the map phase 047 */ 048 public static final String GROUP_COLUMNS = 049 "hbase.mapred.groupingtablemap.columns"; 050 051 protected byte [][] columns; 052 053 /** 054 * Use this before submitting a TableMap job. It will appropriately set up the 055 * JobConf. 056 * 057 * @param table table to be processed 058 * @param columns space separated list of columns to fetch 059 * @param groupColumns space separated list of columns used to form the key 060 * used in collect 061 * @param mapper map class 062 * @param job job configuration object 063 */ 064 @SuppressWarnings("unchecked") 065 public static void initJob(String table, String columns, String groupColumns, 066 Class<? extends TableMap> mapper, JobConf job) { 067 068 TableMapReduceUtil.initTableMapJob(table, columns, mapper, 069 ImmutableBytesWritable.class, Result.class, job); 070 job.set(GROUP_COLUMNS, groupColumns); 071 } 072 073 @Override 074 public void configure(JobConf job) { 075 super.configure(job); 076 String[] cols = job.get(GROUP_COLUMNS, "").split(" "); 077 columns = new byte[cols.length][]; 078 for(int i = 0; i < cols.length; i++) { 079 columns[i] = Bytes.toBytes(cols[i]); 080 } 081 } 082 083 /** 084 * Extract the grouping columns from value to construct a new key. 085 * 086 * Pass the new key and value to reduce. 087 * If any of the grouping columns are not found in the value, the record is skipped. 088 * @param key 089 * @param value 090 * @param output 091 * @param reporter 092 * @throws IOException 093 */ 094 public void map(ImmutableBytesWritable key, Result value, 095 OutputCollector<ImmutableBytesWritable,Result> output, 096 Reporter reporter) throws IOException { 097 098 byte[][] keyVals = extractKeyValues(value); 099 if(keyVals != null) { 100 ImmutableBytesWritable tKey = createGroupKey(keyVals); 101 output.collect(tKey, value); 102 } 103 } 104 105 /** 106 * Extract columns values from the current record. This method returns 107 * null if any of the columns are not found. 108 * 109 * Override this method if you want to deal with nulls differently. 110 * 111 * @param r 112 * @return array of byte values 113 */ 114 protected byte[][] extractKeyValues(Result r) { 115 byte[][] keyVals = null; 116 ArrayList<byte[]> foundList = new ArrayList<>(); 117 int numCols = columns.length; 118 if (numCols > 0) { 119 for (Cell value: r.listCells()) { 120 byte [] column = CellUtil.makeColumn(CellUtil.cloneFamily(value), 121 CellUtil.cloneQualifier(value)); 122 for (int i = 0; i < numCols; i++) { 123 if (Bytes.equals(column, columns[i])) { 124 foundList.add(CellUtil.cloneValue(value)); 125 break; 126 } 127 } 128 } 129 if(foundList.size() == numCols) { 130 keyVals = foundList.toArray(new byte[numCols][]); 131 } 132 } 133 return keyVals; 134 } 135 136 /** 137 * Create a key by concatenating multiple column values. 138 * Override this function in order to produce different types of keys. 139 * 140 * @param vals 141 * @return key generated by concatenating multiple column values 142 */ 143 protected ImmutableBytesWritable createGroupKey(byte[][] vals) { 144 if(vals == null) { 145 return null; 146 } 147 StringBuilder sb = new StringBuilder(); 148 for(int i = 0; i < vals.length; i++) { 149 if(i > 0) { 150 sb.append(" "); 151 } 152 sb.append(Bytes.toString(vals[i])); 153 } 154 return new ImmutableBytesWritable(Bytes.toBytesBinary(sb.toString())); 155 } 156}