001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.fail;
023
024import java.io.IOException;
025import java.util.stream.Stream;
026import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
027import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
028import org.apache.hadoop.hbase.Waiter;
029import org.apache.hadoop.hbase.client.Get;
030import org.apache.hadoop.hbase.client.Put;
031import org.apache.hadoop.hbase.client.Result;
032import org.apache.hadoop.hbase.client.ResultScanner;
033import org.apache.hadoop.hbase.client.Scan;
034import org.apache.hadoop.hbase.testclassification.LargeTests;
035import org.apache.hadoop.hbase.testclassification.ReplicationTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.JVMClusterUtil;
038import org.junit.jupiter.api.BeforeEach;
039import org.junit.jupiter.api.Tag;
040import org.junit.jupiter.api.TestTemplate;
041import org.junit.jupiter.params.provider.Arguments;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * Test handling of changes to the number of a peer's regionservers.
047 */
048@Tag(ReplicationTests.TAG)
049@Tag(LargeTests.TAG)
050@HBaseParameterizedTestTemplate(name = "{index}: serialPeer={0}, syncPeer={1}")
051public class TestReplicationChangingPeerRegionservers extends TestReplicationBase {
052
053  private static final Logger LOG =
054    LoggerFactory.getLogger(TestReplicationChangingPeerRegionservers.class);
055
056  private boolean serialPeer;
057
058  private boolean syncPeer;
059
060  public TestReplicationChangingPeerRegionservers(boolean serialPeer, boolean syncPeer) {
061    this.serialPeer = serialPeer;
062    this.syncPeer = syncPeer;
063  }
064
065  @Override
066  protected boolean isSerialPeer() {
067    return serialPeer;
068  }
069
070  @Override
071  protected boolean isSyncPeer() {
072    return syncPeer;
073  }
074
075  public static Stream<Arguments> parameters() {
076    return Stream.of(Arguments.of(false, false), Arguments.of(false, true),
077      Arguments.of(true, false), Arguments.of(true, true));
078  }
079
080  @BeforeEach
081  public void setUp() throws Exception {
082    // Starting and stopping replication can make us miss new logs,
083    // rolling like this makes sure the most recent one gets added to the queue
084    for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster().getRegionServerThreads()) {
085      UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
086    }
087    UTIL1.deleteTableData(tableName);
088    // truncating the table will send one Delete per row to the slave cluster
089    // in an async fashion, which is why we cannot just call deleteTableData on
090    // utility2 since late writes could make it to the slave in some way.
091    // Instead, we truncate the first table and wait for all the Deletes to
092    // make it to the slave.
093    Scan scan = new Scan();
094    int lastCount = 0;
095    for (int i = 0; i < NB_RETRIES; i++) {
096      if (i == NB_RETRIES - 1) {
097        fail("Waited too much time for truncate");
098      }
099      ResultScanner scanner = htable2.getScanner(scan);
100      Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
101      scanner.close();
102      if (res.length != 0) {
103        if (res.length < lastCount) {
104          i--; // Don't increment timeout if we make progress
105        }
106        lastCount = res.length;
107        LOG.info("Still got " + res.length + " rows");
108        Thread.sleep(SLEEP_TIME);
109      } else {
110        break;
111      }
112    }
113  }
114
115  @TestTemplate
116  public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException {
117    LOG.info("testSimplePutDelete");
118    SingleProcessHBaseCluster peerCluster = UTIL2.getMiniHBaseCluster();
119    // This test wants two RS's up. We only run one generally so add one.
120    peerCluster.startRegionServer();
121    Waiter.waitFor(peerCluster.getConfiguration(), 30000, new Waiter.Predicate<Exception>() {
122      @Override
123      public boolean evaluate() throws Exception {
124        return peerCluster.getLiveRegionServerThreads().size() > 1;
125      }
126    });
127    int numRS = peerCluster.getRegionServerThreads().size();
128
129    doPutTest(Bytes.toBytes(1));
130
131    int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0;
132    peerCluster.stopRegionServer(rsToStop);
133    peerCluster.waitOnRegionServer(rsToStop);
134
135    // Sanity check
136    assertEquals(numRS - 1, peerCluster.getRegionServerThreads().size());
137
138    doPutTest(Bytes.toBytes(2));
139
140    peerCluster.startRegionServer();
141
142    // Sanity check
143    assertEquals(numRS, peerCluster.getRegionServerThreads().size());
144
145    doPutTest(Bytes.toBytes(3));
146  }
147
148  private void doPutTest(byte[] row) throws IOException, InterruptedException {
149    Put put = new Put(row);
150    put.addColumn(famName, row, row);
151
152    if (htable1 == null) {
153      htable1 = UTIL1.getConnection().getTable(tableName);
154    }
155
156    htable1.put(put);
157
158    Get get = new Get(row);
159    for (int i = 0; i < NB_RETRIES; i++) {
160      if (i == NB_RETRIES - 1) {
161        fail("Waited too much time for put replication");
162      }
163      Result res = htable2.get(get);
164      if (res.isEmpty()) {
165        LOG.info("Row not available");
166        Thread.sleep(SLEEP_TIME);
167      } else {
168        assertArrayEquals(res.value(), row);
169        break;
170      }
171    }
172  }
173}