001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Base64; 023import java.util.List; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.ArrayBackedTag; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.KeyValue; 028import org.apache.hadoop.hbase.Tag; 029import org.apache.hadoop.hbase.TagType; 030import org.apache.hadoop.hbase.client.Put; 031import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 032import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; 033import org.apache.hadoop.hbase.security.visibility.CellVisibility; 034import org.apache.hadoop.hbase.security.visibility.InvalidLabelException; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.io.LongWritable; 037import org.apache.hadoop.io.Text; 038import org.apache.hadoop.mapreduce.Counter; 039import org.apache.hadoop.mapreduce.Mapper; 040import org.apache.yetus.audience.InterfaceAudience; 041 042/** 043 * Write table content out to files in hdfs. 044 */ 045@InterfaceAudience.Public 046public class TsvImporterMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { 047 048 /** Timestamp for all inserted rows */ 049 protected long ts; 050 051 /** Column seperator */ 052 private String separator; 053 054 /** Should skip bad lines */ 055 private boolean skipBadLines; 056 /** Should skip empty columns */ 057 private boolean skipEmptyColumns; 058 private Counter badLineCount; 059 private boolean logBadLines; 060 061 protected ImportTsv.TsvParser parser; 062 063 protected Configuration conf; 064 065 protected String cellVisibilityExpr; 066 067 protected long ttl; 068 069 protected CellCreator kvCreator; 070 071 private String hfileOutPath; 072 073 /** List of cell tags */ 074 private List<Tag> tags; 075 076 public long getTs() { 077 return ts; 078 } 079 080 public boolean getSkipBadLines() { 081 return skipBadLines; 082 } 083 084 public Counter getBadLineCount() { 085 return badLineCount; 086 } 087 088 public void incrementBadLineCount(int count) { 089 this.badLineCount.increment(count); 090 } 091 092 /** 093 * Handles initializing this class with objects specific to it (i.e., the parser). Common 094 * initialization that might be leveraged by a subsclass is done in <code>doSetup</code>. Hence a 095 * subclass may choose to override this method and call <code>doSetup</code> as well before 096 * handling it's own custom params. n 097 */ 098 @Override 099 protected void setup(Context context) { 100 doSetup(context); 101 102 conf = context.getConfiguration(); 103 parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); 104 if (parser.getRowKeyColumnIndex() == -1) { 105 throw new RuntimeException("No row key column specified"); 106 } 107 this.kvCreator = new CellCreator(conf); 108 tags = new ArrayList<>(); 109 } 110 111 /** 112 * Handles common parameter initialization that a subclass might want to leverage. n 113 */ 114 protected void doSetup(Context context) { 115 Configuration conf = context.getConfiguration(); 116 117 // If a custom separator has been used, 118 // decode it back from Base64 encoding. 119 separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); 120 if (separator == null) { 121 separator = ImportTsv.DEFAULT_SEPARATOR; 122 } else { 123 separator = new String(Base64.getDecoder().decode(separator)); 124 } 125 // Should never get 0 as we are setting this to a valid value in job 126 // configuration. 127 ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0); 128 129 skipEmptyColumns = context.getConfiguration().getBoolean(ImportTsv.SKIP_EMPTY_COLUMNS, false); 130 skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true); 131 badLineCount = context.getCounter("ImportTsv", "Bad Lines"); 132 logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false); 133 hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY); 134 } 135 136 /** 137 * Convert a line of TSV text into an HBase table row. 138 */ 139 @Override 140 public void map(LongWritable offset, Text value, Context context) throws IOException { 141 byte[] lineBytes = value.getBytes(); 142 143 try { 144 ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, value.getLength()); 145 ImmutableBytesWritable rowKey = 146 new ImmutableBytesWritable(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength()); 147 // Retrieve timestamp if exists 148 ts = parsed.getTimestamp(ts); 149 cellVisibilityExpr = parsed.getCellVisibility(); 150 ttl = parsed.getCellTTL(); 151 152 // create tags for the parsed line 153 if (hfileOutPath != null) { 154 tags.clear(); 155 if (cellVisibilityExpr != null) { 156 tags.addAll(kvCreator.getVisibilityExpressionResolver() 157 .createVisibilityExpTags(cellVisibilityExpr)); 158 } 159 // Add TTL directly to the KV so we can vary them when packing more than one KV 160 // into puts 161 if (ttl > 0) { 162 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); 163 } 164 } 165 Put put = new Put(rowKey.copyBytes()); 166 for (int i = 0; i < parsed.getColumnCount(); i++) { 167 if ( 168 i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() 169 || i == parser.getAttributesKeyColumnIndex() 170 || i == parser.getCellVisibilityColumnIndex() || i == parser.getCellTTLColumnIndex() 171 || (skipEmptyColumns && parsed.getColumnLength(i) == 0) 172 ) { 173 continue; 174 } 175 populatePut(lineBytes, parsed, put, i); 176 } 177 context.write(rowKey, put); 178 } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException 179 | InvalidLabelException badLine) { 180 if (logBadLines) { 181 System.err.println(value); 182 } 183 System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage()); 184 if (skipBadLines) { 185 incrementBadLineCount(1); 186 return; 187 } 188 throw new IOException(badLine); 189 } catch (InterruptedException e) { 190 e.printStackTrace(); 191 } 192 } 193 194 protected void populatePut(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put, 195 int i) throws BadTsvLineException, IOException { 196 Cell cell = null; 197 if (hfileOutPath == null) { 198 cell = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), 199 parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, 200 parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, parsed.getColumnOffset(i), 201 parsed.getColumnLength(i)); 202 if (cellVisibilityExpr != null) { 203 // We won't be validating the expression here. The Visibility CP will do 204 // the validation 205 put.setCellVisibility(new CellVisibility(cellVisibilityExpr)); 206 } 207 if (ttl > 0) { 208 put.setTTL(ttl); 209 } 210 } else { 211 // Creating the KV which needs to be directly written to HFiles. Using the Facade 212 // KVCreator for creation of kvs. 213 cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), 214 parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, 215 parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i), 216 parsed.getColumnLength(i), tags); 217 } 218 put.add(cell); 219 } 220}