001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce.replication;
019
020import java.io.IOException;
021import org.apache.hadoop.hbase.client.Get;
022import org.apache.hadoop.hbase.client.Result;
023import org.apache.hadoop.hbase.client.Scan;
024import org.apache.hadoop.hbase.client.Table;
025import org.apache.hadoop.hbase.util.Bytes;
026import org.apache.hadoop.mapreduce.Mapper;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031@InterfaceAudience.Private
032public class VerifyReplicationRecompareRunnable implements Runnable {
033
034  private static final Logger LOG =
035    LoggerFactory.getLogger(VerifyReplicationRecompareRunnable.class);
036
037  private final Mapper.Context context;
038  private final VerifyReplication.Verifier.Counters originalCounter;
039  private final String delimiter;
040  private final byte[] row;
041  private final Scan tableScan;
042  private final Table sourceTable;
043  private final Table replicatedTable;
044
045  private final int reCompareTries;
046  private final int sleepMsBeforeReCompare;
047  private final int reCompareBackoffExponent;
048  private final boolean verbose;
049
050  private Result sourceResult;
051  private Result replicatedResult;
052
053  public VerifyReplicationRecompareRunnable(Mapper.Context context, Result sourceResult,
054    Result replicatedResult, VerifyReplication.Verifier.Counters originalCounter, String delimiter,
055    Scan tableScan, Table sourceTable, Table replicatedTable, int reCompareTries,
056    int sleepMsBeforeReCompare, int reCompareBackoffExponent, boolean verbose) {
057    this.context = context;
058    this.sourceResult = sourceResult;
059    this.replicatedResult = replicatedResult;
060    this.originalCounter = originalCounter;
061    this.delimiter = delimiter;
062    this.tableScan = tableScan;
063    this.sourceTable = sourceTable;
064    this.replicatedTable = replicatedTable;
065    this.reCompareTries = reCompareTries;
066    this.sleepMsBeforeReCompare = sleepMsBeforeReCompare;
067    this.reCompareBackoffExponent = reCompareBackoffExponent;
068    this.verbose = verbose;
069    this.row = VerifyReplication.getRow(sourceResult, replicatedResult);
070  }
071
072  @Override
073  public void run() {
074    Get get = new Get(row);
075    get.setCacheBlocks(tableScan.getCacheBlocks());
076    get.setFilter(tableScan.getFilter());
077
078    int sleepMs = sleepMsBeforeReCompare;
079    int tries = 0;
080
081    while (++tries <= reCompareTries) {
082      context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).increment(1);
083
084      try {
085        Thread.sleep(sleepMs);
086      } catch (InterruptedException e) {
087        LOG.warn("Sleeping interrupted, incrementing bad rows and aborting");
088        incrementOriginalAndBadCounter();
089        context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1);
090        Thread.currentThread().interrupt();
091        return;
092      }
093
094      try {
095        if (fetchLatestRows(get) && matches(sourceResult, replicatedResult, null)) {
096          if (verbose) {
097            LOG.info("Good row key (with recompare): {}{}{}", delimiter, Bytes.toStringBinary(row),
098              delimiter);
099          }
100          context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).increment(1);
101          return;
102        } else {
103          context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1);
104        }
105      } catch (IOException e) {
106        context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1);
107        if (verbose) {
108          LOG.info("Got an exception during recompare for rowkey={}", Bytes.toStringBinary(row), e);
109        }
110      }
111
112      sleepMs = sleepMs * (2 ^ reCompareBackoffExponent);
113    }
114
115    LOG.error("{}, rowkey={}{}{}", originalCounter, delimiter, Bytes.toStringBinary(row),
116      delimiter);
117    incrementOriginalAndBadCounter();
118  }
119
120  public void fail() {
121    if (LOG.isDebugEnabled()) {
122      LOG.debug("Called fail on row={}", Bytes.toStringBinary(row));
123    }
124    incrementOriginalAndBadCounter();
125    context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1);
126  }
127
128  private boolean fetchLatestRows(Get get) throws IOException {
129    Result sourceResult = sourceTable.get(get);
130    Result replicatedResult = replicatedTable.get(get);
131
132    boolean sourceMatches = matches(sourceResult, this.sourceResult,
133      VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED);
134    boolean replicatedMatches = matches(replicatedResult, this.replicatedResult,
135      VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED);
136
137    this.sourceResult = sourceResult;
138    this.replicatedResult = replicatedResult;
139    return sourceMatches && replicatedMatches;
140  }
141
142  private boolean matches(Result original, Result updated,
143    VerifyReplication.Verifier.Counters failCounter) {
144    try {
145      Result.compareResults(original, updated);
146      return true;
147    } catch (Exception e) {
148      if (failCounter != null) {
149        context.getCounter(failCounter).increment(1);
150        if (LOG.isDebugEnabled()) {
151          LOG.debug("{} for rowkey={}", failCounter, Bytes.toStringBinary(row));
152        }
153      }
154      return false;
155    }
156  }
157
158  private void incrementOriginalAndBadCounter() {
159    context.getCounter(originalCounter).increment(1);
160    context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).increment(1);
161  }
162}