001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.IOException; 022import java.util.ArrayList; 023 024import org.apache.yetus.audience.InterfaceAudience; 025import org.apache.hadoop.conf.Configurable; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.client.Result; 030import org.apache.hadoop.hbase.client.Scan; 031import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.mapreduce.Job; 034 035/** 036 * Extract grouping columns from input record. 037 */ 038@InterfaceAudience.Public 039public class GroupingTableMapper 040extends TableMapper<ImmutableBytesWritable,Result> implements Configurable { 041 042 /** 043 * JobConf parameter to specify the columns used to produce the key passed to 044 * collect from the map phase. 045 */ 046 public static final String GROUP_COLUMNS = 047 "hbase.mapred.groupingtablemap.columns"; 048 049 /** The grouping columns. */ 050 protected byte [][] columns; 051 /** The current configuration. */ 052 private Configuration conf = null; 053 054 /** 055 * Use this before submitting a TableMap job. It will appropriately set up 056 * the job. 057 * 058 * @param table The table to be processed. 059 * @param scan The scan with the columns etc. 060 * @param groupColumns A space separated list of columns used to form the 061 * key used in collect. 062 * @param mapper The mapper class. 063 * @param job The current job. 064 * @throws IOException When setting up the job fails. 065 */ 066 @SuppressWarnings("unchecked") 067 public static void initJob(String table, Scan scan, String groupColumns, 068 Class<? extends TableMapper> mapper, Job job) throws IOException { 069 TableMapReduceUtil.initTableMapperJob(table, scan, mapper, 070 ImmutableBytesWritable.class, Result.class, job); 071 job.getConfiguration().set(GROUP_COLUMNS, groupColumns); 072 } 073 074 /** 075 * Extract the grouping columns from value to construct a new key. Pass the 076 * new key and value to reduce. If any of the grouping columns are not found 077 * in the value, the record is skipped. 078 * 079 * @param key The current key. 080 * @param value The current value. 081 * @param context The current context. 082 * @throws IOException When writing the record fails. 083 * @throws InterruptedException When the job is aborted. 084 */ 085 @Override 086 public void map(ImmutableBytesWritable key, Result value, Context context) 087 throws IOException, InterruptedException { 088 byte[][] keyVals = extractKeyValues(value); 089 if(keyVals != null) { 090 ImmutableBytesWritable tKey = createGroupKey(keyVals); 091 context.write(tKey, value); 092 } 093 } 094 095 /** 096 * Extract columns values from the current record. This method returns 097 * null if any of the columns are not found. 098 * <p> 099 * Override this method if you want to deal with nulls differently. 100 * 101 * @param r The current values. 102 * @return Array of byte values. 103 */ 104 protected byte[][] extractKeyValues(Result r) { 105 byte[][] keyVals = null; 106 ArrayList<byte[]> foundList = new ArrayList<>(); 107 int numCols = columns.length; 108 if (numCols > 0) { 109 for (Cell value: r.listCells()) { 110 byte [] column = CellUtil.makeColumn(CellUtil.cloneFamily(value), 111 CellUtil.cloneQualifier(value)); 112 for (int i = 0; i < numCols; i++) { 113 if (Bytes.equals(column, columns[i])) { 114 foundList.add(CellUtil.cloneValue(value)); 115 break; 116 } 117 } 118 } 119 if(foundList.size() == numCols) { 120 keyVals = foundList.toArray(new byte[numCols][]); 121 } 122 } 123 return keyVals; 124 } 125 126 /** 127 * Create a key by concatenating multiple column values. 128 * <p> 129 * Override this function in order to produce different types of keys. 130 * 131 * @param vals The current key/values. 132 * @return A key generated by concatenating multiple column values. 133 */ 134 protected ImmutableBytesWritable createGroupKey(byte[][] vals) { 135 if(vals == null) { 136 return null; 137 } 138 StringBuilder sb = new StringBuilder(); 139 for(int i = 0; i < vals.length; i++) { 140 if(i > 0) { 141 sb.append(" "); 142 } 143 sb.append(Bytes.toString(vals[i])); 144 } 145 return new ImmutableBytesWritable(Bytes.toBytesBinary(sb.toString())); 146 } 147 148 /** 149 * Returns the current configuration. 150 * 151 * @return The current configuration. 152 * @see org.apache.hadoop.conf.Configurable#getConf() 153 */ 154 @Override 155 public Configuration getConf() { 156 return conf; 157 } 158 159 /** 160 * Sets the configuration. This is used to set up the grouping details. 161 * 162 * @param configuration The configuration to set. 163 * @see org.apache.hadoop.conf.Configurable#setConf( 164 * org.apache.hadoop.conf.Configuration) 165 */ 166 @Override 167 public void setConf(Configuration configuration) { 168 this.conf = configuration; 169 String[] cols = conf.get(GROUP_COLUMNS, "").split(" "); 170 columns = new byte[cols.length][]; 171 for(int i = 0; i < cols.length; i++) { 172 columns[i] = Bytes.toBytes(cols[i]); 173 } 174 } 175 176}