001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.io.UncheckedIOException; 025import java.util.UUID; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 036import org.apache.hadoop.hbase.regionserver.HRegionServer; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 039import org.apache.hadoop.hbase.wal.WAL; 040import org.apache.hadoop.hbase.wal.WAL.Entry; 041import org.apache.hadoop.hbase.wal.WALFactory; 042import org.apache.hadoop.hbase.wal.WALProvider; 043import org.junit.After; 044import org.junit.AfterClass; 045import org.junit.BeforeClass; 046import org.junit.Rule; 047import org.junit.rules.TestName; 048 049/** 050 * Base class for testing serial replication. 051 */ 052public class SerialReplicationTestBase { 053 054 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 055 056 protected static String PEER_ID = "1"; 057 058 protected static byte[] CF = Bytes.toBytes("CF"); 059 060 protected static byte[] CQ = Bytes.toBytes("CQ"); 061 062 protected static FileSystem FS; 063 064 protected static Path LOG_DIR; 065 066 protected static WALProvider.Writer WRITER; 067 068 @Rule 069 public final TestName name = new TestName(); 070 071 protected Path logPath; 072 073 public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint { 074 075 private static final UUID PEER_UUID = UTIL.getRandomUUID(); 076 077 @Override 078 public UUID getPeerUUID() { 079 return PEER_UUID; 080 } 081 082 @Override 083 public boolean replicate(ReplicateContext replicateContext) { 084 synchronized (WRITER) { 085 try { 086 for (Entry entry : replicateContext.getEntries()) { 087 WRITER.append(entry); 088 } 089 WRITER.sync(false); 090 } catch (IOException e) { 091 throw new UncheckedIOException(e); 092 } 093 } 094 return true; 095 } 096 097 @Override 098 public void start() { 099 startAsync(); 100 } 101 102 @Override 103 public void stop() { 104 stopAsync(); 105 } 106 107 @Override 108 protected void doStart() { 109 notifyStarted(); 110 } 111 112 @Override 113 protected void doStop() { 114 notifyStopped(); 115 } 116 } 117 118 @BeforeClass 119 public static void setUpBeforeClass() throws Exception { 120 UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10); 121 UTIL.getConfiguration().setLong("replication.sleep.before.failover", 1000); 122 UTIL.getConfiguration().setLong("hbase.serial.replication.waiting.ms", 100); 123 UTIL.startMiniCluster(3); 124 // disable balancer 125 UTIL.getAdmin().balancerSwitch(false, true); 126 LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated"); 127 FS = UTIL.getTestFileSystem(); 128 FS.mkdirs(LOG_DIR); 129 } 130 131 @AfterClass 132 public static void tearDownAfterClass() throws Exception { 133 UTIL.shutdownMiniCluster(); 134 } 135 136 @After 137 public void tearDown() throws Exception { 138 Admin admin = UTIL.getAdmin(); 139 for (ReplicationPeerDescription pd : admin.listReplicationPeers()) { 140 admin.removeReplicationPeer(pd.getPeerId()); 141 } 142 rollAllWALs(); 143 if (WRITER != null) { 144 WRITER.close(); 145 WRITER = null; 146 } 147 } 148 149 protected static void moveRegion(RegionInfo region, HRegionServer rs) throws Exception { 150 UTIL.getAdmin().move(region.getEncodedNameAsBytes(), 151 Bytes.toBytes(rs.getServerName().getServerName())); 152 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 153 154 @Override 155 public boolean evaluate() throws Exception { 156 return rs.getRegion(region.getEncodedName()) != null; 157 } 158 159 @Override 160 public String explainFailure() throws Exception { 161 return region + " is still not on " + rs; 162 } 163 }); 164 } 165 166 protected static void rollAllWALs() throws Exception { 167 for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { 168 t.getRegionServer().getWalRoller().requestRollAll(); 169 } 170 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 171 172 @Override 173 public boolean evaluate() throws Exception { 174 return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream() 175 .map(t -> t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished); 176 } 177 178 @Override 179 public String explainFailure() throws Exception { 180 return "Log roll has not finished yet"; 181 } 182 }); 183 } 184 185 protected final void setupWALWriter() throws IOException { 186 logPath = new Path(LOG_DIR, name.getMethodName()); 187 WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration()); 188 } 189 190 protected final void waitUntilReplicationDone(int expectedEntries) throws Exception { 191 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 192 193 @Override 194 public boolean evaluate() throws Exception { 195 try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) { 196 int count = 0; 197 while (reader.next() != null) { 198 count++; 199 } 200 return count >= expectedEntries; 201 } catch (IOException e) { 202 return false; 203 } 204 } 205 206 @Override 207 public String explainFailure() throws Exception { 208 return "Not enough entries replicated"; 209 } 210 }); 211 } 212 213 protected final void enablePeerAndWaitUntilReplicationDone(int expectedEntries) throws Exception { 214 UTIL.getAdmin().enableReplicationPeer(PEER_ID); 215 waitUntilReplicationDone(expectedEntries); 216 } 217 218 protected final void addPeer(boolean enabled) throws IOException { 219 UTIL.getAdmin().addReplicationPeer(PEER_ID, 220 ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") 221 .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true) 222 .build(), 223 enabled); 224 } 225 226 protected final void checkOrder(int expectedEntries) throws IOException { 227 try (WAL.Reader reader = 228 WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { 229 long seqId = -1L; 230 int count = 0; 231 for (Entry entry;;) { 232 entry = reader.next(); 233 if (entry == null) { 234 break; 235 } 236 assertTrue( 237 "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(), 238 entry.getKey().getSequenceId() >= seqId); 239 seqId = entry.getKey().getSequenceId(); 240 count++; 241 } 242 assertEquals(expectedEntries, count); 243 } 244 } 245 246 protected final TableName createTable() throws IOException, InterruptedException { 247 TableName tableName = TableName.valueOf(name.getMethodName()); 248 UTIL.getAdmin().createTable( 249 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder 250 .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); 251 UTIL.waitTableAvailable(tableName); 252 return tableName; 253 } 254}