View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.text.MessageFormat;
22  import java.util.ArrayList;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.classification.InterfaceStability;
29  import org.apache.hadoop.hbase.HRegionInfo;
30  import org.apache.hadoop.hbase.HRegionLocation;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.Connection;
33  import org.apache.hadoop.hbase.client.ConnectionFactory;
34  import org.apache.hadoop.hbase.client.RegionLocator;
35  import org.apache.hadoop.hbase.client.Result;
36  import org.apache.hadoop.hbase.client.Scan;
37  import org.apache.hadoop.hbase.client.Table;
38  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.util.Pair;
41  import org.apache.hadoop.hbase.util.RegionSizeCalculator;
42  import org.apache.hadoop.mapreduce.InputFormat;
43  import org.apache.hadoop.mapreduce.InputSplit;
44  import org.apache.hadoop.mapreduce.JobContext;
45  import org.apache.hadoop.mapreduce.RecordReader;
46  import org.apache.hadoop.mapreduce.TaskAttemptContext;
47  
48  import java.util.Map;
49  import java.util.HashMap;
50  import java.util.Iterator;
51  /**
52   * A base for {@link MultiTableInputFormat}s. Receives a list of
53   * {@link Scan} instances that define the input tables and
54   * filters etc. Subclasses may use other TableRecordReader implementations.
55   */
56  @InterfaceAudience.Public
57  @InterfaceStability.Evolving
58  public abstract class MultiTableInputFormatBase extends
59      InputFormat<ImmutableBytesWritable, Result> {
60  
61    final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class);
62  
63    /** Holds the set of scans used to define the input. */
64    private List<Scan> scans;
65  
66    /** The reader scanning the table, can be a custom one. */
67    private TableRecordReader tableRecordReader = null;
68  
69    /**
70     * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
71     * default.
72     *
73     * @param split The split to work with.
74     * @param context The current context.
75     * @return The newly created record reader.
76     * @throws IOException When creating the reader fails.
77     * @throws InterruptedException when record reader initialization fails
78     * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
79     *      org.apache.hadoop.mapreduce.InputSplit,
80     *      org.apache.hadoop.mapreduce.TaskAttemptContext)
81     */
82    @Override
83    public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
84        InputSplit split, TaskAttemptContext context)
85        throws IOException, InterruptedException {
86      TableSplit tSplit = (TableSplit) split;
87      LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength()));
88  
89      if (tSplit.getTable() == null) {
90        throw new IOException("Cannot create a record reader because of a"
91            + " previous error. Please look at the previous logs lines from"
92            + " the task's full log for more details.");
93      }
94      final Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
95      Table table = connection.getTable(tSplit.getTable());
96  
97      if (this.tableRecordReader == null) {
98        this.tableRecordReader = new TableRecordReader();
99      }
100     final TableRecordReader trr = this.tableRecordReader;
101 
102     try {
103       Scan sc = tSplit.getScan();
104       sc.setStartRow(tSplit.getStartRow());
105       sc.setStopRow(tSplit.getEndRow());
106       trr.setScan(sc);
107       trr.setTable(table);
108       return new RecordReader<ImmutableBytesWritable, Result>() {
109 
110         @Override
111         public void close() throws IOException {
112           trr.close();
113           if (connection != null) {
114             connection.close();
115           }
116         }
117 
118         @Override
119         public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
120           return trr.getCurrentKey();
121         }
122 
123         @Override
124         public Result getCurrentValue() throws IOException, InterruptedException {
125           return trr.getCurrentValue();
126         }
127 
128         @Override
129         public float getProgress() throws IOException, InterruptedException {
130           return trr.getProgress();
131         }
132 
133         @Override
134         public void initialize(InputSplit inputsplit, TaskAttemptContext context)
135             throws IOException, InterruptedException {
136           trr.initialize(inputsplit, context);
137         }
138 
139         @Override
140         public boolean nextKeyValue() throws IOException, InterruptedException {
141           return trr.nextKeyValue();
142         }
143       };
144     } catch (IOException ioe) {
145       // If there is an exception make sure that all
146       // resources are closed and released.
147       trr.close();
148       if (connection != null) {
149         connection.close();
150       }
151       throw ioe;
152     }
153   }
154 
155   /**
156    * Calculates the splits that will serve as input for the map tasks. The
157    * number of splits matches the number of regions in a table.
158    *
159    * @param context The current job context.
160    * @return The list of input splits.
161    * @throws IOException When creating the list of splits fails.
162    * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
163    */
164   @Override
165   public List<InputSplit> getSplits(JobContext context) throws IOException {
166     if (scans.isEmpty()) {
167       throw new IOException("No scans were provided.");
168     }
169 
170     Map<TableName, List<Scan>> tableMaps = new HashMap<TableName, List<Scan>>();
171     for (Scan scan : scans) {
172       byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
173       if (tableNameBytes == null)
174         throw new IOException("A scan object did not have a table name");
175 
176       TableName tableName = TableName.valueOf(tableNameBytes);
177 
178       List<Scan> scanList = tableMaps.get(tableName);
179       if (scanList == null) {
180         scanList = new ArrayList<Scan>();
181         tableMaps.put(tableName, scanList);
182       }
183       scanList.add(scan);
184     }
185 
186     List<InputSplit> splits = new ArrayList<InputSplit>();
187     Iterator iter = tableMaps.entrySet().iterator();
188     while (iter.hasNext()) {
189       Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
190       TableName tableName = entry.getKey();
191       List<Scan> scanList = entry.getValue();
192 
193       try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration());
194         Table table = conn.getTable(tableName);
195         RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
196         RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
197                 regionLocator, conn.getAdmin());
198         Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
199         for (Scan scan : scanList) {
200           if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
201             throw new IOException("Expecting at least one region for table : "
202                     + tableName.getNameAsString());
203           }
204           int count = 0;
205 
206           byte[] startRow = scan.getStartRow();
207           byte[] stopRow = scan.getStopRow();
208 
209           for (int i = 0; i < keys.getFirst().length; i++) {
210             if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
211               continue;
212             }
213 
214             if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
215                     Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
216                     (stopRow.length == 0 || Bytes.compareTo(stopRow,
217                             keys.getFirst()[i]) > 0)) {
218               byte[] splitStart = startRow.length == 0 ||
219                       Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
220                       keys.getFirst()[i] : startRow;
221               byte[] splitStop = (stopRow.length == 0 ||
222                       Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
223                       keys.getSecond()[i].length > 0 ?
224                       keys.getSecond()[i] : stopRow;
225 
226               HRegionLocation hregionLocation = regionLocator.getRegionLocation(
227                       keys.getFirst()[i], false);
228               String regionHostname = hregionLocation.getHostname();
229               HRegionInfo regionInfo = hregionLocation.getRegionInfo();
230               long regionSize = sizeCalculator.getRegionSize(
231                       regionInfo.getRegionName());
232 
233               TableSplit split = new TableSplit(table.getName(),
234                       scan, splitStart, splitStop, regionHostname, regionSize);
235 
236               splits.add(split);
237 
238               if (LOG.isDebugEnabled())
239                 LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
240             }
241           }
242         }
243       }
244     }
245 
246     return splits;
247   }
248 
249   /**
250    * Test if the given region is to be included in the InputSplit while
251    * splitting the regions of a table.
252    * <p>
253    * This optimization is effective when there is a specific reasoning to
254    * exclude an entire region from the M-R job, (and hence, not contributing to
255    * the InputSplit), given the start and end keys of the same. <br>
256    * Useful when we need to remember the last-processed top record and revisit
257    * the [last, current) interval for M-R processing, continuously. In addition
258    * to reducing InputSplits, reduces the load on the region server as well, due
259    * to the ordering of the keys. <br>
260    * <br>
261    * Note: It is possible that <code>endKey.length() == 0 </code> , for the last
262    * (recent) region. <br>
263    * Override this method, if you want to bulk exclude regions altogether from
264    * M-R. By default, no region is excluded( i.e. all regions are included).
265    *
266    * @param startKey Start key of the region
267    * @param endKey End key of the region
268    * @return true, if this region needs to be included as part of the input
269    *         (default).
270    */
271   protected boolean includeRegionInSplit(final byte[] startKey,
272       final byte[] endKey) {
273     return true;
274   }
275 
276   /**
277    * Allows subclasses to get the list of {@link Scan} objects.
278    */
279   protected List<Scan> getScans() {
280     return this.scans;
281   }
282 
283   /**
284    * Allows subclasses to set the list of {@link Scan} objects.
285    *
286    * @param scans The list of {@link Scan} used to define the input
287    */
288   protected void setScans(List<Scan> scans) {
289     this.scans = scans;
290   }
291 
292   /**
293    * Allows subclasses to set the {@link TableRecordReader}.
294    *
295    * @param tableRecordReader A different {@link TableRecordReader}
296    *          implementation.
297    */
298   protected void setTableRecordReader(TableRecordReader tableRecordReader) {
299     this.tableRecordReader = tableRecordReader;
300   }
301 }