1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import org.apache.hadoop.io.LongWritable;
21 import org.apache.hadoop.io.Text;
22 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23 import org.apache.hadoop.hbase.util.Base64;
24 import org.apache.hadoop.hbase.util.Pair;
25 import org.apache.hadoop.mapreduce.Mapper;
26 import org.apache.hadoop.mapreduce.Counter;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.hbase.classification.InterfaceStability;
29 import org.apache.hadoop.conf.Configuration;
30
31 import java.io.IOException;
32
33
34
35
36 @InterfaceAudience.Public
37 @InterfaceStability.Evolving
38 public class TsvImporterTextMapper
39 extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
40 {
41
42
43 private String separator;
44
45
46 private boolean skipBadLines;
47 private Counter badLineCount;
48
49 private ImportTsv.TsvParser parser;
50
51 public boolean getSkipBadLines() {
52 return skipBadLines;
53 }
54
55 public Counter getBadLineCount() {
56 return badLineCount;
57 }
58
59 public void incrementBadLineCount(int count) {
60 this.badLineCount.increment(count);
61 }
62
63
64
65
66
67
68
69
70
71 @Override
72 protected void setup(Context context) {
73 doSetup(context);
74
75 Configuration conf = context.getConfiguration();
76
77 parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
78 if (parser.getRowKeyColumnIndex() == -1) {
79 throw new RuntimeException("No row key column specified");
80 }
81 }
82
83
84
85
86
87 protected void doSetup(Context context) {
88 Configuration conf = context.getConfiguration();
89
90
91
92 separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
93 if (separator == null) {
94 separator = ImportTsv.DEFAULT_SEPARATOR;
95 } else {
96 separator = new String(Base64.decode(separator));
97 }
98
99 skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
100 badLineCount = context.getCounter("ImportTsv", "Bad Lines");
101 }
102
103
104
105
106 @Override
107 public void map(LongWritable offset, Text value, Context context) throws IOException {
108 try {
109 Pair<Integer,Integer> rowKeyOffests = parser.parseRowKey(value.getBytes(), value.getLength());
110 ImmutableBytesWritable rowKey = new ImmutableBytesWritable(
111 value.getBytes(), rowKeyOffests.getFirst(), rowKeyOffests.getSecond());
112 context.write(rowKey, value);
113 } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
114 if (skipBadLines) {
115 System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
116 incrementBadLineCount(1);
117 return;
118 }
119 throw new IOException(badLine);
120 } catch (IllegalArgumentException e) {
121 if (skipBadLines) {
122 System.err.println("Bad line at offset: " + offset.get() + ":\n" + e.getMessage());
123 incrementBadLineCount(1);
124 return;
125 } else {
126 throw new IOException(e);
127 }
128 } catch (InterruptedException e) {
129 e.printStackTrace();
130 Thread.currentThread().interrupt();
131 }
132 }
133 }