001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.mockito.ArgumentMatchers.any;
022import static org.mockito.Mockito.lenient;
023import static org.mockito.Mockito.when;
024
025import java.io.IOException;
026import java.util.concurrent.ThreadLocalRandom;
027import org.apache.hadoop.hbase.KeyValue;
028import org.apache.hadoop.hbase.client.Get;
029import org.apache.hadoop.hbase.client.Result;
030import org.apache.hadoop.hbase.client.Scan;
031import org.apache.hadoop.hbase.client.Table;
032import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
033import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplicationRecompareRunnable;
034import org.apache.hadoop.hbase.testclassification.ReplicationTests;
035import org.apache.hadoop.hbase.testclassification.SmallTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.mapreduce.Counter;
038import org.apache.hadoop.mapreduce.Mapper;
039import org.apache.hadoop.mapreduce.counters.GenericCounter;
040import org.junit.jupiter.api.BeforeEach;
041import org.junit.jupiter.api.Tag;
042import org.junit.jupiter.api.Test;
043import org.junit.jupiter.api.extension.ExtendWith;
044import org.mockito.Mock;
045import org.mockito.junit.jupiter.MockitoExtension;
046
047@Tag(ReplicationTests.TAG)
048@Tag(SmallTests.TAG)
049@ExtendWith(MockitoExtension.class)
050public class TestVerifyReplicationRecompareRunnable {
051
052  @Mock
053  private Table sourceTable;
054
055  @Mock
056  private Table replicatedTable;
057
058  @Mock
059  private Mapper.Context context;
060
061  static Result genResult(int cols) {
062    KeyValue[] kvs = new KeyValue[cols];
063
064    for (int i = 0; i < cols; ++i) {
065      kvs[i] =
066        new KeyValue(genBytes(), genBytes(), genBytes(), System.currentTimeMillis(), genBytes());
067    }
068
069    return Result.create(kvs);
070  }
071
072  static byte[] genBytes() {
073    return Bytes.toBytes(ThreadLocalRandom.current().nextInt());
074  }
075
076  @BeforeEach
077  public void setUp() {
078    for (VerifyReplication.Verifier.Counters counter : VerifyReplication.Verifier.Counters
079      .values()) {
080      Counter emptyCounter = new GenericCounter(counter.name(), counter.name());
081      lenient().when(context.getCounter(counter)).thenReturn(emptyCounter);
082    }
083  }
084
085  @Test
086  public void itRecomparesGoodRow() throws IOException {
087    Result result = genResult(2);
088
089    when(sourceTable.get(any(Get.class))).thenReturn(result);
090    when(replicatedTable.get(any(Get.class))).thenReturn(result);
091
092    VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context,
093      genResult(5), null, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "",
094      new Scan(), sourceTable, replicatedTable, 3, 1, 0, true);
095
096    runnable.run();
097
098    assertEquals(0, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
099    assertEquals(0,
100      context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
101    assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
102    assertEquals(1,
103      context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue());
104    assertEquals(1,
105      context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue());
106    assertEquals(2, context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
107  }
108
109  @Test
110  public void itRecomparesBadRow() throws IOException {
111    Result replicatedResult = genResult(1);
112    when(sourceTable.get(any(Get.class))).thenReturn(genResult(5));
113    when(replicatedTable.get(any(Get.class))).thenReturn(replicatedResult);
114
115    VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context,
116      genResult(5), replicatedResult, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS,
117      "", new Scan(), sourceTable, replicatedTable, 1, 1, 0, true);
118
119    runnable.run();
120
121    assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
122    assertEquals(1,
123      context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
124    assertEquals(0, context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
125    assertEquals(1,
126      context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue());
127    assertEquals(0,
128      context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue());
129    assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
130  }
131
132  @Test
133  public void itHandlesExceptionOnRecompare() throws IOException {
134    when(sourceTable.get(any(Get.class))).thenThrow(new IOException("Error!"));
135    lenient().when(replicatedTable.get(any(Get.class))).thenReturn(genResult(5));
136
137    VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context,
138      genResult(5), null, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "",
139      new Scan(), sourceTable, replicatedTable, 1, 1, 0, true);
140
141    runnable.run();
142
143    assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
144    assertEquals(1,
145      context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
146    assertEquals(1,
147      context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue());
148    assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue());
149  }
150}