View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapred;
21  
22  import java.io.IOException;
23  import java.io.UnsupportedEncodingException;
24  import java.util.ArrayList;
25  import java.util.Map;
26  
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
30  import org.apache.hadoop.hbase.client.Result;
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.apache.hadoop.mapred.JobConf;
33  import org.apache.hadoop.mapred.MapReduceBase;
34  import org.apache.hadoop.mapred.OutputCollector;
35  import org.apache.hadoop.mapred.Reporter;
36  
37  
38  /**
39   * Extract grouping columns from input record
40   */
41  @Deprecated
42  public class GroupingTableMap
43  extends MapReduceBase
44  implements TableMap<ImmutableBytesWritable,Result> {
45  
46    /**
47     * JobConf parameter to specify the columns used to produce the key passed to
48     * collect from the map phase
49     */
50    public static final String GROUP_COLUMNS =
51      "hbase.mapred.groupingtablemap.columns";
52  
53    protected byte [][] columns;
54  
55    /**
56     * Use this before submitting a TableMap job. It will appropriately set up the
57     * JobConf.
58     *
59     * @param table table to be processed
60     * @param columns space separated list of columns to fetch
61     * @param groupColumns space separated list of columns used to form the key
62     * used in collect
63     * @param mapper map class
64     * @param job job configuration object
65     */
66    @SuppressWarnings("unchecked")
67    public static void initJob(String table, String columns, String groupColumns,
68      Class<? extends TableMap> mapper, JobConf job) {
69  
70      TableMapReduceUtil.initTableMapJob(table, columns, mapper,
71          ImmutableBytesWritable.class, Result.class, job);
72      job.set(GROUP_COLUMNS, groupColumns);
73    }
74  
75    @Override
76    public void configure(JobConf job) {
77      super.configure(job);
78      String[] cols = job.get(GROUP_COLUMNS, "").split(" ");
79      columns = new byte[cols.length][];
80      for(int i = 0; i < cols.length; i++) {
81        columns[i] = Bytes.toBytes(cols[i]);
82      }
83    }
84  
85    /**
86     * Extract the grouping columns from value to construct a new key.
87     *
88     * Pass the new key and value to reduce.
89     * If any of the grouping columns are not found in the value, the record is skipped.
90     * @param key
91     * @param value
92     * @param output
93     * @param reporter
94     * @throws IOException
95     */
96    public void map(ImmutableBytesWritable key, Result value,
97        OutputCollector<ImmutableBytesWritable,Result> output,
98        Reporter reporter) throws IOException {
99  
100     byte[][] keyVals = extractKeyValues(value);
101     if(keyVals != null) {
102       ImmutableBytesWritable tKey = createGroupKey(keyVals);
103       output.collect(tKey, value);
104     }
105   }
106 
107   /**
108    * Extract columns values from the current record. This method returns
109    * null if any of the columns are not found.
110    *
111    * Override this method if you want to deal with nulls differently.
112    *
113    * @param r
114    * @return array of byte values
115    */
116   protected byte[][] extractKeyValues(Result r) {
117     byte[][] keyVals = null;
118     ArrayList<byte[]> foundList = new ArrayList<byte[]>();
119     int numCols = columns.length;
120     if (numCols > 0) {
121       for (KeyValue value: r.list()) {
122         byte [] column = KeyValue.makeColumn(value.getFamily(),
123             value.getQualifier());
124         for (int i = 0; i < numCols; i++) {
125           if (Bytes.equals(column, columns[i])) {
126             foundList.add(value.getValue());
127             break;
128           }
129         }
130       }
131       if(foundList.size() == numCols) {
132         keyVals = foundList.toArray(new byte[numCols][]);
133       }
134     }
135     return keyVals;
136   }
137 
138   /**
139    * Create a key by concatenating multiple column values.
140    * Override this function in order to produce different types of keys.
141    *
142    * @param vals
143    * @return key generated by concatenating multiple column values
144    */
145   protected ImmutableBytesWritable createGroupKey(byte[][] vals) {
146     if(vals == null) {
147       return null;
148     }
149     StringBuilder sb =  new StringBuilder();
150     for(int i = 0; i < vals.length; i++) {
151       if(i > 0) {
152         sb.append(" ");
153       }
154       try {
155         sb.append(new String(vals[i], HConstants.UTF8_ENCODING));
156       } catch (UnsupportedEncodingException e) {
157         throw new RuntimeException(e);
158       }
159     }
160     return new ImmutableBytesWritable(Bytes.toBytes(sb.toString()));
161   }
162 }