001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.text.MessageFormat;
022import java.util.ArrayList;
023import java.util.List;
024
025import org.apache.yetus.audience.InterfaceAudience;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028import org.apache.hadoop.hbase.HRegionInfo;
029import org.apache.hadoop.hbase.HRegionLocation;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.Connection;
032import org.apache.hadoop.hbase.client.ConnectionFactory;
033import org.apache.hadoop.hbase.client.RegionLocator;
034import org.apache.hadoop.hbase.client.Result;
035import org.apache.hadoop.hbase.client.Scan;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.Pair;
040import org.apache.hadoop.mapreduce.InputFormat;
041import org.apache.hadoop.mapreduce.InputSplit;
042import org.apache.hadoop.mapreduce.JobContext;
043import org.apache.hadoop.mapreduce.RecordReader;
044import org.apache.hadoop.mapreduce.TaskAttemptContext;
045
046import java.util.Map;
047import java.util.HashMap;
048import java.util.Iterator;
049/**
050 * A base for {@link MultiTableInputFormat}s. Receives a list of
051 * {@link Scan} instances that define the input tables and
052 * filters etc. Subclasses may use other TableRecordReader implementations.
053 */
054@InterfaceAudience.Public
055public abstract class MultiTableInputFormatBase extends
056    InputFormat<ImmutableBytesWritable, Result> {
057
058  private static final Logger LOG = LoggerFactory.getLogger(MultiTableInputFormatBase.class);
059
060  /** Holds the set of scans used to define the input. */
061  private List<Scan> scans;
062
063  /** The reader scanning the table, can be a custom one. */
064  private TableRecordReader tableRecordReader = null;
065
066  /**
067   * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
068   * default.
069   *
070   * @param split The split to work with.
071   * @param context The current context.
072   * @return The newly created record reader.
073   * @throws IOException When creating the reader fails.
074   * @throws InterruptedException when record reader initialization fails
075   * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
076   *      org.apache.hadoop.mapreduce.InputSplit,
077   *      org.apache.hadoop.mapreduce.TaskAttemptContext)
078   */
079  @Override
080  public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
081      InputSplit split, TaskAttemptContext context)
082      throws IOException, InterruptedException {
083    TableSplit tSplit = (TableSplit) split;
084    LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength()));
085
086    if (tSplit.getTable() == null) {
087      throw new IOException("Cannot create a record reader because of a"
088          + " previous error. Please look at the previous logs lines from"
089          + " the task's full log for more details.");
090    }
091    final Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
092    Table table = connection.getTable(tSplit.getTable());
093
094    if (this.tableRecordReader == null) {
095      this.tableRecordReader = new TableRecordReader();
096    }
097    final TableRecordReader trr = this.tableRecordReader;
098
099    try {
100      Scan sc = tSplit.getScan();
101      sc.setStartRow(tSplit.getStartRow());
102      sc.setStopRow(tSplit.getEndRow());
103      trr.setScan(sc);
104      trr.setTable(table);
105      return new RecordReader<ImmutableBytesWritable, Result>() {
106
107        @Override
108        public void close() throws IOException {
109          trr.close();
110          connection.close();
111        }
112
113        @Override
114        public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
115          return trr.getCurrentKey();
116        }
117
118        @Override
119        public Result getCurrentValue() throws IOException, InterruptedException {
120          return trr.getCurrentValue();
121        }
122
123        @Override
124        public float getProgress() throws IOException, InterruptedException {
125          return trr.getProgress();
126        }
127
128        @Override
129        public void initialize(InputSplit inputsplit, TaskAttemptContext context)
130            throws IOException, InterruptedException {
131          trr.initialize(inputsplit, context);
132        }
133
134        @Override
135        public boolean nextKeyValue() throws IOException, InterruptedException {
136          return trr.nextKeyValue();
137        }
138      };
139    } catch (IOException ioe) {
140      // If there is an exception make sure that all
141      // resources are closed and released.
142      trr.close();
143      connection.close();
144      throw ioe;
145    }
146  }
147
148  /**
149   * Calculates the splits that will serve as input for the map tasks. The
150   * number of splits matches the number of regions in a table.
151   *
152   * @param context The current job context.
153   * @return The list of input splits.
154   * @throws IOException When creating the list of splits fails.
155   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
156   */
157  @Override
158  public List<InputSplit> getSplits(JobContext context) throws IOException {
159    if (scans.isEmpty()) {
160      throw new IOException("No scans were provided.");
161    }
162
163    Map<TableName, List<Scan>> tableMaps = new HashMap<>();
164    for (Scan scan : scans) {
165      byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
166      if (tableNameBytes == null)
167        throw new IOException("A scan object did not have a table name");
168
169      TableName tableName = TableName.valueOf(tableNameBytes);
170
171      List<Scan> scanList = tableMaps.get(tableName);
172      if (scanList == null) {
173        scanList = new ArrayList<>();
174        tableMaps.put(tableName, scanList);
175      }
176      scanList.add(scan);
177    }
178
179    List<InputSplit> splits = new ArrayList<>();
180    Iterator iter = tableMaps.entrySet().iterator();
181    // Make a single Connection to the Cluster and use it across all tables.
182    try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration())) {
183      while (iter.hasNext()) {
184        Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
185        TableName tableName = entry.getKey();
186        List<Scan> scanList = entry.getValue();
187        try (Table table = conn.getTable(tableName);
188             RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
189          RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
190              regionLocator, conn.getAdmin());
191          Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
192          for (Scan scan : scanList) {
193            if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
194              throw new IOException("Expecting at least one region for table : "
195                  + tableName.getNameAsString());
196            }
197            int count = 0;
198
199            byte[] startRow = scan.getStartRow();
200            byte[] stopRow = scan.getStopRow();
201
202            for (int i = 0; i < keys.getFirst().length; i++) {
203              if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
204                continue;
205              }
206
207              if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
208                  Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
209                  (stopRow.length == 0 || Bytes.compareTo(stopRow,
210                      keys.getFirst()[i]) > 0)) {
211                byte[] splitStart = startRow.length == 0 ||
212                    Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
213                    keys.getFirst()[i] : startRow;
214                byte[] splitStop = (stopRow.length == 0 ||
215                    Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
216                    keys.getSecond()[i].length > 0 ?
217                    keys.getSecond()[i] : stopRow;
218
219                HRegionLocation hregionLocation = regionLocator.getRegionLocation(
220                    keys.getFirst()[i], false);
221                String regionHostname = hregionLocation.getHostname();
222                HRegionInfo regionInfo = hregionLocation.getRegionInfo();
223                String encodedRegionName = regionInfo.getEncodedName();
224                long regionSize = sizeCalculator.getRegionSize(
225                    regionInfo.getRegionName());
226
227                TableSplit split = new TableSplit(table.getName(),
228                    scan, splitStart, splitStop, regionHostname,
229                    encodedRegionName, regionSize);
230
231                splits.add(split);
232
233                if (LOG.isDebugEnabled()) {
234                  LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
235                }
236              }
237            }
238          }
239        }
240      }
241    }
242
243    return splits;
244  }
245
246  /**
247   * Test if the given region is to be included in the InputSplit while
248   * splitting the regions of a table.
249   * <p>
250   * This optimization is effective when there is a specific reasoning to
251   * exclude an entire region from the M-R job, (and hence, not contributing to
252   * the InputSplit), given the start and end keys of the same. <br>
253   * Useful when we need to remember the last-processed top record and revisit
254   * the [last, current) interval for M-R processing, continuously. In addition
255   * to reducing InputSplits, reduces the load on the region server as well, due
256   * to the ordering of the keys. <br>
257   * <br>
258   * Note: It is possible that <code>endKey.length() == 0 </code> , for the last
259   * (recent) region. <br>
260   * Override this method, if you want to bulk exclude regions altogether from
261   * M-R. By default, no region is excluded( i.e. all regions are included).
262   *
263   * @param startKey Start key of the region
264   * @param endKey End key of the region
265   * @return true, if this region needs to be included as part of the input
266   *         (default).
267   */
268  protected boolean includeRegionInSplit(final byte[] startKey,
269      final byte[] endKey) {
270    return true;
271  }
272
273  /**
274   * Allows subclasses to get the list of {@link Scan} objects.
275   */
276  protected List<Scan> getScans() {
277    return this.scans;
278  }
279
280  /**
281   * Allows subclasses to set the list of {@link Scan} objects.
282   *
283   * @param scans The list of {@link Scan} used to define the input
284   */
285  protected void setScans(List<Scan> scans) {
286    this.scans = scans;
287  }
288
289  /**
290   * Allows subclasses to set the {@link TableRecordReader}.
291   *
292   * @param tableRecordReader A different {@link TableRecordReader}
293   *          implementation.
294   */
295  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
296    this.tableRecordReader = tableRecordReader;
297  }
298}