View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.List;
23  
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.hbase.ArrayBackedTag;
26  import org.apache.hadoop.hbase.Cell;
27  import org.apache.hadoop.hbase.KeyValue;
28  import org.apache.hadoop.hbase.Tag;
29  import org.apache.hadoop.hbase.TagType;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.classification.InterfaceStability;
32  import org.apache.hadoop.hbase.client.Put;
33  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
34  import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
35  import org.apache.hadoop.hbase.security.visibility.CellVisibility;
36  import org.apache.hadoop.hbase.security.visibility.InvalidLabelException;
37  import org.apache.hadoop.hbase.util.Base64;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.io.LongWritable;
40  import org.apache.hadoop.io.Text;
41  import org.apache.hadoop.mapreduce.Counter;
42  import org.apache.hadoop.mapreduce.Mapper;
43  
44  /**
45   * Write table content out to files in hdfs.
46   */
47  @InterfaceAudience.Public
48  @InterfaceStability.Stable
49  public class TsvImporterMapper
50  extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
51  {
52  
53    /** Timestamp for all inserted rows */
54    protected long ts;
55  
56    /** Column seperator */
57    private String separator;
58  
59    /** Should skip bad lines */
60    private boolean skipBadLines;
61    /** Should skip empty columns*/
62    private boolean skipEmptyColumns;
63    private Counter badLineCount;
64    private boolean logBadLines;
65  
66    protected ImportTsv.TsvParser parser;
67  
68    protected Configuration conf;
69  
70    protected String cellVisibilityExpr;
71  
72    protected long ttl;
73  
74    protected CellCreator kvCreator;
75  
76    private String hfileOutPath;
77  
78    /** List of cell tags */
79    private List<Tag> tags;
80    
81    public long getTs() {
82      return ts;
83    }
84  
85    public boolean getSkipBadLines() {
86      return skipBadLines;
87    }
88  
89    public Counter getBadLineCount() {
90      return badLineCount;
91    }
92  
93    public void incrementBadLineCount(int count) {
94      this.badLineCount.increment(count);
95    }
96  
97    /**
98     * Handles initializing this class with objects specific to it (i.e., the parser).
99     * Common initialization that might be leveraged by a subsclass is done in
100    * <code>doSetup</code>. Hence a subclass may choose to override this method
101    * and call <code>doSetup</code> as well before handling it's own custom params.
102    *
103    * @param context
104    */
105   @Override
106   protected void setup(Context context) {
107     doSetup(context);
108 
109     conf = context.getConfiguration();
110     parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
111                            separator);
112     if (parser.getRowKeyColumnIndex() == -1) {
113       throw new RuntimeException("No row key column specified");
114     }
115     this.kvCreator = new CellCreator(conf);
116     tags = new ArrayList<Tag>();
117   }
118 
119   /**
120    * Handles common parameter initialization that a subclass might want to leverage.
121    * @param context
122    */
123   protected void doSetup(Context context) {
124     Configuration conf = context.getConfiguration();
125 
126     // If a custom separator has been used,
127     // decode it back from Base64 encoding.
128     separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
129     if (separator == null) {
130       separator = ImportTsv.DEFAULT_SEPARATOR;
131     } else {
132       separator = new String(Base64.decode(separator));
133     }
134     // Should never get 0 as we are setting this to a valid value in job
135     // configuration.
136     ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
137 
138     skipEmptyColumns = context.getConfiguration().getBoolean(
139         ImportTsv.SKIP_EMPTY_COLUMNS, false);
140     skipBadLines = context.getConfiguration().getBoolean(
141         ImportTsv.SKIP_LINES_CONF_KEY, true);
142     badLineCount = context.getCounter("ImportTsv", "Bad Lines");
143     logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false);
144     hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
145   }
146 
147   /**
148    * Convert a line of TSV text into an HBase table row.
149    */
150   @Override
151   public void map(LongWritable offset, Text value,
152     Context context)
153   throws IOException {
154     byte[] lineBytes = value.getBytes();
155 
156     try {
157       ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
158           lineBytes, value.getLength());
159       ImmutableBytesWritable rowKey =
160         new ImmutableBytesWritable(lineBytes,
161             parsed.getRowKeyOffset(),
162             parsed.getRowKeyLength());
163       // Retrieve timestamp if exists
164       ts = parsed.getTimestamp(ts);
165       cellVisibilityExpr = parsed.getCellVisibility();
166       ttl = parsed.getCellTTL();
167 
168       // create tags for the parsed line
169       if (hfileOutPath != null) {
170         tags.clear();
171         if (cellVisibilityExpr != null) {
172           tags.addAll(kvCreator.getVisibilityExpressionResolver().createVisibilityExpTags(
173             cellVisibilityExpr));
174         }
175         // Add TTL directly to the KV so we can vary them when packing more than one KV
176         // into puts
177         if (ttl > 0) {
178           tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
179         }
180       }
181       Put put = new Put(rowKey.copyBytes());
182       for (int i = 0; i < parsed.getColumnCount(); i++) {
183         if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
184             || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
185             || i == parser.getCellTTLColumnIndex() || (skipEmptyColumns 
186             && parsed.getColumnLength(i) == 0)) {
187           continue;
188         }
189         populatePut(lineBytes, parsed, put, i);
190       }
191       context.write(rowKey, put);
192     } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException
193         | InvalidLabelException badLine) {
194       if (logBadLines) {
195         System.err.println(value);
196       }
197       System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
198       if (skipBadLines) {
199         incrementBadLineCount(1);
200         return;
201       }
202       throw new IOException(badLine);
203     } catch (InterruptedException e) {
204       e.printStackTrace();
205     }
206   }
207 
208   protected void populatePut(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put,
209       int i) throws BadTsvLineException, IOException {
210     Cell cell = null;
211     if (hfileOutPath == null) {
212       cell = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
213           parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
214           parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
215           parsed.getColumnOffset(i), parsed.getColumnLength(i));
216       if (cellVisibilityExpr != null) {
217         // We won't be validating the expression here. The Visibility CP will do
218         // the validation
219         put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
220       }
221       if (ttl > 0) {
222         put.setTTL(ttl);
223       }
224     } else {
225       // Creating the KV which needs to be directly written to HFiles. Using the Facade
226       // KVCreator for creation of kvs.
227       cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
228           parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
229           parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
230           parsed.getColumnLength(i), tags);
231     }
232     put.add(cell);
233   }
234 }