001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import java.io.BufferedReader; 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.IOException; 024import java.io.InputStreamReader; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.nio.charset.StandardCharsets; 028import java.util.ArrayList; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Set; 032import java.util.concurrent.atomic.AtomicLong; 033import java.util.zip.GZIPInputStream; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.conf.Configured; 036import org.apache.hadoop.fs.FSDataInputStream; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.HBaseTestingUtil; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.IntegrationTestBase; 044import org.apache.hadoop.hbase.IntegrationTestingUtility; 045import org.apache.hadoop.hbase.MasterNotRunningException; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.Admin; 048import org.apache.hadoop.hbase.client.BufferedMutator; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 051import org.apache.hadoop.hbase.client.Connection; 052import org.apache.hadoop.hbase.client.ConnectionFactory; 053import org.apache.hadoop.hbase.client.Get; 054import org.apache.hadoop.hbase.client.Put; 055import org.apache.hadoop.hbase.client.Result; 056import org.apache.hadoop.hbase.client.Table; 057import org.apache.hadoop.hbase.client.TableDescriptor; 058import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 059import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 060import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 061import org.apache.hadoop.hbase.regionserver.BloomType; 062import org.apache.hadoop.hbase.test.util.CRC64; 063import org.apache.hadoop.hbase.test.util.warc.WARCInputFormat; 064import org.apache.hadoop.hbase.test.util.warc.WARCRecord; 065import org.apache.hadoop.hbase.test.util.warc.WARCWritable; 066import org.apache.hadoop.hbase.util.Bytes; 067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 068import org.apache.hadoop.hbase.util.RegionSplitter; 069import org.apache.hadoop.io.BytesWritable; 070import org.apache.hadoop.io.LongWritable; 071import org.apache.hadoop.io.NullWritable; 072import org.apache.hadoop.io.SequenceFile.CompressionType; 073import org.apache.hadoop.io.Writable; 074import org.apache.hadoop.mapreduce.Counters; 075import org.apache.hadoop.mapreduce.Job; 076import org.apache.hadoop.mapreduce.JobContext; 077import org.apache.hadoop.mapreduce.Mapper; 078import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 079import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 080import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 081import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 082import org.apache.hadoop.util.Tool; 083import org.apache.hadoop.util.ToolRunner; 084import org.slf4j.Logger; 085import org.slf4j.LoggerFactory; 086 087import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 088 089/** 090 * This integration test loads successful resource retrieval records from the Common Crawl 091 * (https://commoncrawl.org/) public dataset into an HBase table and writes records that can be used 092 * to later verify the presence and integrity of those records. 093 * <p> 094 * Run like: <blockquote> ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl 095 * \<br> 096 * -Dfs.s3n.awsAccessKeyId=<AWS access key> \<br> 097 * -Dfs.s3n.awsSecretAccessKey=<AWS secret key> \<br> 098 * /path/to/test-CC-MAIN-2021-10-warc.paths.gz \<br> 099 * /path/to/tmp/warc-loader-output </blockquote> 100 * <p> 101 * Access to the Common Crawl dataset in S3 is made available to anyone by Amazon AWS, but Hadoop's 102 * S3N filesystem still requires valid access credentials to initialize. 103 * <p> 104 * The input path can either specify a directory or a file. The file may optionally be compressed 105 * with gzip. If a directory, the loader expects the directory to contain one or more WARC files 106 * from the Common Crawl dataset. If a file, the loader expects a list of Hadoop S3N URIs which 107 * point to S3 locations for one or more WARC files from the Common Crawl dataset, one URI per line. 108 * Lines should be terminated with the UNIX line terminator. 109 * <p> 110 * Included in hbase-it/src/test/resources/CC-MAIN-2021-10-warc.paths.gz is a list of all WARC files 111 * comprising the Q1 2021 crawl archive. There are 64,000 WARC files in this data set, each 112 * containing ~1GB of gzipped data. The WARC files contain several record types, such as metadata, 113 * request, and response, but we only load the response record types. If the HBase table schema does 114 * not specify compression (by default) there is roughly a 10x expansion. Loading the full crawl 115 * archive results in a table approximately 640 TB in size. 116 * <p> 117 * You can also split the Loader and Verify stages: 118 * <p> 119 * Load with: <blockquote> ./bin/hbase 120 * 'org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl$Loader' \<br> 121 * -files /path/to/hadoop-aws.jar \<br> 122 * -Dfs.s3n.awsAccessKeyId=<AWS access key> \<br> 123 * -Dfs.s3n.awsSecretAccessKey=<AWS secret key> \<br> 124 * /path/to/test-CC-MAIN-2021-10-warc.paths.gz \<br> 125 * /path/to/tmp/warc-loader-output </blockquote> 126 * <p> 127 * Note: The hadoop-aws jar will be needed at runtime to instantiate the S3N filesystem. Use the 128 * <tt>-files</tt> ToolRunner argument to add it. 129 * <p> 130 * Verify with: <blockquote> ./bin/hbase 131 * 'org.apache.hadoop.hbase.test.IntegrationTestLoadCommonCrawl$Verify' \<br> 132 * /path/to/tmp/warc-loader-output </blockquote> 133 * <p> 134 */ 135public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase { 136 137 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestLoadCommonCrawl.class); 138 139 protected static String TABLE_NAME_KEY = "IntegrationTestLoadCommonCrawl.table"; 140 protected static String DEFAULT_TABLE_NAME = "IntegrationTestLoadCommonCrawl"; 141 142 protected static byte[] CONTENT_FAMILY_NAME = Bytes.toBytes("c"); 143 protected static byte[] INFO_FAMILY_NAME = Bytes.toBytes("i"); 144 protected static byte[] CONTENT_QUALIFIER = HConstants.EMPTY_BYTE_ARRAY; 145 protected static byte[] CONTENT_LENGTH_QUALIFIER = Bytes.toBytes("l"); 146 protected static byte[] CONTENT_TYPE_QUALIFIER = Bytes.toBytes("t"); 147 protected static byte[] CRC_QUALIFIER = Bytes.toBytes("c"); 148 protected static byte[] DATE_QUALIFIER = Bytes.toBytes("d"); 149 protected static byte[] IP_ADDRESS_QUALIFIER = Bytes.toBytes("a"); 150 protected static byte[] RECORD_ID_QUALIFIER = Bytes.toBytes("r"); 151 protected static byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u"); 152 153 private static final int VERIFICATION_READ_RETRIES = 10; 154 155 public static enum Counts { 156 REFERENCED, 157 UNREFERENCED, 158 CORRUPT 159 } 160 161 protected Path warcFileInputDir = null; 162 protected Path outputDir = null; 163 protected String[] args; 164 165 protected int runLoader(final Path warcFileInputDir, final Path outputDir) throws Exception { 166 Loader loader = new Loader(); 167 loader.setConf(conf); 168 return loader.run(warcFileInputDir, outputDir); 169 } 170 171 protected int runVerify(final Path inputDir) throws Exception { 172 Verify verify = new Verify(); 173 verify.setConf(conf); 174 return verify.run(inputDir); 175 } 176 177 @Override 178 public int run(String[] args) { 179 if (args.length > 0) { 180 warcFileInputDir = new Path(args[0]); 181 if (args.length > 1) { 182 outputDir = new Path(args[1]); 183 } 184 } 185 try { 186 if (warcFileInputDir == null) { 187 throw new IllegalArgumentException("WARC input file or directory not specified"); 188 } 189 if (outputDir == null) { 190 throw new IllegalArgumentException("Output directory not specified"); 191 } 192 int res = runLoader(warcFileInputDir, outputDir); 193 if (res != 0) { 194 LOG.error("Loader failed"); 195 return -1; 196 } 197 res = runVerify(outputDir); 198 } catch (Exception e) { 199 LOG.error("Tool failed with exception", e); 200 return -1; 201 } 202 return 0; 203 } 204 205 @Override 206 protected void processOptions(final CommandLine cmd) { 207 processBaseOptions(cmd); 208 args = cmd.getArgs(); 209 } 210 211 @Override 212 public void setUpCluster() throws Exception { 213 util = getTestingUtil(getConf()); 214 boolean isDistributed = util.isDistributedCluster(); 215 util.initializeCluster(isDistributed ? 1 : 3); 216 if (!isDistributed) { 217 util.startMiniMapReduceCluster(); 218 } 219 this.setConf(util.getConfiguration()); 220 } 221 222 @Override 223 public void cleanUpCluster() throws Exception { 224 super.cleanUpCluster(); 225 if (util.isDistributedCluster()) { 226 util.shutdownMiniMapReduceCluster(); 227 } 228 } 229 230 static TableName getTablename(final Configuration c) { 231 return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME)); 232 } 233 234 @Override 235 public TableName getTablename() { 236 return getTablename(getConf()); 237 } 238 239 @Override 240 protected Set<String> getColumnFamilies() { 241 Set<String> families = new HashSet<>(); 242 families.add(Bytes.toString(CONTENT_FAMILY_NAME)); 243 families.add(Bytes.toString(INFO_FAMILY_NAME)); 244 return families; 245 } 246 247 @Override 248 public int runTestFromCommandLine() throws Exception { 249 return ToolRunner.run(getConf(), this, args); 250 } 251 252 public static void main(String[] args) throws Exception { 253 Configuration conf = HBaseConfiguration.create(); 254 IntegrationTestingUtility.setUseDistributedCluster(conf); 255 int ret = ToolRunner.run(conf, new IntegrationTestLoadCommonCrawl(), args); 256 System.exit(ret); 257 } 258 259 public static class HBaseKeyWritable implements Writable { 260 261 private byte[] row; 262 private int rowOffset; 263 private int rowLength; 264 private byte[] family; 265 private int familyOffset; 266 private int familyLength; 267 private byte[] qualifier; 268 private int qualifierOffset; 269 private int qualifierLength; 270 private long ts; 271 272 public HBaseKeyWritable() { 273 } 274 275 public HBaseKeyWritable(byte[] row, int rowOffset, int rowLength, byte[] family, 276 int familyOffset, int familyLength, byte[] qualifier, int qualifierOffset, 277 int qualifierLength, long ts) { 278 this.row = row; 279 this.rowOffset = rowOffset; 280 this.rowLength = rowLength; 281 this.family = family; 282 this.familyOffset = familyOffset; 283 this.familyLength = familyLength; 284 this.qualifier = qualifier; 285 this.qualifierOffset = qualifierOffset; 286 this.qualifierLength = qualifierLength; 287 this.ts = ts; 288 } 289 290 public HBaseKeyWritable(byte[] row, byte[] family, byte[] qualifier, long ts) { 291 this(row, 0, row.length, family, 0, family.length, qualifier, 0, 292 qualifier != null ? qualifier.length : 0, ts); 293 } 294 295 public HBaseKeyWritable(byte[] row, byte[] family, long ts) { 296 this(row, family, HConstants.EMPTY_BYTE_ARRAY, ts); 297 } 298 299 public HBaseKeyWritable(Cell cell) { 300 this(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(), 301 cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), 302 cell.getQualifierOffset(), cell.getQualifierLength(), cell.getTimestamp()); 303 } 304 305 @Override 306 public void readFields(DataInput in) throws IOException { 307 this.row = Bytes.toBytes(in.readUTF()); 308 this.rowOffset = 0; 309 this.rowLength = row.length; 310 this.family = Bytes.toBytes(in.readUTF()); 311 this.familyOffset = 0; 312 this.familyLength = family.length; 313 this.qualifier = Bytes.toBytes(in.readUTF()); 314 this.qualifierOffset = 0; 315 this.qualifierLength = qualifier.length; 316 this.ts = in.readLong(); 317 } 318 319 @Override 320 public void write(DataOutput out) throws IOException { 321 out.writeUTF(new String(row, rowOffset, rowLength, StandardCharsets.UTF_8)); 322 out.writeUTF(new String(family, familyOffset, familyLength, StandardCharsets.UTF_8)); 323 if (qualifier != null) { 324 out.writeUTF( 325 new String(qualifier, qualifierOffset, qualifierLength, StandardCharsets.UTF_8)); 326 } else { 327 out.writeUTF(""); 328 } 329 out.writeLong(ts); 330 } 331 332 public byte[] getRowArray() { 333 return row; 334 } 335 336 public void setRow(byte[] row) { 337 this.row = row; 338 } 339 340 public int getRowOffset() { 341 return rowOffset; 342 } 343 344 public void setRowOffset(int rowOffset) { 345 this.rowOffset = rowOffset; 346 } 347 348 public int getRowLength() { 349 return rowLength; 350 } 351 352 public void setRowLength(int rowLength) { 353 this.rowLength = rowLength; 354 } 355 356 public byte[] getFamilyArray() { 357 return family; 358 } 359 360 public void setFamily(byte[] family) { 361 this.family = family; 362 } 363 364 public int getFamilyOffset() { 365 return familyOffset; 366 } 367 368 public void setFamilyOffset(int familyOffset) { 369 this.familyOffset = familyOffset; 370 } 371 372 public int getFamilyLength() { 373 return familyLength; 374 } 375 376 public void setFamilyLength(int familyLength) { 377 this.familyLength = familyLength; 378 } 379 380 public byte[] getQualifierArray() { 381 return qualifier; 382 } 383 384 public void setQualifier(byte[] qualifier) { 385 this.qualifier = qualifier; 386 } 387 388 public int getQualifierOffset() { 389 return qualifierOffset; 390 } 391 392 public void setQualifierOffset(int qualifierOffset) { 393 this.qualifierOffset = qualifierOffset; 394 } 395 396 public int getQualifierLength() { 397 return qualifierLength; 398 } 399 400 public void setQualifierLength(int qualifierLength) { 401 this.qualifierLength = qualifierLength; 402 } 403 404 public long getTimestamp() { 405 return ts; 406 } 407 408 public void setTimestamp(long ts) { 409 this.ts = ts; 410 } 411 } 412 413 public static class Loader extends Configured implements Tool { 414 415 private static final Logger LOG = LoggerFactory.getLogger(Loader.class); 416 private static final String USAGE = "Loader <warInputDir | warFileList> <outputDir>"; 417 418 void createSchema(final TableName tableName) throws IOException { 419 420 try (Connection conn = ConnectionFactory.createConnection(getConf()); 421 Admin admin = conn.getAdmin()) { 422 if (!admin.tableExists(tableName)) { 423 424 ColumnFamilyDescriptorBuilder contentFamilyBuilder = ColumnFamilyDescriptorBuilder 425 .newBuilder(CONTENT_FAMILY_NAME).setDataBlockEncoding(DataBlockEncoding.NONE) 426 .setBloomFilterType(BloomType.ROW).setMaxVersions(1000).setBlocksize(256 * 1024); 427 428 ColumnFamilyDescriptorBuilder infoFamilyBuilder = ColumnFamilyDescriptorBuilder 429 .newBuilder(INFO_FAMILY_NAME).setDataBlockEncoding(DataBlockEncoding.NONE) 430 .setBloomFilterType(BloomType.ROWCOL).setMaxVersions(1000).setBlocksize(8 * 1024); 431 432 Set<ColumnFamilyDescriptor> families = new HashSet<>(); 433 families.add(contentFamilyBuilder.build()); 434 families.add(infoFamilyBuilder.build()); 435 436 TableDescriptor tableDescriptor = 437 TableDescriptorBuilder.newBuilder(tableName).setColumnFamilies(families).build(); 438 439 if ( 440 getConf().getBoolean(HBaseTestingUtil.PRESPLIT_TEST_TABLE_KEY, 441 HBaseTestingUtil.PRESPLIT_TEST_TABLE) 442 ) { 443 int numberOfServers = admin.getRegionServers().size(); 444 if (numberOfServers == 0) { 445 throw new IllegalStateException("No live regionservers"); 446 } 447 int regionsPerServer = getConf().getInt(HBaseTestingUtil.REGIONS_PER_SERVER_KEY, 448 HBaseTestingUtil.DEFAULT_REGIONS_PER_SERVER); 449 int totalNumberOfRegions = numberOfServers * regionsPerServer; 450 LOG.info("Creating test table: " + tableDescriptor); 451 LOG.info("Number of live regionservers: " + numberOfServers + ", " 452 + "pre-splitting table into " + totalNumberOfRegions + " regions " 453 + "(default regions per server: " + regionsPerServer + ")"); 454 byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions); 455 admin.createTable(tableDescriptor, splits); 456 } else { 457 LOG.info("Creating test table: " + tableDescriptor); 458 admin.createTable(tableDescriptor); 459 } 460 } 461 } catch (MasterNotRunningException e) { 462 LOG.error("Master not running", e); 463 throw new IOException(e); 464 } 465 } 466 467 int run(final Path warcFileInput, final Path outputDir) 468 throws IOException, ClassNotFoundException, InterruptedException { 469 470 createSchema(getTablename(getConf())); 471 472 final Job job = Job.getInstance(getConf()); 473 job.setJobName(Loader.class.getName()); 474 job.setNumReduceTasks(0); 475 job.setJarByClass(getClass()); 476 job.setMapperClass(LoaderMapper.class); 477 job.setInputFormatClass(WARCInputFormat.class); 478 final FileSystem fs = FileSystem.get(warcFileInput.toUri(), getConf()); 479 if (fs.getFileStatus(warcFileInput).isDirectory()) { 480 LOG.info("Using directory as WARC input path: " + warcFileInput); 481 FileInputFormat.setInputPaths(job, warcFileInput); 482 } else if (warcFileInput.toUri().getScheme().equals("file")) { 483 LOG.info("Getting WARC input paths from file: " + warcFileInput); 484 final List<Path> paths = new ArrayList<Path>(); 485 try (FSDataInputStream is = fs.open(warcFileInput)) { 486 InputStreamReader reader; 487 if (warcFileInput.getName().toLowerCase().endsWith(".gz")) { 488 reader = new InputStreamReader(new GZIPInputStream(is)); 489 } else { 490 reader = new InputStreamReader(is); 491 } 492 try (BufferedReader br = new BufferedReader(reader)) { 493 String line; 494 while ((line = br.readLine()) != null) { 495 paths.add(new Path(line)); 496 } 497 } 498 } 499 LOG.info("Read " + paths.size() + " WARC input paths from " + warcFileInput); 500 FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()])); 501 } else { 502 FileInputFormat.setInputPaths(job, warcFileInput); 503 } 504 job.setOutputFormatClass(SequenceFileOutputFormat.class); 505 SequenceFileOutputFormat.setOutputPath(job, outputDir); 506 SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); 507 job.setOutputKeyClass(HBaseKeyWritable.class); 508 job.setOutputValueClass(BytesWritable.class); 509 TableMapReduceUtil.addDependencyJars(job); 510 511 boolean success = job.waitForCompletion(true); 512 if (!success) { 513 LOG.error("Failure during job " + job.getJobID()); 514 } 515 return success ? 0 : 1; 516 } 517 518 @Override 519 public int run(String[] args) throws Exception { 520 if (args.length < 2) { 521 System.err.println(USAGE); 522 return 1; 523 } 524 try { 525 Path warcFileInput = new Path(args[0]); 526 Path outputDir = new Path(args[1]); 527 return run(warcFileInput, outputDir); 528 } catch (NumberFormatException e) { 529 System.err.println("Parsing loader arguments failed: " + e.getMessage()); 530 System.err.println(USAGE); 531 return 1; 532 } 533 } 534 535 public static void main(String[] args) throws Exception { 536 System.exit(ToolRunner.run(HBaseConfiguration.create(), new Loader(), args)); 537 } 538 539 public static class LoaderMapper 540 extends Mapper<LongWritable, WARCWritable, HBaseKeyWritable, BytesWritable> { 541 542 protected Configuration conf; 543 protected Connection conn; 544 protected BufferedMutator mutator; 545 546 @Override 547 protected void setup(final Context context) throws IOException, InterruptedException { 548 conf = context.getConfiguration(); 549 conn = ConnectionFactory.createConnection(conf); 550 mutator = conn.getBufferedMutator(getTablename(conf)); 551 } 552 553 @Override 554 protected void cleanup(final Context context) throws IOException, InterruptedException { 555 try { 556 mutator.close(); 557 } catch (Exception e) { 558 LOG.warn("Exception closing Table", e); 559 } 560 try { 561 conn.close(); 562 } catch (Exception e) { 563 LOG.warn("Exception closing Connection", e); 564 } 565 } 566 567 @Override 568 protected void map(final LongWritable key, final WARCWritable value, final Context output) 569 throws IOException, InterruptedException { 570 final WARCRecord.Header warcHeader = value.getRecord().getHeader(); 571 final String recordID = warcHeader.getRecordID(); 572 final String targetURI = warcHeader.getTargetURI(); 573 if (warcHeader.getRecordType().equals("response") && targetURI != null) { 574 final String contentType = warcHeader.getField("WARC-Identified-Payload-Type"); 575 if (contentType != null) { 576 LOG.info("Processing uri=\"" + targetURI + "\", id=" + recordID); 577 578 // Make row key 579 580 byte[] rowKey; 581 try { 582 rowKey = rowKeyFromTargetURI(targetURI); 583 } catch (IllegalArgumentException e) { 584 LOG.debug("Could not make a row key for record " + recordID + ", ignoring", e); 585 return; 586 } catch (URISyntaxException e) { 587 LOG.warn( 588 "Could not parse URI \"" + targetURI + "\" for record " + recordID + ", ignoring"); 589 return; 590 } 591 592 // Get the content and calculate the CRC64 593 594 final byte[] content = value.getRecord().getContent(); 595 final CRC64 crc = new CRC64(); 596 crc.update(content); 597 final long crc64 = crc.getValue(); 598 599 // Store to HBase 600 601 final long ts = getCurrentTime(); 602 final Put put = new Put(rowKey); 603 put.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER, ts, content); 604 put.addColumn(INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, ts, 605 Bytes.toBytes(content.length)); 606 put.addColumn(INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, ts, Bytes.toBytes(contentType)); 607 put.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER, ts, Bytes.toBytes(crc64)); 608 put.addColumn(INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, ts, Bytes.toBytes(recordID)); 609 put.addColumn(INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, ts, Bytes.toBytes(targetURI)); 610 put.addColumn(INFO_FAMILY_NAME, DATE_QUALIFIER, ts, 611 Bytes.toBytes(warcHeader.getDateString())); 612 final String ipAddr = warcHeader.getField("WARC-IP-Address"); 613 if (ipAddr != null) { 614 put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts, Bytes.toBytes(ipAddr)); 615 } 616 mutator.mutate(put); 617 618 // Write records out for later verification, one per HBase field except for the 619 // content record, which will be verified by CRC64. 620 621 output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, ts), 622 new BytesWritable(Bytes.toBytes(crc64))); 623 output.write( 624 new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, ts), 625 new BytesWritable(Bytes.toBytes(content.length))); 626 output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, ts), 627 new BytesWritable(Bytes.toBytes(contentType))); 628 output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, ts), 629 new BytesWritable(Bytes.toBytes(recordID))); 630 output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, ts), 631 new BytesWritable(Bytes.toBytes(targetURI))); 632 output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, DATE_QUALIFIER, ts), 633 new BytesWritable(Bytes.toBytes(warcHeader.getDateString()))); 634 if (ipAddr != null) { 635 output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts), 636 new BytesWritable(Bytes.toBytes(ipAddr))); 637 } 638 } 639 } 640 } 641 642 private byte[] rowKeyFromTargetURI(final String targetUri) 643 throws URISyntaxException, IllegalArgumentException { 644 final URI uri = new URI(targetUri); 645 // Ignore the scheme 646 // Reverse the components of the hostname 647 String reversedHost; 648 if (uri.getHost() != null) { 649 final StringBuilder sb = new StringBuilder(); 650 final String[] hostComponents = uri.getHost().split("\\."); 651 for (int i = hostComponents.length - 1; i >= 0; i--) { 652 sb.append(hostComponents[i]); 653 if (i != 0) { 654 sb.append('.'); 655 } 656 } 657 reversedHost = sb.toString(); 658 } else { 659 throw new IllegalArgumentException("URI is missing host component"); 660 } 661 final StringBuilder sb = new StringBuilder(); 662 sb.append(reversedHost); 663 if (uri.getPort() >= 0) { 664 sb.append(':'); 665 sb.append(uri.getPort()); 666 } 667 if (uri.getPath() != null) { 668 sb.append('/'); 669 sb.append(uri.getPath()); 670 } 671 if (uri.getQuery() != null) { 672 sb.append('?'); 673 sb.append(uri.getQuery()); 674 } 675 if (uri.getFragment() != null) { 676 sb.append('#'); 677 sb.append(uri.getFragment()); 678 } 679 if (sb.length() > HConstants.MAX_ROW_LENGTH) { 680 throw new IllegalArgumentException("Key would be too large (length=" + sb.length() 681 + ", limit=" + HConstants.MAX_ROW_LENGTH); 682 } 683 return Bytes.toBytes(sb.toString()); 684 } 685 686 } 687 } 688 689 public static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> { 690 @Override 691 protected boolean isSplitable(final JobContext context, final Path filename) { 692 return false; 693 } 694 } 695 696 public static class Verify extends Configured implements Tool { 697 698 public static final Logger LOG = LoggerFactory.getLogger(Verify.class); 699 public static final String USAGE = "Verify <inputDir>"; 700 701 int run(final Path inputDir) throws IOException, ClassNotFoundException, InterruptedException { 702 Job job = Job.getInstance(getConf()); 703 job.setJobName(Verify.class.getName()); 704 job.setJarByClass(getClass()); 705 job.setMapperClass(VerifyMapper.class); 706 job.setInputFormatClass(OneFilePerMapperSFIF.class); 707 FileInputFormat.setInputPaths(job, inputDir); 708 job.setOutputFormatClass(NullOutputFormat.class); 709 job.setOutputKeyClass(NullWritable.class); 710 job.setOutputValueClass(NullWritable.class); 711 TableMapReduceUtil.addDependencyJars(job); 712 boolean success = job.waitForCompletion(true); 713 if (!success) { 714 LOG.error("Failure during job " + job.getJobID()); 715 } 716 final Counters counters = job.getCounters(); 717 for (Counts c : Counts.values()) { 718 LOG.info(c + ": " + counters.findCounter(c).getValue()); 719 } 720 if (counters.findCounter(Counts.UNREFERENCED).getValue() > 0) { 721 LOG.error("Nonzero UNREFERENCED count from job " + job.getJobID()); 722 success = false; 723 } 724 if (counters.findCounter(Counts.CORRUPT).getValue() > 0) { 725 LOG.error("Nonzero CORRUPT count from job " + job.getJobID()); 726 success = false; 727 } 728 return success ? 0 : 1; 729 } 730 731 @Override 732 public int run(String[] args) throws Exception { 733 if (args.length < 2) { 734 System.err.println(USAGE); 735 return 1; 736 } 737 Path loaderOutput = new Path(args[0]); 738 return run(loaderOutput); 739 } 740 741 public static void main(String[] args) throws Exception { 742 System.exit(ToolRunner.run(HBaseConfiguration.create(), new Verify(), args)); 743 } 744 745 public static class VerifyMapper 746 extends Mapper<HBaseKeyWritable, BytesWritable, NullWritable, NullWritable> { 747 748 private Connection conn; 749 private Table table; 750 751 @Override 752 protected void setup(final Context context) throws IOException, InterruptedException { 753 conn = ConnectionFactory.createConnection(context.getConfiguration()); 754 table = conn.getTable(getTablename(conn.getConfiguration())); 755 } 756 757 @Override 758 protected void cleanup(final Context context) throws IOException, InterruptedException { 759 try { 760 table.close(); 761 } catch (Exception e) { 762 LOG.warn("Exception closing Table", e); 763 } 764 try { 765 conn.close(); 766 } catch (Exception e) { 767 LOG.warn("Exception closing Connection", e); 768 } 769 } 770 771 @Override 772 protected void map(final HBaseKeyWritable key, final BytesWritable value, 773 final Context output) throws IOException, InterruptedException { 774 final byte[] row = Bytes.copy(key.getRowArray(), key.getRowOffset(), key.getRowLength()); 775 final byte[] family = 776 Bytes.copy(key.getFamilyArray(), key.getFamilyOffset(), key.getFamilyLength()); 777 final byte[] qualifier = 778 Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(), key.getQualifierLength()); 779 final long ts = key.getTimestamp(); 780 781 int retries = VERIFICATION_READ_RETRIES; 782 while (true) { 783 784 if (Bytes.equals(INFO_FAMILY_NAME, family) && Bytes.equals(CRC_QUALIFIER, qualifier)) { 785 786 final long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength()); 787 final Result result = 788 table.get(new Get(row).addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER) 789 .addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER).setTimestamp(ts)); 790 final byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER); 791 if (content == null) { 792 if (retries-- > 0) { 793 continue; 794 } 795 LOG.error("Row " + Bytes.toStringBinary(row) + ": missing content"); 796 output.getCounter(Counts.UNREFERENCED).increment(1); 797 return; 798 } else { 799 final CRC64 crc = new CRC64(); 800 crc.update(content); 801 if (crc.getValue() != expectedCRC64) { 802 LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content"); 803 output.getCounter(Counts.CORRUPT).increment(1); 804 return; 805 } 806 } 807 final byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER); 808 if (crc == null) { 809 if (retries-- > 0) { 810 continue; 811 } 812 LOG.error("Row " + Bytes.toStringBinary(row) + ": missing i:c"); 813 output.getCounter(Counts.UNREFERENCED).increment(1); 814 return; 815 } 816 if (Bytes.toLong(crc) != expectedCRC64) { 817 if (retries-- > 0) { 818 continue; 819 } 820 LOG.error("Row " + Bytes.toStringBinary(row) + ": i:c mismatch"); 821 output.getCounter(Counts.CORRUPT).increment(1); 822 return; 823 } 824 825 } else { 826 827 final Result result = 828 table.get(new Get(row).addColumn(family, qualifier).setTimestamp(ts)); 829 final byte[] bytes = result.getValue(family, qualifier); 830 if (bytes == null) { 831 if (retries-- > 0) { 832 continue; 833 } 834 LOG.error("Row " + Bytes.toStringBinary(row) + ": missing " 835 + Bytes.toStringBinary(family) + ":" + Bytes.toStringBinary(qualifier)); 836 output.getCounter(Counts.UNREFERENCED).increment(1); 837 return; 838 } 839 if (!Bytes.equals(bytes, 0, bytes.length, value.getBytes(), 0, value.getLength())) { 840 if (retries-- > 0) { 841 continue; 842 } 843 LOG.error("Row " + Bytes.toStringBinary(row) + ": " + Bytes.toStringBinary(family) 844 + ":" + Bytes.toStringBinary(qualifier) + " mismatch"); 845 output.getCounter(Counts.CORRUPT).increment(1); 846 return; 847 } 848 849 } 850 851 // If we fell through to here all verification checks have succeeded, potentially after 852 // retries, and we must exit the while loop. 853 output.getCounter(Counts.REFERENCED).increment(1); 854 break; 855 856 } 857 } 858 } 859 } 860 861 private static final AtomicLong counter = new AtomicLong(); 862 863 private static long getCurrentTime() { 864 // Typical hybrid logical clock scheme. 865 // Take the current time, shift by 16 bits and zero those bits, and replace those bits 866 // with the low 16 bits of the atomic counter. Mask off the high bit too because timestamps 867 // cannot be negative. 868 return ((EnvironmentEdgeManager.currentTime() << 16) & 0x7fff_ffff_ffff_0000L) 869 | (counter.getAndIncrement() & 0xffffL); 870 } 871 872}