View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.text.MessageFormat;
22  import java.util.ArrayList;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.classification.InterfaceStability;
29  import org.apache.hadoop.hbase.HRegionInfo;
30  import org.apache.hadoop.hbase.HRegionLocation;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.client.Connection;
34  import org.apache.hadoop.hbase.client.ConnectionFactory;
35  import org.apache.hadoop.hbase.client.RegionLocator;
36  import org.apache.hadoop.hbase.client.Result;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.client.Table;
39  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.hbase.util.Pair;
42  import org.apache.hadoop.hbase.util.RegionSizeCalculator;
43  import org.apache.hadoop.mapreduce.InputFormat;
44  import org.apache.hadoop.mapreduce.InputSplit;
45  import org.apache.hadoop.mapreduce.JobContext;
46  import org.apache.hadoop.mapreduce.RecordReader;
47  import org.apache.hadoop.mapreduce.TaskAttemptContext;
48  
49  import java.util.Map;
50  import java.util.HashMap;
51  import java.util.Iterator;
52  /**
53   * A base for {@link MultiTableInputFormat}s. Receives a list of
54   * {@link Scan} instances that define the input tables and
55   * filters etc. Subclasses may use other TableRecordReader implementations.
56   */
57  @InterfaceAudience.Public
58  @InterfaceStability.Evolving
59  public abstract class MultiTableInputFormatBase extends
60      InputFormat<ImmutableBytesWritable, Result> {
61  
62    final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class);
63  
64    /** Holds the set of scans used to define the input. */
65    private List<Scan> scans;
66  
67    /** The reader scanning the table, can be a custom one. */
68    private TableRecordReader tableRecordReader = null;
69  
70    /**
71     * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
72     * default.
73     *
74     * @param split The split to work with.
75     * @param context The current context.
76     * @return The newly created record reader.
77     * @throws IOException When creating the reader fails.
78     * @throws InterruptedException when record reader initialization fails
79     * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
80     *      org.apache.hadoop.mapreduce.InputSplit,
81     *      org.apache.hadoop.mapreduce.TaskAttemptContext)
82     */
83    @Override
84    public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
85        InputSplit split, TaskAttemptContext context)
86        throws IOException, InterruptedException {
87      TableSplit tSplit = (TableSplit) split;
88      LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength()));
89  
90      if (tSplit.getTable() == null) {
91        throw new IOException("Cannot create a record reader because of a"
92            + " previous error. Please look at the previous logs lines from"
93            + " the task's full log for more details.");
94      }
95      Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
96      Table table = connection.getTable(tSplit.getTable());
97  
98      TableRecordReader trr = this.tableRecordReader;
99  
100     try {
101       // if no table record reader was provided use default
102       if (trr == null) {
103         trr = new TableRecordReader();
104       }
105       Scan sc = tSplit.getScan();
106       sc.setStartRow(tSplit.getStartRow());
107       sc.setStopRow(tSplit.getEndRow());
108       trr.setScan(sc);
109       trr.setTable(table);
110     } catch (IOException ioe) {
111       // If there is an exception make sure that all
112       // resources are closed and released.
113       connection.close();
114       table.close();
115       trr.close();
116       throw ioe;
117     }
118     return trr;
119   }
120 
121   /**
122    * Calculates the splits that will serve as input for the map tasks. The
123    * number of splits matches the number of regions in a table.
124    *
125    * @param context The current job context.
126    * @return The list of input splits.
127    * @throws IOException When creating the list of splits fails.
128    * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
129    */
130   @Override
131   public List<InputSplit> getSplits(JobContext context) throws IOException {
132     if (scans.isEmpty()) {
133       throw new IOException("No scans were provided.");
134     }
135 
136     Map<TableName, List<Scan>> tableMaps = new HashMap<TableName, List<Scan>>();
137     for (Scan scan : scans) {
138       byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
139       if (tableNameBytes == null)
140         throw new IOException("A scan object did not have a table name");
141 
142       TableName tableName = TableName.valueOf(tableNameBytes);
143 
144       List<Scan> scanList = tableMaps.get(tableName);
145       if (scanList == null) {
146         scanList = new ArrayList<Scan>();
147         tableMaps.put(tableName, scanList);
148       }
149       scanList.add(scan);
150     }
151 
152     List<InputSplit> splits = new ArrayList<InputSplit>();
153     Iterator iter = tableMaps.entrySet().iterator();
154     while (iter.hasNext()) {
155       Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
156       TableName tableName = entry.getKey();
157       List<Scan> scanList = entry.getValue();
158 
159       try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration());
160         Table table = conn.getTable(tableName);
161         RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
162         RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
163                 regionLocator, conn.getAdmin());
164         Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
165         for (Scan scan : scanList) {
166           if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
167             throw new IOException("Expecting at least one region for table : "
168                     + tableName.getNameAsString());
169           }
170           int count = 0;
171 
172           byte[] startRow = scan.getStartRow();
173           byte[] stopRow = scan.getStopRow();
174 
175           for (int i = 0; i < keys.getFirst().length; i++) {
176             if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
177               continue;
178             }
179 
180             if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
181                     Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
182                     (stopRow.length == 0 || Bytes.compareTo(stopRow,
183                             keys.getFirst()[i]) > 0)) {
184               byte[] splitStart = startRow.length == 0 ||
185                       Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
186                       keys.getFirst()[i] : startRow;
187               byte[] splitStop = (stopRow.length == 0 ||
188                       Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
189                       keys.getSecond()[i].length > 0 ?
190                       keys.getSecond()[i] : stopRow;
191 
192               HRegionLocation hregionLocation = regionLocator.getRegionLocation(
193                       keys.getFirst()[i], false);
194               String regionHostname = hregionLocation.getHostname();
195               HRegionInfo regionInfo = hregionLocation.getRegionInfo();
196               long regionSize = sizeCalculator.getRegionSize(
197                       regionInfo.getRegionName());
198 
199               TableSplit split = new TableSplit(table.getName(),
200                       scan, splitStart, splitStop, regionHostname, regionSize);
201 
202               splits.add(split);
203 
204               if (LOG.isDebugEnabled())
205                 LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
206             }
207           }
208         }
209       }
210     }
211 
212     return splits;
213   }
214 
215   /**
216    * Test if the given region is to be included in the InputSplit while
217    * splitting the regions of a table.
218    * <p>
219    * This optimization is effective when there is a specific reasoning to
220    * exclude an entire region from the M-R job, (and hence, not contributing to
221    * the InputSplit), given the start and end keys of the same. <br>
222    * Useful when we need to remember the last-processed top record and revisit
223    * the [last, current) interval for M-R processing, continuously. In addition
224    * to reducing InputSplits, reduces the load on the region server as well, due
225    * to the ordering of the keys. <br>
226    * <br>
227    * Note: It is possible that <code>endKey.length() == 0 </code> , for the last
228    * (recent) region. <br>
229    * Override this method, if you want to bulk exclude regions altogether from
230    * M-R. By default, no region is excluded( i.e. all regions are included).
231    *
232    * @param startKey Start key of the region
233    * @param endKey End key of the region
234    * @return true, if this region needs to be included as part of the input
235    *         (default).
236    */
237   protected boolean includeRegionInSplit(final byte[] startKey,
238       final byte[] endKey) {
239     return true;
240   }
241 
242   /**
243    * Allows subclasses to get the list of {@link Scan} objects.
244    */
245   protected List<Scan> getScans() {
246     return this.scans;
247   }
248 
249   /**
250    * Allows subclasses to set the list of {@link Scan} objects.
251    *
252    * @param scans The list of {@link Scan} used to define the input
253    */
254   protected void setScans(List<Scan> scans) {
255     this.scans = scans;
256   }
257 
258   /**
259    * Allows subclasses to set the {@link TableRecordReader}.
260    *
261    * @param tableRecordReader A different {@link TableRecordReader}
262    *          implementation.
263    */
264   protected void setTableRecordReader(TableRecordReader tableRecordReader) {
265     this.tableRecordReader = tableRecordReader;
266   }
267 }