View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  
22  import org.apache.hadoop.classification.InterfaceAudience;
23  import org.apache.hadoop.classification.InterfaceStability;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.hbase.Cell;
26  import org.apache.hadoop.hbase.KeyValue;
27  import org.apache.hadoop.hbase.client.Put;
28  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
29  import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
30  import org.apache.hadoop.hbase.security.visibility.CellVisibility;
31  import org.apache.hadoop.hbase.util.Base64;
32  import org.apache.hadoop.io.LongWritable;
33  import org.apache.hadoop.io.Text;
34  import org.apache.hadoop.mapreduce.Counter;
35  import org.apache.hadoop.mapreduce.Mapper;
36  
37  /**
38   * Write table content out to files in hdfs.
39   */
40  @InterfaceAudience.Public
41  @InterfaceStability.Stable
42  public class TsvImporterMapper
43  extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
44  {
45  
46    /** Timestamp for all inserted rows */
47    protected long ts;
48  
49    /** Column seperator */
50    private String separator;
51  
52    /** Should skip bad lines */
53    private boolean skipBadLines;
54    private Counter badLineCount;
55  
56    protected ImportTsv.TsvParser parser;
57  
58    protected Configuration conf;
59  
60    protected String cellVisibilityExpr;
61  
62    protected CellCreator kvCreator;
63  
64    private String hfileOutPath;
65  
66    public long getTs() {
67      return ts;
68    }
69  
70    public boolean getSkipBadLines() {
71      return skipBadLines;
72    }
73  
74    public Counter getBadLineCount() {
75      return badLineCount;
76    }
77  
78    public void incrementBadLineCount(int count) {
79      this.badLineCount.increment(count);
80    }
81  
82    /**
83     * Handles initializing this class with objects specific to it (i.e., the parser).
84     * Common initialization that might be leveraged by a subsclass is done in
85     * <code>doSetup</code>. Hence a subclass may choose to override this method
86     * and call <code>doSetup</code> as well before handling it's own custom params.
87     *
88     * @param context
89     */
90    @Override
91    protected void setup(Context context) {
92      doSetup(context);
93  
94      conf = context.getConfiguration();
95      parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
96                             separator);
97      if (parser.getRowKeyColumnIndex() == -1) {
98        throw new RuntimeException("No row key column specified");
99      }
100     this.kvCreator = new CellCreator(conf);
101   }
102 
103   /**
104    * Handles common parameter initialization that a subclass might want to leverage.
105    * @param context
106    */
107   protected void doSetup(Context context) {
108     Configuration conf = context.getConfiguration();
109 
110     // If a custom separator has been used,
111     // decode it back from Base64 encoding.
112     separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
113     if (separator == null) {
114       separator = ImportTsv.DEFAULT_SEPARATOR;
115     } else {
116       separator = new String(Base64.decode(separator));
117     }
118     // Should never get 0 as we are setting this to a valid value in job
119     // configuration.
120     ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
121 
122     skipBadLines = context.getConfiguration().getBoolean(
123         ImportTsv.SKIP_LINES_CONF_KEY, true);
124     badLineCount = context.getCounter("ImportTsv", "Bad Lines");
125     hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
126   }
127 
128   /**
129    * Convert a line of TSV text into an HBase table row.
130    */
131   @Override
132   public void map(LongWritable offset, Text value,
133     Context context)
134   throws IOException {
135     byte[] lineBytes = value.getBytes();
136 
137     try {
138       ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
139           lineBytes, value.getLength());
140       ImmutableBytesWritable rowKey =
141         new ImmutableBytesWritable(lineBytes,
142             parsed.getRowKeyOffset(),
143             parsed.getRowKeyLength());
144       // Retrieve timestamp if exists
145       ts = parsed.getTimestamp(ts);
146       cellVisibilityExpr = parsed.getCellVisibility();
147 
148       Put put = new Put(rowKey.copyBytes());
149       for (int i = 0; i < parsed.getColumnCount(); i++) {
150         if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
151             || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
152           continue;
153         }
154         populatePut(lineBytes, parsed, put, i);
155       }
156       context.write(rowKey, put);
157     } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
158       if (skipBadLines) {
159         System.err.println(
160             "Bad line at offset: " + offset.get() + ":\n" +
161             badLine.getMessage());
162         incrementBadLineCount(1);
163         return;
164       } else {
165         throw new IOException(badLine);
166       }
167     } catch (IllegalArgumentException e) {
168       if (skipBadLines) {
169         System.err.println(
170             "Bad line at offset: " + offset.get() + ":\n" +
171             e.getMessage());
172         incrementBadLineCount(1);
173         return;
174       } else {
175         throw new IOException(e);
176       }
177     } catch (InterruptedException e) {
178       e.printStackTrace();
179     }
180   }
181 
182   protected void populatePut(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put,
183       int i) throws BadTsvLineException, IOException {
184     Cell cell = null;
185     if (hfileOutPath == null) {
186       cell = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
187           parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
188           parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
189           parsed.getColumnOffset(i), parsed.getColumnLength(i));
190       if (cellVisibilityExpr != null) {
191         // We won't be validating the expression here. The Visibility CP will do
192         // the validation
193         put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
194       }
195     } else {
196       // Creating the KV which needs to be directly written to HFiles. Using the Facade
197       // KVCreator for creation of kvs.
198       cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
199           parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
200           parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
201           parsed.getColumnLength(i), cellVisibilityExpr);
202     }
203     put.add(cell);
204   }
205 }