1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.List;
23
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.Cell;
26 import org.apache.hadoop.hbase.KeyValue;
27 import org.apache.hadoop.hbase.Tag;
28 import org.apache.hadoop.hbase.TagType;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.classification.InterfaceStability;
31 import org.apache.hadoop.hbase.client.Put;
32 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
33 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
34 import org.apache.hadoop.hbase.security.visibility.CellVisibility;
35 import org.apache.hadoop.hbase.security.visibility.InvalidLabelException;
36 import org.apache.hadoop.hbase.util.Base64;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.io.LongWritable;
39 import org.apache.hadoop.io.Text;
40 import org.apache.hadoop.mapreduce.Counter;
41 import org.apache.hadoop.mapreduce.Mapper;
42
43
44
45
46 @InterfaceAudience.Public
47 @InterfaceStability.Stable
48 public class TsvImporterMapper
49 extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
50 {
51
52
53 protected long ts;
54
55
56 private String separator;
57
58
59 private boolean skipBadLines;
60 private Counter badLineCount;
61
62 protected ImportTsv.TsvParser parser;
63
64 protected Configuration conf;
65
66 protected String cellVisibilityExpr;
67
68 protected long ttl;
69
70 protected CellCreator kvCreator;
71
72 private String hfileOutPath;
73
74 public long getTs() {
75 return ts;
76 }
77
78 public boolean getSkipBadLines() {
79 return skipBadLines;
80 }
81
82 public Counter getBadLineCount() {
83 return badLineCount;
84 }
85
86 public void incrementBadLineCount(int count) {
87 this.badLineCount.increment(count);
88 }
89
90
91
92
93
94
95
96
97
98 @Override
99 protected void setup(Context context) {
100 doSetup(context);
101
102 conf = context.getConfiguration();
103 parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
104 separator);
105 if (parser.getRowKeyColumnIndex() == -1) {
106 throw new RuntimeException("No row key column specified");
107 }
108 this.kvCreator = new CellCreator(conf);
109 }
110
111
112
113
114
115 protected void doSetup(Context context) {
116 Configuration conf = context.getConfiguration();
117
118
119
120 separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
121 if (separator == null) {
122 separator = ImportTsv.DEFAULT_SEPARATOR;
123 } else {
124 separator = new String(Base64.decode(separator));
125 }
126
127
128 ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
129
130 skipBadLines = context.getConfiguration().getBoolean(
131 ImportTsv.SKIP_LINES_CONF_KEY, true);
132 badLineCount = context.getCounter("ImportTsv", "Bad Lines");
133 hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
134 }
135
136
137
138
139 @Override
140 public void map(LongWritable offset, Text value,
141 Context context)
142 throws IOException {
143 byte[] lineBytes = value.getBytes();
144
145 try {
146 ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
147 lineBytes, value.getLength());
148 ImmutableBytesWritable rowKey =
149 new ImmutableBytesWritable(lineBytes,
150 parsed.getRowKeyOffset(),
151 parsed.getRowKeyLength());
152
153 ts = parsed.getTimestamp(ts);
154 cellVisibilityExpr = parsed.getCellVisibility();
155 ttl = parsed.getCellTTL();
156
157 Put put = new Put(rowKey.copyBytes());
158 for (int i = 0; i < parsed.getColumnCount(); i++) {
159 if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
160 || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
161 || i == parser.getCellTTLColumnIndex()) {
162 continue;
163 }
164 populatePut(lineBytes, parsed, put, i);
165 }
166 context.write(rowKey, put);
167 } catch (InvalidLabelException badLine) {
168 if (skipBadLines) {
169 System.err.println(
170 "Bad line at offset: " + offset.get() + ":\n" +
171 badLine.getMessage());
172 incrementBadLineCount(1);
173 return;
174 } else {
175 throw new IOException(badLine);
176 }
177 } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
178 if (skipBadLines) {
179 System.err.println(
180 "Bad line at offset: " + offset.get() + ":\n" +
181 badLine.getMessage());
182 incrementBadLineCount(1);
183 return;
184 } else {
185 throw new IOException(badLine);
186 }
187 } catch (IllegalArgumentException e) {
188 if (skipBadLines) {
189 System.err.println(
190 "Bad line at offset: " + offset.get() + ":\n" +
191 e.getMessage());
192 incrementBadLineCount(1);
193 return;
194 } else {
195 throw new IOException(e);
196 }
197 } catch (InterruptedException e) {
198 e.printStackTrace();
199 }
200 }
201
202 protected void populatePut(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put,
203 int i) throws BadTsvLineException, IOException {
204 Cell cell = null;
205 if (hfileOutPath == null) {
206 cell = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
207 parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
208 parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
209 parsed.getColumnOffset(i), parsed.getColumnLength(i));
210 if (cellVisibilityExpr != null) {
211
212
213 put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
214 }
215 if (ttl > 0) {
216 put.setTTL(ttl);
217 }
218 } else {
219
220
221 List<Tag> tags = new ArrayList<Tag>();
222 if (cellVisibilityExpr != null) {
223 tags.addAll(kvCreator.getVisibilityExpressionResolver()
224 .createVisibilityExpTags(cellVisibilityExpr));
225 }
226
227
228 if (ttl > 0) {
229 tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
230 }
231 cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
232 parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
233 parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
234 parsed.getColumnLength(i), tags);
235 }
236 put.add(cell);
237 }
238 }