001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.ByteArrayInputStream; 021import java.io.DataInput; 022import java.io.DataInputStream; 023import java.io.DataOutput; 024import java.io.IOException; 025import java.lang.reflect.InvocationTargetException; 026import java.lang.reflect.Method; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.List; 030import java.util.Locale; 031import java.util.Map; 032import java.util.TreeMap; 033import java.util.UUID; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.conf.Configured; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellComparator; 040import org.apache.hadoop.hbase.CellUtil; 041import org.apache.hadoop.hbase.HBaseConfiguration; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.KeyValueUtil; 044import org.apache.hadoop.hbase.PrivateCellUtil; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.Tag; 047import org.apache.hadoop.hbase.ZooKeeperConnectionException; 048import org.apache.hadoop.hbase.client.Admin; 049import org.apache.hadoop.hbase.client.Connection; 050import org.apache.hadoop.hbase.client.ConnectionFactory; 051import org.apache.hadoop.hbase.client.Delete; 052import org.apache.hadoop.hbase.client.Durability; 053import org.apache.hadoop.hbase.client.Mutation; 054import org.apache.hadoop.hbase.client.Put; 055import org.apache.hadoop.hbase.client.RegionLocator; 056import org.apache.hadoop.hbase.client.Result; 057import org.apache.hadoop.hbase.client.Table; 058import org.apache.hadoop.hbase.filter.Filter; 059import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 062import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 063import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 064import org.apache.hadoop.io.RawComparator; 065import org.apache.hadoop.io.WritableComparable; 066import org.apache.hadoop.io.WritableComparator; 067import org.apache.hadoop.mapreduce.Job; 068import org.apache.hadoop.mapreduce.Partitioner; 069import org.apache.hadoop.mapreduce.Reducer; 070import org.apache.hadoop.mapreduce.TaskCounter; 071import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 072import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 073import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 074import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 075import org.apache.hadoop.util.Tool; 076import org.apache.hadoop.util.ToolRunner; 077import org.apache.yetus.audience.InterfaceAudience; 078import org.apache.zookeeper.KeeperException; 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082/** 083 * Import data written by {@link Export}. 084 */ 085@InterfaceAudience.Public 086public class Import extends Configured implements Tool { 087 private static final Logger LOG = LoggerFactory.getLogger(Import.class); 088 final static String NAME = "import"; 089 public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS"; 090 public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output"; 091 public final static String FILTER_CLASS_CONF_KEY = "import.filter.class"; 092 public final static String FILTER_ARGS_CONF_KEY = "import.filter.args"; 093 public final static String TABLE_NAME = "import.table.name"; 094 public final static String WAL_DURABILITY = "import.wal.durability"; 095 public final static String HAS_LARGE_RESULT = "import.bulk.hasLargeResult"; 096 097 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 098 099 public static class CellWritableComparablePartitioner 100 extends Partitioner<CellWritableComparable, Cell> { 101 private static CellWritableComparable[] START_KEYS = null; 102 103 @Override 104 public int getPartition(CellWritableComparable key, Cell value, int numPartitions) { 105 for (int i = 0; i < START_KEYS.length; ++i) { 106 if (key.compareTo(START_KEYS[i]) <= 0) { 107 return i; 108 } 109 } 110 return START_KEYS.length; 111 } 112 113 } 114 115 public static class CellWritableComparable implements WritableComparable<CellWritableComparable> { 116 117 private Cell kv = null; 118 119 static { 120 // register this comparator 121 WritableComparator.define(CellWritableComparable.class, new CellWritableComparator()); 122 } 123 124 public CellWritableComparable() { 125 } 126 127 public CellWritableComparable(Cell kv) { 128 this.kv = kv; 129 } 130 131 @Override 132 public void write(DataOutput out) throws IOException { 133 out.writeInt(PrivateCellUtil.estimatedSerializedSizeOfKey(kv)); 134 out.writeInt(0); 135 PrivateCellUtil.writeFlatKey(kv, out); 136 } 137 138 @Override 139 public void readFields(DataInput in) throws IOException { 140 kv = KeyValue.create(in); 141 } 142 143 @Override 144 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS", 145 justification = "This is wrong, yes, but we should be purging Writables, not fixing them") 146 public int compareTo(CellWritableComparable o) { 147 return CellComparator.getInstance().compare(this.kv, o.kv); 148 } 149 150 public static class CellWritableComparator extends WritableComparator { 151 152 @Override 153 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 154 try { 155 CellWritableComparable kv1 = new CellWritableComparable(); 156 kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1))); 157 CellWritableComparable kv2 = new CellWritableComparable(); 158 kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2))); 159 return compare(kv1, kv2); 160 } catch (IOException e) { 161 throw new RuntimeException(e); 162 } 163 } 164 165 } 166 167 } 168 169 public static class CellReducer 170 extends Reducer<CellWritableComparable, Cell, ImmutableBytesWritable, Cell> { 171 protected void reduce(CellWritableComparable row, Iterable<Cell> kvs, 172 Reducer<CellWritableComparable, Cell, ImmutableBytesWritable, Cell>.Context context) 173 throws java.io.IOException, InterruptedException { 174 int index = 0; 175 for (Cell kv : kvs) { 176 context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), 177 new MapReduceExtendedCell(kv)); 178 if (++index % 100 == 0) context.setStatus("Wrote " + index + " KeyValues, " 179 + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray())); 180 } 181 } 182 } 183 184 public static class CellSortImporter extends TableMapper<CellWritableComparable, Cell> { 185 private Map<byte[], byte[]> cfRenameMap; 186 private Filter filter; 187 private static final Logger LOG = LoggerFactory.getLogger(CellImporter.class); 188 189 /** 190 * @param row The current table row key. 191 * @param value The columns. 192 * @param context The current context. 193 * @throws IOException When something is broken with the data. 194 */ 195 @Override 196 public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException { 197 try { 198 if (LOG.isTraceEnabled()) { 199 LOG.trace( 200 "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength())); 201 } 202 if ( 203 filter == null || !filter.filterRowKey( 204 PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength())) 205 ) { 206 for (Cell kv : value.rawCells()) { 207 kv = filterKv(filter, kv); 208 // skip if we filtered it out 209 if (kv == null) continue; 210 Cell ret = convertKv(kv, cfRenameMap); 211 context.write(new CellWritableComparable(ret), ret); 212 } 213 } 214 } catch (InterruptedException e) { 215 LOG.error("Interrupted while emitting Cell", e); 216 Thread.currentThread().interrupt(); 217 } 218 } 219 220 @Override 221 public void setup(Context context) throws IOException { 222 cfRenameMap = createCfRenameMap(context.getConfiguration()); 223 filter = instantiateFilter(context.getConfiguration()); 224 int reduceNum = context.getNumReduceTasks(); 225 Configuration conf = context.getConfiguration(); 226 TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME)); 227 try (Connection conn = ConnectionFactory.createConnection(conf); 228 RegionLocator regionLocator = conn.getRegionLocator(tableName)) { 229 byte[][] startKeys = regionLocator.getStartKeys(); 230 if (startKeys.length != reduceNum) { 231 throw new IOException("Region split after job initialization"); 232 } 233 CellWritableComparable[] startKeyWraps = new CellWritableComparable[startKeys.length - 1]; 234 for (int i = 1; i < startKeys.length; ++i) { 235 startKeyWraps[i - 1] = 236 new CellWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i])); 237 } 238 CellWritableComparablePartitioner.START_KEYS = startKeyWraps; 239 } 240 } 241 } 242 243 /** 244 * A mapper that just writes out KeyValues. 245 */ 246 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS", 247 justification = "Writables are going away and this has been this way forever") 248 public static class CellImporter extends TableMapper<ImmutableBytesWritable, Cell> { 249 private Map<byte[], byte[]> cfRenameMap; 250 private Filter filter; 251 private static final Logger LOG = LoggerFactory.getLogger(CellImporter.class); 252 253 /** 254 * @param row The current table row key. 255 * @param value The columns. 256 * @param context The current context. 257 * @throws IOException When something is broken with the data. 258 */ 259 @Override 260 public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException { 261 try { 262 if (LOG.isTraceEnabled()) { 263 LOG.trace( 264 "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength())); 265 } 266 if ( 267 filter == null || !filter.filterRowKey( 268 PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength())) 269 ) { 270 for (Cell kv : value.rawCells()) { 271 kv = filterKv(filter, kv); 272 // skip if we filtered it out 273 if (kv == null) continue; 274 context.write(row, new MapReduceExtendedCell(convertKv(kv, cfRenameMap))); 275 } 276 } 277 } catch (InterruptedException e) { 278 LOG.error("Interrupted while emitting Cell", e); 279 Thread.currentThread().interrupt(); 280 } 281 } 282 283 @Override 284 public void setup(Context context) { 285 cfRenameMap = createCfRenameMap(context.getConfiguration()); 286 filter = instantiateFilter(context.getConfiguration()); 287 } 288 } 289 290 /** 291 * Write table content out to files in hdfs. 292 */ 293 public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> { 294 private Map<byte[], byte[]> cfRenameMap; 295 private List<UUID> clusterIds; 296 private Filter filter; 297 private Durability durability; 298 299 /** 300 * @param row The current table row key. 301 * @param value The columns. 302 * @param context The current context. 303 * @throws IOException When something is broken with the data. 304 */ 305 @Override 306 public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException { 307 try { 308 writeResult(row, value, context); 309 } catch (InterruptedException e) { 310 LOG.error("Interrupted while writing result", e); 311 Thread.currentThread().interrupt(); 312 } 313 } 314 315 private void writeResult(ImmutableBytesWritable key, Result result, Context context) 316 throws IOException, InterruptedException { 317 Put put = null; 318 Delete delete = null; 319 if (LOG.isTraceEnabled()) { 320 LOG.trace( 321 "Considering the row." + Bytes.toString(key.get(), key.getOffset(), key.getLength())); 322 } 323 if ( 324 filter == null || !filter.filterRowKey( 325 PrivateCellUtil.createFirstOnRow(key.get(), key.getOffset(), (short) key.getLength())) 326 ) { 327 processKV(key, result, context, put, delete); 328 } 329 } 330 331 protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put, 332 Delete delete) throws IOException, InterruptedException { 333 for (Cell kv : result.rawCells()) { 334 kv = filterKv(filter, kv); 335 // skip if we filter it out 336 if (kv == null) continue; 337 338 kv = convertKv(kv, cfRenameMap); 339 // Deletes and Puts are gathered and written when finished 340 /* 341 * If there are sequence of mutations and tombstones in an Export, and after Import the same 342 * sequence should be restored as it is. If we combine all Delete tombstones into single 343 * request then there is chance of ignoring few DeleteFamily tombstones, because if we 344 * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining 345 * only newest in hbase table and ignoring other. Check - HBASE-12065 346 */ 347 if (PrivateCellUtil.isDeleteFamily(kv)) { 348 Delete deleteFamily = new Delete(key.get()); 349 deleteFamily.add(kv); 350 if (durability != null) { 351 deleteFamily.setDurability(durability); 352 } 353 deleteFamily.setClusterIds(clusterIds); 354 context.write(key, deleteFamily); 355 } else if (CellUtil.isDelete(kv)) { 356 if (delete == null) { 357 delete = new Delete(key.get()); 358 } 359 delete.add(kv); 360 } else { 361 if (put == null) { 362 put = new Put(key.get()); 363 } 364 addPutToKv(put, kv); 365 } 366 } 367 if (put != null) { 368 if (durability != null) { 369 put.setDurability(durability); 370 } 371 put.setClusterIds(clusterIds); 372 context.write(key, put); 373 } 374 if (delete != null) { 375 if (durability != null) { 376 delete.setDurability(durability); 377 } 378 delete.setClusterIds(clusterIds); 379 context.write(key, delete); 380 } 381 } 382 383 protected void addPutToKv(Put put, Cell kv) throws IOException { 384 put.add(kv); 385 } 386 387 @Override 388 public void setup(Context context) { 389 LOG.info("Setting up " + getClass() + " mapper."); 390 Configuration conf = context.getConfiguration(); 391 cfRenameMap = createCfRenameMap(conf); 392 filter = instantiateFilter(conf); 393 String durabilityStr = conf.get(WAL_DURABILITY); 394 if (durabilityStr != null) { 395 durability = Durability.valueOf(durabilityStr.toUpperCase(Locale.ROOT)); 396 LOG.info("setting WAL durability to " + durability); 397 } else { 398 LOG.info("setting WAL durability to default."); 399 } 400 // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid. 401 ZKWatcher zkw = null; 402 Exception ex = null; 403 try { 404 zkw = new ZKWatcher(conf, context.getTaskAttemptID().toString(), null); 405 clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw)); 406 } catch (ZooKeeperConnectionException e) { 407 ex = e; 408 LOG.error("Problem connecting to ZooKeper during task setup", e); 409 } catch (KeeperException e) { 410 ex = e; 411 LOG.error("Problem reading ZooKeeper data during task setup", e); 412 } catch (IOException e) { 413 ex = e; 414 LOG.error("Problem setting up task", e); 415 } finally { 416 if (zkw != null) zkw.close(); 417 } 418 if (clusterIds == null) { 419 // exit early if setup fails 420 throw new RuntimeException(ex); 421 } 422 } 423 } 424 425 /** 426 * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to 427 * optionally not include in the job output 428 * @param conf {@link Configuration} from which to load the filter 429 * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used 430 * @throws IllegalArgumentException if the filter is misconfigured 431 */ 432 public static Filter instantiateFilter(Configuration conf) { 433 // get the filter, if it was configured 434 Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class); 435 if (filterClass == null) { 436 LOG.debug("No configured filter class, accepting all keyvalues."); 437 return null; 438 } 439 LOG.debug("Attempting to create filter:" + filterClass); 440 String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY); 441 ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs); 442 try { 443 Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class); 444 return (Filter) m.invoke(null, quotedArgs); 445 } catch (IllegalAccessException e) { 446 LOG.error("Couldn't instantiate filter!", e); 447 throw new RuntimeException(e); 448 } catch (SecurityException e) { 449 LOG.error("Couldn't instantiate filter!", e); 450 throw new RuntimeException(e); 451 } catch (NoSuchMethodException e) { 452 LOG.error("Couldn't instantiate filter!", e); 453 throw new RuntimeException(e); 454 } catch (IllegalArgumentException e) { 455 LOG.error("Couldn't instantiate filter!", e); 456 throw new RuntimeException(e); 457 } catch (InvocationTargetException e) { 458 LOG.error("Couldn't instantiate filter!", e); 459 throw new RuntimeException(e); 460 } 461 } 462 463 private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) { 464 ArrayList<byte[]> quotedArgs = new ArrayList<>(); 465 for (String stringArg : stringArgs) { 466 // all the filters' instantiation methods expected quoted args since they are coming from 467 // the shell, so add them here, though it shouldn't really be needed :-/ 468 quotedArgs.add(Bytes.toBytes("'" + stringArg + "'")); 469 } 470 return quotedArgs; 471 } 472 473 /** 474 * Attempt to filter out the keyvalue 475 * @param c {@link Cell} on which to apply the filter 476 * @return <tt>null</tt> if the key should not be written, otherwise returns the original 477 * {@link Cell} 478 */ 479 public static Cell filterKv(Filter filter, Cell c) throws IOException { 480 // apply the filter and skip this kv if the filter doesn't apply 481 if (filter != null) { 482 Filter.ReturnCode code = filter.filterCell(c); 483 if (LOG.isTraceEnabled()) { 484 LOG.trace("Filter returned:" + code + " for the cell:" + c); 485 } 486 // if its not an accept type, then skip this kv 487 if ( 488 !(code.equals(Filter.ReturnCode.INCLUDE) 489 || code.equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL)) 490 ) { 491 return null; 492 } 493 } 494 return c; 495 } 496 497 // helper: create a new KeyValue based on CF rename map 498 private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) { 499 if (cfRenameMap != null) { 500 // If there's a rename mapping for this CF, create a new KeyValue 501 byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv)); 502 if (newCfName != null) { 503 List<Tag> tags = PrivateCellUtil.getTags(kv); 504 kv = new KeyValue(kv.getRowArray(), // row buffer 505 kv.getRowOffset(), // row offset 506 kv.getRowLength(), // row length 507 newCfName, // CF buffer 508 0, // CF offset 509 newCfName.length, // CF length 510 kv.getQualifierArray(), // qualifier buffer 511 kv.getQualifierOffset(), // qualifier offset 512 kv.getQualifierLength(), // qualifier length 513 kv.getTimestamp(), // timestamp 514 KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type 515 kv.getValueArray(), // value buffer 516 kv.getValueOffset(), // value offset 517 kv.getValueLength(), // value length 518 tags.size() == 0 ? null : tags); 519 } 520 } 521 return kv; 522 } 523 524 // helper: make a map from sourceCfName to destCfName by parsing a config key 525 private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) { 526 Map<byte[], byte[]> cfRenameMap = null; 527 String allMappingsPropVal = conf.get(CF_RENAME_PROP); 528 if (allMappingsPropVal != null) { 529 // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,... 530 String[] allMappings = allMappingsPropVal.split(","); 531 for (String mapping : allMappings) { 532 if (cfRenameMap == null) { 533 cfRenameMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 534 } 535 String[] srcAndDest = mapping.split(":"); 536 if (srcAndDest.length != 2) { 537 continue; 538 } 539 cfRenameMap.put(Bytes.toBytes(srcAndDest[0]), Bytes.toBytes(srcAndDest[1])); 540 } 541 } 542 return cfRenameMap; 543 } 544 545 /** 546 * <p> 547 * Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells the mapper 548 * how to rename column families. 549 * <p> 550 * Alternately, instead of calling this function, you could set the configuration key 551 * {@link #CF_RENAME_PROP} yourself. The value should look like 552 * 553 * <pre> 554 * srcCf1:destCf1,srcCf2:destCf2,.... 555 * </pre> 556 * 557 * . This would have the same effect on the mapper behavior. 558 * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be set 559 * @param renameMap a mapping from source CF names to destination CF names 560 */ 561 static public void configureCfRenaming(Configuration conf, Map<String, String> renameMap) { 562 StringBuilder sb = new StringBuilder(); 563 for (Map.Entry<String, String> entry : renameMap.entrySet()) { 564 String sourceCf = entry.getKey(); 565 String destCf = entry.getValue(); 566 567 if ( 568 sourceCf.contains(":") || sourceCf.contains(",") || destCf.contains(":") 569 || destCf.contains(",") 570 ) { 571 throw new IllegalArgumentException( 572 "Illegal character in CF names: " + sourceCf + ", " + destCf); 573 } 574 575 if (sb.length() != 0) { 576 sb.append(","); 577 } 578 sb.append(sourceCf + ":" + destCf); 579 } 580 conf.set(CF_RENAME_PROP, sb.toString()); 581 } 582 583 /** 584 * Add a Filter to be instantiated on import 585 * @param conf Configuration to update (will be passed to the job) 586 * @param clazz {@link Filter} subclass to instantiate on the server. 587 * @param filterArgs List of arguments to pass to the filter on instantiation 588 */ 589 public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz, 590 List<String> filterArgs) throws IOException { 591 conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName()); 592 conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()])); 593 } 594 595 /** 596 * Sets up the actual job. 597 * @param conf The current configuration. 598 * @param args The command line parameters. 599 * @return The newly created job. 600 * @throws IOException When setting up the job fails. 601 */ 602 public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException { 603 TableName tableName = TableName.valueOf(args[0]); 604 conf.set(TABLE_NAME, tableName.getNameAsString()); 605 Path inputDir = new Path(args[1]); 606 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 607 job.setJarByClass(Importer.class); 608 FileInputFormat.setInputPaths(job, inputDir); 609 job.setInputFormatClass(SequenceFileInputFormat.class); 610 String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); 611 612 // make sure we get the filter in the jars 613 try { 614 Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class); 615 if (filter != null) { 616 TableMapReduceUtil.addDependencyJarsForClasses(conf, filter); 617 } 618 } catch (Exception e) { 619 throw new IOException(e); 620 } 621 622 if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) { 623 LOG.info("Use Large Result!!"); 624 try (Connection conn = ConnectionFactory.createConnection(conf); 625 Table table = conn.getTable(tableName); 626 RegionLocator regionLocator = conn.getRegionLocator(tableName)) { 627 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 628 job.setMapperClass(CellSortImporter.class); 629 job.setReducerClass(CellReducer.class); 630 Path outputDir = new Path(hfileOutPath); 631 FileOutputFormat.setOutputPath(job, outputDir); 632 job.setMapOutputKeyClass(CellWritableComparable.class); 633 job.setMapOutputValueClass(MapReduceExtendedCell.class); 634 job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class", 635 CellWritableComparable.CellWritableComparator.class, RawComparator.class); 636 Path partitionsPath = 637 new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration())); 638 FileSystem fs = FileSystem.get(job.getConfiguration()); 639 fs.deleteOnExit(partitionsPath); 640 job.setPartitionerClass(CellWritableComparablePartitioner.class); 641 job.setNumReduceTasks(regionLocator.getStartKeys().length); 642 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 643 org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class); 644 } 645 } else if (hfileOutPath != null) { 646 LOG.info("writing to hfiles for bulk load."); 647 job.setMapperClass(CellImporter.class); 648 try (Connection conn = ConnectionFactory.createConnection(conf); 649 Table table = conn.getTable(tableName); 650 RegionLocator regionLocator = conn.getRegionLocator(tableName)) { 651 job.setReducerClass(CellSortReducer.class); 652 Path outputDir = new Path(hfileOutPath); 653 FileOutputFormat.setOutputPath(job, outputDir); 654 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 655 job.setMapOutputValueClass(MapReduceExtendedCell.class); 656 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 657 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 658 org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class); 659 } 660 } else { 661 LOG.info("writing directly to table from Mapper."); 662 // No reducers. Just write straight to table. Call initTableReducerJob 663 // because it sets up the TableOutputFormat. 664 job.setMapperClass(Importer.class); 665 TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job); 666 job.setNumReduceTasks(0); 667 } 668 return job; 669 } 670 671 /* 672 * @param errorMsg Error message. Can be null. 673 */ 674 private static void usage(final String errorMsg) { 675 if (errorMsg != null && errorMsg.length() > 0) { 676 System.err.println("ERROR: " + errorMsg); 677 } 678 System.err.println("Usage: Import [options] <tablename> <inputdir>"); 679 System.err.println("By default Import will load data directly into HBase. To instead generate"); 680 System.err.println("HFiles of data to prepare for a bulk data load, pass the option:"); 681 System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); 682 System.err.println("If there is a large result that includes too much Cell " 683 + "whitch can occur OOME caused by the memery sort in reducer, pass the option:"); 684 System.err.println(" -D" + HAS_LARGE_RESULT + "=true"); 685 System.err 686 .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use"); 687 System.err.println(" -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>"); 688 System.err.println(" -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter"); 689 System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the " 690 + CF_RENAME_PROP + " property. Futher, filters will only use the" 691 + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify " 692 + " whether the current row needs to be ignored completely for processing and " 693 + " Filter#filterCell(Cell) method to determine if the Cell should be added;" 694 + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including" 695 + " the Cell."); 696 System.err.println("To import data exported from HBase 0.94, use"); 697 System.err.println(" -Dhbase.import.version=0.94"); 698 System.err.println(" -D " + JOB_NAME_CONF_KEY 699 + "=jobName - use the specified mapreduce job name for the import"); 700 System.err.println("For performance consider the following options:\n" 701 + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false\n" 702 + " -D" + WAL_DURABILITY + "=<Used while writing data to hbase." 703 + " Allowed values are the supported durability values" 704 + " like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>"); 705 } 706 707 /** 708 * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we 709 * need to flush all the regions of the table as the data is held in memory and is also not 710 * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the 711 * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL} 712 */ 713 public static void flushRegionsIfNecessary(Configuration conf) 714 throws IOException, InterruptedException { 715 String tableName = conf.get(TABLE_NAME); 716 Admin hAdmin = null; 717 Connection connection = null; 718 String durability = conf.get(WAL_DURABILITY); 719 // Need to flush if the data is written to hbase and skip wal is enabled. 720 if ( 721 conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null 722 && Durability.SKIP_WAL.name().equalsIgnoreCase(durability) 723 ) { 724 LOG.info("Flushing all data that skipped the WAL."); 725 try { 726 connection = ConnectionFactory.createConnection(conf); 727 hAdmin = connection.getAdmin(); 728 hAdmin.flush(TableName.valueOf(tableName)); 729 } finally { 730 if (hAdmin != null) { 731 hAdmin.close(); 732 } 733 if (connection != null) { 734 connection.close(); 735 } 736 } 737 } 738 } 739 740 @Override 741 public int run(String[] args) throws Exception { 742 if (args.length < 2) { 743 usage("Wrong number of arguments: " + args.length); 744 return -1; 745 } 746 String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER); 747 if (inputVersionString != null) { 748 getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString); 749 } 750 Job job = createSubmittableJob(getConf(), args); 751 boolean isJobSuccessful = job.waitForCompletion(true); 752 if (isJobSuccessful) { 753 // Flush all the regions of the table 754 flushRegionsIfNecessary(getConf()); 755 } 756 long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue(); 757 long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue(); 758 if (outputRecords < inputRecords) { 759 System.err.println("Warning, not all records were imported (maybe filtered out)."); 760 if (outputRecords == 0) { 761 System.err.println("If the data was exported from HBase 0.94 " 762 + "consider using -Dhbase.import.version=0.94."); 763 } 764 } 765 766 return (isJobSuccessful ? 0 : 1); 767 } 768 769 /** 770 * Main entry point. 771 * @param args The command line parameters. 772 * @throws Exception When running the job fails. 773 */ 774 public static void main(String[] args) throws Exception { 775 int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args); 776 System.exit(errCode); 777 } 778 779}