001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.io.UncheckedIOException; 025import java.util.UUID; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 036import org.apache.hadoop.hbase.regionserver.HRegionServer; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 039import org.apache.hadoop.hbase.wal.WAL; 040import org.apache.hadoop.hbase.wal.WAL.Entry; 041import org.apache.hadoop.hbase.wal.WALFactory; 042import org.apache.hadoop.hbase.wal.WALProvider; 043import org.junit.After; 044import org.junit.AfterClass; 045import org.junit.BeforeClass; 046import org.junit.Rule; 047import org.junit.rules.TestName; 048 049/** 050 * Base class for testing serial replication. 051 */ 052public class SerialReplicationTestBase { 053 054 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 055 056 protected static String PEER_ID = "1"; 057 058 protected static byte[] CF = Bytes.toBytes("CF"); 059 060 protected static byte[] CQ = Bytes.toBytes("CQ"); 061 062 protected static FileSystem FS; 063 064 protected static Path LOG_DIR; 065 066 protected static WALProvider.Writer WRITER; 067 068 @Rule 069 public final TestName name = new TestName(); 070 071 protected Path logPath; 072 073 public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint { 074 075 private static final UUID PEER_UUID = UTIL.getRandomUUID(); 076 077 @Override 078 public UUID getPeerUUID() { 079 return PEER_UUID; 080 } 081 082 @Override 083 public boolean replicate(ReplicateContext replicateContext) { 084 synchronized (WRITER) { 085 try { 086 for (Entry entry : replicateContext.getEntries()) { 087 WRITER.append(entry); 088 } 089 WRITER.sync(false); 090 } catch (IOException e) { 091 throw new UncheckedIOException(e); 092 } 093 } 094 return true; 095 } 096 097 @Override 098 public void start() { 099 startAsync(); 100 } 101 102 @Override 103 public void stop() { 104 stopAsync(); 105 } 106 107 @Override 108 protected void doStart() { 109 notifyStarted(); 110 } 111 112 @Override 113 protected void doStop() { 114 notifyStopped(); 115 } 116 117 @Override 118 public boolean canReplicateToSameCluster() { 119 return true; 120 } 121 } 122 123 @BeforeClass 124 public static void setUpBeforeClass() throws Exception { 125 UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10); 126 UTIL.getConfiguration().setLong("replication.sleep.before.failover", 1000); 127 UTIL.getConfiguration().setLong("hbase.serial.replication.waiting.ms", 100); 128 UTIL.startMiniCluster(3); 129 // disable balancer 130 UTIL.getAdmin().balancerSwitch(false, true); 131 LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated"); 132 FS = UTIL.getTestFileSystem(); 133 FS.mkdirs(LOG_DIR); 134 } 135 136 @AfterClass 137 public static void tearDownAfterClass() throws Exception { 138 UTIL.shutdownMiniCluster(); 139 } 140 141 @After 142 public void tearDown() throws Exception { 143 Admin admin = UTIL.getAdmin(); 144 for (ReplicationPeerDescription pd : admin.listReplicationPeers()) { 145 admin.removeReplicationPeer(pd.getPeerId()); 146 } 147 rollAllWALs(); 148 if (WRITER != null) { 149 WRITER.close(); 150 WRITER = null; 151 } 152 } 153 154 protected static void moveRegion(RegionInfo region, HRegionServer rs) throws Exception { 155 UTIL.getAdmin().move(region.getEncodedNameAsBytes(), rs.getServerName()); 156 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 157 158 @Override 159 public boolean evaluate() throws Exception { 160 return rs.getRegion(region.getEncodedName()) != null; 161 } 162 163 @Override 164 public String explainFailure() throws Exception { 165 return region + " is still not on " + rs; 166 } 167 }); 168 } 169 170 protected static void rollAllWALs() throws Exception { 171 for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { 172 t.getRegionServer().getWalRoller().requestRollAll(); 173 } 174 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 175 176 @Override 177 public boolean evaluate() throws Exception { 178 return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream() 179 .map(RegionServerThread::getRegionServer).allMatch(HRegionServer::walRollRequestFinished); 180 } 181 182 @Override 183 public String explainFailure() throws Exception { 184 return "Log roll has not finished yet"; 185 } 186 }); 187 } 188 189 protected final void setupWALWriter() throws IOException { 190 logPath = new Path(LOG_DIR, name.getMethodName()); 191 WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration()); 192 } 193 194 protected final void waitUntilReplicationDone(int expectedEntries) throws Exception { 195 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 196 197 @Override 198 public boolean evaluate() throws Exception { 199 try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) { 200 int count = 0; 201 while (reader.next() != null) { 202 count++; 203 } 204 return count >= expectedEntries; 205 } catch (IOException e) { 206 return false; 207 } 208 } 209 210 @Override 211 public String explainFailure() throws Exception { 212 return "Not enough entries replicated"; 213 } 214 }); 215 } 216 217 protected final void enablePeerAndWaitUntilReplicationDone(int expectedEntries) throws Exception { 218 UTIL.getAdmin().enableReplicationPeer(PEER_ID); 219 waitUntilReplicationDone(expectedEntries); 220 } 221 222 protected final void addPeer(boolean enabled) throws IOException { 223 UTIL.getAdmin().addReplicationPeer(PEER_ID, 224 ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") 225 .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true) 226 .build(), 227 enabled); 228 } 229 230 protected final void checkOrder(int expectedEntries) throws IOException { 231 try (WAL.Reader reader = 232 WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { 233 long seqId = -1L; 234 int count = 0; 235 for (Entry entry;;) { 236 entry = reader.next(); 237 if (entry == null) { 238 break; 239 } 240 assertTrue( 241 "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(), 242 entry.getKey().getSequenceId() >= seqId); 243 seqId = entry.getKey().getSequenceId(); 244 count++; 245 } 246 assertEquals(expectedEntries, count); 247 } 248 } 249 250 protected final TableName createTable() throws IOException, InterruptedException { 251 TableName tableName = TableName.valueOf(name.getMethodName()); 252 UTIL.getAdmin().createTable( 253 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder 254 .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); 255 UTIL.waitTableAvailable(tableName); 256 return tableName; 257 } 258}