View Javadoc

1   /**
2    * Copyright 2007 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import java.io.IOException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configurable;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.client.HTable;
30  import org.apache.hadoop.hbase.client.Scan;
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.apache.hadoop.util.StringUtils;
33  
34  /**
35   * Convert HBase tabular data into a format that is consumable by Map/Reduce.
36   */
37  public class TableInputFormat extends TableInputFormatBase
38  implements Configurable {
39  
40    private final Log LOG = LogFactory.getLog(TableInputFormat.class);
41  
42    /** Job parameter that specifies the input table. */
43    public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
44    /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
45     * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
46     */
47    public static final String SCAN = "hbase.mapreduce.scan";
48    /** Scan start row */
49    public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
50    /** Scan stop row */
51    public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
52    /** Column Family to Scan */
53    public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
54    /** Space delimited list of columns to scan. */
55    public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
56    /** The timestamp used to filter columns with a specific timestamp. */
57    public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
58    /** The starting timestamp used to filter columns with a specific range of versions. */
59    public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
60    /** The ending timestamp used to filter columns with a specific range of versions. */
61    public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
62    /** The maximum number of version to return. */
63    public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
64    /** Set to false to disable server-side caching of blocks for this scan. */
65    public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
66    /** The number of rows for caching that will be passed to scanners. */
67    public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
68    /** Set the maximum number of values to return for each call to next(). */
69    public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
70  
71    /** The configuration. */
72    private Configuration conf = null;
73  
74    /**
75     * Returns the current configuration.
76     *
77     * @return The current configuration.
78     * @see org.apache.hadoop.conf.Configurable#getConf()
79     */
80    @Override
81    public Configuration getConf() {
82      return conf;
83    }
84  
85    /**
86     * Sets the configuration. This is used to set the details for the table to
87     * be scanned.
88     *
89     * @param configuration  The configuration to set.
90     * @see org.apache.hadoop.conf.Configurable#setConf(
91     *   org.apache.hadoop.conf.Configuration)
92     */
93    @Override
94    public void setConf(Configuration configuration) {
95      this.conf = configuration;
96      String tableName = conf.get(INPUT_TABLE);
97      try {
98        setHTable(new HTable(new Configuration(conf), tableName));
99      } catch (Exception e) {
100       LOG.error(StringUtils.stringifyException(e));
101     }
102 
103     Scan scan = null;
104 
105     if (conf.get(SCAN) != null) {
106       try {
107         scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
108       } catch (IOException e) {
109         LOG.error("An error occurred.", e);
110       }
111     } else {
112       try {
113         scan = new Scan();
114 
115         if (conf.get(SCAN_ROW_START) != null) {
116           scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START)));
117         }
118 
119         if (conf.get(SCAN_ROW_STOP) != null) {
120           scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP)));
121         }
122 
123         if (conf.get(SCAN_COLUMNS) != null) {
124           addColumns(scan, conf.get(SCAN_COLUMNS));
125         }
126 
127         if (conf.get(SCAN_COLUMN_FAMILY) != null) {
128           scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
129         }
130 
131         if (conf.get(SCAN_TIMESTAMP) != null) {
132           scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
133         }
134 
135         if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
136           scan.setTimeRange(
137               Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
138               Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
139         }
140 
141         if (conf.get(SCAN_MAXVERSIONS) != null) {
142           scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
143         }
144 
145         if (conf.get(SCAN_CACHEDROWS) != null) {
146           scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
147         }
148 
149         if (conf.get(SCAN_BATCHSIZE) != null) {
150           scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
151         }
152 
153         // false by default, full table scans generate too much BC churn
154         scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
155       } catch (Exception e) {
156           LOG.error(StringUtils.stringifyException(e));
157       }
158     }
159 
160     setScan(scan);
161   }
162   
163   /**
164    * Parses a combined family and qualifier and adds either both or just the
165    * family in case there is not qualifier. This assumes the older colon
166    * divided notation, e.g. "data:contents" or "meta:".
167    * <p>
168    * Note: It will through an error when the colon is missing.
169    *
170    * @param familyAndQualifier family and qualifier
171    * @return A reference to this instance.
172    * @throws IllegalArgumentException When the colon is missing.
173    */
174   private static void addColumn(Scan scan, byte[] familyAndQualifier) {
175     byte [][] fq = KeyValue.parseColumn(familyAndQualifier);
176     if (fq.length > 1 && fq[1] != null && fq[1].length > 0) {
177       scan.addColumn(fq[0], fq[1]);
178     } else {
179       scan.addFamily(fq[0]);
180     }
181   }
182 
183   /**
184    * Adds an array of columns specified using old format, family:qualifier.
185    * <p>
186    * Overrides previous calls to addFamily for any families in the input.
187    *
188    * @param columns array of columns, formatted as <pre>family:qualifier</pre>
189    */
190   public static void addColumns(Scan scan, byte [][] columns) {
191     for (byte[] column : columns) {
192       addColumn(scan, column);
193     }
194   }
195 
196   /**
197    * Convenience method to help parse old style (or rather user entry on the
198    * command line) column definitions, e.g. "data:contents mime:". The columns
199    * must be space delimited and always have a colon (":") to denote family
200    * and qualifier.
201    *
202    * @param columns  The columns to parse.
203    * @return A reference to this instance.
204    */
205   private static void addColumns(Scan scan, String columns) {
206     String[] cols = columns.split(" ");
207     for (String col : cols) {
208       addColumn(scan, Bytes.toBytes(col));
209     }
210   }
211 
212 }