001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.BufferedReader; 023import java.io.ByteArrayOutputStream; 024import java.io.DataInput; 025import java.io.DataOutput; 026import java.io.IOException; 027import java.io.InputStreamReader; 028import java.net.URI; 029import java.net.URISyntaxException; 030import java.nio.charset.StandardCharsets; 031import java.util.ArrayList; 032import java.util.Collection; 033import java.util.HashSet; 034import java.util.List; 035import java.util.Set; 036import java.util.concurrent.CompletableFuture; 037import java.util.concurrent.ExecutionException; 038import java.util.concurrent.ExecutorService; 039import java.util.concurrent.Executors; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicLong; 042import java.util.regex.Matcher; 043import java.util.regex.Pattern; 044import java.util.zip.GZIPInputStream; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.conf.Configured; 047import org.apache.hadoop.fs.FSDataInputStream; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.Cell; 051import org.apache.hadoop.hbase.HBaseConfiguration; 052import org.apache.hadoop.hbase.HBaseTestingUtility; 053import org.apache.hadoop.hbase.HConstants; 054import org.apache.hadoop.hbase.IntegrationTestBase; 055import org.apache.hadoop.hbase.IntegrationTestingUtility; 056import org.apache.hadoop.hbase.MasterNotRunningException; 057import org.apache.hadoop.hbase.TableName; 058import org.apache.hadoop.hbase.client.Admin; 059import org.apache.hadoop.hbase.client.AsyncConnection; 060import org.apache.hadoop.hbase.client.AsyncTable; 061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 063import org.apache.hadoop.hbase.client.Connection; 064import org.apache.hadoop.hbase.client.ConnectionFactory; 065import org.apache.hadoop.hbase.client.Get; 066import org.apache.hadoop.hbase.client.Increment; 067import org.apache.hadoop.hbase.client.Put; 068import org.apache.hadoop.hbase.client.Result; 069import org.apache.hadoop.hbase.client.ScanResultConsumer; 070import org.apache.hadoop.hbase.client.Table; 071import org.apache.hadoop.hbase.client.TableDescriptor; 072import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 073import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 074import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 075import org.apache.hadoop.hbase.regionserver.BloomType; 076import org.apache.hadoop.hbase.test.util.CRC64; 077import org.apache.hadoop.hbase.test.util.warc.WARCInputFormat; 078import org.apache.hadoop.hbase.test.util.warc.WARCRecord; 079import org.apache.hadoop.hbase.test.util.warc.WARCWritable; 080import org.apache.hadoop.hbase.util.Bytes; 081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 082import org.apache.hadoop.hbase.util.RegionSplitter; 083import org.apache.hadoop.io.BytesWritable; 084import org.apache.hadoop.io.LongWritable; 085import org.apache.hadoop.io.NullWritable; 086import org.apache.hadoop.io.SequenceFile.CompressionType; 087import org.apache.hadoop.io.Writable; 088import org.apache.hadoop.mapreduce.Counters; 089import org.apache.hadoop.mapreduce.Job; 090import org.apache.hadoop.mapreduce.JobContext; 091import org.apache.hadoop.mapreduce.Mapper; 092import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 093import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 094import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 095import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 096import org.apache.hadoop.util.Tool; 097import org.apache.hadoop.util.ToolRunner; 098import org.slf4j.Logger; 099import org.slf4j.LoggerFactory; 100 101import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 102import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 103 104/** 105 * This integration test loads successful resource retrieval records from the Common Crawl 106 * (https://commoncrawl.org/) public dataset into an HBase table and writes records that can be used 107 * to later verify the presence and integrity of those records. 108 * <p> 109 * Run like: <blockquote> ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl 110 * \<br> 111 * -Dfs.s3n.awsAccessKeyId=<AWS access key> \<br> 112 * -Dfs.s3n.awsSecretAccessKey=<AWS secret key> \<br> 113 * /path/to/test-CC-MAIN-2021-10-warc.paths.gz \<br> 114 * /path/to/tmp/warc-loader-output </blockquote> 115 * <p> 116 * Access to the Common Crawl dataset in S3 is made available to anyone by Amazon AWS, but Hadoop's 117 * S3N filesystem still requires valid access credentials to initialize. 118 * <p> 119 * The input path can either specify a directory or a file. The file may optionally be compressed 120 * with gzip. If a directory, the loader expects the directory to contain one or more WARC files 121 * from the Common Crawl dataset. If a file, the loader expects a list of Hadoop S3N URIs which 122 * point to S3 locations for one or more WARC files from the Common Crawl dataset, one URI per line. 123 * Lines should be terminated with the UNIX line terminator. 124 * <p> 125 * Included in hbase-it/src/test/resources/CC-MAIN-2021-10-warc.paths.gz is a list of all WARC files 126 * comprising the Q1 2021 crawl archive. There are 64,000 WARC files in this data set, each 127 * containing ~1GB of gzipped data. The WARC files contain several record types, such as metadata, 128 * request, and response, but we only load the response record types. If the HBase table schema does 129 * not specify compression (by default) there is roughly a 10x expansion. Loading the full crawl 130 * archive results in a table approximately 640 TB in size. 131 * <p> 132 * The loader can optionally drive read load during ingest by incrementing counters for each URL 133 * discovered in content. Add <tt>-DIntegrationTestLoadCommonCrawl.increments=true</tt> to the 134 * command line to enable. 135 * <p> 136 * You can also split the Loader and Verify stages: 137 * <p> 138 * Load with: <blockquote> ./bin/hbase 139 * 'org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl$Loader' \<br> 140 * -files /path/to/hadoop-aws.jar \<br> 141 * -Dfs.s3n.awsAccessKeyId=<AWS access key> \<br> 142 * -Dfs.s3n.awsSecretAccessKey=<AWS secret key> \<br> 143 * /path/to/test-CC-MAIN-2021-10-warc.paths.gz \<br> 144 * /path/to/tmp/warc-loader-output </blockquote> 145 * <p> 146 * Note: The hadoop-aws jar will be needed at runtime to instantiate the S3N filesystem. Use the 147 * <tt>-files</tt> ToolRunner argument to add it. 148 * <p> 149 * Verify with: <blockquote> ./bin/hbase 150 * 'org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl$Verify' \<br> 151 * /path/to/tmp/warc-loader-output </blockquote> 152 * <p> 153 */ 154public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { 155 156 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestLoadCommonCrawl.class); 157 158 static final String TABLE_NAME_KEY = "IntegrationTestLoadCommonCrawl.table"; 159 static final String DEFAULT_TABLE_NAME = "IntegrationTestLoadCommonCrawl"; 160 161 static final String INCREMENTS_NAME_KEY = "IntegrationTestLoadCommonCrawl.increments"; 162 static final boolean DEFAULT_INCREMENTS = false; 163 164 static final int MAX_INFLIGHT = 1000; 165 static final int INFLIGHT_PAUSE_MS = 100; 166 167 static final byte[] CONTENT_FAMILY_NAME = Bytes.toBytes("c"); 168 static final byte[] INFO_FAMILY_NAME = Bytes.toBytes("i"); 169 static final byte[] URL_FAMILY_NAME = Bytes.toBytes("u"); 170 static final byte[] SEP = Bytes.toBytes(":"); 171 static final byte[] CONTENT_QUALIFIER = HConstants.EMPTY_BYTE_ARRAY; 172 static final byte[] CONTENT_LENGTH_QUALIFIER = Bytes.toBytes("l"); 173 static final byte[] CONTENT_TYPE_QUALIFIER = Bytes.toBytes("t"); 174 static final byte[] CRC_QUALIFIER = Bytes.toBytes("c"); 175 static final byte[] DATE_QUALIFIER = Bytes.toBytes("d"); 176 static final byte[] IP_ADDRESS_QUALIFIER = Bytes.toBytes("a"); 177 static final byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u"); 178 static final byte[] REF_QUALIFIER = Bytes.toBytes("ref"); 179 180 public static enum Counts { 181 REFERENCED, 182 UNREFERENCED, 183 CORRUPT, 184 RPC_BYTES_WRITTEN, 185 RPC_TIME_MS, 186 } 187 188 protected Path warcFileInputDir = null; 189 protected Path outputDir = null; 190 protected String[] args; 191 192 protected int runLoader(final Path warcFileInputDir, final Path outputDir) throws Exception { 193 Loader loader = new Loader(); 194 loader.setConf(conf); 195 return loader.run(warcFileInputDir, outputDir); 196 } 197 198 protected int runVerify(final Path inputDir) throws Exception { 199 Verify verify = new Verify(); 200 verify.setConf(conf); 201 return verify.run(inputDir); 202 } 203 204 @Override 205 public int run(String[] args) { 206 if (args.length > 0) { 207 warcFileInputDir = new Path(args[0]); 208 if (args.length > 1) { 209 outputDir = new Path(args[1]); 210 } 211 } 212 try { 213 if (warcFileInputDir == null) { 214 throw new IllegalArgumentException("WARC input file or directory not specified"); 215 } 216 if (outputDir == null) { 217 throw new IllegalArgumentException("Output directory not specified"); 218 } 219 int res = runLoader(warcFileInputDir, outputDir); 220 if (res != 0) { 221 LOG.error("Loader failed"); 222 return -1; 223 } 224 return runVerify(outputDir); 225 } catch (Exception e) { 226 LOG.error("Tool failed with exception", e); 227 return -1; 228 } 229 } 230 231 @Override 232 protected void processOptions(final CommandLine cmd) { 233 processBaseOptions(cmd); 234 args = cmd.getArgs(); 235 } 236 237 @Override 238 public void setUpCluster() throws Exception { 239 util = getTestingUtil(getConf()); 240 boolean isDistributed = util.isDistributedCluster(); 241 util.initializeCluster(isDistributed ? 1 : 3); 242 if (!isDistributed) { 243 util.startMiniMapReduceCluster(); 244 } 245 this.setConf(util.getConfiguration()); 246 } 247 248 @Override 249 public void cleanUpCluster() throws Exception { 250 super.cleanUpCluster(); 251 if (util.isDistributedCluster()) { 252 util.shutdownMiniMapReduceCluster(); 253 } 254 } 255 256 static TableName getTablename(final Configuration c) { 257 return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME)); 258 } 259 260 @Override 261 public TableName getTablename() { 262 return getTablename(getConf()); 263 } 264 265 @Override 266 protected Set<String> getColumnFamilies() { 267 Set<String> families = new HashSet<>(); 268 families.add(Bytes.toString(CONTENT_FAMILY_NAME)); 269 families.add(Bytes.toString(INFO_FAMILY_NAME)); 270 families.add(Bytes.toString(URL_FAMILY_NAME)); 271 return families; 272 } 273 274 @Override 275 public int runTestFromCommandLine() throws Exception { 276 return ToolRunner.run(getConf(), this, args); 277 } 278 279 public static void main(String[] args) throws Exception { 280 Configuration conf = HBaseConfiguration.create(); 281 IntegrationTestingUtility.setUseDistributedCluster(conf); 282 int ret = ToolRunner.run(conf, new IntegrationTestLoadCommonCrawl(), args); 283 System.exit(ret); 284 } 285 286 public static class HBaseKeyWritable implements Writable { 287 288 private byte[] row; 289 private int rowOffset; 290 private int rowLength; 291 private byte[] family; 292 private int familyOffset; 293 private int familyLength; 294 private byte[] qualifier; 295 private int qualifierOffset; 296 private int qualifierLength; 297 private long ts; 298 299 public HBaseKeyWritable() { 300 } 301 302 public HBaseKeyWritable(byte[] row, int rowOffset, int rowLength, byte[] family, 303 int familyOffset, int familyLength, byte[] qualifier, int qualifierOffset, 304 int qualifierLength, long ts) { 305 this.row = row; 306 this.rowOffset = rowOffset; 307 this.rowLength = rowLength; 308 this.family = family; 309 this.familyOffset = familyOffset; 310 this.familyLength = familyLength; 311 this.qualifier = qualifier; 312 this.qualifierOffset = qualifierOffset; 313 this.qualifierLength = qualifierLength; 314 this.ts = ts; 315 } 316 317 public HBaseKeyWritable(byte[] row, byte[] family, byte[] qualifier, long ts) { 318 this(row, 0, row.length, family, 0, family.length, qualifier, 0, 319 qualifier != null ? qualifier.length : 0, ts); 320 } 321 322 public HBaseKeyWritable(byte[] row, byte[] family, byte[] qualifier) { 323 this(row, family, qualifier, Long.MAX_VALUE); 324 } 325 326 public HBaseKeyWritable(byte[] row, byte[] family, long ts) { 327 this(row, family, HConstants.EMPTY_BYTE_ARRAY, ts); 328 } 329 330 public HBaseKeyWritable(byte[] row, byte[] family) { 331 this(row, family, Long.MAX_VALUE); 332 } 333 334 public HBaseKeyWritable(Cell cell) { 335 this(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(), 336 cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), 337 cell.getQualifierOffset(), cell.getQualifierLength(), cell.getTimestamp()); 338 } 339 340 @Override 341 public void readFields(DataInput in) throws IOException { 342 this.row = Bytes.toBytes(in.readUTF()); 343 this.rowOffset = 0; 344 this.rowLength = row.length; 345 this.family = Bytes.toBytes(in.readUTF()); 346 this.familyOffset = 0; 347 this.familyLength = family.length; 348 this.qualifier = Bytes.toBytes(in.readUTF()); 349 this.qualifierOffset = 0; 350 this.qualifierLength = qualifier.length; 351 this.ts = in.readLong(); 352 } 353 354 @Override 355 public void write(DataOutput out) throws IOException { 356 out.writeUTF(new String(row, rowOffset, rowLength, StandardCharsets.UTF_8)); 357 out.writeUTF(new String(family, familyOffset, familyLength, StandardCharsets.UTF_8)); 358 if (qualifier != null) { 359 out.writeUTF( 360 new String(qualifier, qualifierOffset, qualifierLength, StandardCharsets.UTF_8)); 361 } else { 362 out.writeUTF(""); 363 } 364 out.writeLong(ts); 365 } 366 367 public byte[] getRowArray() { 368 return row; 369 } 370 371 public void setRow(byte[] row) { 372 this.row = row; 373 } 374 375 public int getRowOffset() { 376 return rowOffset; 377 } 378 379 public void setRowOffset(int rowOffset) { 380 this.rowOffset = rowOffset; 381 } 382 383 public int getRowLength() { 384 return rowLength; 385 } 386 387 public void setRowLength(int rowLength) { 388 this.rowLength = rowLength; 389 } 390 391 public byte[] getFamilyArray() { 392 return family; 393 } 394 395 public void setFamily(byte[] family) { 396 this.family = family; 397 } 398 399 public int getFamilyOffset() { 400 return familyOffset; 401 } 402 403 public void setFamilyOffset(int familyOffset) { 404 this.familyOffset = familyOffset; 405 } 406 407 public int getFamilyLength() { 408 return familyLength; 409 } 410 411 public void setFamilyLength(int familyLength) { 412 this.familyLength = familyLength; 413 } 414 415 public byte[] getQualifierArray() { 416 return qualifier; 417 } 418 419 public void setQualifier(byte[] qualifier) { 420 this.qualifier = qualifier; 421 } 422 423 public int getQualifierOffset() { 424 return qualifierOffset; 425 } 426 427 public void setQualifierOffset(int qualifierOffset) { 428 this.qualifierOffset = qualifierOffset; 429 } 430 431 public int getQualifierLength() { 432 return qualifierLength; 433 } 434 435 public void setQualifierLength(int qualifierLength) { 436 this.qualifierLength = qualifierLength; 437 } 438 439 public long getTimestamp() { 440 return ts; 441 } 442 443 public void setTimestamp(long ts) { 444 this.ts = ts; 445 } 446 } 447 448 public static class Loader extends Configured implements Tool { 449 450 private static final Logger LOG = LoggerFactory.getLogger(Loader.class); 451 private static final String USAGE = "Loader <warInputDir | warFileList> <outputDir>"; 452 453 void createSchema(final TableName tableName) throws IOException { 454 455 try (Connection conn = ConnectionFactory.createConnection(getConf()); 456 Admin admin = conn.getAdmin()) { 457 if (!admin.tableExists(tableName)) { 458 459 ColumnFamilyDescriptorBuilder contentFamilyBuilder = 460 ColumnFamilyDescriptorBuilder.newBuilder(CONTENT_FAMILY_NAME).setMaxVersions(1000) 461 .setDataBlockEncoding(DataBlockEncoding.NONE).setBloomFilterType(BloomType.ROW); 462 463 ColumnFamilyDescriptorBuilder infoFamilyBuilder = 464 ColumnFamilyDescriptorBuilder.newBuilder(INFO_FAMILY_NAME).setMaxVersions(1000) 465 .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1) 466 .setBloomFilterType(BloomType.ROWCOL).setBlocksize(8 * 1024); 467 468 ColumnFamilyDescriptorBuilder urlFamilyBuilder = 469 ColumnFamilyDescriptorBuilder.newBuilder(URL_FAMILY_NAME).setMaxVersions(1000) 470 .setDataBlockEncoding(DataBlockEncoding.ROW_INDEX_V1) 471 .setBloomFilterType(BloomType.ROWCOL).setBlocksize(8 * 1024); 472 473 Set<ColumnFamilyDescriptor> families = new HashSet<>(); 474 families.add(contentFamilyBuilder.build()); 475 families.add(infoFamilyBuilder.build()); 476 families.add(urlFamilyBuilder.build()); 477 478 TableDescriptor tableDescriptor = 479 TableDescriptorBuilder.newBuilder(tableName).setColumnFamilies(families).build(); 480 481 if ( 482 getConf().getBoolean(HBaseTestingUtility.PRESPLIT_TEST_TABLE_KEY, 483 HBaseTestingUtility.PRESPLIT_TEST_TABLE) 484 ) { 485 int numberOfServers = admin.getRegionServers().size(); 486 if (numberOfServers == 0) { 487 throw new IllegalStateException("No live regionservers"); 488 } 489 int regionsPerServer = getConf().getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY, 490 HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER); 491 int totalNumberOfRegions = numberOfServers * regionsPerServer; 492 LOG.info("Creating test table: " + tableDescriptor); 493 LOG.info("Number of live regionservers: " + numberOfServers + ", " 494 + "pre-splitting table into " + totalNumberOfRegions + " regions " 495 + "(default regions per server: " + regionsPerServer + ")"); 496 byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions); 497 admin.createTable(tableDescriptor, splits); 498 } else { 499 LOG.info("Creating test table: " + tableDescriptor); 500 admin.createTable(tableDescriptor); 501 } 502 } 503 } catch (MasterNotRunningException e) { 504 LOG.error("Master not running", e); 505 throw new IOException(e); 506 } 507 } 508 509 int run(final Path warcFileInput, final Path outputDir) 510 throws IOException, ClassNotFoundException, InterruptedException { 511 512 createSchema(getTablename(getConf())); 513 514 final Job job = Job.getInstance(getConf()); 515 job.setJobName(Loader.class.getName()); 516 job.setNumReduceTasks(0); 517 job.setJarByClass(getClass()); 518 job.setMapperClass(LoaderMapper.class); 519 job.setInputFormatClass(WARCInputFormat.class); 520 final FileSystem fs = FileSystem.get(warcFileInput.toUri(), getConf()); 521 if (fs.getFileStatus(warcFileInput).isDirectory()) { 522 LOG.info("Using directory as WARC input path: " + warcFileInput); 523 FileInputFormat.setInputPaths(job, warcFileInput); 524 } else if (warcFileInput.toUri().getScheme().equals("file")) { 525 LOG.info("Getting WARC input paths from file: " + warcFileInput); 526 final List<Path> paths = new ArrayList<Path>(); 527 try (FSDataInputStream is = fs.open(warcFileInput)) { 528 InputStreamReader reader; 529 if (warcFileInput.getName().toLowerCase().endsWith(".gz")) { 530 reader = new InputStreamReader(new GZIPInputStream(is), StandardCharsets.UTF_8); 531 } else { 532 reader = new InputStreamReader(is, StandardCharsets.UTF_8); 533 } 534 try (BufferedReader br = new BufferedReader(reader)) { 535 String line; 536 while ((line = br.readLine()) != null) { 537 paths.add(new Path(line)); 538 } 539 } 540 } 541 LOG.info("Read " + paths.size() + " WARC input paths from " + warcFileInput); 542 FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()])); 543 } else { 544 FileInputFormat.setInputPaths(job, warcFileInput); 545 } 546 job.setOutputFormatClass(SequenceFileOutputFormat.class); 547 SequenceFileOutputFormat.setOutputPath(job, outputDir); 548 SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); 549 job.setOutputKeyClass(HBaseKeyWritable.class); 550 job.setOutputValueClass(BytesWritable.class); 551 TableMapReduceUtil.addDependencyJars(job); 552 // Increase max attempts because S3 might throttle aggressively and ultimately fail a task 553 job.getConfiguration().setInt("mapred.map.max.attempts", 100); 554 job.getConfiguration().setInt("mapreduce.map.maxattempts", 100); 555 556 boolean success = job.waitForCompletion(true); 557 if (!success) { 558 LOG.error("Failure during job " + job.getJobID()); 559 } 560 561 final Counters counters = job.getCounters(); 562 for (Counts c : Counts.values()) { 563 long value = counters.findCounter(c).getValue(); 564 if (value != 0) { 565 LOG.info(c + ": " + value); 566 } 567 } 568 569 return success ? 0 : 1; 570 } 571 572 @Override 573 public int run(String[] args) throws Exception { 574 if (args.length < 2) { 575 System.err.println(USAGE); 576 return 1; 577 } 578 try { 579 Path warcFileInput = new Path(args[0]); 580 Path outputDir = new Path(args[1]); 581 return run(warcFileInput, outputDir); 582 } catch (NumberFormatException e) { 583 System.err.println("Parsing loader arguments failed: " + e.getMessage()); 584 System.err.println(USAGE); 585 return 1; 586 } 587 } 588 589 public static void main(String[] args) throws Exception { 590 System.exit(ToolRunner.run(HBaseConfiguration.create(), new Loader(), args)); 591 } 592 593 public static class LoaderMapper 594 extends Mapper<LongWritable, WARCWritable, HBaseKeyWritable, BytesWritable> { 595 596 protected AsyncConnection conn; 597 protected AsyncTable<ScanResultConsumer> table; 598 protected ExecutorService executor; 599 protected AtomicLong inflight = new AtomicLong(); 600 protected boolean doIncrements; 601 602 @Override 603 protected void setup(final Context context) throws IOException, InterruptedException { 604 executor = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors()); 605 Configuration conf = context.getConfiguration(); 606 doIncrements = conf.getBoolean(INCREMENTS_NAME_KEY, DEFAULT_INCREMENTS); 607 try { 608 conn = ConnectionFactory.createAsyncConnection(conf).get(); 609 table = conn.getTable(getTablename(conf), executor); 610 } catch (ExecutionException e) { 611 throw new IOException(e); 612 } 613 } 614 615 @Override 616 protected void cleanup(final Context context) throws IOException, InterruptedException { 617 618 while (inflight.get() != 0) { 619 LOG.info("Operations in flight, waiting"); 620 Thread.sleep(INFLIGHT_PAUSE_MS); 621 } 622 623 // Shut down the executor 624 executor.shutdown(); 625 if (!executor.awaitTermination(1, TimeUnit.MINUTES)) { 626 LOG.warn("Pool did not shut down cleanly"); 627 } 628 // Close the connection 629 try { 630 conn.close(); 631 } catch (Exception e) { 632 LOG.warn("Exception closing Connection", e); 633 } 634 } 635 636 @Override 637 protected void map(final LongWritable key, final WARCWritable value, final Context output) 638 throws IOException, InterruptedException { 639 final WARCRecord.Header warcHeader = value.getRecord().getHeader(); 640 final String recordID = warcHeader.getRecordID(); 641 final String targetURI = warcHeader.getTargetURI(); 642 if (warcHeader.getRecordType().equals("response") && targetURI != null) { 643 final String contentType = warcHeader.getField("WARC-Identified-Payload-Type"); 644 if (contentType != null) { 645 // Make row key 646 byte[] rowKey; 647 try { 648 rowKey = rowKeyFromTargetURI(targetURI); 649 } catch (IllegalArgumentException e) { 650 LOG.debug("Could not make a row key for record " + recordID + ", ignoring", e); 651 return; 652 } catch (URISyntaxException e) { 653 LOG.warn( 654 "Could not parse URI \"" + targetURI + "\" for record " + recordID + ", ignoring"); 655 return; 656 } 657 658 // Get the content and calculate the CRC64 659 final byte[] content = value.getRecord().getContent(); 660 final CRC64 crc = new CRC64(); 661 crc.update(content); 662 final long crc64 = crc.getValue(); 663 LOG.info("{}: content {} bytes, crc64={}", targetURI, content.length, 664 Bytes.toHex(Bytes.toBytes(crc64))); 665 666 // Store to HBase 667 final long ts = getSequence(); 668 final Put put = new Put(rowKey); 669 put.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER, ts, content); 670 put.addColumn(INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, ts, 671 Bytes.toBytes(content.length)); 672 put.addColumn(INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, ts, Bytes.toBytes(contentType)); 673 put.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER, ts, Bytes.toBytes(crc64)); 674 put.addColumn(INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, ts, Bytes.toBytes(targetURI)); 675 put.addColumn(INFO_FAMILY_NAME, DATE_QUALIFIER, ts, 676 Bytes.toBytes(warcHeader.getDateString())); 677 final String ipAddr = warcHeader.getField("WARC-IP-Address"); 678 if (ipAddr != null) { 679 put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts, Bytes.toBytes(ipAddr)); 680 } 681 long pending = inflight.incrementAndGet(); 682 while (pending > MAX_INFLIGHT) { 683 LOG.info("Too many operations in flight, waiting"); 684 Thread.sleep(INFLIGHT_PAUSE_MS); 685 pending = inflight.get(); 686 } 687 final long putStartTime = System.currentTimeMillis(); 688 final CompletableFuture<Void> putFuture = table.put(put); 689 addListener(putFuture, (r, e) -> { 690 inflight.decrementAndGet(); 691 if (e == null) { 692 output.getCounter(Counts.RPC_TIME_MS) 693 .increment(System.currentTimeMillis() - putStartTime); 694 output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(put.heapSize()); 695 } 696 }); 697 698 // Write records out for later verification, one per HBase field except for the 699 // content record, which will be verified by CRC64. 700 output.write( 701 new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, ts), 702 new BytesWritable(Bytes.toBytes(content.length))); 703 output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, ts), 704 new BytesWritable(Bytes.toBytes(contentType))); 705 output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, ts), 706 new BytesWritable(Bytes.toBytes(crc64))); 707 output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, ts), 708 new BytesWritable(Bytes.toBytes(targetURI))); 709 output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, DATE_QUALIFIER, ts), 710 new BytesWritable(Bytes.toBytes(warcHeader.getDateString()))); 711 if (ipAddr != null) { 712 output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts), 713 new BytesWritable(Bytes.toBytes(ipAddr))); 714 } 715 716 if (doIncrements) { 717 // The URLs cf is not tracked for correctness. For now it is used only to exercise 718 // Increments, to drive some read load during ingest. They can be verified with a 719 // reducer to sum increments per row and then compare the final count to the table 720 // data. This is left as a future exercise. 721 final byte[] refQual = Bytes.add(REF_QUALIFIER, SEP, rowKey); 722 for (String refUri : extractUrls(content)) { 723 try { 724 byte[] urlRowKey = rowKeyFromTargetURI(refUri); 725 LOG.debug(" -> {}", refUri); 726 final Increment increment = new Increment(urlRowKey); 727 increment.setTimestamp(ts); 728 increment.addColumn(URL_FAMILY_NAME, refQual, 1); 729 pending = inflight.incrementAndGet(); 730 while (pending > MAX_INFLIGHT) { 731 LOG.info("Too many operations in flight, waiting"); 732 Thread.sleep(INFLIGHT_PAUSE_MS); 733 pending = inflight.get(); 734 } 735 final long incrStartTime = System.currentTimeMillis(); 736 final CompletableFuture<Result> incrFuture = table.increment(increment); 737 addListener(incrFuture, (r, e) -> { 738 inflight.decrementAndGet(); 739 if (e == null) { 740 output.getCounter(Counts.RPC_TIME_MS) 741 .increment(System.currentTimeMillis() - incrStartTime); 742 output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(increment.heapSize()); 743 } 744 }); 745 } catch (IllegalArgumentException | URISyntaxException e) { 746 LOG.debug("Could not make a row key for URI " + refUri + ", ignoring", e); 747 } 748 } 749 } 750 } 751 } 752 } 753 } 754 } 755 756 public static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> { 757 @Override 758 protected boolean isSplitable(final JobContext context, final Path filename) { 759 return false; 760 } 761 } 762 763 public static class Verify extends Configured implements Tool { 764 765 public static final Logger LOG = LoggerFactory.getLogger(Verify.class); 766 public static final String USAGE = "Verify <inputDir>"; 767 768 int run(final Path inputDir) throws IOException, ClassNotFoundException, InterruptedException { 769 Job job = Job.getInstance(getConf()); 770 job.setJobName(Verify.class.getName()); 771 job.setJarByClass(getClass()); 772 job.setMapperClass(VerifyMapper.class); 773 job.setInputFormatClass(OneFilePerMapperSFIF.class); 774 FileInputFormat.setInputPaths(job, inputDir); 775 job.setOutputFormatClass(NullOutputFormat.class); 776 job.setOutputKeyClass(NullWritable.class); 777 job.setOutputValueClass(NullWritable.class); 778 TableMapReduceUtil.addDependencyJars(job); 779 780 boolean success = job.waitForCompletion(true); 781 if (!success) { 782 LOG.error("Failure during job " + job.getJobID()); 783 } 784 785 final Counters counters = job.getCounters(); 786 for (Counts c : Counts.values()) { 787 long value = counters.findCounter(c).getValue(); 788 if (value != 0) { 789 LOG.info(c + ": " + value); 790 } 791 } 792 if (counters.findCounter(Counts.UNREFERENCED).getValue() > 0) { 793 LOG.error("Nonzero UNREFERENCED count from job " + job.getJobID()); 794 success = false; 795 } 796 if (counters.findCounter(Counts.CORRUPT).getValue() > 0) { 797 LOG.error("Nonzero CORRUPT count from job " + job.getJobID()); 798 success = false; 799 } 800 801 return success ? 0 : 1; 802 } 803 804 @Override 805 public int run(String[] args) throws Exception { 806 if (args.length < 2) { 807 System.err.println(USAGE); 808 return 1; 809 } 810 Path loaderOutput = new Path(args[0]); 811 return run(loaderOutput); 812 } 813 814 public static void main(String[] args) throws Exception { 815 System.exit(ToolRunner.run(HBaseConfiguration.create(), new Verify(), args)); 816 } 817 818 public static class VerifyMapper 819 extends Mapper<HBaseKeyWritable, BytesWritable, NullWritable, NullWritable> { 820 821 protected Connection conn; 822 protected Table table; 823 824 @Override 825 protected void setup(final Context context) throws IOException, InterruptedException { 826 Configuration conf = context.getConfiguration(); 827 conn = ConnectionFactory.createConnection(conf); 828 table = conn.getTable(getTablename(conf)); 829 } 830 831 @Override 832 protected void cleanup(final Context context) throws IOException, InterruptedException { 833 // Close the table 834 try { 835 table.close(); 836 } catch (Exception e) { 837 LOG.warn("Exception closing table", e); 838 } 839 // Close the connection 840 try { 841 conn.close(); 842 } catch (Exception e) { 843 LOG.warn("Exception closing Connection", e); 844 } 845 } 846 847 @Override 848 protected void map(final HBaseKeyWritable key, final BytesWritable value, 849 final Context output) throws IOException, InterruptedException { 850 final byte[] row = Bytes.copy(key.getRowArray(), key.getRowOffset(), key.getRowLength()); 851 final byte[] family = 852 Bytes.copy(key.getFamilyArray(), key.getFamilyOffset(), key.getFamilyLength()); 853 final byte[] qualifier = 854 Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(), key.getQualifierLength()); 855 final long ts = key.getTimestamp(); 856 857 if (Bytes.equals(INFO_FAMILY_NAME, family) && Bytes.equals(CRC_QUALIFIER, qualifier)) { 858 final long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength()); 859 final Get get = new Get(row).setTimestamp(ts).addFamily(CONTENT_FAMILY_NAME) 860 .addFamily(INFO_FAMILY_NAME); 861 final long startTime = System.currentTimeMillis(); 862 Result r; 863 try { 864 r = table.get(get); 865 output.getCounter(Counts.RPC_TIME_MS).increment(System.currentTimeMillis() - startTime); 866 } catch (Exception e) { 867 LOG.error("Row " + Bytes.toStringBinary(row) + ": exception", e); 868 output.getCounter(Counts.UNREFERENCED).increment(1); 869 return; 870 } 871 final byte[] crcBytes = r.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER); 872 if (crcBytes == null) { 873 LOG.error("Row " + Bytes.toStringBinary(row) + ": missing i:c"); 874 output.getCounter(Counts.UNREFERENCED).increment(1); 875 return; 876 } 877 if (Bytes.toLong(crcBytes) != expectedCRC64) { 878 LOG.error("Row " + Bytes.toStringBinary(row) + ": i:c mismatch"); 879 output.getCounter(Counts.CORRUPT).increment(1); 880 return; 881 } 882 // If we fell through to here all verification checks have succeeded for the info 883 // record. 884 output.getCounter(Counts.REFERENCED).increment(1); 885 final byte[] content = r.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER); 886 if (content == null) { 887 LOG.error("Row " + Bytes.toStringBinary(row) + ": missing content"); 888 output.getCounter(Counts.UNREFERENCED).increment(1); 889 return; 890 } else { 891 final CRC64 crc = new CRC64(); 892 crc.update(content); 893 if (crc.getValue() != expectedCRC64) { 894 LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content"); 895 output.getCounter(Counts.CORRUPT).increment(1); 896 return; 897 } 898 } 899 // If we fell through to here all verification checks have succeeded for the content 900 // record. 901 output.getCounter(Counts.REFERENCED).increment(1); 902 } else { 903 final long startTime = System.currentTimeMillis(); 904 final Get get = new Get(row).setTimestamp(ts).addColumn(family, qualifier); 905 Result r; 906 try { 907 r = table.get(get); 908 output.getCounter(Counts.RPC_TIME_MS).increment(System.currentTimeMillis() - startTime); 909 } catch (Exception e) { 910 LOG.error("Row " + Bytes.toStringBinary(row) + ": exception", e); 911 output.getCounter(Counts.UNREFERENCED).increment(1); 912 return; 913 } 914 final byte[] bytes = r.getValue(family, qualifier); 915 if (bytes == null) { 916 LOG.error("Row " + Bytes.toStringBinary(row) + ": missing " 917 + Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier)); 918 output.getCounter(Counts.UNREFERENCED).increment(1); 919 return; 920 } 921 if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) { 922 LOG.error("Row " + Bytes.toStringBinary(row) + ": " + Bytes.toStringBinary(family) + ":" 923 + Bytes.toStringBinary(qualifier) + " mismatch"); 924 output.getCounter(Counts.CORRUPT).increment(1); 925 return; 926 } 927 // If we fell through to here all verification checks have succeeded for the info 928 // record. 929 output.getCounter(Counts.REFERENCED).increment(1); 930 } 931 } 932 } 933 } 934 935 private static final AtomicLong counter = new AtomicLong(); 936 private static final int shift = 8; 937 938 private static long getSequence() { 939 long t = EnvironmentEdgeManager.currentTime(); 940 t <<= shift; 941 t |= (counter.getAndIncrement() % (1 << shift)); 942 return t; 943 } 944 945 private static byte[] rowKeyFromTargetURI(final String targetUri) 946 throws IOException, URISyntaxException, IllegalArgumentException { 947 final URI uri = new URI(targetUri); 948 // Ignore the scheme 949 // Reverse the components of the hostname 950 String reversedHost; 951 if (uri.getHost() != null) { 952 final String[] hostComponents = 953 Splitter.on('.').splitToStream(uri.getHost()).toArray(String[]::new); 954 final StringBuilder sb = new StringBuilder(); 955 for (int i = hostComponents.length - 1; i >= 0; i--) { 956 sb.append(hostComponents[i]); 957 if (i != 0) { 958 sb.append('.'); 959 } 960 } 961 reversedHost = sb.toString(); 962 } else { 963 throw new IllegalArgumentException("URI is missing host component"); 964 } 965 final ByteArrayOutputStream os = new ByteArrayOutputStream(); 966 os.write(reversedHost.getBytes(StandardCharsets.UTF_8)); 967 if (uri.getPort() >= 0) { 968 os.write(String.format(":%d", uri.getPort()).getBytes(StandardCharsets.UTF_8)); 969 } 970 os.write((byte) '|'); 971 if (uri.getPath() != null) { 972 os.write(uri.getPath().getBytes(StandardCharsets.UTF_8)); 973 } 974 if (uri.getQuery() != null) { 975 os.write(String.format("?%s", uri.getQuery()).getBytes(StandardCharsets.UTF_8)); 976 } 977 if (uri.getFragment() != null) { 978 os.write(String.format("#%s", uri.getFragment()).getBytes(StandardCharsets.UTF_8)); 979 } 980 if (os.size() > HConstants.MAX_ROW_LENGTH) { 981 throw new IllegalArgumentException( 982 "Key would be too large (length=" + os.size() + ", limit=" + HConstants.MAX_ROW_LENGTH); 983 } 984 return os.toByteArray(); 985 } 986 987 static final Pattern URL_PATTERN = Pattern.compile( 988 "\\b((https?|ftp|file)://|(www|ftp)\\.)" + "[\\-A-Z0-9+&@#/%?=~_|$!:,\\.;]*[A-Z0-9+&@#/%=~_|$]", 989 Pattern.CASE_INSENSITIVE); 990 991 private static Collection<String> extractUrls(byte[] content) { 992 final Set<String> list = new HashSet<>(); // uniques 993 final Matcher m = URL_PATTERN.matcher(new String(content, StandardCharsets.UTF_8)); 994 while (m.find()) { 995 list.add(m.group()); 996 } 997 return list; 998 } 999 1000}