001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce.replication; 019 020import java.io.IOException; 021import org.apache.hadoop.hbase.client.Get; 022import org.apache.hadoop.hbase.client.Result; 023import org.apache.hadoop.hbase.client.Scan; 024import org.apache.hadoop.hbase.client.Table; 025import org.apache.hadoop.hbase.util.Bytes; 026import org.apache.hadoop.mapreduce.Mapper; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031@InterfaceAudience.Private 032public class VerifyReplicationRecompareRunnable implements Runnable { 033 034 private static final Logger LOG = 035 LoggerFactory.getLogger(VerifyReplicationRecompareRunnable.class); 036 037 private final Mapper.Context context; 038 private final VerifyReplication.Verifier.Counters originalCounter; 039 private final String delimiter; 040 private final byte[] row; 041 private final Scan tableScan; 042 private final Table sourceTable; 043 private final Table replicatedTable; 044 045 private final int reCompareTries; 046 private final int sleepMsBeforeReCompare; 047 private final int reCompareBackoffExponent; 048 private final boolean verbose; 049 050 private Result sourceResult; 051 private Result replicatedResult; 052 053 public VerifyReplicationRecompareRunnable(Mapper.Context context, Result sourceResult, 054 Result replicatedResult, VerifyReplication.Verifier.Counters originalCounter, String delimiter, 055 Scan tableScan, Table sourceTable, Table replicatedTable, int reCompareTries, 056 int sleepMsBeforeReCompare, int reCompareBackoffExponent, boolean verbose) { 057 this.context = context; 058 this.sourceResult = sourceResult; 059 this.replicatedResult = replicatedResult; 060 this.originalCounter = originalCounter; 061 this.delimiter = delimiter; 062 this.tableScan = tableScan; 063 this.sourceTable = sourceTable; 064 this.replicatedTable = replicatedTable; 065 this.reCompareTries = reCompareTries; 066 this.sleepMsBeforeReCompare = sleepMsBeforeReCompare; 067 this.reCompareBackoffExponent = reCompareBackoffExponent; 068 this.verbose = verbose; 069 this.row = VerifyReplication.getRow(sourceResult, replicatedResult); 070 } 071 072 @Override 073 public void run() { 074 Get get = new Get(row); 075 get.setCacheBlocks(tableScan.getCacheBlocks()); 076 get.setFilter(tableScan.getFilter()); 077 078 int sleepMs = sleepMsBeforeReCompare; 079 int tries = 0; 080 081 while (++tries <= reCompareTries) { 082 context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).increment(1); 083 084 try { 085 Thread.sleep(sleepMs); 086 } catch (InterruptedException e) { 087 LOG.warn("Sleeping interrupted, incrementing bad rows and aborting"); 088 incrementOriginalAndBadCounter(); 089 context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1); 090 Thread.currentThread().interrupt(); 091 return; 092 } 093 094 try { 095 if (fetchLatestRows(get) && matches(sourceResult, replicatedResult, null)) { 096 if (verbose) { 097 LOG.info("Good row key (with recompare): {}{}{}", delimiter, Bytes.toStringBinary(row), 098 delimiter); 099 } 100 context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).increment(1); 101 return; 102 } else { 103 context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1); 104 } 105 } catch (IOException e) { 106 context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1); 107 if (verbose) { 108 LOG.info("Got an exception during recompare for rowkey={}", Bytes.toStringBinary(row), e); 109 } 110 } 111 112 sleepMs = sleepMs * (2 ^ reCompareBackoffExponent); 113 } 114 115 LOG.error("{}, rowkey={}{}{}", originalCounter, delimiter, Bytes.toStringBinary(row), 116 delimiter); 117 incrementOriginalAndBadCounter(); 118 } 119 120 public void fail() { 121 if (LOG.isDebugEnabled()) { 122 LOG.debug("Called fail on row={}", Bytes.toStringBinary(row)); 123 } 124 incrementOriginalAndBadCounter(); 125 context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1); 126 } 127 128 private boolean fetchLatestRows(Get get) throws IOException { 129 Result sourceResult = sourceTable.get(get); 130 Result replicatedResult = replicatedTable.get(get); 131 132 boolean sourceMatches = matches(sourceResult, this.sourceResult, 133 VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED); 134 boolean replicatedMatches = matches(replicatedResult, this.replicatedResult, 135 VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED); 136 137 this.sourceResult = sourceResult; 138 this.replicatedResult = replicatedResult; 139 return sourceMatches && replicatedMatches; 140 } 141 142 private boolean matches(Result original, Result updated, 143 VerifyReplication.Verifier.Counters failCounter) { 144 try { 145 Result.compareResults(original, updated); 146 return true; 147 } catch (Exception e) { 148 if (failCounter != null) { 149 context.getCounter(failCounter).increment(1); 150 if (LOG.isDebugEnabled()) { 151 LOG.debug("{} for rowkey={}", failCounter, Bytes.toStringBinary(row)); 152 } 153 } 154 return false; 155 } 156 } 157 158 private void incrementOriginalAndBadCounter() { 159 context.getCounter(originalCounter).increment(1); 160 context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).increment(1); 161 } 162}