1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static java.lang.String.format;
22
23 import com.google.common.base.Preconditions;
24 import com.google.common.base.Splitter;
25 import com.google.common.collect.Lists;
26
27 import org.apache.commons.lang.StringUtils;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.conf.Configured;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.HBaseConfiguration;
34 import org.apache.hadoop.hbase.HColumnDescriptor;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.TableNotFoundException;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.classification.InterfaceStability;
41 import org.apache.hadoop.hbase.client.Admin;
42 import org.apache.hadoop.hbase.client.Connection;
43 import org.apache.hadoop.hbase.client.ConnectionFactory;
44 import org.apache.hadoop.hbase.client.Put;
45 import org.apache.hadoop.hbase.client.RegionLocator;
46 import org.apache.hadoop.hbase.client.Table;
47 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
48 import org.apache.hadoop.hbase.util.Base64;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.util.Pair;
51 import org.apache.hadoop.io.Text;
52 import org.apache.hadoop.mapreduce.Job;
53 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
54 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
55 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
56 import org.apache.hadoop.security.Credentials;
57 import org.apache.hadoop.util.GenericOptionsParser;
58 import org.apache.hadoop.util.Tool;
59 import org.apache.hadoop.util.ToolRunner;
60
61 import java.io.File;
62 import java.io.IOException;
63 import java.util.ArrayList;
64 import java.util.HashSet;
65 import java.util.Set;
66
67
68
69
70
71
72
73
74
75 @InterfaceAudience.Public
76 @InterfaceStability.Stable
77 public class ImportTsv extends Configured implements Tool {
78
79 protected static final Log LOG = LogFactory.getLog(ImportTsv.class);
80
81 final static String NAME = "importtsv";
82
83 public final static String MAPPER_CONF_KEY = "importtsv.mapper.class";
84 public final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
85 public final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
86 public final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
87
88
89 public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
90 public final static String COLUMNS_CONF_KEY = "importtsv.columns";
91 public final static String SEPARATOR_CONF_KEY = "importtsv.separator";
92 public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator";
93
94
95 public final static String CREDENTIALS_LOCATION = "credentials_location";
96 final static String DEFAULT_SEPARATOR = "\t";
97 final static String DEFAULT_ATTRIBUTES_SEPERATOR = "=>";
98 final static String DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR = ",";
99 final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
100 public final static String CREATE_TABLE_CONF_KEY = "create.table";
101 public final static String NO_STRICT_COL_FAMILY = "no.strict";
102
103 public static class TsvParser {
104
105
106
107 private final byte[][] families;
108 private final byte[][] qualifiers;
109
110 private final byte separatorByte;
111
112 private int rowKeyColumnIndex;
113
114 private int maxColumnCount;
115
116
117 public static final int DEFAULT_TIMESTAMP_COLUMN_INDEX = -1;
118
119 private int timestampKeyColumnIndex = DEFAULT_TIMESTAMP_COLUMN_INDEX;
120
121 public static final String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY";
122
123 public static final String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY";
124
125 public static final String ATTRIBUTES_COLUMN_SPEC = "HBASE_ATTRIBUTES_KEY";
126
127 public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY";
128
129 public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL";
130
131 private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX;
132
133 public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1;
134
135 public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1;
136
137 public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1;
138
139 private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
140
141 private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX;
142
143
144
145
146
147
148 public TsvParser(String columnsSpecification, String separatorStr) {
149
150 byte[] separator = Bytes.toBytes(separatorStr);
151 Preconditions.checkArgument(separator.length == 1,
152 "TsvParser only supports single-byte separators");
153 separatorByte = separator[0];
154
155
156 ArrayList<String> columnStrings = Lists.newArrayList(
157 Splitter.on(',').trimResults().split(columnsSpecification));
158
159 maxColumnCount = columnStrings.size();
160 families = new byte[maxColumnCount][];
161 qualifiers = new byte[maxColumnCount][];
162
163 for (int i = 0; i < columnStrings.size(); i++) {
164 String str = columnStrings.get(i);
165 if (ROWKEY_COLUMN_SPEC.equals(str)) {
166 rowKeyColumnIndex = i;
167 continue;
168 }
169 if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) {
170 timestampKeyColumnIndex = i;
171 continue;
172 }
173 if (ATTRIBUTES_COLUMN_SPEC.equals(str)) {
174 attrKeyColumnIndex = i;
175 continue;
176 }
177 if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
178 cellVisibilityColumnIndex = i;
179 continue;
180 }
181 if (CELL_TTL_COLUMN_SPEC.equals(str)) {
182 cellTTLColumnIndex = i;
183 continue;
184 }
185 String[] parts = str.split(":", 2);
186 if (parts.length == 1) {
187 families[i] = str.getBytes();
188 qualifiers[i] = HConstants.EMPTY_BYTE_ARRAY;
189 } else {
190 families[i] = parts[0].getBytes();
191 qualifiers[i] = parts[1].getBytes();
192 }
193 }
194 }
195
196 public boolean hasTimestamp() {
197 return timestampKeyColumnIndex != DEFAULT_TIMESTAMP_COLUMN_INDEX;
198 }
199
200 public int getTimestampKeyColumnIndex() {
201 return timestampKeyColumnIndex;
202 }
203
204 public boolean hasAttributes() {
205 return attrKeyColumnIndex != DEFAULT_ATTRIBUTES_COLUMN_INDEX;
206 }
207
208 public boolean hasCellVisibility() {
209 return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
210 }
211
212 public boolean hasCellTTL() {
213 return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
214 }
215
216 public int getAttributesKeyColumnIndex() {
217 return attrKeyColumnIndex;
218 }
219
220 public int getCellVisibilityColumnIndex() {
221 return cellVisibilityColumnIndex;
222 }
223
224 public int getCellTTLColumnIndex() {
225 return cellTTLColumnIndex;
226 }
227
228 public int getRowKeyColumnIndex() {
229 return rowKeyColumnIndex;
230 }
231
232 public byte[] getFamily(int idx) {
233 return families[idx];
234 }
235 public byte[] getQualifier(int idx) {
236 return qualifiers[idx];
237 }
238
239 public ParsedLine parse(byte[] lineBytes, int length)
240 throws BadTsvLineException {
241
242 ArrayList<Integer> tabOffsets = new ArrayList<Integer>(maxColumnCount);
243 for (int i = 0; i < length; i++) {
244 if (lineBytes[i] == separatorByte) {
245 tabOffsets.add(i);
246 }
247 }
248 if (tabOffsets.isEmpty()) {
249 throw new BadTsvLineException("No delimiter");
250 }
251
252 tabOffsets.add(length);
253
254 if (tabOffsets.size() > maxColumnCount) {
255 throw new BadTsvLineException("Excessive columns");
256 } else if (tabOffsets.size() <= getRowKeyColumnIndex()) {
257 throw new BadTsvLineException("No row key");
258 } else if (hasTimestamp()
259 && tabOffsets.size() <= getTimestampKeyColumnIndex()) {
260 throw new BadTsvLineException("No timestamp");
261 } else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) {
262 throw new BadTsvLineException("No attributes specified");
263 } else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
264 throw new BadTsvLineException("No cell visibility specified");
265 } else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) {
266 throw new BadTsvLineException("No cell TTL specified");
267 }
268 return new ParsedLine(tabOffsets, lineBytes);
269 }
270
271 class ParsedLine {
272 private final ArrayList<Integer> tabOffsets;
273 private byte[] lineBytes;
274
275 ParsedLine(ArrayList<Integer> tabOffsets, byte[] lineBytes) {
276 this.tabOffsets = tabOffsets;
277 this.lineBytes = lineBytes;
278 }
279
280 public int getRowKeyOffset() {
281 return getColumnOffset(rowKeyColumnIndex);
282 }
283 public int getRowKeyLength() {
284 return getColumnLength(rowKeyColumnIndex);
285 }
286
287 public long getTimestamp(long ts) throws BadTsvLineException {
288
289 if (!hasTimestamp()) {
290 return ts;
291 }
292
293 String timeStampStr = Bytes.toString(lineBytes,
294 getColumnOffset(timestampKeyColumnIndex),
295 getColumnLength(timestampKeyColumnIndex));
296 try {
297 return Long.parseLong(timeStampStr);
298 } catch (NumberFormatException nfe) {
299
300 throw new BadTsvLineException("Invalid timestamp " + timeStampStr);
301 }
302 }
303
304 private String getAttributes() {
305 if (!hasAttributes()) {
306 return null;
307 } else {
308 return Bytes.toString(lineBytes, getColumnOffset(attrKeyColumnIndex),
309 getColumnLength(attrKeyColumnIndex));
310 }
311 }
312
313 public String[] getIndividualAttributes() {
314 String attributes = getAttributes();
315 if (attributes != null) {
316 return attributes.split(DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR);
317 } else {
318 return null;
319 }
320 }
321
322 public int getAttributeKeyOffset() {
323 if (hasAttributes()) {
324 return getColumnOffset(attrKeyColumnIndex);
325 } else {
326 return DEFAULT_ATTRIBUTES_COLUMN_INDEX;
327 }
328 }
329
330 public int getAttributeKeyLength() {
331 if (hasAttributes()) {
332 return getColumnLength(attrKeyColumnIndex);
333 } else {
334 return DEFAULT_ATTRIBUTES_COLUMN_INDEX;
335 }
336 }
337
338 public int getCellVisibilityColumnOffset() {
339 if (hasCellVisibility()) {
340 return getColumnOffset(cellVisibilityColumnIndex);
341 } else {
342 return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
343 }
344 }
345
346 public int getCellVisibilityColumnLength() {
347 if (hasCellVisibility()) {
348 return getColumnLength(cellVisibilityColumnIndex);
349 } else {
350 return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
351 }
352 }
353
354 public String getCellVisibility() {
355 if (!hasCellVisibility()) {
356 return null;
357 } else {
358 return Bytes.toString(lineBytes, getColumnOffset(cellVisibilityColumnIndex),
359 getColumnLength(cellVisibilityColumnIndex));
360 }
361 }
362
363 public int getCellTTLColumnOffset() {
364 if (hasCellTTL()) {
365 return getColumnOffset(cellTTLColumnIndex);
366 } else {
367 return DEFAULT_CELL_TTL_COLUMN_INDEX;
368 }
369 }
370
371 public int getCellTTLColumnLength() {
372 if (hasCellTTL()) {
373 return getColumnLength(cellTTLColumnIndex);
374 } else {
375 return DEFAULT_CELL_TTL_COLUMN_INDEX;
376 }
377 }
378
379 public long getCellTTL() {
380 if (!hasCellTTL()) {
381 return 0;
382 } else {
383 return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex),
384 getColumnLength(cellTTLColumnIndex));
385 }
386 }
387
388 public int getColumnOffset(int idx) {
389 if (idx > 0)
390 return tabOffsets.get(idx - 1) + 1;
391 else
392 return 0;
393 }
394 public int getColumnLength(int idx) {
395 return tabOffsets.get(idx) - getColumnOffset(idx);
396 }
397 public int getColumnCount() {
398 return tabOffsets.size();
399 }
400 public byte[] getLineBytes() {
401 return lineBytes;
402 }
403 }
404
405 public static class BadTsvLineException extends Exception {
406 public BadTsvLineException(String err) {
407 super(err);
408 }
409 private static final long serialVersionUID = 1L;
410 }
411
412
413
414
415
416
417
418
419 public Pair<Integer, Integer> parseRowKey(byte[] lineBytes, int length)
420 throws BadTsvLineException {
421 int rkColumnIndex = 0;
422 int startPos = 0, endPos = 0;
423 for (int i = 0; i <= length; i++) {
424 if (i == length || lineBytes[i] == separatorByte) {
425 endPos = i - 1;
426 if (rkColumnIndex++ == getRowKeyColumnIndex()) {
427 if ((endPos + 1) == startPos) {
428 throw new BadTsvLineException("Empty value for ROW KEY.");
429 }
430 break;
431 } else {
432 startPos = endPos + 2;
433 }
434 }
435 if (i == length) {
436 throw new BadTsvLineException(
437 "Row key does not exist as number of columns in the line"
438 + " are less than row key position.");
439 }
440 }
441 return new Pair<Integer, Integer>(startPos, endPos - startPos + 1);
442 }
443 }
444
445
446
447
448
449
450
451
452
453 public static Job createSubmittableJob(Configuration conf, String[] args)
454 throws IOException, ClassNotFoundException {
455 Job job = null;
456 try (Connection connection = ConnectionFactory.createConnection(conf)) {
457 try (Admin admin = connection.getAdmin()) {
458
459
460 String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
461 if (actualSeparator != null) {
462 conf.set(SEPARATOR_CONF_KEY,
463 Base64.encodeBytes(actualSeparator.getBytes()));
464 }
465
466
467 String mapperClassName = conf.get(MAPPER_CONF_KEY);
468 Class mapperClass =
469 mapperClassName != null ? Class.forName(mapperClassName) : DEFAULT_MAPPER;
470
471 TableName tableName = TableName.valueOf(args[0]);
472 Path inputDir = new Path(args[1]);
473 String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName.getNameAsString());
474 job = Job.getInstance(conf, jobName);
475 job.setJarByClass(mapperClass);
476 FileInputFormat.setInputPaths(job, inputDir);
477 job.setInputFormatClass(TextInputFormat.class);
478 job.setMapperClass(mapperClass);
479 String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
480 String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
481 if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
482 String fileLoc = conf.get(CREDENTIALS_LOCATION);
483 Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
484 job.getCredentials().addAll(cred);
485 }
486
487 if (hfileOutPath != null) {
488 if (!admin.tableExists(tableName)) {
489 String errorMsg = format("Table '%s' does not exist.", tableName);
490 if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) {
491 LOG.warn(errorMsg);
492
493
494 createTable(admin, tableName, columns);
495 } else {
496 LOG.error(errorMsg);
497 throw new TableNotFoundException(errorMsg);
498 }
499 }
500 try (Table table = connection.getTable(tableName);
501 RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
502 boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false);
503
504 if(!noStrict) {
505 ArrayList<String> unmatchedFamilies = new ArrayList<String>();
506 Set<String> cfSet = getColumnFamilies(columns);
507 HTableDescriptor tDesc = table.getTableDescriptor();
508 for (String cf : cfSet) {
509 if(tDesc.getFamily(Bytes.toBytes(cf)) == null) {
510 unmatchedFamilies.add(cf);
511 }
512 }
513 if(unmatchedFamilies.size() > 0) {
514 ArrayList<String> familyNames = new ArrayList<String>();
515 for (HColumnDescriptor family : table.getTableDescriptor().getFamilies()) {
516 familyNames.add(family.getNameAsString());
517 }
518 String msg =
519 "Column Families " + unmatchedFamilies + " specified in " + COLUMNS_CONF_KEY
520 + " does not match with any of the table " + tableName
521 + " column families " + familyNames + ".\n"
522 + "To disable column family check, use -D" + NO_STRICT_COL_FAMILY
523 + "=true.\n";
524 usage(msg);
525 System.exit(-1);
526 }
527 }
528 job.setReducerClass(PutSortReducer.class);
529 Path outputDir = new Path(hfileOutPath);
530 FileOutputFormat.setOutputPath(job, outputDir);
531 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
532 if (mapperClass.equals(TsvImporterTextMapper.class)) {
533 job.setMapOutputValueClass(Text.class);
534 job.setReducerClass(TextSortReducer.class);
535 } else {
536 job.setMapOutputValueClass(Put.class);
537 job.setCombinerClass(PutCombiner.class);
538 }
539 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
540 regionLocator);
541 }
542 } else {
543 if (!admin.tableExists(tableName)) {
544 String errorMsg = format("Table '%s' does not exist.", tableName);
545 LOG.error(errorMsg);
546 throw new TableNotFoundException(errorMsg);
547 }
548 if (mapperClass.equals(TsvImporterTextMapper.class)) {
549 usage(TsvImporterTextMapper.class.toString()
550 + " should not be used for non bulkloading case. use "
551 + TsvImporterMapper.class.toString()
552 + " or custom mapper whose value type is Put.");
553 System.exit(-1);
554 }
555
556
557 TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null,
558 job);
559 job.setNumReduceTasks(0);
560 }
561
562 TableMapReduceUtil.addDependencyJars(job);
563 TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
564 com.google.common.base.Function.class
565 }
566 }
567 return job;
568 }
569
570 private static void createTable(Admin admin, TableName tableName, String[] columns)
571 throws IOException {
572 HTableDescriptor htd = new HTableDescriptor(tableName);
573 Set<String> cfSet = getColumnFamilies(columns);
574 for (String cf : cfSet) {
575 HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf));
576 htd.addFamily(hcd);
577 }
578 LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.",
579 tableName, cfSet));
580 admin.createTable(htd);
581 }
582
583 private static Set<String> getColumnFamilies(String[] columns) {
584 Set<String> cfSet = new HashSet<String>();
585 for (String aColumn : columns) {
586 if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)
587 || TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn)
588 || TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn)
589 || TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn)
590 || TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn))
591 continue;
592
593 cfSet.add(aColumn.split(":", 2)[0]);
594 }
595 return cfSet;
596 }
597
598
599
600
601 private static void usage(final String errorMsg) {
602 if (errorMsg != null && errorMsg.length() > 0) {
603 System.err.println("ERROR: " + errorMsg);
604 }
605 String usage =
606 "Usage: " + NAME + " -D"+ COLUMNS_CONF_KEY + "=a,b,c <tablename> <inputdir>\n" +
607 "\n" +
608 "Imports the given input directory of TSV data into the specified table.\n" +
609 "\n" +
610 "The column names of the TSV data must be specified using the -D" + COLUMNS_CONF_KEY + "\n" +
611 "option. This option takes the form of comma-separated column names, where each\n" +
612 "column name is either a simple column family, or a columnfamily:qualifier. The special\n" +
613 "column name " + TsvParser.ROWKEY_COLUMN_SPEC + " is used to designate that this column should be used\n" +
614 "as the row key for each imported record. You must specify exactly one column\n" +
615 "to be the row key, and you must specify a column name for every column that exists in the\n" +
616 "input data. Another special column" + TsvParser.TIMESTAMPKEY_COLUMN_SPEC +
617 " designates that this column should be\n" +
618 "used as timestamp for each record. Unlike " + TsvParser.ROWKEY_COLUMN_SPEC + ", " +
619 TsvParser.TIMESTAMPKEY_COLUMN_SPEC + " is optional.\n" +
620 "You must specify at most one column as timestamp key for each imported record.\n" +
621 "Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" +
622 "Note: if you use this option, then '" + TIMESTAMP_CONF_KEY + "' option will be ignored.\n" +
623 "\n" +
624 "Other special columns that can be specified are " + TsvParser.CELL_TTL_COLUMN_SPEC +
625 " and " + TsvParser.CELL_VISIBILITY_COLUMN_SPEC + ".\n" +
626 TsvParser.CELL_TTL_COLUMN_SPEC + " designates that this column will be used " +
627 "as a Cell's Time To Live (TTL) attribute.\n" +
628 TsvParser.CELL_VISIBILITY_COLUMN_SPEC + " designates that this column contains the " +
629 "visibility label expression.\n" +
630 "\n" +
631 TsvParser.ATTRIBUTES_COLUMN_SPEC+" can be used to specify Operation Attributes per record.\n"+
632 " Should be specified as key=>value where "+TsvParser.DEFAULT_ATTRIBUTES_COLUMN_INDEX+ " is used \n"+
633 " as the seperator. Note that more than one OperationAttributes can be specified.\n"+
634 "By default importtsv will load data directly into HBase. To instead generate\n" +
635 "HFiles of data to prepare for a bulk data load, pass the option:\n" +
636 " -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output\n" +
637 " Note: if you do not use this option, then the target table must already exist in HBase\n" +
638 "\n" +
639 "Other options that may be specified with -D include:\n" +
640 " -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
641 " '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" +
642 " -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" +
643 " -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " +
644 DEFAULT_MAPPER.getName() + "\n" +
645 " -D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the import\n" +
646 " -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" +
647 " Note: if you set this to 'no', then the target table must already exist in HBase\n" +
648 " -D" + NO_STRICT_COL_FAMILY + "=true - ignore column family check in hbase table. " +
649 "Default is false\n\n" +
650 "For performance consider the following options:\n" +
651 " -Dmapreduce.map.speculative=false\n" +
652 " -Dmapreduce.reduce.speculative=false";
653
654 System.err.println(usage);
655 }
656
657 @Override
658 public int run(String[] args) throws Exception {
659 setConf(HBaseConfiguration.create(getConf()));
660 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
661 if (otherArgs.length < 2) {
662 usage("Wrong number of arguments: " + otherArgs.length);
663 return -1;
664 }
665
666
667
668
669
670 if (null == getConf().get(MAPPER_CONF_KEY)) {
671
672 String columns[] = getConf().getStrings(COLUMNS_CONF_KEY);
673 if (columns == null) {
674 usage("No columns specified. Please specify with -D" +
675 COLUMNS_CONF_KEY+"=...");
676 return -1;
677 }
678
679
680 int rowkeysFound = 0;
681 for (String col : columns) {
682 if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++;
683 }
684 if (rowkeysFound != 1) {
685 usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC);
686 return -1;
687 }
688
689
690 int tskeysFound = 0;
691 for (String col : columns) {
692 if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC))
693 tskeysFound++;
694 }
695 if (tskeysFound > 1) {
696 usage("Must specify at most one column as "
697 + TsvParser.TIMESTAMPKEY_COLUMN_SPEC);
698 return -1;
699 }
700
701 int attrKeysFound = 0;
702 for (String col : columns) {
703 if (col.equals(TsvParser.ATTRIBUTES_COLUMN_SPEC))
704 attrKeysFound++;
705 }
706 if (attrKeysFound > 1) {
707 usage("Must specify at most one column as "
708 + TsvParser.ATTRIBUTES_COLUMN_SPEC);
709 return -1;
710 }
711
712
713
714 if (columns.length - (rowkeysFound + tskeysFound + attrKeysFound) < 1) {
715 usage("One or more columns in addition to the row key and timestamp(optional) are required");
716 return -1;
717 }
718 }
719
720
721 long timstamp = getConf().getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis());
722
723
724
725 getConf().setLong(TIMESTAMP_CONF_KEY, timstamp);
726
727 Job job = createSubmittableJob(getConf(), otherArgs);
728 return job.waitForCompletion(true) ? 0 : 1;
729 }
730
731 public static void main(String[] args) throws Exception {
732 int status = ToolRunner.run(new ImportTsv(), args);
733 System.exit(status);
734 }
735 }