View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import org.apache.hadoop.io.LongWritable;
21  import org.apache.hadoop.io.Text;
22  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23  import org.apache.hadoop.hbase.client.Put;
24  import org.apache.hadoop.hbase.util.Base64;
25  import org.apache.hadoop.hbase.KeyValue;
26  import org.apache.hadoop.mapreduce.Mapper;
27  import org.apache.hadoop.mapreduce.Counter;
28  import org.apache.hadoop.conf.Configuration;
29  
30  import java.io.IOException;
31  
32  /**
33   * Write table content out to files in hdfs.
34   */
35  public class TsvImporterMapper
36  extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
37  {
38  
39    /** Timestamp for all inserted rows */
40    private long ts;
41  
42    /** Column seperator */
43    private String separator;
44  
45    /** Should skip bad lines */
46    private boolean skipBadLines;
47    private Counter badLineCount;
48  
49    private ImportTsv.TsvParser parser;
50  
51    public long getTs() {
52      return ts;
53    }
54  
55    public boolean getSkipBadLines() {
56      return skipBadLines;
57    }
58  
59    public Counter getBadLineCount() {
60      return badLineCount;
61    }
62  
63    public void incrementBadLineCount(int count) {
64      this.badLineCount.increment(count);
65    }
66  
67    /**
68     * Handles initializing this class with objects specific to it (i.e., the parser).
69     * Common initialization that might be leveraged by a subsclass is done in
70     * <code>doSetup</code>. Hence a subclass may choose to override this method
71     * and call <code>doSetup</code> as well before handling it's own custom params.
72     *
73     * @param context
74     */
75    @Override
76    protected void setup(Context context) {
77      doSetup(context);
78  
79      Configuration conf = context.getConfiguration();
80  
81      parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
82                             separator);
83      if (parser.getRowKeyColumnIndex() == -1) {
84        throw new RuntimeException("No row key column specified");
85      }
86    }
87  
88    /**
89     * Handles common parameter initialization that a subclass might want to leverage.
90     * @param context
91     */
92    protected void doSetup(Context context) {
93      Configuration conf = context.getConfiguration();
94  
95      // If a custom separator has been used,
96      // decode it back from Base64 encoding.
97      separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
98      if (separator == null) {
99        separator = ImportTsv.DEFAULT_SEPARATOR;
100     } else {
101       separator = new String(Base64.decode(separator));
102     }
103 
104     // Should never get 0 as we are setting this to a valid value in job
105     // configuration.
106     ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
107 
108     skipBadLines = context.getConfiguration().getBoolean(
109         ImportTsv.SKIP_LINES_CONF_KEY, true);
110     badLineCount = context.getCounter("ImportTsv", "Bad Lines");
111   }
112 
113   /**
114    * Convert a line of TSV text into an HBase table row.
115    */
116   @Override
117   public void map(LongWritable offset, Text value,
118     Context context)
119   throws IOException {
120     byte[] lineBytes = value.getBytes();
121 
122     try {
123       ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
124           lineBytes, value.getLength());
125       ImmutableBytesWritable rowKey =
126         new ImmutableBytesWritable(lineBytes,
127             parsed.getRowKeyOffset(),
128             parsed.getRowKeyLength());
129       // Retrieve timestamp if exists
130       ts = parsed.getTimestamp(ts);
131 
132       Put put = new Put(rowKey.copyBytes());
133       for (int i = 0; i < parsed.getColumnCount(); i++) {
134         if (i == parser.getRowKeyColumnIndex()
135             || i == parser.getTimestampKeyColumnIndex()) {
136           continue;
137         }
138         KeyValue kv = new KeyValue(
139             lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
140             parser.getFamily(i), 0, parser.getFamily(i).length,
141             parser.getQualifier(i), 0, parser.getQualifier(i).length,
142             ts,
143             KeyValue.Type.Put,
144             lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
145         put.add(kv);
146       }
147       context.write(rowKey, put);
148     } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
149       if (skipBadLines) {
150         System.err.println(
151             "Bad line at offset: " + offset.get() + ":\n" +
152             badLine.getMessage());
153         incrementBadLineCount(1);
154         return;
155       } else {
156         throw new IOException(badLine);
157       }
158     } catch (IllegalArgumentException e) {
159       if (skipBadLines) {
160         System.err.println(
161             "Bad line at offset: " + offset.get() + ":\n" +
162             e.getMessage());
163         incrementBadLineCount(1);
164         return;
165       } else {
166         throw new IOException(e);
167       }
168     } catch (InterruptedException e) {
169       e.printStackTrace();
170     }
171   }
172 }