View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static java.lang.String.format;
22  
23  import java.io.File;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.HashSet;
27  import java.util.Set;
28  
29  import org.apache.commons.lang.StringUtils;
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.conf.Configured;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.HBaseConfiguration;
36  import org.apache.hadoop.hbase.HColumnDescriptor;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.HTableDescriptor;
39  import org.apache.hadoop.hbase.TableName;
40  import org.apache.hadoop.hbase.TableNotEnabledException;
41  import org.apache.hadoop.hbase.TableNotFoundException;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.hbase.classification.InterfaceStability;
44  import org.apache.hadoop.hbase.client.Admin;
45  import org.apache.hadoop.hbase.client.Connection;
46  import org.apache.hadoop.hbase.client.ConnectionFactory;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.client.RegionLocator;
49  import org.apache.hadoop.hbase.client.Table;
50  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
51  import org.apache.hadoop.hbase.util.Base64;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.hbase.util.Pair;
54  import org.apache.hadoop.io.Text;
55  import org.apache.hadoop.mapreduce.Job;
56  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
57  import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
58  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
59  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
60  import org.apache.hadoop.security.Credentials;
61  import org.apache.hadoop.util.Tool;
62  import org.apache.hadoop.util.ToolRunner;
63  
64  import com.google.common.base.Preconditions;
65  import com.google.common.base.Splitter;
66  import com.google.common.collect.Lists;
67  
68  /**
69   * Tool to import data from a TSV file.
70   *
71   * This tool is rather simplistic - it doesn't do any quoting or
72   * escaping, but is useful for many data loads.
73   *
74   * @see ImportTsv#usage(String)
75   */
76  @InterfaceAudience.Public
77  @InterfaceStability.Stable
78  public class ImportTsv extends Configured implements Tool {
79  
80    protected static final Log LOG = LogFactory.getLog(ImportTsv.class);
81  
82    final static String NAME = "importtsv";
83  
84    public final static String MAPPER_CONF_KEY = "importtsv.mapper.class";
85    public final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
86    public final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
87    public final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
88    // TODO: the rest of these configs are used exclusively by TsvImporterMapper.
89    // Move them out of the tool and let the mapper handle its own validation.
90    public final static String DRY_RUN_CONF_KEY = "importtsv.dry.run";
91    // If true, bad lines are logged to stderr. Default: false.
92    public final static String LOG_BAD_LINES_CONF_KEY = "importtsv.log.bad.lines";
93    public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
94    public final static String COLUMNS_CONF_KEY = "importtsv.columns";
95    public final static String SEPARATOR_CONF_KEY = "importtsv.separator";
96    public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator";
97    //This config is used to propagate credentials from parent MR jobs which launch
98    //ImportTSV jobs. SEE IntegrationTestImportTsv.
99    public final static String CREDENTIALS_LOCATION = "credentials_location";
100   final static String DEFAULT_SEPARATOR = "\t";
101   final static String DEFAULT_ATTRIBUTES_SEPERATOR = "=>";
102   final static String DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR = ",";
103   final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
104   public final static String CREATE_TABLE_CONF_KEY = "create.table";
105   public final static String NO_STRICT_COL_FAMILY = "no.strict";
106   /**
107    * If table didn't exist and was created in dry-run mode, this flag is
108    * flipped to delete it when MR ends.
109    */
110   private static boolean dryRunTableCreated;
111 
112   public static class TsvParser {
113     /**
114      * Column families and qualifiers mapped to the TSV columns
115      */
116     private final byte[][] families;
117     private final byte[][] qualifiers;
118 
119     private final byte separatorByte;
120 
121     private int rowKeyColumnIndex;
122 
123     private int maxColumnCount;
124 
125     // Default value must be negative
126     public static final int DEFAULT_TIMESTAMP_COLUMN_INDEX = -1;
127 
128     private int timestampKeyColumnIndex = DEFAULT_TIMESTAMP_COLUMN_INDEX;
129 
130     public static final String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY";
131 
132     public static final String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY";
133 
134     public static final String ATTRIBUTES_COLUMN_SPEC = "HBASE_ATTRIBUTES_KEY";
135 
136     public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY";
137 
138     public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL";
139 
140     private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX;
141 
142     public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1;
143 
144     public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1;
145 
146     public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1;
147 
148     private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
149 
150     private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX;
151 
152     /**
153      * @param columnsSpecification the list of columns to parser out, comma separated.
154      * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
155      * @param separatorStr
156      */
157     public TsvParser(String columnsSpecification, String separatorStr) {
158       // Configure separator
159       byte[] separator = Bytes.toBytes(separatorStr);
160       Preconditions.checkArgument(separator.length == 1,
161         "TsvParser only supports single-byte separators");
162       separatorByte = separator[0];
163 
164       // Configure columns
165       ArrayList<String> columnStrings = Lists.newArrayList(
166         Splitter.on(',').trimResults().split(columnsSpecification));
167 
168       maxColumnCount = columnStrings.size();
169       families = new byte[maxColumnCount][];
170       qualifiers = new byte[maxColumnCount][];
171 
172       for (int i = 0; i < columnStrings.size(); i++) {
173         String str = columnStrings.get(i);
174         if (ROWKEY_COLUMN_SPEC.equals(str)) {
175           rowKeyColumnIndex = i;
176           continue;
177         }
178         if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) {
179           timestampKeyColumnIndex = i;
180           continue;
181         }
182         if (ATTRIBUTES_COLUMN_SPEC.equals(str)) {
183           attrKeyColumnIndex = i;
184           continue;
185         }
186         if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
187           cellVisibilityColumnIndex = i;
188           continue;
189         }
190         if (CELL_TTL_COLUMN_SPEC.equals(str)) {
191           cellTTLColumnIndex = i;
192           continue;
193         }
194         String[] parts = str.split(":", 2);
195         if (parts.length == 1) {
196           families[i] = str.getBytes();
197           qualifiers[i] = HConstants.EMPTY_BYTE_ARRAY;
198         } else {
199           families[i] = parts[0].getBytes();
200           qualifiers[i] = parts[1].getBytes();
201         }
202       }
203     }
204 
205     public boolean hasTimestamp() {
206       return timestampKeyColumnIndex != DEFAULT_TIMESTAMP_COLUMN_INDEX;
207     }
208 
209     public int getTimestampKeyColumnIndex() {
210       return timestampKeyColumnIndex;
211     }
212 
213     public boolean hasAttributes() {
214       return attrKeyColumnIndex != DEFAULT_ATTRIBUTES_COLUMN_INDEX;
215     }
216 
217     public boolean hasCellVisibility() {
218       return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
219     }
220 
221     public boolean hasCellTTL() {
222       return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
223     }
224 
225     public int getAttributesKeyColumnIndex() {
226       return attrKeyColumnIndex;
227     }
228 
229     public int getCellVisibilityColumnIndex() {
230       return cellVisibilityColumnIndex;
231     }
232 
233     public int getCellTTLColumnIndex() {
234       return cellTTLColumnIndex;
235     }
236 
237     public int getRowKeyColumnIndex() {
238       return rowKeyColumnIndex;
239     }
240 
241     public byte[] getFamily(int idx) {
242       return families[idx];
243     }
244     public byte[] getQualifier(int idx) {
245       return qualifiers[idx];
246     }
247 
248     public ParsedLine parse(byte[] lineBytes, int length)
249     throws BadTsvLineException {
250       // Enumerate separator offsets
251       ArrayList<Integer> tabOffsets = new ArrayList<Integer>(maxColumnCount);
252       for (int i = 0; i < length; i++) {
253         if (lineBytes[i] == separatorByte) {
254           tabOffsets.add(i);
255         }
256       }
257       if (tabOffsets.isEmpty()) {
258         throw new BadTsvLineException("No delimiter");
259       }
260 
261       tabOffsets.add(length);
262 
263       if (tabOffsets.size() > maxColumnCount) {
264         throw new BadTsvLineException("Excessive columns");
265       } else if (tabOffsets.size() <= getRowKeyColumnIndex()) {
266         throw new BadTsvLineException("No row key");
267       } else if (hasTimestamp()
268           && tabOffsets.size() <= getTimestampKeyColumnIndex()) {
269         throw new BadTsvLineException("No timestamp");
270       } else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) {
271         throw new BadTsvLineException("No attributes specified");
272       } else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
273         throw new BadTsvLineException("No cell visibility specified");
274       } else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) {
275         throw new BadTsvLineException("No cell TTL specified");
276       }
277       return new ParsedLine(tabOffsets, lineBytes);
278     }
279 
280     class ParsedLine {
281       private final ArrayList<Integer> tabOffsets;
282       private byte[] lineBytes;
283 
284       ParsedLine(ArrayList<Integer> tabOffsets, byte[] lineBytes) {
285         this.tabOffsets = tabOffsets;
286         this.lineBytes = lineBytes;
287       }
288 
289       public int getRowKeyOffset() {
290         return getColumnOffset(rowKeyColumnIndex);
291       }
292       public int getRowKeyLength() {
293         return getColumnLength(rowKeyColumnIndex);
294       }
295 
296       public long getTimestamp(long ts) throws BadTsvLineException {
297         // Return ts if HBASE_TS_KEY is not configured in column spec
298         if (!hasTimestamp()) {
299           return ts;
300         }
301 
302         String timeStampStr = Bytes.toString(lineBytes,
303             getColumnOffset(timestampKeyColumnIndex),
304             getColumnLength(timestampKeyColumnIndex));
305         try {
306           return Long.parseLong(timeStampStr);
307         } catch (NumberFormatException nfe) {
308           // treat this record as bad record
309           throw new BadTsvLineException("Invalid timestamp " + timeStampStr);
310         }
311       }
312 
313       private String getAttributes() {
314         if (!hasAttributes()) {
315           return null;
316         } else {
317           return Bytes.toString(lineBytes, getColumnOffset(attrKeyColumnIndex),
318               getColumnLength(attrKeyColumnIndex));
319         }
320       }
321 
322       public String[] getIndividualAttributes() {
323         String attributes = getAttributes();
324         if (attributes != null) {
325           return attributes.split(DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR);
326         } else {
327           return null;
328         }
329       }
330 
331       public int getAttributeKeyOffset() {
332         if (hasAttributes()) {
333           return getColumnOffset(attrKeyColumnIndex);
334         } else {
335           return DEFAULT_ATTRIBUTES_COLUMN_INDEX;
336         }
337       }
338 
339       public int getAttributeKeyLength() {
340         if (hasAttributes()) {
341           return getColumnLength(attrKeyColumnIndex);
342         } else {
343           return DEFAULT_ATTRIBUTES_COLUMN_INDEX;
344         }
345       }
346 
347       public int getCellVisibilityColumnOffset() {
348         if (hasCellVisibility()) {
349           return getColumnOffset(cellVisibilityColumnIndex);
350         } else {
351           return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
352         }
353       }
354 
355       public int getCellVisibilityColumnLength() {
356         if (hasCellVisibility()) {
357           return getColumnLength(cellVisibilityColumnIndex);
358         } else {
359           return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
360         }
361       }
362 
363       public String getCellVisibility() {
364         if (!hasCellVisibility()) {
365           return null;
366         } else {
367           return Bytes.toString(lineBytes, getColumnOffset(cellVisibilityColumnIndex),
368               getColumnLength(cellVisibilityColumnIndex));
369         }
370       }
371 
372       public int getCellTTLColumnOffset() {
373         if (hasCellTTL()) {
374           return getColumnOffset(cellTTLColumnIndex);
375         } else {
376           return DEFAULT_CELL_TTL_COLUMN_INDEX;
377         }
378       }
379 
380       public int getCellTTLColumnLength() {
381         if (hasCellTTL()) {
382           return getColumnLength(cellTTLColumnIndex);
383         } else {
384           return DEFAULT_CELL_TTL_COLUMN_INDEX;
385         }
386       }
387 
388       public long getCellTTL() {
389         if (!hasCellTTL()) {
390           return 0;
391         } else {
392           return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex),
393               getColumnLength(cellTTLColumnIndex));
394         }
395       }
396 
397       public int getColumnOffset(int idx) {
398         if (idx > 0)
399           return tabOffsets.get(idx - 1) + 1;
400         else
401           return 0;
402       }
403       public int getColumnLength(int idx) {
404         return tabOffsets.get(idx) - getColumnOffset(idx);
405       }
406       public int getColumnCount() {
407         return tabOffsets.size();
408       }
409       public byte[] getLineBytes() {
410         return lineBytes;
411       }
412     }
413 
414     public static class BadTsvLineException extends Exception {
415       public BadTsvLineException(String err) {
416         super(err);
417       }
418       private static final long serialVersionUID = 1L;
419     }
420 
421     /**
422      * Return starting position and length of row key from the specified line bytes.
423      * @param lineBytes
424      * @param length
425      * @return Pair of row key offset and length.
426      * @throws BadTsvLineException
427      */
428     public Pair<Integer, Integer> parseRowKey(byte[] lineBytes, int length)
429         throws BadTsvLineException {
430       int rkColumnIndex = 0;
431       int startPos = 0, endPos = 0;
432       for (int i = 0; i <= length; i++) {
433         if (i == length || lineBytes[i] == separatorByte) {
434           endPos = i - 1;
435           if (rkColumnIndex++ == getRowKeyColumnIndex()) {
436             if ((endPos + 1) == startPos) {
437               throw new BadTsvLineException("Empty value for ROW KEY.");
438             }
439             break;
440           } else {
441             startPos = endPos + 2;
442           }
443         }
444         if (i == length) {
445           throw new BadTsvLineException(
446               "Row key does not exist as number of columns in the line"
447                   + " are less than row key position.");
448         }
449       }
450       return new Pair<Integer, Integer>(startPos, endPos - startPos + 1);
451     }
452   }
453 
454   /**
455    * Sets up the actual job.
456    *
457    * @param conf  The current configuration.
458    * @param args  The command line parameters.
459    * @return The newly created job.
460    * @throws IOException When setting up the job fails.
461    */
462   protected static Job createSubmittableJob(Configuration conf, String[] args)
463       throws IOException, ClassNotFoundException {
464     Job job = null;
465     boolean isDryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
466     try (Connection connection = ConnectionFactory.createConnection(conf)) {
467       try (Admin admin = connection.getAdmin()) {
468         // Support non-XML supported characters
469         // by re-encoding the passed separator as a Base64 string.
470         String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
471         if (actualSeparator != null) {
472           conf.set(SEPARATOR_CONF_KEY,
473               Base64.encodeBytes(actualSeparator.getBytes()));
474         }
475 
476         // See if a non-default Mapper was set
477         String mapperClassName = conf.get(MAPPER_CONF_KEY);
478         Class mapperClass =
479           mapperClassName != null ? Class.forName(mapperClassName) : DEFAULT_MAPPER;
480 
481           TableName tableName = TableName.valueOf(args[0]);
482           Path inputDir = new Path(args[1]);
483           String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName.getNameAsString());
484           job = Job.getInstance(conf, jobName);
485           job.setJarByClass(mapperClass);
486           FileInputFormat.setInputPaths(job, inputDir);
487           job.setInputFormatClass(TextInputFormat.class);
488           job.setMapperClass(mapperClass);
489           job.setMapOutputKeyClass(ImmutableBytesWritable.class);
490           String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
491           String[] columns = conf.getStrings(COLUMNS_CONF_KEY);
492           if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
493             String fileLoc = conf.get(CREDENTIALS_LOCATION);
494             Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
495             job.getCredentials().addAll(cred);
496           }
497 
498           if (hfileOutPath != null) {
499             if (!admin.tableExists(tableName)) {
500               LOG.warn(format("Table '%s' does not exist.", tableName));
501               if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) {
502                 // TODO: this is backwards. Instead of depending on the existence of a table,
503                 // create a sane splits file for HFileOutputFormat based on data sampling.
504                 createTable(admin, tableName, columns);
505                 if (isDryRun) {
506                   LOG.warn("Dry run: Table will be deleted at end of dry run.");
507                   dryRunTableCreated = true;
508                 }
509               } else {
510                 String errorMsg =
511                     format("Table '%s' does not exist and '%s' is set to no.", tableName,
512                         CREATE_TABLE_CONF_KEY);
513                 LOG.error(errorMsg);
514                 throw new TableNotFoundException(errorMsg);
515               }
516             }
517             try (Table table = connection.getTable(tableName);
518                 RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
519               boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false);
520               // if no.strict is false then check column family
521               if(!noStrict) {
522                 ArrayList<String> unmatchedFamilies = new ArrayList<String>();
523                 Set<String> cfSet = getColumnFamilies(columns);
524                 HTableDescriptor tDesc = table.getTableDescriptor();
525                 for (String cf : cfSet) {
526                   if(tDesc.getFamily(Bytes.toBytes(cf)) == null) {
527                     unmatchedFamilies.add(cf);
528                   }
529                 }
530                 if(unmatchedFamilies.size() > 0) {
531                   ArrayList<String> familyNames = new ArrayList<String>();
532                   for (HColumnDescriptor family : table.getTableDescriptor().getFamilies()) {
533                     familyNames.add(family.getNameAsString());
534                   }
535                   String msg =
536                       "Column Families " + unmatchedFamilies + " specified in " + COLUMNS_CONF_KEY
537                       + " does not match with any of the table " + tableName
538                       + " column families " + familyNames + ".\n"
539                       + "To disable column family check, use -D" + NO_STRICT_COL_FAMILY
540                       + "=true.\n";
541                   usage(msg);
542                   System.exit(-1);
543                 }
544               }
545               if (mapperClass.equals(TsvImporterTextMapper.class)) {
546                 job.setMapOutputValueClass(Text.class);
547                 job.setReducerClass(TextSortReducer.class);
548               } else {
549                 job.setMapOutputValueClass(Put.class);
550                 job.setCombinerClass(PutCombiner.class);
551                 job.setReducerClass(PutSortReducer.class);
552               }
553               if (!isDryRun) {
554                 Path outputDir = new Path(hfileOutPath);
555                 FileOutputFormat.setOutputPath(job, outputDir);
556                 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
557                     regionLocator);
558               }
559             }
560           } else {
561             if (!admin.tableExists(tableName)) {
562               String errorMsg = format("Table '%s' does not exist.", tableName);
563               LOG.error(errorMsg);
564               throw new TableNotFoundException(errorMsg);
565             }
566             if (mapperClass.equals(TsvImporterTextMapper.class)) {
567               usage(TsvImporterTextMapper.class.toString()
568                   + " should not be used for non bulkloading case. use "
569                   + TsvImporterMapper.class.toString()
570                   + " or custom mapper whose value type is Put.");
571               System.exit(-1);
572             }
573             if (!isDryRun) {
574               // No reducers. Just write straight to table. Call initTableReducerJob
575               // to set up the TableOutputFormat.
576               TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
577             }
578             job.setNumReduceTasks(0);
579           }
580           if (isDryRun) {
581             job.setOutputFormatClass(NullOutputFormat.class);
582             job.getConfiguration().setStrings("io.serializations",
583                 job.getConfiguration().get("io.serializations"),
584                 MutationSerialization.class.getName(), ResultSerialization.class.getName(),
585                 KeyValueSerialization.class.getName());
586           }
587           TableMapReduceUtil.addDependencyJars(job);
588           TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
589               com.google.common.base.Function.class /* Guava used by TsvParser */);
590       }
591     }
592     return job;
593   }
594 
595   private static void createTable(Admin admin, TableName tableName, String[] columns)
596       throws IOException {
597     HTableDescriptor htd = new HTableDescriptor(tableName);
598     Set<String> cfSet = getColumnFamilies(columns);
599     for (String cf : cfSet) {
600       HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf));
601       htd.addFamily(hcd);
602     }
603     LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.",
604       tableName, cfSet));
605     admin.createTable(htd);
606   }
607 
608   private static void deleteTable(Configuration conf, String[] args) {
609     TableName tableName = TableName.valueOf(args[0]);
610     try (Connection connection = ConnectionFactory.createConnection(conf);
611          Admin admin = connection.getAdmin()) {
612       try {
613         admin.disableTable(tableName);
614       } catch (TableNotEnabledException e) {
615         LOG.debug("Dry mode: Table: " + tableName + " already disabled, so just deleting it.");
616       }
617       admin.deleteTable(tableName);
618     } catch (IOException e) {
619       LOG.error(format("***Dry run: Failed to delete table '%s'.***\n%s", tableName, e.toString()));
620       return;
621     }
622     LOG.info(format("Dry run: Deleted table '%s'.", tableName));
623   }
624 
625   private static Set<String> getColumnFamilies(String[] columns) {
626     Set<String> cfSet = new HashSet<String>();
627     for (String aColumn : columns) {
628       if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)
629           || TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn)
630           || TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn)
631           || TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn)
632           || TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn))
633         continue;
634       // we are only concerned with the first one (in case this is a cf:cq)
635       cfSet.add(aColumn.split(":", 2)[0]);
636     }
637     return cfSet;
638   }
639 
640   /*
641    * @param errorMsg Error message.  Can be null.
642    */
643   private static void usage(final String errorMsg) {
644     if (errorMsg != null && errorMsg.length() > 0) {
645       System.err.println("ERROR: " + errorMsg);
646     }
647     String usage =
648       "Usage: " + NAME + " -D"+ COLUMNS_CONF_KEY + "=a,b,c <tablename> <inputdir>\n" +
649       "\n" +
650       "Imports the given input directory of TSV data into the specified table.\n" +
651       "\n" +
652       "The column names of the TSV data must be specified using the -D" + COLUMNS_CONF_KEY + "\n" +
653       "option. This option takes the form of comma-separated column names, where each\n" +
654       "column name is either a simple column family, or a columnfamily:qualifier. The special\n" +
655       "column name " + TsvParser.ROWKEY_COLUMN_SPEC + " is used to designate that this column should be used\n" +
656       "as the row key for each imported record. You must specify exactly one column\n" +
657       "to be the row key, and you must specify a column name for every column that exists in the\n" +
658       "input data. Another special column" + TsvParser.TIMESTAMPKEY_COLUMN_SPEC +
659       " designates that this column should be\n" +
660       "used as timestamp for each record. Unlike " + TsvParser.ROWKEY_COLUMN_SPEC + ", " +
661       TsvParser.TIMESTAMPKEY_COLUMN_SPEC + " is optional.\n" +
662       "You must specify at most one column as timestamp key for each imported record.\n" +
663       "Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" +
664       "Note: if you use this option, then '" + TIMESTAMP_CONF_KEY + "' option will be ignored.\n" +
665       "\n" +
666       TsvParser.ATTRIBUTES_COLUMN_SPEC+" can be used to specify Operation Attributes per record.\n"+
667       " Should be specified as key=>value where "+TsvParser.DEFAULT_ATTRIBUTES_COLUMN_INDEX+ " is used \n"+
668       " as the seperator.  Note that more than one OperationAttributes can be specified.\n"+
669       "By default importtsv will load data directly into HBase. To instead generate\n" +
670       "HFiles of data to prepare for a bulk data load, pass the option:\n" +
671       "  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output\n" +
672       "  Note: if you do not use this option, then the target table must already exist in HBase\n" +
673       "\n" +
674       "Other options that may be specified with -D include:\n" +
675       "  -D" + DRY_RUN_CONF_KEY + "=true - Dry run mode. Data is not actually populated into" +
676       " table. If table does not exist, it is created but deleted in the end.\n" +
677       "  -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
678       "  -D" + LOG_BAD_LINES_CONF_KEY + "=true - logs invalid lines to stderr\n" +
679       "  '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" +
680       "  -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" +
681       "  -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " +
682       DEFAULT_MAPPER.getName() + "\n" +
683       "  -D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the import\n" +
684       "  -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" +
685       "  Note: if you set this to 'no', then the target table must already exist in HBase\n" +
686       "  -D" + NO_STRICT_COL_FAMILY + "=true - ignore column family check in hbase table. " +
687       "Default is false\n\n" +
688       "For performance consider the following options:\n" +
689       "  -Dmapreduce.map.speculative=false\n" +
690       "  -Dmapreduce.reduce.speculative=false";
691 
692     System.err.println(usage);
693   }
694 
695   @Override
696   public int run(String[] args) throws Exception {
697     if (args.length < 2) {
698       usage("Wrong number of arguments: " + args.length);
699       return -1;
700     }
701 
702     // When MAPPER_CONF_KEY is null, the user wants to use the provided TsvImporterMapper, so
703     // perform validation on these additional args. When it's not null, user has provided their
704     // own mapper, thus these validation are not relevant.
705     // TODO: validation for TsvImporterMapper, not this tool. Move elsewhere.
706     if (null == getConf().get(MAPPER_CONF_KEY)) {
707       // Make sure columns are specified
708       String[] columns = getConf().getStrings(COLUMNS_CONF_KEY);
709       if (columns == null) {
710         usage("No columns specified. Please specify with -D" +
711             COLUMNS_CONF_KEY+"=...");
712         return -1;
713       }
714 
715       // Make sure they specify exactly one column as the row key
716       int rowkeysFound = 0;
717       for (String col : columns) {
718         if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++;
719       }
720       if (rowkeysFound != 1) {
721         usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC);
722         return -1;
723       }
724 
725       // Make sure we have at most one column as the timestamp key
726       int tskeysFound = 0;
727       for (String col : columns) {
728         if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC))
729           tskeysFound++;
730       }
731       if (tskeysFound > 1) {
732         usage("Must specify at most one column as "
733             + TsvParser.TIMESTAMPKEY_COLUMN_SPEC);
734         return -1;
735       }
736 
737       int attrKeysFound = 0;
738       for (String col : columns) {
739         if (col.equals(TsvParser.ATTRIBUTES_COLUMN_SPEC))
740           attrKeysFound++;
741       }
742       if (attrKeysFound > 1) {
743         usage("Must specify at most one column as "
744             + TsvParser.ATTRIBUTES_COLUMN_SPEC);
745         return -1;
746       }
747 
748       // Make sure one or more columns are specified excluding rowkey and
749       // timestamp key
750       if (columns.length - (rowkeysFound + tskeysFound + attrKeysFound) < 1) {
751         usage("One or more columns in addition to the row key and timestamp(optional) are required");
752         return -1;
753       }
754     }
755 
756     // If timestamp option is not specified, use current system time.
757     long timstamp = getConf().getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis());
758 
759     // Set it back to replace invalid timestamp (non-numeric) with current
760     // system time
761     getConf().setLong(TIMESTAMP_CONF_KEY, timstamp);
762 
763     dryRunTableCreated = false;
764     Job job = createSubmittableJob(getConf(), args);
765     boolean success = job.waitForCompletion(true);
766     if (dryRunTableCreated) {
767       deleteTable(getConf(), args);
768     }
769     return success ? 0 : 1;
770   }
771 
772   public static void main(String[] args) throws Exception {
773     int status = ToolRunner.run(HBaseConfiguration.create(), new ImportTsv(), args);
774     System.exit(status);
775   }
776 }