View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.text.MessageFormat;
22  import java.util.ArrayList;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.classification.InterfaceStability;
29  import org.apache.hadoop.hbase.HRegionInfo;
30  import org.apache.hadoop.hbase.HRegionLocation;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.Connection;
33  import org.apache.hadoop.hbase.client.ConnectionFactory;
34  import org.apache.hadoop.hbase.client.RegionLocator;
35  import org.apache.hadoop.hbase.client.Result;
36  import org.apache.hadoop.hbase.client.Scan;
37  import org.apache.hadoop.hbase.client.Table;
38  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.util.Pair;
41  import org.apache.hadoop.hbase.util.RegionSizeCalculator;
42  import org.apache.hadoop.mapreduce.InputFormat;
43  import org.apache.hadoop.mapreduce.InputSplit;
44  import org.apache.hadoop.mapreduce.JobContext;
45  import org.apache.hadoop.mapreduce.RecordReader;
46  import org.apache.hadoop.mapreduce.TaskAttemptContext;
47  
48  import java.util.Map;
49  import java.util.HashMap;
50  import java.util.Iterator;
51  /**
52   * A base for {@link MultiTableInputFormat}s. Receives a list of
53   * {@link Scan} instances that define the input tables and
54   * filters etc. Subclasses may use other TableRecordReader implementations.
55   */
56  @InterfaceAudience.Public
57  @InterfaceStability.Evolving
58  public abstract class MultiTableInputFormatBase extends
59      InputFormat<ImmutableBytesWritable, Result> {
60  
61    private static final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class);
62  
63    /** Holds the set of scans used to define the input. */
64    private List<Scan> scans;
65  
66    /** The reader scanning the table, can be a custom one. */
67    private TableRecordReader tableRecordReader = null;
68  
69    /**
70     * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
71     * default.
72     *
73     * @param split The split to work with.
74     * @param context The current context.
75     * @return The newly created record reader.
76     * @throws IOException When creating the reader fails.
77     * @throws InterruptedException when record reader initialization fails
78     * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
79     *      org.apache.hadoop.mapreduce.InputSplit,
80     *      org.apache.hadoop.mapreduce.TaskAttemptContext)
81     */
82    @Override
83    public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
84        InputSplit split, TaskAttemptContext context)
85        throws IOException, InterruptedException {
86      TableSplit tSplit = (TableSplit) split;
87      LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength()));
88  
89      if (tSplit.getTable() == null) {
90        throw new IOException("Cannot create a record reader because of a"
91            + " previous error. Please look at the previous logs lines from"
92            + " the task's full log for more details.");
93      }
94      final Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
95      Table table = connection.getTable(tSplit.getTable());
96  
97      if (this.tableRecordReader == null) {
98        this.tableRecordReader = new TableRecordReader();
99      }
100     final TableRecordReader trr = this.tableRecordReader;
101 
102     try {
103       Scan sc = tSplit.getScan();
104       sc.setStartRow(tSplit.getStartRow());
105       sc.setStopRow(tSplit.getEndRow());
106       trr.setScan(sc);
107       trr.setTable(table);
108       return new RecordReader<ImmutableBytesWritable, Result>() {
109 
110         @Override
111         public void close() throws IOException {
112           trr.close();
113           connection.close();
114         }
115 
116         @Override
117         public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
118           return trr.getCurrentKey();
119         }
120 
121         @Override
122         public Result getCurrentValue() throws IOException, InterruptedException {
123           return trr.getCurrentValue();
124         }
125 
126         @Override
127         public float getProgress() throws IOException, InterruptedException {
128           return trr.getProgress();
129         }
130 
131         @Override
132         public void initialize(InputSplit inputsplit, TaskAttemptContext context)
133             throws IOException, InterruptedException {
134           trr.initialize(inputsplit, context);
135         }
136 
137         @Override
138         public boolean nextKeyValue() throws IOException, InterruptedException {
139           return trr.nextKeyValue();
140         }
141       };
142     } catch (IOException ioe) {
143       // If there is an exception make sure that all
144       // resources are closed and released.
145       trr.close();
146       connection.close();
147       throw ioe;
148     }
149   }
150 
151   /**
152    * Calculates the splits that will serve as input for the map tasks. The
153    * number of splits matches the number of regions in a table.
154    *
155    * @param context The current job context.
156    * @return The list of input splits.
157    * @throws IOException When creating the list of splits fails.
158    * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
159    */
160   @Override
161   public List<InputSplit> getSplits(JobContext context) throws IOException {
162     if (scans.isEmpty()) {
163       throw new IOException("No scans were provided.");
164     }
165 
166     Map<TableName, List<Scan>> tableMaps = new HashMap<TableName, List<Scan>>();
167     for (Scan scan : scans) {
168       byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
169       if (tableNameBytes == null)
170         throw new IOException("A scan object did not have a table name");
171 
172       TableName tableName = TableName.valueOf(tableNameBytes);
173 
174       List<Scan> scanList = tableMaps.get(tableName);
175       if (scanList == null) {
176         scanList = new ArrayList<Scan>();
177         tableMaps.put(tableName, scanList);
178       }
179       scanList.add(scan);
180     }
181 
182     List<InputSplit> splits = new ArrayList<InputSplit>();
183     Iterator iter = tableMaps.entrySet().iterator();
184     // Make a single Connection to the Cluster and use it across all tables.
185     try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration())) {
186       while (iter.hasNext()) {
187         Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
188         TableName tableName = entry.getKey();
189         List<Scan> scanList = entry.getValue();
190 
191         try (Table table = conn.getTable(tableName);
192              RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
193           RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
194                   regionLocator, conn.getAdmin());
195           Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
196           for (Scan scan : scanList) {
197             if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
198               throw new IOException("Expecting at least one region for table : "
199                       + tableName.getNameAsString());
200             }
201             int count = 0;
202 
203             byte[] startRow = scan.getStartRow();
204             byte[] stopRow = scan.getStopRow();
205 
206             for (int i = 0; i < keys.getFirst().length; i++) {
207               if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
208                 continue;
209               }
210 
211               if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
212                       Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
213                       (stopRow.length == 0 || Bytes.compareTo(stopRow,
214                               keys.getFirst()[i]) > 0)) {
215                 byte[] splitStart = startRow.length == 0 ||
216                         Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
217                         keys.getFirst()[i] : startRow;
218                 byte[] splitStop = (stopRow.length == 0 ||
219                         Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
220                         keys.getSecond()[i].length > 0 ?
221                         keys.getSecond()[i] : stopRow;
222 
223                 HRegionLocation hregionLocation = regionLocator.getRegionLocation(
224                         keys.getFirst()[i], false);
225                 String regionHostname = hregionLocation.getHostname();
226                 HRegionInfo regionInfo = hregionLocation.getRegionInfo();
227                 long regionSize = sizeCalculator.getRegionSize(
228                         regionInfo.getRegionName());
229 
230                 TableSplit split = new TableSplit(table.getName(),
231                         scan, splitStart, splitStop, regionHostname, regionSize);
232 
233                 splits.add(split);
234 
235                 if (LOG.isDebugEnabled()) {
236                   LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
237                 }
238               }
239             }
240           }
241         }
242       }
243     }
244 
245     return splits;
246   }
247 
248   /**
249    * Test if the given region is to be included in the InputSplit while
250    * splitting the regions of a table.
251    * <p>
252    * This optimization is effective when there is a specific reasoning to
253    * exclude an entire region from the M-R job, (and hence, not contributing to
254    * the InputSplit), given the start and end keys of the same. <br>
255    * Useful when we need to remember the last-processed top record and revisit
256    * the [last, current) interval for M-R processing, continuously. In addition
257    * to reducing InputSplits, reduces the load on the region server as well, due
258    * to the ordering of the keys. <br>
259    * <br>
260    * Note: It is possible that <code>endKey.length() == 0 </code> , for the last
261    * (recent) region. <br>
262    * Override this method, if you want to bulk exclude regions altogether from
263    * M-R. By default, no region is excluded( i.e. all regions are included).
264    *
265    * @param startKey Start key of the region
266    * @param endKey End key of the region
267    * @return true, if this region needs to be included as part of the input
268    *         (default).
269    */
270   protected boolean includeRegionInSplit(final byte[] startKey,
271       final byte[] endKey) {
272     return true;
273   }
274 
275   /**
276    * Allows subclasses to get the list of {@link Scan} objects.
277    */
278   protected List<Scan> getScans() {
279     return this.scans;
280   }
281 
282   /**
283    * Allows subclasses to set the list of {@link Scan} objects.
284    *
285    * @param scans The list of {@link Scan} used to define the input
286    */
287   protected void setScans(List<Scan> scans) {
288     this.scans = scans;
289   }
290 
291   /**
292    * Allows subclasses to set the {@link TableRecordReader}.
293    *
294    * @param tableRecordReader A different {@link TableRecordReader}
295    *          implementation.
296    */
297   protected void setTableRecordReader(TableRecordReader tableRecordReader) {
298     this.tableRecordReader = tableRecordReader;
299   }
300 }