001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.fail; 023import java.io.IOException; 024import java.util.List; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.MiniHBaseCluster; 027import org.apache.hadoop.hbase.Waiter; 028import org.apache.hadoop.hbase.client.Get; 029import org.apache.hadoop.hbase.client.Put; 030import org.apache.hadoop.hbase.client.Result; 031import org.apache.hadoop.hbase.client.ResultScanner; 032import org.apache.hadoop.hbase.client.Scan; 033import org.apache.hadoop.hbase.testclassification.LargeTests; 034import org.apache.hadoop.hbase.testclassification.ReplicationTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.JVMClusterUtil; 037import org.junit.Before; 038import org.junit.ClassRule; 039import org.junit.Test; 040import org.junit.experimental.categories.Category; 041import org.junit.runner.RunWith; 042import org.junit.runners.Parameterized; 043import org.junit.runners.Parameterized.Parameter; 044import org.junit.runners.Parameterized.Parameters; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 048 049/** 050 * Test handling of changes to the number of a peer's regionservers. 051 */ 052@RunWith(Parameterized.class) 053@Category({ ReplicationTests.class, LargeTests.class }) 054public class TestReplicationChangingPeerRegionservers extends TestReplicationBase { 055 056 @ClassRule 057 public static final HBaseClassTestRule CLASS_RULE = 058 HBaseClassTestRule.forClass(TestReplicationChangingPeerRegionservers.class); 059 060 private static final Logger LOG = 061 LoggerFactory.getLogger(TestReplicationChangingPeerRegionservers.class); 062 063 @SuppressWarnings("checkstyle:VisibilityModifier") @Parameter 064 public boolean serialPeer; 065 066 @Override 067 protected boolean isSerialPeer() { 068 return serialPeer; 069 } 070 071 @Parameters(name = "{index}: serialPeer={0}") 072 public static List<Boolean> parameters() { 073 return ImmutableList.of(true, false); 074 } 075 076 @Before 077 public void setUp() throws Exception { 078 // Starting and stopping replication can make us miss new logs, 079 // rolling like this makes sure the most recent one gets added to the queue 080 for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster() 081 .getRegionServerThreads()) { 082 UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 083 } 084 UTIL1.deleteTableData(tableName); 085 // truncating the table will send one Delete per row to the slave cluster 086 // in an async fashion, which is why we cannot just call deleteTableData on 087 // utility2 since late writes could make it to the slave in some way. 088 // Instead, we truncate the first table and wait for all the Deletes to 089 // make it to the slave. 090 Scan scan = new Scan(); 091 int lastCount = 0; 092 for (int i = 0; i < NB_RETRIES; i++) { 093 if (i == NB_RETRIES - 1) { 094 fail("Waited too much time for truncate"); 095 } 096 ResultScanner scanner = htable2.getScanner(scan); 097 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); 098 scanner.close(); 099 if (res.length != 0) { 100 if (res.length < lastCount) { 101 i--; // Don't increment timeout if we make progress 102 } 103 lastCount = res.length; 104 LOG.info("Still got " + res.length + " rows"); 105 Thread.sleep(SLEEP_TIME); 106 } else { 107 break; 108 } 109 } 110 } 111 112 @Test 113 public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException { 114 LOG.info("testSimplePutDelete"); 115 MiniHBaseCluster peerCluster = UTIL2.getMiniHBaseCluster(); 116 // This test wants two RS's up. We only run one generally so add one. 117 peerCluster.startRegionServer(); 118 Waiter.waitFor(peerCluster.getConfiguration(), 30000, new Waiter.Predicate<Exception>() { 119 @Override public boolean evaluate() throws Exception { 120 return peerCluster.getLiveRegionServerThreads().size() > 1; 121 } 122 }); 123 int numRS = peerCluster.getRegionServerThreads().size(); 124 125 doPutTest(Bytes.toBytes(1)); 126 127 int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0; 128 peerCluster.stopRegionServer(rsToStop); 129 peerCluster.waitOnRegionServer(rsToStop); 130 131 // Sanity check 132 assertEquals(numRS - 1, peerCluster.getRegionServerThreads().size()); 133 134 doPutTest(Bytes.toBytes(2)); 135 136 peerCluster.startRegionServer(); 137 138 // Sanity check 139 assertEquals(numRS, peerCluster.getRegionServerThreads().size()); 140 141 doPutTest(Bytes.toBytes(3)); 142 } 143 144 private void doPutTest(byte[] row) throws IOException, InterruptedException { 145 Put put = new Put(row); 146 put.addColumn(famName, row, row); 147 148 if (htable1 == null) { 149 htable1 = UTIL1.getConnection().getTable(tableName); 150 } 151 152 htable1.put(put); 153 154 Get get = new Get(row); 155 for (int i = 0; i < NB_RETRIES; i++) { 156 if (i == NB_RETRIES - 1) { 157 fail("Waited too much time for put replication"); 158 } 159 Result res = htable2.get(get); 160 if (res.isEmpty()) { 161 LOG.info("Row not available"); 162 Thread.sleep(SLEEP_TIME); 163 } else { 164 assertArrayEquals(res.value(), row); 165 break; 166 } 167 } 168 } 169}