View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import org.apache.hadoop.io.LongWritable;
21  import org.apache.hadoop.io.Text;
22  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23  import org.apache.hadoop.hbase.util.Base64;
24  import org.apache.hadoop.hbase.util.Pair;
25  import org.apache.hadoop.mapreduce.Mapper;
26  import org.apache.hadoop.mapreduce.Counter;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.classification.InterfaceStability;
29  import org.apache.hadoop.conf.Configuration;
30  
31  import java.io.IOException;
32  
33  /**
34   * Write table content out to map output files.
35   */
36  @InterfaceAudience.Public
37  @InterfaceStability.Evolving
38  public class TsvImporterTextMapper
39  extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
40  {
41  
42    /** Column seperator */
43    private String separator;
44  
45    /** Should skip bad lines */
46    private boolean skipBadLines;
47    private Counter badLineCount;
48  
49    private ImportTsv.TsvParser parser;
50  
51    public boolean getSkipBadLines() {
52      return skipBadLines;
53    }
54  
55    public Counter getBadLineCount() {
56      return badLineCount;
57    }
58  
59    public void incrementBadLineCount(int count) {
60      this.badLineCount.increment(count);
61    }
62  
63    /**
64     * Handles initializing this class with objects specific to it (i.e., the parser).
65     * Common initialization that might be leveraged by a subsclass is done in
66     * <code>doSetup</code>. Hence a subclass may choose to override this method
67     * and call <code>doSetup</code> as well before handling it's own custom params.
68     *
69     * @param context
70     */
71    @Override
72    protected void setup(Context context) {
73      doSetup(context);
74  
75      Configuration conf = context.getConfiguration();
76  
77      parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
78      if (parser.getRowKeyColumnIndex() == -1) {
79        throw new RuntimeException("No row key column specified");
80      }
81    }
82  
83    /**
84     * Handles common parameter initialization that a subclass might want to leverage.
85     * @param context
86     */
87    protected void doSetup(Context context) {
88      Configuration conf = context.getConfiguration();
89  
90      // If a custom separator has been used,
91      // decode it back from Base64 encoding.
92      separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
93      if (separator == null) {
94        separator = ImportTsv.DEFAULT_SEPARATOR;
95      } else {
96        separator = new String(Base64.decode(separator));
97      }
98  
99      skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
100     badLineCount = context.getCounter("ImportTsv", "Bad Lines");
101   }
102 
103   /**
104    * Convert a line of TSV text into an HBase table row.
105    */
106   @Override
107   public void map(LongWritable offset, Text value, Context context) throws IOException {
108     try {
109       Pair<Integer,Integer> rowKeyOffests = parser.parseRowKey(value.getBytes(), value.getLength());
110       ImmutableBytesWritable rowKey = new ImmutableBytesWritable(
111           value.getBytes(), rowKeyOffests.getFirst(), rowKeyOffests.getSecond());
112       context.write(rowKey, value);
113     } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
114       if (skipBadLines) {
115         System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
116         incrementBadLineCount(1);
117         return;
118       } 
119       throw new IOException(badLine);
120     } catch (IllegalArgumentException e) {
121       if (skipBadLines) {
122         System.err.println("Bad line at offset: " + offset.get() + ":\n" + e.getMessage());
123         incrementBadLineCount(1);
124         return;
125       } else {
126         throw new IOException(e);
127       }
128     } catch (InterruptedException e) {
129       e.printStackTrace();
130       Thread.currentThread().interrupt();
131     } 
132   }
133 }