1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Set;
25 import java.util.TreeSet;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.Cell;
29 import org.apache.hadoop.hbase.KeyValue;
30 import org.apache.hadoop.hbase.KeyValueUtil;
31 import org.apache.hadoop.hbase.Tag;
32 import org.apache.hadoop.hbase.TagType;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.hbase.classification.InterfaceStability;
35 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36 import org.apache.hadoop.hbase.security.visibility.InvalidLabelException;
37 import org.apache.hadoop.hbase.util.Base64;
38 import org.apache.hadoop.hbase.util.Bytes;
39 import org.apache.hadoop.io.Text;
40 import org.apache.hadoop.mapreduce.Counter;
41 import org.apache.hadoop.mapreduce.Reducer;
42 import org.apache.hadoop.util.StringUtils;
43
44
45
46
47
48
49
50 @InterfaceAudience.Public
51 @InterfaceStability.Evolving
52 public class TextSortReducer extends
53 Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> {
54
55
56 private long ts;
57
58
59 private String separator;
60
61
62 private boolean skipBadLines;
63
64 private Counter badLineCount;
65
66 private ImportTsv.TsvParser parser;
67
68
69 private String cellVisibilityExpr;
70
71
72 private long ttl;
73
74 private CellCreator kvCreator;
75
76 public long getTs() {
77 return ts;
78 }
79
80 public boolean getSkipBadLines() {
81 return skipBadLines;
82 }
83
84 public Counter getBadLineCount() {
85 return badLineCount;
86 }
87
88 public void incrementBadLineCount(int count) {
89 this.badLineCount.increment(count);
90 }
91
92
93
94
95
96
97
98
99
100 @Override
101 protected void setup(Context context) {
102 doSetup(context);
103
104 Configuration conf = context.getConfiguration();
105
106 parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
107 if (parser.getRowKeyColumnIndex() == -1) {
108 throw new RuntimeException("No row key column specified");
109 }
110 this.kvCreator = new CellCreator(conf);
111 }
112
113
114
115
116
117 protected void doSetup(Context context) {
118 Configuration conf = context.getConfiguration();
119
120
121
122 separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
123 if (separator == null) {
124 separator = ImportTsv.DEFAULT_SEPARATOR;
125 } else {
126 separator = new String(Base64.decode(separator));
127 }
128
129
130 ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
131
132 skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
133 badLineCount = context.getCounter("ImportTsv", "Bad Lines");
134 }
135
136 @Override
137 protected void reduce(
138 ImmutableBytesWritable rowKey,
139 java.lang.Iterable<Text> lines,
140 Reducer<ImmutableBytesWritable, Text,
141 ImmutableBytesWritable, KeyValue>.Context context)
142 throws java.io.IOException, InterruptedException
143 {
144
145 long threshold = context.getConfiguration().getLong(
146 "reducer.row.threshold", 1L * (1<<30));
147 Iterator<Text> iter = lines.iterator();
148 while (iter.hasNext()) {
149 Set<KeyValue> kvs = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
150 long curSize = 0;
151
152 while (iter.hasNext() && curSize < threshold) {
153 Text line = iter.next();
154 byte[] lineBytes = line.getBytes();
155 try {
156 ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, line.getLength());
157
158 ts = parsed.getTimestamp(ts);
159 cellVisibilityExpr = parsed.getCellVisibility();
160 ttl = parsed.getCellTTL();
161
162 for (int i = 0; i < parsed.getColumnCount(); i++) {
163 if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
164 || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
165 || i == parser.getCellTTLColumnIndex()) {
166 continue;
167 }
168
169
170 List<Tag> tags = new ArrayList<Tag>();
171 if (cellVisibilityExpr != null) {
172 tags.addAll(kvCreator.getVisibilityExpressionResolver()
173 .createVisibilityExpTags(cellVisibilityExpr));
174 }
175
176
177 if (ttl > 0) {
178 tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
179 }
180 Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(),
181 parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length,
182 parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes,
183 parsed.getColumnOffset(i), parsed.getColumnLength(i), tags);
184 KeyValue kv = KeyValueUtil.ensureKeyValueTypeForMR(cell);
185 kvs.add(kv);
186 curSize += kv.heapSize();
187 }
188 } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException
189 | InvalidLabelException badLine) {
190 if (skipBadLines) {
191 System.err.println("Bad line." + badLine.getMessage());
192 incrementBadLineCount(1);
193 continue;
194 }
195 throw new IOException(badLine);
196 }
197 }
198 context.setStatus("Read " + kvs.size() + " entries of " + kvs.getClass()
199 + "(" + StringUtils.humanReadableInt(curSize) + ")");
200 int index = 0;
201 for (KeyValue kv : kvs) {
202 context.write(rowKey, kv);
203 if (++index > 0 && index % 100 == 0)
204 context.setStatus("Wrote " + index + " key values.");
205 }
206
207
208 if (iter.hasNext()) {
209
210 context.write(null, null);
211 }
212 }
213 }
214 }