001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.fail;
023import java.io.IOException;
024import java.util.List;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.MiniHBaseCluster;
027import org.apache.hadoop.hbase.Waiter;
028import org.apache.hadoop.hbase.client.Get;
029import org.apache.hadoop.hbase.client.Put;
030import org.apache.hadoop.hbase.client.Result;
031import org.apache.hadoop.hbase.client.ResultScanner;
032import org.apache.hadoop.hbase.client.Scan;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.testclassification.ReplicationTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.JVMClusterUtil;
037import org.junit.Before;
038import org.junit.ClassRule;
039import org.junit.Test;
040import org.junit.experimental.categories.Category;
041import org.junit.runner.RunWith;
042import org.junit.runners.Parameterized;
043import org.junit.runners.Parameterized.Parameter;
044import org.junit.runners.Parameterized.Parameters;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
048
049/**
050 * Test handling of changes to the number of a peer's regionservers.
051 */
052@RunWith(Parameterized.class)
053@Category({ ReplicationTests.class, LargeTests.class })
054public class TestReplicationChangingPeerRegionservers extends TestReplicationBase {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058      HBaseClassTestRule.forClass(TestReplicationChangingPeerRegionservers.class);
059
060  private static final Logger LOG =
061      LoggerFactory.getLogger(TestReplicationChangingPeerRegionservers.class);
062
063  @SuppressWarnings("checkstyle:VisibilityModifier") @Parameter(0)
064  public boolean serialPeer;
065
066  @Parameter(1)
067  public boolean syncPeer;
068
069  @Override
070  protected boolean isSerialPeer() {
071    return serialPeer;
072  }
073
074  @Override
075  protected boolean isSyncPeer() {
076    return syncPeer;
077  }
078
079  @Parameters(name = "{index}: serialPeer={0}, syncPeer={1}")
080  public static List<Object[]> parameters() {
081    return ImmutableList.of(new Object[] { false, false }, new Object[] { false, true },
082      new Object[] { true, false }, new Object[] { true, true });
083  }
084
085  @Before
086  public void setUp() throws Exception {
087    // Starting and stopping replication can make us miss new logs,
088    // rolling like this makes sure the most recent one gets added to the queue
089    for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster()
090        .getRegionServerThreads()) {
091      UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
092    }
093    UTIL1.deleteTableData(tableName);
094    // truncating the table will send one Delete per row to the slave cluster
095    // in an async fashion, which is why we cannot just call deleteTableData on
096    // utility2 since late writes could make it to the slave in some way.
097    // Instead, we truncate the first table and wait for all the Deletes to
098    // make it to the slave.
099    Scan scan = new Scan();
100    int lastCount = 0;
101    for (int i = 0; i < NB_RETRIES; i++) {
102      if (i == NB_RETRIES - 1) {
103        fail("Waited too much time for truncate");
104      }
105      ResultScanner scanner = htable2.getScanner(scan);
106      Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
107      scanner.close();
108      if (res.length != 0) {
109        if (res.length < lastCount) {
110          i--; // Don't increment timeout if we make progress
111        }
112        lastCount = res.length;
113        LOG.info("Still got " + res.length + " rows");
114        Thread.sleep(SLEEP_TIME);
115      } else {
116        break;
117      }
118    }
119  }
120
121  @Test
122  public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException {
123    LOG.info("testSimplePutDelete");
124    MiniHBaseCluster peerCluster = UTIL2.getMiniHBaseCluster();
125    // This test wants two RS's up. We only run one generally so add one.
126    peerCluster.startRegionServer();
127    Waiter.waitFor(peerCluster.getConfiguration(), 30000, new Waiter.Predicate<Exception>() {
128      @Override public boolean evaluate() throws Exception {
129        return peerCluster.getLiveRegionServerThreads().size() > 1;
130      }
131    });
132    int numRS = peerCluster.getRegionServerThreads().size();
133
134    doPutTest(Bytes.toBytes(1));
135
136    int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0;
137    peerCluster.stopRegionServer(rsToStop);
138    peerCluster.waitOnRegionServer(rsToStop);
139
140    // Sanity check
141    assertEquals(numRS - 1, peerCluster.getRegionServerThreads().size());
142
143    doPutTest(Bytes.toBytes(2));
144
145    peerCluster.startRegionServer();
146
147    // Sanity check
148    assertEquals(numRS, peerCluster.getRegionServerThreads().size());
149
150    doPutTest(Bytes.toBytes(3));
151  }
152
153  private void doPutTest(byte[] row) throws IOException, InterruptedException {
154    Put put = new Put(row);
155    put.addColumn(famName, row, row);
156
157    if (htable1 == null) {
158      htable1 = UTIL1.getConnection().getTable(tableName);
159    }
160
161    htable1.put(put);
162
163    Get get = new Get(row);
164    for (int i = 0; i < NB_RETRIES; i++) {
165      if (i == NB_RETRIES - 1) {
166        fail("Waited too much time for put replication");
167      }
168      Result res = htable2.get(get);
169      if (res.isEmpty()) {
170        LOG.info("Row not available");
171        Thread.sleep(SLEEP_TIME);
172      } else {
173        assertArrayEquals(res.value(), row);
174        break;
175      }
176    }
177  }
178}