View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  
22  import org.apache.hadoop.classification.InterfaceAudience;
23  import org.apache.hadoop.classification.InterfaceStability;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.hbase.KeyValue;
26  import org.apache.hadoop.hbase.client.Put;
27  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
28  import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
29  import org.apache.hadoop.hbase.security.visibility.CellVisibility;
30  import org.apache.hadoop.hbase.util.Base64;
31  import org.apache.hadoop.io.LongWritable;
32  import org.apache.hadoop.io.Text;
33  import org.apache.hadoop.mapreduce.Counter;
34  import org.apache.hadoop.mapreduce.Mapper;
35  
36  /**
37   * Write table content out to files in hdfs.
38   */
39  @InterfaceAudience.Public
40  @InterfaceStability.Stable
41  public class TsvImporterMapper
42  extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
43  {
44  
45    /** Timestamp for all inserted rows */
46    protected long ts;
47  
48    /** Column seperator */
49    private String separator;
50  
51    /** Should skip bad lines */
52    private boolean skipBadLines;
53    private Counter badLineCount;
54  
55    protected ImportTsv.TsvParser parser;
56  
57    protected Configuration conf;
58  
59    protected String cellVisibilityExpr;
60  
61    private String hfileOutPath;
62  
63    private LabelExpander labelExpander;
64  
65    public long getTs() {
66      return ts;
67    }
68  
69    public boolean getSkipBadLines() {
70      return skipBadLines;
71    }
72  
73    public Counter getBadLineCount() {
74      return badLineCount;
75    }
76  
77    public void incrementBadLineCount(int count) {
78      this.badLineCount.increment(count);
79    }
80  
81    /**
82     * Handles initializing this class with objects specific to it (i.e., the parser).
83     * Common initialization that might be leveraged by a subsclass is done in
84     * <code>doSetup</code>. Hence a subclass may choose to override this method
85     * and call <code>doSetup</code> as well before handling it's own custom params.
86     *
87     * @param context
88     */
89    @Override
90    protected void setup(Context context) {
91      doSetup(context);
92  
93      conf = context.getConfiguration();
94      parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
95                             separator);
96      if (parser.getRowKeyColumnIndex() == -1) {
97        throw new RuntimeException("No row key column specified");
98      }
99      labelExpander = new LabelExpander(conf);
100   }
101 
102   /**
103    * Handles common parameter initialization that a subclass might want to leverage.
104    * @param context
105    */
106   protected void doSetup(Context context) {
107     Configuration conf = context.getConfiguration();
108 
109     // If a custom separator has been used,
110     // decode it back from Base64 encoding.
111     separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
112     if (separator == null) {
113       separator = ImportTsv.DEFAULT_SEPARATOR;
114     } else {
115       separator = new String(Base64.decode(separator));
116     }
117     // Should never get 0 as we are setting this to a valid value in job
118     // configuration.
119     ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
120 
121     skipBadLines = context.getConfiguration().getBoolean(
122         ImportTsv.SKIP_LINES_CONF_KEY, true);
123     badLineCount = context.getCounter("ImportTsv", "Bad Lines");
124     hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
125   }
126 
127   /**
128    * Convert a line of TSV text into an HBase table row.
129    */
130   @Override
131   public void map(LongWritable offset, Text value,
132     Context context)
133   throws IOException {
134     byte[] lineBytes = value.getBytes();
135 
136     try {
137       ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
138           lineBytes, value.getLength());
139       ImmutableBytesWritable rowKey =
140         new ImmutableBytesWritable(lineBytes,
141             parsed.getRowKeyOffset(),
142             parsed.getRowKeyLength());
143       // Retrieve timestamp if exists
144       ts = parsed.getTimestamp(ts);
145       cellVisibilityExpr = parsed.getCellVisibility();
146 
147       Put put = new Put(rowKey.copyBytes());
148       for (int i = 0; i < parsed.getColumnCount(); i++) {
149         if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
150             || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
151           continue;
152         }
153         KeyValue kv = createPuts(lineBytes, parsed, put, i);
154       }
155       context.write(rowKey, put);
156     } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
157       if (skipBadLines) {
158         System.err.println(
159             "Bad line at offset: " + offset.get() + ":\n" +
160             badLine.getMessage());
161         incrementBadLineCount(1);
162         return;
163       } else {
164         throw new IOException(badLine);
165       }
166     } catch (IllegalArgumentException e) {
167       if (skipBadLines) {
168         System.err.println(
169             "Bad line at offset: " + offset.get() + ":\n" +
170             e.getMessage());
171         incrementBadLineCount(1);
172         return;
173       } else {
174         throw new IOException(e);
175       }
176     } catch (InterruptedException e) {
177       e.printStackTrace();
178     }
179   }
180 
181   protected KeyValue createPuts(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put,
182       int i) throws BadTsvLineException, IOException {
183     KeyValue kv = null;
184     if (hfileOutPath == null) {
185       kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
186           parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
187           parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
188           parsed.getColumnOffset(i), parsed.getColumnLength(i));
189       if (cellVisibilityExpr != null) {
190         // We won't be validating the expression here. The Visibility CP will do
191         // the validation
192         put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
193       }
194     } else {
195       kv = labelExpander.createKVFromCellVisibilityExpr(
196           parsed.getRowKeyOffset(), parsed.getRowKeyLength(), parser.getFamily(i), 0,
197           parser.getFamily(i).length, parser.getQualifier(i), 0,
198           parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
199           parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr);
200     }
201     put.add(kv);
202     return kv;
203   }
204 }