001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.ByteArrayInputStream; 021import java.io.DataInput; 022import java.io.DataInputStream; 023import java.io.DataOutput; 024import java.io.IOException; 025import java.lang.reflect.InvocationTargetException; 026import java.lang.reflect.Method; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.List; 030import java.util.Locale; 031import java.util.Map; 032import java.util.TreeMap; 033import java.util.UUID; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.conf.Configured; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.CellComparator; 040import org.apache.hadoop.hbase.CellUtil; 041import org.apache.hadoop.hbase.ExtendedCell; 042import org.apache.hadoop.hbase.HBaseConfiguration; 043import org.apache.hadoop.hbase.KeyValue; 044import org.apache.hadoop.hbase.KeyValueUtil; 045import org.apache.hadoop.hbase.PrivateCellUtil; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.Tag; 048import org.apache.hadoop.hbase.ZooKeeperConnectionException; 049import org.apache.hadoop.hbase.client.Admin; 050import org.apache.hadoop.hbase.client.ClientInternalHelper; 051import org.apache.hadoop.hbase.client.Connection; 052import org.apache.hadoop.hbase.client.ConnectionFactory; 053import org.apache.hadoop.hbase.client.Delete; 054import org.apache.hadoop.hbase.client.Durability; 055import org.apache.hadoop.hbase.client.Mutation; 056import org.apache.hadoop.hbase.client.Put; 057import org.apache.hadoop.hbase.client.RegionLocator; 058import org.apache.hadoop.hbase.client.Result; 059import org.apache.hadoop.hbase.client.Table; 060import org.apache.hadoop.hbase.filter.Filter; 061import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 064import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 065import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 066import org.apache.hadoop.io.RawComparator; 067import org.apache.hadoop.io.WritableComparable; 068import org.apache.hadoop.io.WritableComparator; 069import org.apache.hadoop.mapreduce.Job; 070import org.apache.hadoop.mapreduce.Partitioner; 071import org.apache.hadoop.mapreduce.Reducer; 072import org.apache.hadoop.mapreduce.TaskCounter; 073import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 074import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 075import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 076import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 077import org.apache.hadoop.util.Tool; 078import org.apache.hadoop.util.ToolRunner; 079import org.apache.yetus.audience.InterfaceAudience; 080import org.apache.zookeeper.KeeperException; 081import org.slf4j.Logger; 082import org.slf4j.LoggerFactory; 083 084/** 085 * Import data written by {@link Export}. 086 */ 087@InterfaceAudience.Public 088public class Import extends Configured implements Tool { 089 private static final Logger LOG = LoggerFactory.getLogger(Import.class); 090 final static String NAME = "import"; 091 public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS"; 092 public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output"; 093 public final static String FILTER_CLASS_CONF_KEY = "import.filter.class"; 094 public final static String FILTER_ARGS_CONF_KEY = "import.filter.args"; 095 public final static String TABLE_NAME = "import.table.name"; 096 public final static String WAL_DURABILITY = "import.wal.durability"; 097 public final static String HAS_LARGE_RESULT = "import.bulk.hasLargeResult"; 098 099 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 100 101 public static class CellWritableComparablePartitioner 102 extends Partitioner<CellWritableComparable, Cell> { 103 private static CellWritableComparable[] START_KEYS = null; 104 105 @Override 106 public int getPartition(CellWritableComparable key, Cell value, int numPartitions) { 107 for (int i = 0; i < START_KEYS.length; ++i) { 108 if (key.compareTo(START_KEYS[i]) <= 0) { 109 return i; 110 } 111 } 112 return START_KEYS.length; 113 } 114 115 } 116 117 public static class CellWritableComparable implements WritableComparable<CellWritableComparable> { 118 119 private ExtendedCell kv = null; 120 121 static { 122 // register this comparator 123 WritableComparator.define(CellWritableComparable.class, new CellWritableComparator()); 124 } 125 126 public CellWritableComparable() { 127 } 128 129 public CellWritableComparable(Cell kv) { 130 this.kv = (ExtendedCell) kv; 131 } 132 133 @Override 134 public void write(DataOutput out) throws IOException { 135 int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(kv); 136 int valueLen = 0; // We avoid writing value here. So just serialize as if an empty value. 137 out.writeInt(keyLen + valueLen + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE); 138 out.writeInt(keyLen); 139 out.writeInt(valueLen); 140 PrivateCellUtil.writeFlatKey(kv, out); 141 } 142 143 @Override 144 public void readFields(DataInput in) throws IOException { 145 kv = KeyValue.create(in); 146 } 147 148 @Override 149 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS", 150 justification = "This is wrong, yes, but we should be purging Writables, not fixing them") 151 public int compareTo(CellWritableComparable o) { 152 return CellComparator.getInstance().compare(this.kv, o.kv); 153 } 154 155 public static class CellWritableComparator extends WritableComparator { 156 157 @Override 158 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 159 try { 160 CellWritableComparable kv1 = new CellWritableComparable(); 161 kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1))); 162 CellWritableComparable kv2 = new CellWritableComparable(); 163 kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2))); 164 return compare(kv1, kv2); 165 } catch (IOException e) { 166 throw new RuntimeException(e); 167 } 168 } 169 170 } 171 172 } 173 174 public static class CellReducer 175 extends Reducer<CellWritableComparable, Cell, ImmutableBytesWritable, Cell> { 176 protected void reduce(CellWritableComparable row, Iterable<Cell> kvs, 177 Reducer<CellWritableComparable, Cell, ImmutableBytesWritable, Cell>.Context context) 178 throws java.io.IOException, InterruptedException { 179 int index = 0; 180 for (Cell kv : kvs) { 181 context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), 182 new MapReduceExtendedCell(PrivateCellUtil.ensureExtendedCell(kv))); 183 if (++index % 100 == 0) context.setStatus("Wrote " + index + " KeyValues, " 184 + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray())); 185 } 186 } 187 } 188 189 public static class CellSortImporter extends TableMapper<CellWritableComparable, Cell> { 190 private Map<byte[], byte[]> cfRenameMap; 191 private Filter filter; 192 private static final Logger LOG = LoggerFactory.getLogger(CellImporter.class); 193 194 /** 195 * @param row The current table row key. 196 * @param value The columns. 197 * @param context The current context. 198 * @throws IOException When something is broken with the data. 199 */ 200 @Override 201 public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException { 202 try { 203 if (LOG.isTraceEnabled()) { 204 LOG.trace( 205 "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength())); 206 } 207 if ( 208 filter == null || !filter.filterRowKey( 209 PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength())) 210 ) { 211 for (ExtendedCell kv : ClientInternalHelper.getExtendedRawCells(value)) { 212 kv = filterKv(filter, kv); 213 // skip if we filtered it out 214 if (kv == null) { 215 continue; 216 } 217 Cell ret = convertKv(kv, cfRenameMap); 218 context.write(new CellWritableComparable(ret), 219 new MapReduceExtendedCell((ExtendedCell) ret)); 220 } 221 } 222 } catch (InterruptedException e) { 223 LOG.error("Interrupted while emitting Cell", e); 224 Thread.currentThread().interrupt(); 225 } 226 } 227 228 @Override 229 public void setup(Context context) throws IOException { 230 cfRenameMap = createCfRenameMap(context.getConfiguration()); 231 filter = instantiateFilter(context.getConfiguration()); 232 int reduceNum = context.getNumReduceTasks(); 233 Configuration conf = context.getConfiguration(); 234 TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME)); 235 try (Connection conn = ConnectionFactory.createConnection(conf); 236 RegionLocator regionLocator = conn.getRegionLocator(tableName)) { 237 byte[][] startKeys = regionLocator.getStartKeys(); 238 if (startKeys.length != reduceNum) { 239 throw new IOException("Region split after job initialization"); 240 } 241 CellWritableComparable[] startKeyWraps = new CellWritableComparable[startKeys.length - 1]; 242 for (int i = 1; i < startKeys.length; ++i) { 243 startKeyWraps[i - 1] = 244 new CellWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i])); 245 } 246 CellWritableComparablePartitioner.START_KEYS = startKeyWraps; 247 } 248 } 249 } 250 251 /** 252 * A mapper that just writes out KeyValues. 253 */ 254 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS", 255 justification = "Writables are going away and this has been this way forever") 256 public static class CellImporter extends TableMapper<ImmutableBytesWritable, Cell> { 257 private Map<byte[], byte[]> cfRenameMap; 258 private Filter filter; 259 private static final Logger LOG = LoggerFactory.getLogger(CellImporter.class); 260 261 /** 262 * @param row The current table row key. 263 * @param value The columns. 264 * @param context The current context. 265 * @throws IOException When something is broken with the data. 266 */ 267 @Override 268 public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException { 269 try { 270 if (LOG.isTraceEnabled()) { 271 LOG.trace( 272 "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength())); 273 } 274 if ( 275 filter == null || !filter.filterRowKey( 276 PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength())) 277 ) { 278 for (ExtendedCell kv : ClientInternalHelper.getExtendedRawCells(value)) { 279 kv = filterKv(filter, kv); 280 // skip if we filtered it out 281 if (kv == null) { 282 continue; 283 } 284 context.write(row, new MapReduceExtendedCell(convertKv(kv, cfRenameMap))); 285 } 286 } 287 } catch (InterruptedException e) { 288 LOG.error("Interrupted while emitting Cell", e); 289 Thread.currentThread().interrupt(); 290 } 291 } 292 293 @Override 294 public void setup(Context context) { 295 cfRenameMap = createCfRenameMap(context.getConfiguration()); 296 filter = instantiateFilter(context.getConfiguration()); 297 } 298 } 299 300 /** 301 * Write table content out to files in hdfs. 302 */ 303 public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> { 304 private Map<byte[], byte[]> cfRenameMap; 305 private List<UUID> clusterIds; 306 private Filter filter; 307 private Durability durability; 308 309 /** 310 * @param row The current table row key. 311 * @param value The columns. 312 * @param context The current context. 313 * @throws IOException When something is broken with the data. 314 */ 315 @Override 316 public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException { 317 try { 318 writeResult(row, value, context); 319 } catch (InterruptedException e) { 320 LOG.error("Interrupted while writing result", e); 321 Thread.currentThread().interrupt(); 322 } 323 } 324 325 private void writeResult(ImmutableBytesWritable key, Result result, Context context) 326 throws IOException, InterruptedException { 327 Put put = null; 328 Delete delete = null; 329 if (LOG.isTraceEnabled()) { 330 LOG.trace( 331 "Considering the row." + Bytes.toString(key.get(), key.getOffset(), key.getLength())); 332 } 333 if ( 334 filter == null || !filter.filterRowKey( 335 PrivateCellUtil.createFirstOnRow(key.get(), key.getOffset(), (short) key.getLength())) 336 ) { 337 processKV(key, result, context, put, delete); 338 } 339 } 340 341 protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put, 342 Delete delete) throws IOException, InterruptedException { 343 for (ExtendedCell kv : ClientInternalHelper.getExtendedRawCells(result)) { 344 kv = filterKv(filter, kv); 345 // skip if we filter it out 346 if (kv == null) { 347 continue; 348 } 349 350 kv = convertKv(kv, cfRenameMap); 351 // Deletes and Puts are gathered and written when finished 352 /* 353 * If there are sequence of mutations and tombstones in an Export, and after Import the same 354 * sequence should be restored as it is. If we combine all Delete tombstones into single 355 * request then there is chance of ignoring few DeleteFamily tombstones, because if we 356 * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining 357 * only newest in hbase table and ignoring other. Check - HBASE-12065 358 */ 359 if (PrivateCellUtil.isDeleteFamily(kv)) { 360 Delete deleteFamily = new Delete(key.get()); 361 deleteFamily.add(kv); 362 if (durability != null) { 363 deleteFamily.setDurability(durability); 364 } 365 deleteFamily.setClusterIds(clusterIds); 366 context.write(key, deleteFamily); 367 } else if (CellUtil.isDelete(kv)) { 368 if (delete == null) { 369 delete = new Delete(key.get()); 370 } 371 delete.add(kv); 372 } else { 373 if (put == null) { 374 put = new Put(key.get()); 375 } 376 addPutToKv(put, kv); 377 } 378 } 379 if (put != null) { 380 if (durability != null) { 381 put.setDurability(durability); 382 } 383 put.setClusterIds(clusterIds); 384 context.write(key, put); 385 } 386 if (delete != null) { 387 if (durability != null) { 388 delete.setDurability(durability); 389 } 390 delete.setClusterIds(clusterIds); 391 context.write(key, delete); 392 } 393 } 394 395 protected void addPutToKv(Put put, Cell kv) throws IOException { 396 put.add(kv); 397 } 398 399 @Override 400 public void setup(Context context) { 401 LOG.info("Setting up " + getClass() + " mapper."); 402 Configuration conf = context.getConfiguration(); 403 cfRenameMap = createCfRenameMap(conf); 404 filter = instantiateFilter(conf); 405 String durabilityStr = conf.get(WAL_DURABILITY); 406 if (durabilityStr != null) { 407 durability = Durability.valueOf(durabilityStr.toUpperCase(Locale.ROOT)); 408 LOG.info("setting WAL durability to " + durability); 409 } else { 410 LOG.info("setting WAL durability to default."); 411 } 412 // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid. 413 ZKWatcher zkw = null; 414 Exception ex = null; 415 try { 416 zkw = new ZKWatcher(conf, context.getTaskAttemptID().toString(), null); 417 clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw)); 418 } catch (ZooKeeperConnectionException e) { 419 ex = e; 420 LOG.error("Problem connecting to ZooKeper during task setup", e); 421 } catch (KeeperException e) { 422 ex = e; 423 LOG.error("Problem reading ZooKeeper data during task setup", e); 424 } catch (IOException e) { 425 ex = e; 426 LOG.error("Problem setting up task", e); 427 } finally { 428 if (zkw != null) zkw.close(); 429 } 430 if (clusterIds == null) { 431 // exit early if setup fails 432 throw new RuntimeException(ex); 433 } 434 } 435 } 436 437 /** 438 * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to 439 * optionally not include in the job output 440 * @param conf {@link Configuration} from which to load the filter 441 * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used 442 * @throws IllegalArgumentException if the filter is misconfigured 443 */ 444 public static Filter instantiateFilter(Configuration conf) { 445 // get the filter, if it was configured 446 Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class); 447 if (filterClass == null) { 448 LOG.debug("No configured filter class, accepting all keyvalues."); 449 return null; 450 } 451 LOG.debug("Attempting to create filter:" + filterClass); 452 String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY); 453 ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs); 454 try { 455 Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class); 456 return (Filter) m.invoke(null, quotedArgs); 457 } catch (IllegalAccessException e) { 458 LOG.error("Couldn't instantiate filter!", e); 459 throw new RuntimeException(e); 460 } catch (SecurityException e) { 461 LOG.error("Couldn't instantiate filter!", e); 462 throw new RuntimeException(e); 463 } catch (NoSuchMethodException e) { 464 LOG.error("Couldn't instantiate filter!", e); 465 throw new RuntimeException(e); 466 } catch (IllegalArgumentException e) { 467 LOG.error("Couldn't instantiate filter!", e); 468 throw new RuntimeException(e); 469 } catch (InvocationTargetException e) { 470 LOG.error("Couldn't instantiate filter!", e); 471 throw new RuntimeException(e); 472 } 473 } 474 475 private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) { 476 ArrayList<byte[]> quotedArgs = new ArrayList<>(); 477 for (String stringArg : stringArgs) { 478 // all the filters' instantiation methods expected quoted args since they are coming from 479 // the shell, so add them here, though it shouldn't really be needed :-/ 480 quotedArgs.add(Bytes.toBytes("'" + stringArg + "'")); 481 } 482 return quotedArgs; 483 } 484 485 /** 486 * Attempt to filter out the keyvalue 487 * @param c {@link Cell} on which to apply the filter 488 * @return <tt>null</tt> if the key should not be written, otherwise returns the original 489 * {@link Cell} 490 */ 491 public static ExtendedCell filterKv(Filter filter, ExtendedCell c) throws IOException { 492 // apply the filter and skip this kv if the filter doesn't apply 493 if (filter != null) { 494 Filter.ReturnCode code = filter.filterCell(c); 495 if (LOG.isTraceEnabled()) { 496 LOG.trace("Filter returned:" + code + " for the cell:" + c); 497 } 498 // if its not an accept type, then skip this kv 499 if ( 500 !(code.equals(Filter.ReturnCode.INCLUDE) 501 || code.equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL)) 502 ) { 503 return null; 504 } 505 } 506 return c; 507 } 508 509 // helper: create a new KeyValue based on CF rename map 510 private static ExtendedCell convertKv(ExtendedCell kv, Map<byte[], byte[]> cfRenameMap) { 511 if (cfRenameMap != null) { 512 // If there's a rename mapping for this CF, create a new KeyValue 513 byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv)); 514 if (newCfName != null) { 515 List<Tag> tags = PrivateCellUtil.getTags(kv); 516 kv = new KeyValue(kv.getRowArray(), // row buffer 517 kv.getRowOffset(), // row offset 518 kv.getRowLength(), // row length 519 newCfName, // CF buffer 520 0, // CF offset 521 newCfName.length, // CF length 522 kv.getQualifierArray(), // qualifier buffer 523 kv.getQualifierOffset(), // qualifier offset 524 kv.getQualifierLength(), // qualifier length 525 kv.getTimestamp(), // timestamp 526 KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type 527 kv.getValueArray(), // value buffer 528 kv.getValueOffset(), // value offset 529 kv.getValueLength(), // value length 530 tags.size() == 0 ? null : tags); 531 } 532 } 533 return kv; 534 } 535 536 // helper: make a map from sourceCfName to destCfName by parsing a config key 537 private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) { 538 Map<byte[], byte[]> cfRenameMap = null; 539 String allMappingsPropVal = conf.get(CF_RENAME_PROP); 540 if (allMappingsPropVal != null) { 541 // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,... 542 String[] allMappings = allMappingsPropVal.split(","); 543 for (String mapping : allMappings) { 544 if (cfRenameMap == null) { 545 cfRenameMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 546 } 547 String[] srcAndDest = mapping.split(":"); 548 if (srcAndDest.length != 2) { 549 continue; 550 } 551 cfRenameMap.put(Bytes.toBytes(srcAndDest[0]), Bytes.toBytes(srcAndDest[1])); 552 } 553 } 554 return cfRenameMap; 555 } 556 557 /** 558 * <p> 559 * Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells the mapper 560 * how to rename column families. 561 * <p> 562 * Alternately, instead of calling this function, you could set the configuration key 563 * {@link #CF_RENAME_PROP} yourself. The value should look like 564 * 565 * <pre> 566 * srcCf1:destCf1,srcCf2:destCf2,.... 567 * </pre> 568 * 569 * . This would have the same effect on the mapper behavior. 570 * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be set 571 * @param renameMap a mapping from source CF names to destination CF names 572 */ 573 static public void configureCfRenaming(Configuration conf, Map<String, String> renameMap) { 574 StringBuilder sb = new StringBuilder(); 575 for (Map.Entry<String, String> entry : renameMap.entrySet()) { 576 String sourceCf = entry.getKey(); 577 String destCf = entry.getValue(); 578 579 if ( 580 sourceCf.contains(":") || sourceCf.contains(",") || destCf.contains(":") 581 || destCf.contains(",") 582 ) { 583 throw new IllegalArgumentException( 584 "Illegal character in CF names: " + sourceCf + ", " + destCf); 585 } 586 587 if (sb.length() != 0) { 588 sb.append(","); 589 } 590 sb.append(sourceCf + ":" + destCf); 591 } 592 conf.set(CF_RENAME_PROP, sb.toString()); 593 } 594 595 /** 596 * Add a Filter to be instantiated on import 597 * @param conf Configuration to update (will be passed to the job) 598 * @param clazz {@link Filter} subclass to instantiate on the server. 599 * @param filterArgs List of arguments to pass to the filter on instantiation 600 */ 601 public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz, 602 List<String> filterArgs) throws IOException { 603 conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName()); 604 conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()])); 605 } 606 607 /** 608 * Sets up the actual job. 609 * @param conf The current configuration. 610 * @param args The command line parameters. 611 * @return The newly created job. 612 * @throws IOException When setting up the job fails. 613 */ 614 public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException { 615 TableName tableName = TableName.valueOf(args[0]); 616 conf.set(TABLE_NAME, tableName.getNameAsString()); 617 Path inputDir = new Path(args[1]); 618 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 619 job.setJarByClass(Importer.class); 620 FileInputFormat.setInputPaths(job, inputDir); 621 job.setInputFormatClass(SequenceFileInputFormat.class); 622 String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); 623 624 // make sure we get the filter in the jars 625 try { 626 Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class); 627 if (filter != null) { 628 TableMapReduceUtil.addDependencyJarsForClasses(conf, filter); 629 } 630 } catch (Exception e) { 631 throw new IOException(e); 632 } 633 634 if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) { 635 LOG.info("Use Large Result!!"); 636 try (Connection conn = ConnectionFactory.createConnection(conf); 637 Table table = conn.getTable(tableName); 638 RegionLocator regionLocator = conn.getRegionLocator(tableName)) { 639 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 640 job.setMapperClass(CellSortImporter.class); 641 job.setReducerClass(CellReducer.class); 642 Path outputDir = new Path(hfileOutPath); 643 FileOutputFormat.setOutputPath(job, outputDir); 644 job.setMapOutputKeyClass(CellWritableComparable.class); 645 job.setMapOutputValueClass(MapReduceExtendedCell.class); 646 job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class", 647 CellWritableComparable.CellWritableComparator.class, RawComparator.class); 648 Path partitionsPath = 649 new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration())); 650 FileSystem fs = FileSystem.get(job.getConfiguration()); 651 fs.deleteOnExit(partitionsPath); 652 job.setPartitionerClass(CellWritableComparablePartitioner.class); 653 job.setNumReduceTasks(regionLocator.getStartKeys().length); 654 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 655 org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class); 656 } 657 } else if (hfileOutPath != null) { 658 LOG.info("writing to hfiles for bulk load."); 659 job.setMapperClass(CellImporter.class); 660 try (Connection conn = ConnectionFactory.createConnection(conf); 661 Table table = conn.getTable(tableName); 662 RegionLocator regionLocator = conn.getRegionLocator(tableName)) { 663 job.setReducerClass(CellSortReducer.class); 664 Path outputDir = new Path(hfileOutPath); 665 FileOutputFormat.setOutputPath(job, outputDir); 666 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 667 job.setMapOutputValueClass(MapReduceExtendedCell.class); 668 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 669 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 670 org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class); 671 } 672 } else { 673 LOG.info("writing directly to table from Mapper."); 674 // No reducers. Just write straight to table. Call initTableReducerJob 675 // because it sets up the TableOutputFormat. 676 job.setMapperClass(Importer.class); 677 TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job); 678 job.setNumReduceTasks(0); 679 } 680 return job; 681 } 682 683 /* 684 * @param errorMsg Error message. Can be null. 685 */ 686 private static void usage(final String errorMsg) { 687 if (errorMsg != null && errorMsg.length() > 0) { 688 System.err.println("ERROR: " + errorMsg); 689 } 690 System.err.println("Usage: Import [options] <tablename> <inputdir>"); 691 System.err.println("By default Import will load data directly into HBase. To instead generate"); 692 System.err.println("HFiles of data to prepare for a bulk data load, pass the option:"); 693 System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); 694 System.err.println("If there is a large result that includes too much Cell " 695 + "whitch can occur OOME caused by the memery sort in reducer, pass the option:"); 696 System.err.println(" -D" + HAS_LARGE_RESULT + "=true"); 697 System.err 698 .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use"); 699 System.err.println(" -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>"); 700 System.err.println(" -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter"); 701 System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the " 702 + CF_RENAME_PROP + " property. Futher, filters will only use the" 703 + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify " 704 + " whether the current row needs to be ignored completely for processing and " 705 + " Filter#filterCell(Cell) method to determine if the Cell should be added;" 706 + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including" 707 + " the Cell."); 708 System.err.println("To import data exported from HBase 0.94, use"); 709 System.err.println(" -Dhbase.import.version=0.94"); 710 System.err.println(" -D " + JOB_NAME_CONF_KEY 711 + "=jobName - use the specified mapreduce job name for the import"); 712 System.err.println("For performance consider the following options:\n" 713 + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false\n" 714 + " -D" + WAL_DURABILITY + "=<Used while writing data to hbase." 715 + " Allowed values are the supported durability values" 716 + " like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>"); 717 } 718 719 /** 720 * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we 721 * need to flush all the regions of the table as the data is held in memory and is also not 722 * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the 723 * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL} 724 */ 725 public static void flushRegionsIfNecessary(Configuration conf) 726 throws IOException, InterruptedException { 727 String tableName = conf.get(TABLE_NAME); 728 Admin hAdmin = null; 729 Connection connection = null; 730 String durability = conf.get(WAL_DURABILITY); 731 // Need to flush if the data is written to hbase and skip wal is enabled. 732 if ( 733 conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null 734 && Durability.SKIP_WAL.name().equalsIgnoreCase(durability) 735 ) { 736 LOG.info("Flushing all data that skipped the WAL."); 737 try { 738 connection = ConnectionFactory.createConnection(conf); 739 hAdmin = connection.getAdmin(); 740 hAdmin.flush(TableName.valueOf(tableName)); 741 } finally { 742 if (hAdmin != null) { 743 hAdmin.close(); 744 } 745 if (connection != null) { 746 connection.close(); 747 } 748 } 749 } 750 } 751 752 @Override 753 public int run(String[] args) throws Exception { 754 if (args.length < 2) { 755 usage("Wrong number of arguments: " + args.length); 756 return -1; 757 } 758 String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER); 759 if (inputVersionString != null) { 760 getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString); 761 } 762 Job job = createSubmittableJob(getConf(), args); 763 boolean isJobSuccessful = job.waitForCompletion(true); 764 if (isJobSuccessful) { 765 // Flush all the regions of the table 766 flushRegionsIfNecessary(getConf()); 767 } 768 long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue(); 769 long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue(); 770 if (outputRecords < inputRecords) { 771 System.err.println("Warning, not all records were imported (maybe filtered out)."); 772 if (outputRecords == 0) { 773 System.err.println("If the data was exported from HBase 0.94 " 774 + "consider using -Dhbase.import.version=0.94."); 775 } 776 } 777 778 return (isJobSuccessful ? 0 : 1); 779 } 780 781 /** 782 * Main entry point. 783 * @param args The command line parameters. 784 * @throws Exception When running the job fails. 785 */ 786 public static void main(String[] args) throws Exception { 787 int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args); 788 System.exit(errCode); 789 } 790 791}