001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.io.UncheckedIOException; 025import java.util.UUID; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 036import org.apache.hadoop.hbase.regionserver.HRegionServer; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 039import org.apache.hadoop.hbase.wal.WAL; 040import org.apache.hadoop.hbase.wal.WAL.Entry; 041import org.apache.hadoop.hbase.wal.WALFactory; 042import org.apache.hadoop.hbase.wal.WALProvider; 043import org.junit.After; 044import org.junit.AfterClass; 045import org.junit.BeforeClass; 046import org.junit.Rule; 047import org.junit.rules.TestName; 048 049/** 050 * Base class for testing serial replication. 051 */ 052public class SerialReplicationTestBase { 053 054 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 055 056 protected static String PEER_ID = "1"; 057 058 protected static byte[] CF = Bytes.toBytes("CF"); 059 060 protected static byte[] CQ = Bytes.toBytes("CQ"); 061 062 protected static FileSystem FS; 063 064 protected static Path LOG_DIR; 065 066 protected static WALProvider.Writer WRITER; 067 068 @Rule 069 public final TestName name = new TestName(); 070 071 protected Path logPath; 072 073 public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint { 074 075 private static final UUID PEER_UUID = UTIL.getRandomUUID(); 076 077 @Override 078 public UUID getPeerUUID() { 079 return PEER_UUID; 080 } 081 082 @Override 083 public boolean replicate(ReplicateContext replicateContext) { 084 synchronized (WRITER) { 085 try { 086 for (Entry entry : replicateContext.getEntries()) { 087 WRITER.append(entry); 088 } 089 WRITER.sync(false); 090 } catch (IOException e) { 091 throw new UncheckedIOException(e); 092 } 093 } 094 return true; 095 } 096 097 @Override 098 public void start() { 099 startAsync(); 100 } 101 102 @Override 103 public void stop() { 104 stopAsync(); 105 } 106 107 @Override 108 protected void doStart() { 109 notifyStarted(); 110 } 111 112 @Override 113 protected void doStop() { 114 notifyStopped(); 115 } 116 117 @Override 118 public boolean canReplicateToSameCluster() { 119 return true; 120 } 121 } 122 123 @BeforeClass 124 public static void setUpBeforeClass() throws Exception { 125 UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10); 126 UTIL.getConfiguration().setLong("replication.sleep.before.failover", 1000); 127 UTIL.getConfiguration().setLong("hbase.serial.replication.waiting.ms", 100); 128 UTIL.startMiniCluster(3); 129 // disable balancer 130 UTIL.getAdmin().balancerSwitch(false, true); 131 LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated"); 132 FS = UTIL.getTestFileSystem(); 133 FS.mkdirs(LOG_DIR); 134 } 135 136 @AfterClass 137 public static void tearDownAfterClass() throws Exception { 138 UTIL.shutdownMiniCluster(); 139 } 140 141 @After 142 public void tearDown() throws Exception { 143 Admin admin = UTIL.getAdmin(); 144 for (ReplicationPeerDescription pd : admin.listReplicationPeers()) { 145 admin.removeReplicationPeer(pd.getPeerId()); 146 } 147 rollAllWALs(); 148 if (WRITER != null) { 149 WRITER.close(); 150 WRITER = null; 151 } 152 } 153 154 protected static void moveRegion(RegionInfo region, HRegionServer rs) throws Exception { 155 UTIL.getAdmin().move(region.getEncodedNameAsBytes(), rs.getServerName()); 156 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 157 158 @Override 159 public boolean evaluate() throws Exception { 160 return rs.getRegion(region.getEncodedName()) != null; 161 } 162 163 @Override 164 public String explainFailure() throws Exception { 165 return region + " is still not on " + rs; 166 } 167 }); 168 } 169 170 protected static void rollAllWALs() throws Exception { 171 for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { 172 t.getRegionServer().getWalRoller().requestRollAll(); 173 } 174 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 175 176 @Override 177 public boolean evaluate() throws Exception { 178 return UTIL.getMiniHBaseCluster() 179 .getLiveRegionServerThreads() 180 .stream() 181 .map(RegionServerThread::getRegionServer) 182 .allMatch(HRegionServer::walRollRequestFinished); 183 } 184 185 @Override 186 public String explainFailure() throws Exception { 187 return "Log roll has not finished yet"; 188 } 189 }); 190 } 191 192 protected final void setupWALWriter() throws IOException { 193 logPath = new Path(LOG_DIR, name.getMethodName()); 194 WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration()); 195 } 196 197 protected final void waitUntilReplicationDone(int expectedEntries) throws Exception { 198 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 199 200 @Override 201 public boolean evaluate() throws Exception { 202 try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) { 203 int count = 0; 204 while (reader.next() != null) { 205 count++; 206 } 207 return count >= expectedEntries; 208 } catch (IOException e) { 209 return false; 210 } 211 } 212 213 @Override 214 public String explainFailure() throws Exception { 215 return "Not enough entries replicated"; 216 } 217 }); 218 } 219 220 protected final void enablePeerAndWaitUntilReplicationDone(int expectedEntries) throws Exception { 221 UTIL.getAdmin().enableReplicationPeer(PEER_ID); 222 waitUntilReplicationDone(expectedEntries); 223 } 224 225 protected final void addPeer(boolean enabled) throws IOException { 226 UTIL.getAdmin().addReplicationPeer(PEER_ID, 227 ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") 228 .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true) 229 .build(), 230 enabled); 231 } 232 233 protected final void checkOrder(int expectedEntries) throws IOException { 234 try (WAL.Reader reader = 235 WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { 236 long seqId = -1L; 237 int count = 0; 238 for (Entry entry;;) { 239 entry = reader.next(); 240 if (entry == null) { 241 break; 242 } 243 assertTrue( 244 "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(), 245 entry.getKey().getSequenceId() >= seqId); 246 seqId = entry.getKey().getSequenceId(); 247 count++; 248 } 249 assertEquals(expectedEntries, count); 250 } 251 } 252 253 protected final TableName createTable() throws IOException, InterruptedException { 254 TableName tableName = TableName.valueOf(name.getMethodName()); 255 UTIL.getAdmin().createTable( 256 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder 257 .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); 258 UTIL.waitTableAvailable(tableName); 259 return tableName; 260 } 261}