001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.MiniHBaseCluster; 027import org.apache.hadoop.hbase.client.Get; 028import org.apache.hadoop.hbase.client.Put; 029import org.apache.hadoop.hbase.client.Result; 030import org.apache.hadoop.hbase.client.ResultScanner; 031import org.apache.hadoop.hbase.client.Scan; 032import org.apache.hadoop.hbase.testclassification.LargeTests; 033import org.apache.hadoop.hbase.testclassification.ReplicationTests; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.JVMClusterUtil; 036import org.junit.Before; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Test handling of changes to the number of a peer's regionservers. 045 */ 046@Category({ReplicationTests.class, LargeTests.class}) 047public class TestReplicationChangingPeerRegionservers extends TestReplicationBase { 048 049 @ClassRule 050 public static final HBaseClassTestRule CLASS_RULE = 051 HBaseClassTestRule.forClass(TestReplicationChangingPeerRegionservers.class); 052 053 private static final Logger LOG = 054 LoggerFactory.getLogger(TestReplicationChangingPeerRegionservers.class); 055 056 /** 057 * @throws java.lang.Exception 058 */ 059 @Before 060 public void setUp() throws Exception { 061 // Starting and stopping replication can make us miss new logs, 062 // rolling like this makes sure the most recent one gets added to the queue 063 for (JVMClusterUtil.RegionServerThread r : 064 utility1.getHBaseCluster().getRegionServerThreads()) { 065 utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); 066 } 067 utility1.deleteTableData(tableName); 068 // truncating the table will send one Delete per row to the slave cluster 069 // in an async fashion, which is why we cannot just call deleteTableData on 070 // utility2 since late writes could make it to the slave in some way. 071 // Instead, we truncate the first table and wait for all the Deletes to 072 // make it to the slave. 073 Scan scan = new Scan(); 074 int lastCount = 0; 075 for (int i = 0; i < NB_RETRIES; i++) { 076 if (i == NB_RETRIES - 1) { 077 fail("Waited too much time for truncate"); 078 } 079 ResultScanner scanner = htable2.getScanner(scan); 080 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); 081 scanner.close(); 082 if (res.length != 0) { 083 if (res.length < lastCount) { 084 i--; // Don't increment timeout if we make progress 085 } 086 lastCount = res.length; 087 LOG.info("Still got " + res.length + " rows"); 088 Thread.sleep(SLEEP_TIME); 089 } else { 090 break; 091 } 092 } 093 } 094 095 @Test 096 public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException { 097 098 LOG.info("testSimplePutDelete"); 099 MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster(); 100 int numRS = peerCluster.getRegionServerThreads().size(); 101 102 doPutTest(Bytes.toBytes(1)); 103 104 int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0; 105 peerCluster.stopRegionServer(rsToStop); 106 peerCluster.waitOnRegionServer(rsToStop); 107 108 // Sanity check 109 assertEquals(numRS - 1, peerCluster.getRegionServerThreads().size()); 110 111 doPutTest(Bytes.toBytes(2)); 112 113 peerCluster.startRegionServer(); 114 115 // Sanity check 116 assertEquals(numRS, peerCluster.getRegionServerThreads().size()); 117 118 doPutTest(Bytes.toBytes(3)); 119 120 } 121 122 private void doPutTest(byte[] row) throws IOException, InterruptedException { 123 Put put = new Put(row); 124 put.addColumn(famName, row, row); 125 126 if (htable1 == null) { 127 htable1 = utility1.getConnection().getTable(tableName); 128 } 129 130 htable1.put(put); 131 132 Get get = new Get(row); 133 for (int i = 0; i < NB_RETRIES; i++) { 134 if (i == NB_RETRIES - 1) { 135 fail("Waited too much time for put replication"); 136 } 137 Result res = htable2.get(get); 138 if (res.isEmpty()) { 139 LOG.info("Row not available"); 140 Thread.sleep(SLEEP_TIME); 141 } else { 142 assertArrayEquals(res.value(), row); 143 break; 144 } 145 } 146 } 147}