001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.ByteArrayInputStream; 022import java.io.DataInput; 023import java.io.DataInputStream; 024import java.io.DataOutput; 025import java.io.IOException; 026import java.lang.reflect.InvocationTargetException; 027import java.lang.reflect.Method; 028import java.util.ArrayList; 029import java.util.Collections; 030import java.util.List; 031import java.util.Locale; 032import java.util.Map; 033import java.util.TreeMap; 034import java.util.UUID; 035 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.conf.Configured; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.Cell; 041import org.apache.hadoop.hbase.CellComparator; 042import org.apache.hadoop.hbase.CellUtil; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.PrivateCellUtil; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.KeyValueUtil; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.ZooKeeperConnectionException; 049import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 050import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.apache.hadoop.hbase.client.Admin; 053import org.apache.hadoop.hbase.client.Connection; 054import org.apache.hadoop.hbase.client.ConnectionFactory; 055import org.apache.hadoop.hbase.client.Delete; 056import org.apache.hadoop.hbase.client.Durability; 057import org.apache.hadoop.hbase.client.Mutation; 058import org.apache.hadoop.hbase.client.Put; 059import org.apache.hadoop.hbase.client.RegionLocator; 060import org.apache.hadoop.hbase.client.Result; 061import org.apache.hadoop.hbase.client.Table; 062import org.apache.hadoop.hbase.filter.Filter; 063import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 066import org.apache.hadoop.io.RawComparator; 067import org.apache.hadoop.io.WritableComparable; 068import org.apache.hadoop.io.WritableComparator; 069import org.apache.hadoop.mapreduce.Job; 070import org.apache.hadoop.mapreduce.Partitioner; 071import org.apache.hadoop.mapreduce.Reducer; 072import org.apache.hadoop.mapreduce.TaskCounter; 073import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 074import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 075import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 076import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 077import org.apache.hadoop.util.Tool; 078import org.apache.hadoop.util.ToolRunner; 079import org.apache.zookeeper.KeeperException; 080import org.slf4j.Logger; 081import org.slf4j.LoggerFactory; 082 083 084/** 085 * Import data written by {@link Export}. 086 */ 087@InterfaceAudience.Public 088public class Import extends Configured implements Tool { 089 private static final Logger LOG = LoggerFactory.getLogger(Import.class); 090 final static String NAME = "import"; 091 public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS"; 092 public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output"; 093 public final static String FILTER_CLASS_CONF_KEY = "import.filter.class"; 094 public final static String FILTER_ARGS_CONF_KEY = "import.filter.args"; 095 public final static String TABLE_NAME = "import.table.name"; 096 public final static String WAL_DURABILITY = "import.wal.durability"; 097 public final static String HAS_LARGE_RESULT= "import.bulk.hasLargeResult"; 098 099 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 100 101 public static class CellWritableComparablePartitioner 102 extends Partitioner<CellWritableComparable, Cell> { 103 private static CellWritableComparable[] START_KEYS = null; 104 @Override 105 public int getPartition(CellWritableComparable key, Cell value, 106 int numPartitions) { 107 for (int i = 0; i < START_KEYS.length; ++i) { 108 if (key.compareTo(START_KEYS[i]) <= 0) { 109 return i; 110 } 111 } 112 return START_KEYS.length; 113 } 114 115 } 116 117 /** 118 * @deprecated Use {@link CellWritableComparablePartitioner}. Will be removed 119 * from 3.0 onwards 120 */ 121 @Deprecated 122 public static class KeyValueWritableComparablePartitioner 123 extends Partitioner<KeyValueWritableComparable, KeyValue> { 124 private static KeyValueWritableComparable[] START_KEYS = null; 125 126 @Override 127 public int getPartition(KeyValueWritableComparable key, KeyValue value, int numPartitions) { 128 for (int i = 0; i < START_KEYS.length; ++i) { 129 if (key.compareTo(START_KEYS[i]) <= 0) { 130 return i; 131 } 132 } 133 return START_KEYS.length; 134 } 135 } 136 137 public static class KeyValueWritableComparable 138 implements WritableComparable<KeyValueWritableComparable> { 139 140 private KeyValue kv = null; 141 142 static { 143 // register this comparator 144 WritableComparator.define(KeyValueWritableComparable.class, new KeyValueWritableComparator()); 145 } 146 147 public KeyValueWritableComparable() { 148 } 149 150 public KeyValueWritableComparable(KeyValue kv) { 151 this.kv = kv; 152 } 153 154 @Override 155 public void write(DataOutput out) throws IOException { 156 KeyValue.write(kv, out); 157 } 158 159 @Override 160 public void readFields(DataInput in) throws IOException { 161 kv = KeyValue.create(in); 162 } 163 164 @Override 165 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS", 166 justification = "This is wrong, yes, but we should be purging Writables, not fixing them") 167 public int compareTo(KeyValueWritableComparable o) { 168 return CellComparator.getInstance().compare(this.kv, o.kv); 169 } 170 171 public static class KeyValueWritableComparator extends WritableComparator { 172 173 @Override 174 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 175 try { 176 KeyValueWritableComparable kv1 = new KeyValueWritableComparable(); 177 kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1))); 178 KeyValueWritableComparable kv2 = new KeyValueWritableComparable(); 179 kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2))); 180 return compare(kv1, kv2); 181 } catch (IOException e) { 182 throw new RuntimeException(e); 183 } 184 } 185 186 } 187 188 } 189 190 public static class CellWritableComparable 191 implements WritableComparable<CellWritableComparable> { 192 193 private Cell kv = null; 194 195 static { 196 // register this comparator 197 WritableComparator.define(CellWritableComparable.class, 198 new CellWritableComparator()); 199 } 200 201 public CellWritableComparable() { 202 } 203 204 public CellWritableComparable(Cell kv) { 205 this.kv = kv; 206 } 207 208 @Override 209 public void write(DataOutput out) throws IOException { 210 out.writeInt(PrivateCellUtil.estimatedSerializedSizeOfKey(kv)); 211 out.writeInt(0); 212 PrivateCellUtil.writeFlatKey(kv, out); 213 } 214 215 @Override 216 public void readFields(DataInput in) throws IOException { 217 kv = KeyValue.create(in); 218 } 219 220 @Override 221 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS", 222 justification = "This is wrong, yes, but we should be purging Writables, not fixing them") 223 public int compareTo(CellWritableComparable o) { 224 return CellComparator.getInstance().compare(this.kv, o.kv); 225 } 226 227 public static class CellWritableComparator extends WritableComparator { 228 229 @Override 230 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 231 try { 232 CellWritableComparable kv1 = new CellWritableComparable(); 233 kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1))); 234 CellWritableComparable kv2 = new CellWritableComparable(); 235 kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2))); 236 return compare(kv1, kv2); 237 } catch (IOException e) { 238 throw new RuntimeException(e); 239 } 240 } 241 242 } 243 244 } 245 246 /** 247 * @deprecated Use {@link CellReducer}. Will be removed from 3.0 onwards 248 */ 249 @Deprecated 250 public static class KeyValueReducer 251 extends Reducer<KeyValueWritableComparable, KeyValue, ImmutableBytesWritable, KeyValue> { 252 protected void reduce(KeyValueWritableComparable row, Iterable<KeyValue> kvs, 253 Reducer<KeyValueWritableComparable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context) 254 throws java.io.IOException, InterruptedException { 255 int index = 0; 256 for (KeyValue kv : kvs) { 257 context.write(new ImmutableBytesWritable(kv.getRowArray()), kv); 258 if (++index % 100 == 0) context.setStatus("Wrote " + index + " KeyValues, " 259 + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray())); 260 } 261 } 262 } 263 264 public static class CellReducer 265 extends 266 Reducer<CellWritableComparable, Cell, ImmutableBytesWritable, Cell> { 267 protected void reduce( 268 CellWritableComparable row, 269 Iterable<Cell> kvs, 270 Reducer<CellWritableComparable, 271 Cell, ImmutableBytesWritable, Cell>.Context context) 272 throws java.io.IOException, InterruptedException { 273 int index = 0; 274 for (Cell kv : kvs) { 275 context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), 276 new MapReduceExtendedCell(kv)); 277 if (++index % 100 == 0) 278 context.setStatus("Wrote " + index + " KeyValues, " 279 + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray())); 280 } 281 } 282 } 283 284 /** 285 * @deprecated Use {@link CellSortImporter}. Will be removed from 3.0 onwards 286 */ 287 @Deprecated 288 public static class KeyValueSortImporter 289 extends TableMapper<KeyValueWritableComparable, KeyValue> { 290 private Map<byte[], byte[]> cfRenameMap; 291 private Filter filter; 292 private static final Logger LOG = LoggerFactory.getLogger(KeyValueSortImporter.class); 293 294 /** 295 * @param row The current table row key. 296 * @param value The columns. 297 * @param context The current context. 298 * @throws IOException When something is broken with the data. 299 */ 300 @Override 301 public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException { 302 try { 303 if (LOG.isTraceEnabled()) { 304 LOG.trace( 305 "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength())); 306 } 307 if (filter == null || !filter.filterRowKey( 308 PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))) { 309 for (Cell kv : value.rawCells()) { 310 kv = filterKv(filter, kv); 311 // skip if we filtered it out 312 if (kv == null) continue; 313 // TODO get rid of ensureKeyValue 314 KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)); 315 context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret); 316 } 317 } 318 } catch (InterruptedException e) { 319 e.printStackTrace(); 320 } 321 } 322 323 @Override 324 public void setup(Context context) throws IOException { 325 cfRenameMap = createCfRenameMap(context.getConfiguration()); 326 filter = instantiateFilter(context.getConfiguration()); 327 int reduceNum = context.getNumReduceTasks(); 328 Configuration conf = context.getConfiguration(); 329 TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME)); 330 try (Connection conn = ConnectionFactory.createConnection(conf); 331 RegionLocator regionLocator = conn.getRegionLocator(tableName)) { 332 byte[][] startKeys = regionLocator.getStartKeys(); 333 if (startKeys.length != reduceNum) { 334 throw new IOException("Region split after job initialization"); 335 } 336 KeyValueWritableComparable[] startKeyWraps = 337 new KeyValueWritableComparable[startKeys.length - 1]; 338 for (int i = 1; i < startKeys.length; ++i) { 339 startKeyWraps[i - 1] = 340 new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i])); 341 } 342 KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps; 343 } 344 } 345 } 346 347 /** 348 * A mapper that just writes out KeyValues. 349 * @deprecated Use {@link CellImporter}. Will be removed from 3.0 onwards 350 */ 351 @Deprecated 352 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS", 353 justification = "Writables are going away and this has been this way forever") 354 public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> { 355 private Map<byte[], byte[]> cfRenameMap; 356 private Filter filter; 357 private static final Logger LOG = LoggerFactory.getLogger(KeyValueImporter.class); 358 359 /** 360 * @param row The current table row key. 361 * @param value The columns. 362 * @param context The current context. 363 * @throws IOException When something is broken with the data. 364 */ 365 @Override 366 public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException { 367 try { 368 if (LOG.isTraceEnabled()) { 369 LOG.trace( 370 "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength())); 371 } 372 if (filter == null || !filter.filterRowKey( 373 PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))) { 374 for (Cell kv : value.rawCells()) { 375 kv = filterKv(filter, kv); 376 // skip if we filtered it out 377 if (kv == null) continue; 378 // TODO get rid of ensureKeyValue 379 context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap))); 380 } 381 } 382 } catch (InterruptedException e) { 383 e.printStackTrace(); 384 } 385 } 386 387 @Override 388 public void setup(Context context) { 389 cfRenameMap = createCfRenameMap(context.getConfiguration()); 390 filter = instantiateFilter(context.getConfiguration()); 391 } 392 } 393 394 public static class CellSortImporter 395 extends TableMapper<CellWritableComparable, Cell> { 396 private Map<byte[], byte[]> cfRenameMap; 397 private Filter filter; 398 private static final Logger LOG = LoggerFactory.getLogger(CellImporter.class); 399 400 /** 401 * @param row The current table row key. 402 * @param value The columns. 403 * @param context The current context. 404 * @throws IOException When something is broken with the data. 405 */ 406 @Override 407 public void map(ImmutableBytesWritable row, Result value, 408 Context context) 409 throws IOException { 410 try { 411 if (LOG.isTraceEnabled()) { 412 LOG.trace("Considering the row." 413 + Bytes.toString(row.get(), row.getOffset(), row.getLength())); 414 } 415 if (filter == null 416 || !filter.filterRowKey(PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), 417 (short) row.getLength()))) { 418 for (Cell kv : value.rawCells()) { 419 kv = filterKv(filter, kv); 420 // skip if we filtered it out 421 if (kv == null) continue; 422 Cell ret = convertKv(kv, cfRenameMap); 423 context.write(new CellWritableComparable(ret), ret); 424 } 425 } 426 } catch (InterruptedException e) { 427 e.printStackTrace(); 428 } 429 } 430 431 @Override 432 public void setup(Context context) throws IOException { 433 cfRenameMap = createCfRenameMap(context.getConfiguration()); 434 filter = instantiateFilter(context.getConfiguration()); 435 int reduceNum = context.getNumReduceTasks(); 436 Configuration conf = context.getConfiguration(); 437 TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME)); 438 try (Connection conn = ConnectionFactory.createConnection(conf); 439 RegionLocator regionLocator = conn.getRegionLocator(tableName)) { 440 byte[][] startKeys = regionLocator.getStartKeys(); 441 if (startKeys.length != reduceNum) { 442 throw new IOException("Region split after job initialization"); 443 } 444 CellWritableComparable[] startKeyWraps = 445 new CellWritableComparable[startKeys.length - 1]; 446 for (int i = 1; i < startKeys.length; ++i) { 447 startKeyWraps[i - 1] = 448 new CellWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i])); 449 } 450 CellWritableComparablePartitioner.START_KEYS = startKeyWraps; 451 } 452 } 453 } 454 455 /** 456 * A mapper that just writes out KeyValues. 457 */ 458 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS", 459 justification="Writables are going away and this has been this way forever") 460 public static class CellImporter extends TableMapper<ImmutableBytesWritable, Cell> { 461 private Map<byte[], byte[]> cfRenameMap; 462 private Filter filter; 463 private static final Logger LOG = LoggerFactory.getLogger(CellImporter.class); 464 465 /** 466 * @param row The current table row key. 467 * @param value The columns. 468 * @param context The current context. 469 * @throws IOException When something is broken with the data. 470 */ 471 @Override 472 public void map(ImmutableBytesWritable row, Result value, 473 Context context) 474 throws IOException { 475 try { 476 if (LOG.isTraceEnabled()) { 477 LOG.trace("Considering the row." 478 + Bytes.toString(row.get(), row.getOffset(), row.getLength())); 479 } 480 if (filter == null 481 || !filter.filterRowKey(PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), 482 (short) row.getLength()))) { 483 for (Cell kv : value.rawCells()) { 484 kv = filterKv(filter, kv); 485 // skip if we filtered it out 486 if (kv == null) continue; 487 context.write(row, new MapReduceExtendedCell(convertKv(kv, cfRenameMap))); 488 } 489 } 490 } catch (InterruptedException e) { 491 e.printStackTrace(); 492 } 493 } 494 495 @Override 496 public void setup(Context context) { 497 cfRenameMap = createCfRenameMap(context.getConfiguration()); 498 filter = instantiateFilter(context.getConfiguration()); 499 } 500 } 501 502 /** 503 * Write table content out to files in hdfs. 504 */ 505 public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> { 506 private Map<byte[], byte[]> cfRenameMap; 507 private List<UUID> clusterIds; 508 private Filter filter; 509 private Durability durability; 510 511 /** 512 * @param row The current table row key. 513 * @param value The columns. 514 * @param context The current context. 515 * @throws IOException When something is broken with the data. 516 */ 517 @Override 518 public void map(ImmutableBytesWritable row, Result value, 519 Context context) 520 throws IOException { 521 try { 522 writeResult(row, value, context); 523 } catch (InterruptedException e) { 524 e.printStackTrace(); 525 } 526 } 527 528 private void writeResult(ImmutableBytesWritable key, Result result, Context context) 529 throws IOException, InterruptedException { 530 Put put = null; 531 Delete delete = null; 532 if (LOG.isTraceEnabled()) { 533 LOG.trace("Considering the row." 534 + Bytes.toString(key.get(), key.getOffset(), key.getLength())); 535 } 536 if (filter == null 537 || !filter.filterRowKey(PrivateCellUtil.createFirstOnRow(key.get(), key.getOffset(), 538 (short) key.getLength()))) { 539 processKV(key, result, context, put, delete); 540 } 541 } 542 543 protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put, 544 Delete delete) throws IOException, InterruptedException { 545 for (Cell kv : result.rawCells()) { 546 kv = filterKv(filter, kv); 547 // skip if we filter it out 548 if (kv == null) continue; 549 550 kv = convertKv(kv, cfRenameMap); 551 // Deletes and Puts are gathered and written when finished 552 /* 553 * If there are sequence of mutations and tombstones in an Export, and after Import the same 554 * sequence should be restored as it is. If we combine all Delete tombstones into single 555 * request then there is chance of ignoring few DeleteFamily tombstones, because if we 556 * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining 557 * only newest in hbase table and ignoring other. Check - HBASE-12065 558 */ 559 if (PrivateCellUtil.isDeleteFamily(kv)) { 560 Delete deleteFamily = new Delete(key.get()); 561 deleteFamily.add(kv); 562 if (durability != null) { 563 deleteFamily.setDurability(durability); 564 } 565 deleteFamily.setClusterIds(clusterIds); 566 context.write(key, deleteFamily); 567 } else if (CellUtil.isDelete(kv)) { 568 if (delete == null) { 569 delete = new Delete(key.get()); 570 } 571 delete.add(kv); 572 } else { 573 if (put == null) { 574 put = new Put(key.get()); 575 } 576 addPutToKv(put, kv); 577 } 578 } 579 if (put != null) { 580 if (durability != null) { 581 put.setDurability(durability); 582 } 583 put.setClusterIds(clusterIds); 584 context.write(key, put); 585 } 586 if (delete != null) { 587 if (durability != null) { 588 delete.setDurability(durability); 589 } 590 delete.setClusterIds(clusterIds); 591 context.write(key, delete); 592 } 593 } 594 595 protected void addPutToKv(Put put, Cell kv) throws IOException { 596 put.add(kv); 597 } 598 599 @Override 600 public void setup(Context context) { 601 LOG.info("Setting up " + getClass() + " mapper."); 602 Configuration conf = context.getConfiguration(); 603 cfRenameMap = createCfRenameMap(conf); 604 filter = instantiateFilter(conf); 605 String durabilityStr = conf.get(WAL_DURABILITY); 606 if(durabilityStr != null){ 607 durability = Durability.valueOf(durabilityStr.toUpperCase(Locale.ROOT)); 608 LOG.info("setting WAL durability to " + durability); 609 } else { 610 LOG.info("setting WAL durability to default."); 611 } 612 // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid. 613 ZKWatcher zkw = null; 614 Exception ex = null; 615 try { 616 zkw = new ZKWatcher(conf, context.getTaskAttemptID().toString(), null); 617 clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw)); 618 } catch (ZooKeeperConnectionException e) { 619 ex = e; 620 LOG.error("Problem connecting to ZooKeper during task setup", e); 621 } catch (KeeperException e) { 622 ex = e; 623 LOG.error("Problem reading ZooKeeper data during task setup", e); 624 } catch (IOException e) { 625 ex = e; 626 LOG.error("Problem setting up task", e); 627 } finally { 628 if (zkw != null) zkw.close(); 629 } 630 if (clusterIds == null) { 631 // exit early if setup fails 632 throw new RuntimeException(ex); 633 } 634 } 635 } 636 637 /** 638 * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to 639 * optionally not include in the job output 640 * @param conf {@link Configuration} from which to load the filter 641 * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used 642 * @throws IllegalArgumentException if the filter is misconfigured 643 */ 644 public static Filter instantiateFilter(Configuration conf) { 645 // get the filter, if it was configured 646 Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class); 647 if (filterClass == null) { 648 LOG.debug("No configured filter class, accepting all keyvalues."); 649 return null; 650 } 651 LOG.debug("Attempting to create filter:" + filterClass); 652 String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY); 653 ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs); 654 try { 655 Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class); 656 return (Filter) m.invoke(null, quotedArgs); 657 } catch (IllegalAccessException e) { 658 LOG.error("Couldn't instantiate filter!", e); 659 throw new RuntimeException(e); 660 } catch (SecurityException e) { 661 LOG.error("Couldn't instantiate filter!", e); 662 throw new RuntimeException(e); 663 } catch (NoSuchMethodException e) { 664 LOG.error("Couldn't instantiate filter!", e); 665 throw new RuntimeException(e); 666 } catch (IllegalArgumentException e) { 667 LOG.error("Couldn't instantiate filter!", e); 668 throw new RuntimeException(e); 669 } catch (InvocationTargetException e) { 670 LOG.error("Couldn't instantiate filter!", e); 671 throw new RuntimeException(e); 672 } 673 } 674 675 private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) { 676 ArrayList<byte[]> quotedArgs = new ArrayList<>(); 677 for (String stringArg : stringArgs) { 678 // all the filters' instantiation methods expected quoted args since they are coming from 679 // the shell, so add them here, though it shouldn't really be needed :-/ 680 quotedArgs.add(Bytes.toBytes("'" + stringArg + "'")); 681 } 682 return quotedArgs; 683 } 684 685 /** 686 * Attempt to filter out the keyvalue 687 * @param c {@link Cell} on which to apply the filter 688 * @return <tt>null</tt> if the key should not be written, otherwise returns the original 689 * {@link Cell} 690 */ 691 public static Cell filterKv(Filter filter, Cell c) throws IOException { 692 // apply the filter and skip this kv if the filter doesn't apply 693 if (filter != null) { 694 Filter.ReturnCode code = filter.filterCell(c); 695 if (LOG.isTraceEnabled()) { 696 LOG.trace("Filter returned:" + code + " for the cell:" + c); 697 } 698 // if its not an accept type, then skip this kv 699 if (!(code.equals(Filter.ReturnCode.INCLUDE) || code 700 .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) { 701 return null; 702 } 703 } 704 return c; 705 } 706 707 // helper: create a new KeyValue based on CF rename map 708 private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) { 709 if(cfRenameMap != null) { 710 // If there's a rename mapping for this CF, create a new KeyValue 711 byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv)); 712 if (newCfName != null) { 713 kv = new KeyValue(kv.getRowArray(), // row buffer 714 kv.getRowOffset(), // row offset 715 kv.getRowLength(), // row length 716 newCfName, // CF buffer 717 0, // CF offset 718 newCfName.length, // CF length 719 kv.getQualifierArray(), // qualifier buffer 720 kv.getQualifierOffset(), // qualifier offset 721 kv.getQualifierLength(), // qualifier length 722 kv.getTimestamp(), // timestamp 723 KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type 724 kv.getValueArray(), // value buffer 725 kv.getValueOffset(), // value offset 726 kv.getValueLength()); // value length 727 } 728 } 729 return kv; 730 } 731 732 // helper: make a map from sourceCfName to destCfName by parsing a config key 733 private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) { 734 Map<byte[], byte[]> cfRenameMap = null; 735 String allMappingsPropVal = conf.get(CF_RENAME_PROP); 736 if(allMappingsPropVal != null) { 737 // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,... 738 String[] allMappings = allMappingsPropVal.split(","); 739 for (String mapping: allMappings) { 740 if(cfRenameMap == null) { 741 cfRenameMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 742 } 743 String [] srcAndDest = mapping.split(":"); 744 if(srcAndDest.length != 2) { 745 continue; 746 } 747 cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes()); 748 } 749 } 750 return cfRenameMap; 751 } 752 753 /** 754 * <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells 755 * the mapper how to rename column families. 756 * 757 * <p>Alternately, instead of calling this function, you could set the configuration key 758 * {@link #CF_RENAME_PROP} yourself. The value should look like 759 * <pre>srcCf1:destCf1,srcCf2:destCf2,....</pre>. This would have the same effect on 760 * the mapper behavior. 761 * 762 * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be 763 * set 764 * @param renameMap a mapping from source CF names to destination CF names 765 */ 766 static public void configureCfRenaming(Configuration conf, 767 Map<String, String> renameMap) { 768 StringBuilder sb = new StringBuilder(); 769 for(Map.Entry<String,String> entry: renameMap.entrySet()) { 770 String sourceCf = entry.getKey(); 771 String destCf = entry.getValue(); 772 773 if(sourceCf.contains(":") || sourceCf.contains(",") || 774 destCf.contains(":") || destCf.contains(",")) { 775 throw new IllegalArgumentException("Illegal character in CF names: " 776 + sourceCf + ", " + destCf); 777 } 778 779 if(sb.length() != 0) { 780 sb.append(","); 781 } 782 sb.append(sourceCf + ":" + destCf); 783 } 784 conf.set(CF_RENAME_PROP, sb.toString()); 785 } 786 787 /** 788 * Add a Filter to be instantiated on import 789 * @param conf Configuration to update (will be passed to the job) 790 * @param clazz {@link Filter} subclass to instantiate on the server. 791 * @param filterArgs List of arguments to pass to the filter on instantiation 792 */ 793 public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz, 794 List<String> filterArgs) throws IOException { 795 conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName()); 796 conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()])); 797 } 798 799 /** 800 * Sets up the actual job. 801 * @param conf The current configuration. 802 * @param args The command line parameters. 803 * @return The newly created job. 804 * @throws IOException When setting up the job fails. 805 */ 806 public static Job createSubmittableJob(Configuration conf, String[] args) 807 throws IOException { 808 TableName tableName = TableName.valueOf(args[0]); 809 conf.set(TABLE_NAME, tableName.getNameAsString()); 810 Path inputDir = new Path(args[1]); 811 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 812 job.setJarByClass(Importer.class); 813 FileInputFormat.setInputPaths(job, inputDir); 814 job.setInputFormatClass(SequenceFileInputFormat.class); 815 String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); 816 817 // make sure we get the filter in the jars 818 try { 819 Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class); 820 if (filter != null) { 821 TableMapReduceUtil.addDependencyJarsForClasses(conf, filter); 822 } 823 } catch (Exception e) { 824 throw new IOException(e); 825 } 826 827 if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) { 828 LOG.info("Use Large Result!!"); 829 try (Connection conn = ConnectionFactory.createConnection(conf); 830 Table table = conn.getTable(tableName); 831 RegionLocator regionLocator = conn.getRegionLocator(tableName)) { 832 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 833 job.setMapperClass(CellSortImporter.class); 834 job.setReducerClass(CellReducer.class); 835 Path outputDir = new Path(hfileOutPath); 836 FileOutputFormat.setOutputPath(job, outputDir); 837 job.setMapOutputKeyClass(CellWritableComparable.class); 838 job.setMapOutputValueClass(MapReduceExtendedCell.class); 839 job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class", 840 CellWritableComparable.CellWritableComparator.class, 841 RawComparator.class); 842 Path partitionsPath = 843 new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration())); 844 FileSystem fs = FileSystem.get(job.getConfiguration()); 845 fs.deleteOnExit(partitionsPath); 846 job.setPartitionerClass(CellWritableComparablePartitioner.class); 847 job.setNumReduceTasks(regionLocator.getStartKeys().length); 848 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 849 org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class); 850 } 851 } else if (hfileOutPath != null) { 852 LOG.info("writing to hfiles for bulk load."); 853 job.setMapperClass(CellImporter.class); 854 try (Connection conn = ConnectionFactory.createConnection(conf); 855 Table table = conn.getTable(tableName); 856 RegionLocator regionLocator = conn.getRegionLocator(tableName)){ 857 job.setReducerClass(CellSortReducer.class); 858 Path outputDir = new Path(hfileOutPath); 859 FileOutputFormat.setOutputPath(job, outputDir); 860 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 861 job.setMapOutputValueClass(MapReduceExtendedCell.class); 862 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 863 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 864 org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class); 865 } 866 } else { 867 LOG.info("writing directly to table from Mapper."); 868 // No reducers. Just write straight to table. Call initTableReducerJob 869 // because it sets up the TableOutputFormat. 870 job.setMapperClass(Importer.class); 871 TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job); 872 job.setNumReduceTasks(0); 873 } 874 return job; 875 } 876 877 /* 878 * @param errorMsg Error message. Can be null. 879 */ 880 private static void usage(final String errorMsg) { 881 if (errorMsg != null && errorMsg.length() > 0) { 882 System.err.println("ERROR: " + errorMsg); 883 } 884 System.err.println("Usage: Import [options] <tablename> <inputdir>"); 885 System.err.println("By default Import will load data directly into HBase. To instead generate"); 886 System.err.println("HFiles of data to prepare for a bulk data load, pass the option:"); 887 System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); 888 System.err.println("If there is a large result that includes too much Cell " 889 + "whitch can occur OOME caused by the memery sort in reducer, pass the option:"); 890 System.err.println(" -D" + HAS_LARGE_RESULT + "=true"); 891 System.err 892 .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use"); 893 System.err.println(" -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>"); 894 System.err.println(" -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter"); 895 System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the " 896 + CF_RENAME_PROP + " property. Futher, filters will only use the" 897 + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify " 898 + " whether the current row needs to be ignored completely for processing and " 899 + " Filter#filterCell(Cell) method to determine if the Cell should be added;" 900 + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including" 901 + " the Cell."); 902 System.err.println("To import data exported from HBase 0.94, use"); 903 System.err.println(" -Dhbase.import.version=0.94"); 904 System.err.println(" -D " + JOB_NAME_CONF_KEY 905 + "=jobName - use the specified mapreduce job name for the import"); 906 System.err.println("For performance consider the following options:\n" 907 + " -Dmapreduce.map.speculative=false\n" 908 + " -Dmapreduce.reduce.speculative=false\n" 909 + " -D" + WAL_DURABILITY + "=<Used while writing data to hbase." 910 +" Allowed values are the supported durability values" 911 +" like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>"); 912 } 913 914 /** 915 * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we 916 * need to flush all the regions of the table as the data is held in memory and is also not 917 * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the 918 * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL} 919 */ 920 public static void flushRegionsIfNecessary(Configuration conf) throws IOException, 921 InterruptedException { 922 String tableName = conf.get(TABLE_NAME); 923 Admin hAdmin = null; 924 Connection connection = null; 925 String durability = conf.get(WAL_DURABILITY); 926 // Need to flush if the data is written to hbase and skip wal is enabled. 927 if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null 928 && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) { 929 LOG.info("Flushing all data that skipped the WAL."); 930 try { 931 connection = ConnectionFactory.createConnection(conf); 932 hAdmin = connection.getAdmin(); 933 hAdmin.flush(TableName.valueOf(tableName)); 934 } finally { 935 if (hAdmin != null) { 936 hAdmin.close(); 937 } 938 if (connection != null) { 939 connection.close(); 940 } 941 } 942 } 943 } 944 945 @Override 946 public int run(String[] args) throws Exception { 947 if (args.length < 2) { 948 usage("Wrong number of arguments: " + args.length); 949 return -1; 950 } 951 String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER); 952 if (inputVersionString != null) { 953 getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString); 954 } 955 Job job = createSubmittableJob(getConf(), args); 956 boolean isJobSuccessful = job.waitForCompletion(true); 957 if(isJobSuccessful){ 958 // Flush all the regions of the table 959 flushRegionsIfNecessary(getConf()); 960 } 961 long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue(); 962 long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue(); 963 if (outputRecords < inputRecords) { 964 System.err.println("Warning, not all records were imported (maybe filtered out)."); 965 if (outputRecords == 0) { 966 System.err.println("If the data was exported from HBase 0.94 "+ 967 "consider using -Dhbase.import.version=0.94."); 968 } 969 } 970 971 return (isJobSuccessful ? 0 : 1); 972 } 973 974 /** 975 * Main entry point. 976 * @param args The command line parameters. 977 * @throws Exception When running the job fails. 978 */ 979 public static void main(String[] args) throws Exception { 980 int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args); 981 System.exit(errCode); 982 } 983 984}