001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.List; 026 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.MiniHBaseCluster; 029import org.apache.hadoop.hbase.client.Get; 030import org.apache.hadoop.hbase.client.Put; 031import org.apache.hadoop.hbase.client.Result; 032import org.apache.hadoop.hbase.client.ResultScanner; 033import org.apache.hadoop.hbase.client.Scan; 034import org.apache.hadoop.hbase.testclassification.LargeTests; 035import org.apache.hadoop.hbase.testclassification.ReplicationTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.JVMClusterUtil; 038import org.junit.Before; 039import org.junit.ClassRule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042import org.junit.runner.RunWith; 043import org.junit.runners.Parameterized; 044import org.junit.runners.Parameterized.Parameter; 045import org.junit.runners.Parameterized.Parameters; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 050 051/** 052 * Test handling of changes to the number of a peer's regionservers. 053 */ 054@RunWith(Parameterized.class) 055@Category({ ReplicationTests.class, LargeTests.class }) 056public class TestReplicationChangingPeerRegionservers extends TestReplicationBase { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestReplicationChangingPeerRegionservers.class); 061 062 private static final Logger LOG = 063 LoggerFactory.getLogger(TestReplicationChangingPeerRegionservers.class); 064 065 @Parameter 066 public boolean serialPeer; 067 068 @Override 069 protected boolean isSerialPeer() { 070 return serialPeer; 071 } 072 073 @Parameters(name = "{index}: serialPeer={0}") 074 public static List<Boolean> parameters() { 075 return ImmutableList.of(true, false); 076 } 077 078 /** 079 * @throws java.lang.Exception 080 */ 081 @Before 082 public void setUp() throws Exception { 083 // Starting and stopping replication can make us miss new logs, 084 // rolling like this makes sure the most recent one gets added to the queue 085 for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster() 086 .getRegionServerThreads()) { 087 utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 088 } 089 utility1.deleteTableData(tableName); 090 // truncating the table will send one Delete per row to the slave cluster 091 // in an async fashion, which is why we cannot just call deleteTableData on 092 // utility2 since late writes could make it to the slave in some way. 093 // Instead, we truncate the first table and wait for all the Deletes to 094 // make it to the slave. 095 Scan scan = new Scan(); 096 int lastCount = 0; 097 for (int i = 0; i < NB_RETRIES; i++) { 098 if (i == NB_RETRIES - 1) { 099 fail("Waited too much time for truncate"); 100 } 101 ResultScanner scanner = htable2.getScanner(scan); 102 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); 103 scanner.close(); 104 if (res.length != 0) { 105 if (res.length < lastCount) { 106 i--; // Don't increment timeout if we make progress 107 } 108 lastCount = res.length; 109 LOG.info("Still got " + res.length + " rows"); 110 Thread.sleep(SLEEP_TIME); 111 } else { 112 break; 113 } 114 } 115 } 116 117 @Test 118 public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException { 119 LOG.info("testSimplePutDelete"); 120 MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster(); 121 int numRS = peerCluster.getRegionServerThreads().size(); 122 123 doPutTest(Bytes.toBytes(1)); 124 125 int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0; 126 peerCluster.stopRegionServer(rsToStop); 127 peerCluster.waitOnRegionServer(rsToStop); 128 129 // Sanity check 130 assertEquals(numRS - 1, peerCluster.getRegionServerThreads().size()); 131 132 doPutTest(Bytes.toBytes(2)); 133 134 peerCluster.startRegionServer(); 135 136 // Sanity check 137 assertEquals(numRS, peerCluster.getRegionServerThreads().size()); 138 139 doPutTest(Bytes.toBytes(3)); 140 } 141 142 private void doPutTest(byte[] row) throws IOException, InterruptedException { 143 Put put = new Put(row); 144 put.addColumn(famName, row, row); 145 146 if (htable1 == null) { 147 htable1 = utility1.getConnection().getTable(tableName); 148 } 149 150 htable1.put(put); 151 152 Get get = new Get(row); 153 for (int i = 0; i < NB_RETRIES; i++) { 154 if (i == NB_RETRIES - 1) { 155 fail("Waited too much time for put replication"); 156 } 157 Result res = htable2.get(get); 158 if (res.isEmpty()) { 159 LOG.info("Row not available"); 160 Thread.sleep(SLEEP_TIME); 161 } else { 162 assertArrayEquals(res.value(), row); 163 break; 164 } 165 } 166 } 167}