1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22 import java.lang.reflect.InvocationTargetException;
23 import java.lang.reflect.Method;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.TreeMap;
29 import java.util.UUID;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.hbase.classification.InterfaceStability;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.hbase.Cell;
38 import org.apache.hadoop.hbase.CellUtil;
39 import org.apache.hadoop.hbase.HBaseConfiguration;
40 import org.apache.hadoop.hbase.KeyValue;
41 import org.apache.hadoop.hbase.KeyValueUtil;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
44 import org.apache.hadoop.hbase.classification.InterfaceAudience;
45 import org.apache.hadoop.hbase.classification.InterfaceStability;
46 import org.apache.hadoop.hbase.client.Connection;
47 import org.apache.hadoop.hbase.client.ConnectionFactory;
48 import org.apache.hadoop.hbase.client.Delete;
49 import org.apache.hadoop.hbase.client.Durability;
50 import org.apache.hadoop.hbase.client.HBaseAdmin;
51 import org.apache.hadoop.hbase.client.Mutation;
52 import org.apache.hadoop.hbase.client.Put;
53 import org.apache.hadoop.hbase.client.RegionLocator;
54 import org.apache.hadoop.hbase.client.Result;
55 import org.apache.hadoop.hbase.client.Table;
56 import org.apache.hadoop.hbase.filter.Filter;
57 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
60 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
61 import org.apache.hadoop.mapreduce.Job;
62 import org.apache.hadoop.mapreduce.TaskCounter;
63 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
64 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
65 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
66 import org.apache.hadoop.util.GenericOptionsParser;
67 import org.apache.zookeeper.KeeperException;
68
69
70
71
72
73 @InterfaceAudience.Public
74 @InterfaceStability.Stable
75 public class Import {
76 private static final Log LOG = LogFactory.getLog(Import.class);
77 final static String NAME = "import";
78 public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
79 public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
80 public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
81 public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
82 public final static String TABLE_NAME = "import.table.name";
83 public final static String WAL_DURABILITY = "import.wal.durability";
84
85
86
87
88 public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
89 private Map<byte[], byte[]> cfRenameMap;
90 private Filter filter;
91 private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
92
93
94
95
96
97
98
99 @Override
100 public void map(ImmutableBytesWritable row, Result value,
101 Context context)
102 throws IOException {
103 try {
104 if (LOG.isTraceEnabled()) {
105 LOG.trace("Considering the row."
106 + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
107 }
108 if (filter == null || !filter.filterRowKey(row.get(), row.getOffset(), row.getLength())) {
109 for (Cell kv : value.rawCells()) {
110 kv = filterKv(filter, kv);
111
112 if (kv == null) continue;
113
114 context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
115 }
116 }
117 } catch (InterruptedException e) {
118 e.printStackTrace();
119 }
120 }
121
122 @Override
123 public void setup(Context context) {
124 cfRenameMap = createCfRenameMap(context.getConfiguration());
125 filter = instantiateFilter(context.getConfiguration());
126 }
127 }
128
129
130
131
132 public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
133 private Map<byte[], byte[]> cfRenameMap;
134 private List<UUID> clusterIds;
135 private Filter filter;
136 private Durability durability;
137
138
139
140
141
142
143
144 @Override
145 public void map(ImmutableBytesWritable row, Result value,
146 Context context)
147 throws IOException {
148 try {
149 writeResult(row, value, context);
150 } catch (InterruptedException e) {
151 e.printStackTrace();
152 }
153 }
154
155 private void writeResult(ImmutableBytesWritable key, Result result, Context context)
156 throws IOException, InterruptedException {
157 Put put = null;
158 Delete delete = null;
159 if (LOG.isTraceEnabled()) {
160 LOG.trace("Considering the row."
161 + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
162 }
163 if (filter == null || !filter.filterRowKey(key.get(), key.getOffset(), key.getLength())) {
164 processKV(key, result, context, put, delete);
165 }
166 }
167
168 protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put,
169 Delete delete) throws IOException, InterruptedException {
170 for (Cell kv : result.rawCells()) {
171 kv = filterKv(filter, kv);
172
173 if (kv == null) continue;
174
175 kv = convertKv(kv, cfRenameMap);
176
177
178
179
180
181
182
183
184 if (CellUtil.isDeleteFamily(kv)) {
185 Delete deleteFamily = new Delete(key.get());
186 deleteFamily.addDeleteMarker(kv);
187 if (durability != null) {
188 deleteFamily.setDurability(durability);
189 }
190 deleteFamily.setClusterIds(clusterIds);
191 context.write(key, deleteFamily);
192 } else if (CellUtil.isDelete(kv)) {
193 if (delete == null) {
194 delete = new Delete(key.get());
195 }
196 delete.addDeleteMarker(kv);
197 } else {
198 if (put == null) {
199 put = new Put(key.get());
200 }
201 addPutToKv(put, kv);
202 }
203 }
204 if (put != null) {
205 if (durability != null) {
206 put.setDurability(durability);
207 }
208 put.setClusterIds(clusterIds);
209 context.write(key, put);
210 }
211 if (delete != null) {
212 if (durability != null) {
213 delete.setDurability(durability);
214 }
215 delete.setClusterIds(clusterIds);
216 context.write(key, delete);
217 }
218 }
219
220 protected void addPutToKv(Put put, Cell kv) throws IOException {
221 put.add(kv);
222 }
223
224 @Override
225 public void setup(Context context) {
226 Configuration conf = context.getConfiguration();
227 cfRenameMap = createCfRenameMap(conf);
228 filter = instantiateFilter(conf);
229 String durabilityStr = conf.get(WAL_DURABILITY);
230 if(durabilityStr != null){
231 durability = Durability.valueOf(durabilityStr.toUpperCase());
232 }
233
234 ZooKeeperWatcher zkw = null;
235 Exception ex = null;
236 try {
237 zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
238 clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
239 } catch (ZooKeeperConnectionException e) {
240 ex = e;
241 LOG.error("Problem connecting to ZooKeper during task setup", e);
242 } catch (KeeperException e) {
243 ex = e;
244 LOG.error("Problem reading ZooKeeper data during task setup", e);
245 } catch (IOException e) {
246 ex = e;
247 LOG.error("Problem setting up task", e);
248 } finally {
249 if (zkw != null) zkw.close();
250 }
251 if (clusterIds == null) {
252
253 throw new RuntimeException(ex);
254 }
255 }
256 }
257
258
259
260
261
262
263
264
265 public static Filter instantiateFilter(Configuration conf) {
266
267 Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
268 if (filterClass == null) {
269 LOG.debug("No configured filter class, accepting all keyvalues.");
270 return null;
271 }
272 LOG.debug("Attempting to create filter:" + filterClass);
273 String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
274 ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
275 try {
276 Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
277 return (Filter) m.invoke(null, quotedArgs);
278 } catch (IllegalAccessException e) {
279 LOG.error("Couldn't instantiate filter!", e);
280 throw new RuntimeException(e);
281 } catch (SecurityException e) {
282 LOG.error("Couldn't instantiate filter!", e);
283 throw new RuntimeException(e);
284 } catch (NoSuchMethodException e) {
285 LOG.error("Couldn't instantiate filter!", e);
286 throw new RuntimeException(e);
287 } catch (IllegalArgumentException e) {
288 LOG.error("Couldn't instantiate filter!", e);
289 throw new RuntimeException(e);
290 } catch (InvocationTargetException e) {
291 LOG.error("Couldn't instantiate filter!", e);
292 throw new RuntimeException(e);
293 }
294 }
295
296 private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
297 ArrayList<byte[]> quotedArgs = new ArrayList<byte[]>();
298 for (String stringArg : stringArgs) {
299
300
301 quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
302 }
303 return quotedArgs;
304 }
305
306
307
308
309
310
311
312 public static Cell filterKv(Filter filter, Cell kv) throws IOException {
313
314 if (filter != null) {
315 Filter.ReturnCode code = filter.filterKeyValue(kv);
316 if (LOG.isTraceEnabled()) {
317 LOG.trace("Filter returned:" + code + " for the key value:" + kv);
318 }
319
320 if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
321 .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
322 return null;
323 }
324 }
325 return kv;
326 }
327
328
329 private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) {
330 if(cfRenameMap != null) {
331
332 byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
333 if(newCfName != null) {
334 kv = new KeyValue(kv.getRowArray(),
335 kv.getRowOffset(),
336 kv.getRowLength(),
337 newCfName,
338 0,
339 newCfName.length,
340 kv.getQualifierArray(),
341 kv.getQualifierOffset(),
342 kv.getQualifierLength(),
343 kv.getTimestamp(),
344 KeyValue.Type.codeToType(kv.getTypeByte()),
345 kv.getValueArray(),
346 kv.getValueOffset(),
347 kv.getValueLength());
348 }
349 }
350 return kv;
351 }
352
353
354 private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
355 Map<byte[], byte[]> cfRenameMap = null;
356 String allMappingsPropVal = conf.get(CF_RENAME_PROP);
357 if(allMappingsPropVal != null) {
358
359 String[] allMappings = allMappingsPropVal.split(",");
360 for (String mapping: allMappings) {
361 if(cfRenameMap == null) {
362 cfRenameMap = new TreeMap<byte[],byte[]>(Bytes.BYTES_COMPARATOR);
363 }
364 String [] srcAndDest = mapping.split(":");
365 if(srcAndDest.length != 2) {
366 continue;
367 }
368 cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
369 }
370 }
371 return cfRenameMap;
372 }
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387 static public void configureCfRenaming(Configuration conf,
388 Map<String, String> renameMap) {
389 StringBuilder sb = new StringBuilder();
390 for(Map.Entry<String,String> entry: renameMap.entrySet()) {
391 String sourceCf = entry.getKey();
392 String destCf = entry.getValue();
393
394 if(sourceCf.contains(":") || sourceCf.contains(",") ||
395 destCf.contains(":") || destCf.contains(",")) {
396 throw new IllegalArgumentException("Illegal character in CF names: "
397 + sourceCf + ", " + destCf);
398 }
399
400 if(sb.length() != 0) {
401 sb.append(",");
402 }
403 sb.append(sourceCf + ":" + destCf);
404 }
405 conf.set(CF_RENAME_PROP, sb.toString());
406 }
407
408
409
410
411
412
413
414 public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
415 List<String> filterArgs) throws IOException {
416 conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
417 conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
418 }
419
420
421
422
423
424
425
426
427 public static Job createSubmittableJob(Configuration conf, String[] args)
428 throws IOException {
429 TableName tableName = TableName.valueOf(args[0]);
430 conf.set(TABLE_NAME, tableName.getNameAsString());
431 Path inputDir = new Path(args[1]);
432 Job job = new Job(conf, NAME + "_" + tableName);
433 job.setJarByClass(Importer.class);
434 FileInputFormat.setInputPaths(job, inputDir);
435 job.setInputFormatClass(SequenceFileInputFormat.class);
436 String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
437
438
439 try {
440 Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
441 if (filter != null) {
442 TableMapReduceUtil.addDependencyJars(conf, filter);
443 }
444 } catch (Exception e) {
445 throw new IOException(e);
446 }
447
448 if (hfileOutPath != null) {
449 job.setMapperClass(KeyValueImporter.class);
450 try (Connection conn = ConnectionFactory.createConnection(conf);
451 Table table = conn.getTable(tableName);
452 RegionLocator regionLocator = conn.getRegionLocator(tableName)){
453 job.setReducerClass(KeyValueSortReducer.class);
454 Path outputDir = new Path(hfileOutPath);
455 FileOutputFormat.setOutputPath(job, outputDir);
456 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
457 job.setMapOutputValueClass(KeyValue.class);
458 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
459 TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
460 com.google.common.base.Preconditions.class);
461 }
462 } else {
463
464
465 job.setMapperClass(Importer.class);
466 TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
467 job.setNumReduceTasks(0);
468 }
469 return job;
470 }
471
472
473
474
475 private static void usage(final String errorMsg) {
476 if (errorMsg != null && errorMsg.length() > 0) {
477 System.err.println("ERROR: " + errorMsg);
478 }
479 System.err.println("Usage: Import [options] <tablename> <inputdir>");
480 System.err.println("By default Import will load data directly into HBase. To instead generate");
481 System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
482 System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
483 System.err
484 .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
485 System.err.println(" -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
486 System.err.println(" -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
487 System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
488 + CF_RENAME_PROP + " property. Futher, filters will only use the"
489 + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
490 + " whether the current row needs to be ignored completely for processing and "
491 + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
492 + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
493 + " the KeyValue.");
494 System.err.println("To import data exported from HBase 0.94, use");
495 System.err.println(" -Dhbase.import.version=0.94");
496 System.err.println("For performance consider the following options:\n"
497 + " -Dmapreduce.map.speculative=false\n"
498 + " -Dmapreduce.reduce.speculative=false\n"
499 + " -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
500 +" Allowed values are the supported durability values"
501 +" like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
502 }
503
504
505
506
507
508
509
510 public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
511 InterruptedException {
512 String tableName = conf.get(TABLE_NAME);
513 HBaseAdmin hAdmin = null;
514 String durability = conf.get(WAL_DURABILITY);
515
516 if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
517 && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
518 try {
519 hAdmin = new HBaseAdmin(conf);
520 hAdmin.flush(tableName);
521 } finally {
522 if (hAdmin != null) {
523 hAdmin.close();
524 }
525 }
526 }
527 }
528
529
530
531
532
533
534
535 public static void main(String[] args) throws Exception {
536 Configuration conf = HBaseConfiguration.create();
537 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
538 if (otherArgs.length < 2) {
539 usage("Wrong number of arguments: " + otherArgs.length);
540 System.exit(-1);
541 }
542 String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
543 if (inputVersionString != null) {
544 conf.set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
545 }
546 Job job = createSubmittableJob(conf, otherArgs);
547 boolean isJobSuccessful = job.waitForCompletion(true);
548 if(isJobSuccessful){
549
550 flushRegionsIfNecessary(conf);
551 }
552 long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
553 long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue();
554 if (outputRecords < inputRecords) {
555 System.err.println("Warning, not all records were imported (maybe filtered out).");
556 if (outputRecords == 0) {
557 System.err.println("If the data was exported from HBase 0.94 "+
558 "consider using -Dhbase.import.version=0.94.");
559 }
560 }
561
562 System.exit(job.waitForCompletion(true) ? 0 : 1);
563 }
564 }