View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.List;
23  
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.classification.InterfaceStability;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.Tag;
30  import org.apache.hadoop.hbase.TagType;
31  import org.apache.hadoop.hbase.client.Put;
32  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
33  import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
34  import org.apache.hadoop.hbase.security.visibility.CellVisibility;
35  import org.apache.hadoop.hbase.util.Base64;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.io.LongWritable;
38  import org.apache.hadoop.io.Text;
39  import org.apache.hadoop.mapreduce.Counter;
40  import org.apache.hadoop.mapreduce.Mapper;
41  
42  /**
43   * Write table content out to files in hdfs.
44   */
45  @InterfaceAudience.Public
46  @InterfaceStability.Stable
47  public class TsvImporterMapper
48  extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
49  {
50  
51    /** Timestamp for all inserted rows */
52    protected long ts;
53  
54    /** Column seperator */
55    private String separator;
56  
57    /** Should skip bad lines */
58    private boolean skipBadLines;
59    private Counter badLineCount;
60  
61    protected ImportTsv.TsvParser parser;
62  
63    protected Configuration conf;
64  
65    protected String cellVisibilityExpr;
66  
67    protected long ttl;
68  
69    protected CellCreator kvCreator;
70  
71    private String hfileOutPath;
72  
73    public long getTs() {
74      return ts;
75    }
76  
77    public boolean getSkipBadLines() {
78      return skipBadLines;
79    }
80  
81    public Counter getBadLineCount() {
82      return badLineCount;
83    }
84  
85    public void incrementBadLineCount(int count) {
86      this.badLineCount.increment(count);
87    }
88  
89    /**
90     * Handles initializing this class with objects specific to it (i.e., the parser).
91     * Common initialization that might be leveraged by a subsclass is done in
92     * <code>doSetup</code>. Hence a subclass may choose to override this method
93     * and call <code>doSetup</code> as well before handling it's own custom params.
94     *
95     * @param context
96     */
97    @Override
98    protected void setup(Context context) {
99      doSetup(context);
100 
101     conf = context.getConfiguration();
102     parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
103                            separator);
104     if (parser.getRowKeyColumnIndex() == -1) {
105       throw new RuntimeException("No row key column specified");
106     }
107     this.kvCreator = new CellCreator(conf);
108   }
109 
110   /**
111    * Handles common parameter initialization that a subclass might want to leverage.
112    * @param context
113    */
114   protected void doSetup(Context context) {
115     Configuration conf = context.getConfiguration();
116 
117     // If a custom separator has been used,
118     // decode it back from Base64 encoding.
119     separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
120     if (separator == null) {
121       separator = ImportTsv.DEFAULT_SEPARATOR;
122     } else {
123       separator = new String(Base64.decode(separator));
124     }
125     // Should never get 0 as we are setting this to a valid value in job
126     // configuration.
127     ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
128 
129     skipBadLines = context.getConfiguration().getBoolean(
130         ImportTsv.SKIP_LINES_CONF_KEY, true);
131     badLineCount = context.getCounter("ImportTsv", "Bad Lines");
132     hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
133   }
134 
135   /**
136    * Convert a line of TSV text into an HBase table row.
137    */
138   @Override
139   public void map(LongWritable offset, Text value,
140     Context context)
141   throws IOException {
142     byte[] lineBytes = value.getBytes();
143 
144     try {
145       ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
146           lineBytes, value.getLength());
147       ImmutableBytesWritable rowKey =
148         new ImmutableBytesWritable(lineBytes,
149             parsed.getRowKeyOffset(),
150             parsed.getRowKeyLength());
151       // Retrieve timestamp if exists
152       ts = parsed.getTimestamp(ts);
153       cellVisibilityExpr = parsed.getCellVisibility();
154       ttl = parsed.getCellTTL();
155 
156       Put put = new Put(rowKey.copyBytes());
157       for (int i = 0; i < parsed.getColumnCount(); i++) {
158         if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
159             || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
160             || i == parser.getCellTTLColumnIndex()) {
161           continue;
162         }
163         populatePut(lineBytes, parsed, put, i);
164       }
165       context.write(rowKey, put);
166     } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
167       if (skipBadLines) {
168         System.err.println(
169             "Bad line at offset: " + offset.get() + ":\n" +
170             badLine.getMessage());
171         incrementBadLineCount(1);
172         return;
173       } else {
174         throw new IOException(badLine);
175       }
176     } catch (IllegalArgumentException e) {
177       if (skipBadLines) {
178         System.err.println(
179             "Bad line at offset: " + offset.get() + ":\n" +
180             e.getMessage());
181         incrementBadLineCount(1);
182         return;
183       } else {
184         throw new IOException(e);
185       }
186     } catch (InterruptedException e) {
187       e.printStackTrace();
188     }
189   }
190 
191   protected void populatePut(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put,
192       int i) throws BadTsvLineException, IOException {
193     Cell cell = null;
194     if (hfileOutPath == null) {
195       cell = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
196           parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
197           parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
198           parsed.getColumnOffset(i), parsed.getColumnLength(i));
199       if (cellVisibilityExpr != null) {
200         // We won't be validating the expression here. The Visibility CP will do
201         // the validation
202         put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
203       }
204       if (ttl > 0) {
205         put.setTTL(ttl);
206       }
207     } else {
208       // Creating the KV which needs to be directly written to HFiles. Using the Facade
209       // KVCreator for creation of kvs.
210       List<Tag> tags = new ArrayList<Tag>();
211       if (cellVisibilityExpr != null) {
212         tags.addAll(kvCreator.getVisibilityExpressionResolver()
213           .createVisibilityExpTags(cellVisibilityExpr));
214       }
215       // Add TTL directly to the KV so we can vary them when packing more than one KV
216       // into puts
217       if (ttl > 0) {
218         tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
219       }
220       cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
221           parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
222           parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
223           parsed.getColumnLength(i), tags);
224     }
225     put.add(cell);
226   }
227 }