001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.MiniHBaseCluster;
027import org.apache.hadoop.hbase.client.Get;
028import org.apache.hadoop.hbase.client.Put;
029import org.apache.hadoop.hbase.client.Result;
030import org.apache.hadoop.hbase.client.ResultScanner;
031import org.apache.hadoop.hbase.client.Scan;
032import org.apache.hadoop.hbase.testclassification.LargeTests;
033import org.apache.hadoop.hbase.testclassification.ReplicationTests;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.JVMClusterUtil;
036import org.junit.Before;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Test handling of changes to the number of a peer's regionservers.
045 */
046@Category({ReplicationTests.class, LargeTests.class})
047public class TestReplicationChangingPeerRegionservers extends TestReplicationBase {
048
049  @ClassRule
050  public static final HBaseClassTestRule CLASS_RULE =
051      HBaseClassTestRule.forClass(TestReplicationChangingPeerRegionservers.class);
052
053  private static final Logger LOG =
054      LoggerFactory.getLogger(TestReplicationChangingPeerRegionservers.class);
055
056  /**
057   * @throws java.lang.Exception
058   */
059  @Before
060  public void setUp() throws Exception {
061    // Starting and stopping replication can make us miss new logs,
062    // rolling like this makes sure the most recent one gets added to the queue
063    for (JVMClusterUtil.RegionServerThread r :
064                          utility1.getHBaseCluster().getRegionServerThreads()) {
065      utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
066    }
067    utility1.deleteTableData(tableName);
068    // truncating the table will send one Delete per row to the slave cluster
069    // in an async fashion, which is why we cannot just call deleteTableData on
070    // utility2 since late writes could make it to the slave in some way.
071    // Instead, we truncate the first table and wait for all the Deletes to
072    // make it to the slave.
073    Scan scan = new Scan();
074    int lastCount = 0;
075    for (int i = 0; i < NB_RETRIES; i++) {
076      if (i == NB_RETRIES - 1) {
077        fail("Waited too much time for truncate");
078      }
079      ResultScanner scanner = htable2.getScanner(scan);
080      Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
081      scanner.close();
082      if (res.length != 0) {
083        if (res.length < lastCount) {
084          i--; // Don't increment timeout if we make progress
085        }
086        lastCount = res.length;
087        LOG.info("Still got " + res.length + " rows");
088        Thread.sleep(SLEEP_TIME);
089      } else {
090        break;
091      }
092    }
093  }
094
095  @Test
096  public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException {
097
098    LOG.info("testSimplePutDelete");
099    MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster();
100    int numRS = peerCluster.getRegionServerThreads().size();
101
102    doPutTest(Bytes.toBytes(1));
103
104    int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0;
105    peerCluster.stopRegionServer(rsToStop);
106    peerCluster.waitOnRegionServer(rsToStop);
107
108    // Sanity check
109    assertEquals(numRS - 1, peerCluster.getRegionServerThreads().size());
110
111    doPutTest(Bytes.toBytes(2));
112
113    peerCluster.startRegionServer();
114
115    // Sanity check
116    assertEquals(numRS, peerCluster.getRegionServerThreads().size());
117
118    doPutTest(Bytes.toBytes(3));
119
120  }
121
122  private void doPutTest(byte[] row) throws IOException, InterruptedException {
123    Put put = new Put(row);
124    put.addColumn(famName, row, row);
125
126    if (htable1 == null) {
127      htable1 = utility1.getConnection().getTable(tableName);
128    }
129
130    htable1.put(put);
131
132    Get get = new Get(row);
133    for (int i = 0; i < NB_RETRIES; i++) {
134      if (i == NB_RETRIES - 1) {
135        fail("Waited too much time for put replication");
136      }
137      Result res = htable2.get(get);
138      if (res.isEmpty()) {
139        LOG.info("Row not available");
140        Thread.sleep(SLEEP_TIME);
141      } else {
142        assertArrayEquals(res.value(), row);
143        break;
144      }
145    }
146  }
147}