View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.text.MessageFormat;
22  import java.util.ArrayList;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.classification.InterfaceStability;
29  import org.apache.hadoop.hbase.HRegionInfo;
30  import org.apache.hadoop.hbase.HRegionLocation;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.Connection;
33  import org.apache.hadoop.hbase.client.ConnectionFactory;
34  import org.apache.hadoop.hbase.client.RegionLocator;
35  import org.apache.hadoop.hbase.client.Result;
36  import org.apache.hadoop.hbase.client.Scan;
37  import org.apache.hadoop.hbase.client.Table;
38  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.util.Pair;
41  import org.apache.hadoop.hbase.util.RegionSizeCalculator;
42  import org.apache.hadoop.mapreduce.InputFormat;
43  import org.apache.hadoop.mapreduce.InputSplit;
44  import org.apache.hadoop.mapreduce.JobContext;
45  import org.apache.hadoop.mapreduce.RecordReader;
46  import org.apache.hadoop.mapreduce.TaskAttemptContext;
47  
48  import java.util.Map;
49  import java.util.HashMap;
50  import java.util.Iterator;
51  /**
52   * A base for {@link MultiTableInputFormat}s. Receives a list of
53   * {@link Scan} instances that define the input tables and
54   * filters etc. Subclasses may use other TableRecordReader implementations.
55   */
56  @InterfaceAudience.Public
57  @InterfaceStability.Evolving
58  public abstract class MultiTableInputFormatBase extends
59      InputFormat<ImmutableBytesWritable, Result> {
60  
61    private static final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class);
62  
63    /** Holds the set of scans used to define the input. */
64    private List<Scan> scans;
65  
66    /** The reader scanning the table, can be a custom one. */
67    private TableRecordReader tableRecordReader = null;
68  
69    /**
70     * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
71     * default.
72     *
73     * @param split The split to work with.
74     * @param context The current context.
75     * @return The newly created record reader.
76     * @throws IOException When creating the reader fails.
77     * @throws InterruptedException when record reader initialization fails
78     * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
79     *      org.apache.hadoop.mapreduce.InputSplit,
80     *      org.apache.hadoop.mapreduce.TaskAttemptContext)
81     */
82    @Override
83    public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
84        InputSplit split, TaskAttemptContext context)
85        throws IOException, InterruptedException {
86      TableSplit tSplit = (TableSplit) split;
87      LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength()));
88  
89      if (tSplit.getTable() == null) {
90        throw new IOException("Cannot create a record reader because of a"
91            + " previous error. Please look at the previous logs lines from"
92            + " the task's full log for more details.");
93      }
94      final Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
95      Table table = connection.getTable(tSplit.getTable());
96  
97      if (this.tableRecordReader == null) {
98        this.tableRecordReader = new TableRecordReader();
99      }
100     final TableRecordReader trr = this.tableRecordReader;
101 
102     try {
103       Scan sc = tSplit.getScan();
104       sc.setStartRow(tSplit.getStartRow());
105       sc.setStopRow(tSplit.getEndRow());
106       trr.setScan(sc);
107       trr.setTable(table);
108       return new RecordReader<ImmutableBytesWritable, Result>() {
109 
110         @Override
111         public void close() throws IOException {
112           trr.close();
113           connection.close();
114         }
115 
116         @Override
117         public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
118           return trr.getCurrentKey();
119         }
120 
121         @Override
122         public Result getCurrentValue() throws IOException, InterruptedException {
123           return trr.getCurrentValue();
124         }
125 
126         @Override
127         public float getProgress() throws IOException, InterruptedException {
128           return trr.getProgress();
129         }
130 
131         @Override
132         public void initialize(InputSplit inputsplit, TaskAttemptContext context)
133             throws IOException, InterruptedException {
134           trr.initialize(inputsplit, context);
135         }
136 
137         @Override
138         public boolean nextKeyValue() throws IOException, InterruptedException {
139           return trr.nextKeyValue();
140         }
141       };
142     } catch (IOException ioe) {
143       // If there is an exception make sure that all
144       // resources are closed and released.
145       trr.close();
146       connection.close();
147       throw ioe;
148     }
149   }
150 
151   /**
152    * Calculates the splits that will serve as input for the map tasks. The
153    * number of splits matches the number of regions in a table.
154    *
155    * @param context The current job context.
156    * @return The list of input splits.
157    * @throws IOException When creating the list of splits fails.
158    * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
159    */
160   @Override
161   public List<InputSplit> getSplits(JobContext context) throws IOException {
162     if (scans.isEmpty()) {
163       throw new IOException("No scans were provided.");
164     }
165 
166     Map<TableName, List<Scan>> tableMaps = new HashMap<TableName, List<Scan>>();
167     for (Scan scan : scans) {
168       byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
169       if (tableNameBytes == null)
170         throw new IOException("A scan object did not have a table name");
171 
172       TableName tableName = TableName.valueOf(tableNameBytes);
173 
174       List<Scan> scanList = tableMaps.get(tableName);
175       if (scanList == null) {
176         scanList = new ArrayList<Scan>();
177         tableMaps.put(tableName, scanList);
178       }
179       scanList.add(scan);
180     }
181 
182     List<InputSplit> splits = new ArrayList<InputSplit>();
183     Iterator iter = tableMaps.entrySet().iterator();
184     while (iter.hasNext()) {
185       Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
186       TableName tableName = entry.getKey();
187       List<Scan> scanList = entry.getValue();
188 
189       try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration());
190         Table table = conn.getTable(tableName);
191         RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
192         RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
193                 regionLocator, conn.getAdmin());
194         Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
195         for (Scan scan : scanList) {
196           if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
197             throw new IOException("Expecting at least one region for table : "
198                     + tableName.getNameAsString());
199           }
200           int count = 0;
201 
202           byte[] startRow = scan.getStartRow();
203           byte[] stopRow = scan.getStopRow();
204 
205           for (int i = 0; i < keys.getFirst().length; i++) {
206             if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
207               continue;
208             }
209 
210             if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
211                     Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
212                     (stopRow.length == 0 || Bytes.compareTo(stopRow,
213                             keys.getFirst()[i]) > 0)) {
214               byte[] splitStart = startRow.length == 0 ||
215                       Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
216                       keys.getFirst()[i] : startRow;
217               byte[] splitStop = (stopRow.length == 0 ||
218                       Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
219                       keys.getSecond()[i].length > 0 ?
220                       keys.getSecond()[i] : stopRow;
221 
222               HRegionLocation hregionLocation = regionLocator.getRegionLocation(
223                       keys.getFirst()[i], false);
224               String regionHostname = hregionLocation.getHostname();
225               HRegionInfo regionInfo = hregionLocation.getRegionInfo();
226               String encodedRegionName = regionInfo.getEncodedName();
227               long regionSize = sizeCalculator.getRegionSize(
228                       regionInfo.getRegionName());
229 
230               TableSplit split = new TableSplit(table.getName(),
231                       scan, splitStart, splitStop, regionHostname,
232                       encodedRegionName, regionSize);
233 
234               splits.add(split);
235 
236               if (LOG.isDebugEnabled())
237                 LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
238             }
239           }
240         }
241       }
242     }
243 
244     return splits;
245   }
246 
247   /**
248    * Test if the given region is to be included in the InputSplit while
249    * splitting the regions of a table.
250    * <p>
251    * This optimization is effective when there is a specific reasoning to
252    * exclude an entire region from the M-R job, (and hence, not contributing to
253    * the InputSplit), given the start and end keys of the same. <br>
254    * Useful when we need to remember the last-processed top record and revisit
255    * the [last, current) interval for M-R processing, continuously. In addition
256    * to reducing InputSplits, reduces the load on the region server as well, due
257    * to the ordering of the keys. <br>
258    * <br>
259    * Note: It is possible that <code>endKey.length() == 0 </code> , for the last
260    * (recent) region. <br>
261    * Override this method, if you want to bulk exclude regions altogether from
262    * M-R. By default, no region is excluded( i.e. all regions are included).
263    *
264    * @param startKey Start key of the region
265    * @param endKey End key of the region
266    * @return true, if this region needs to be included as part of the input
267    *         (default).
268    */
269   protected boolean includeRegionInSplit(final byte[] startKey,
270       final byte[] endKey) {
271     return true;
272   }
273 
274   /**
275    * Allows subclasses to get the list of {@link Scan} objects.
276    */
277   protected List<Scan> getScans() {
278     return this.scans;
279   }
280 
281   /**
282    * Allows subclasses to set the list of {@link Scan} objects.
283    *
284    * @param scans The list of {@link Scan} used to define the input
285    */
286   protected void setScans(List<Scan> scans) {
287     this.scans = scans;
288   }
289 
290   /**
291    * Allows subclasses to set the {@link TableRecordReader}.
292    *
293    * @param tableRecordReader A different {@link TableRecordReader}
294    *          implementation.
295    */
296   protected void setTableRecordReader(TableRecordReader tableRecordReader) {
297     this.tableRecordReader = tableRecordReader;
298   }
299 }