001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.io.UncheckedIOException; 025import java.util.UUID; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 036import org.apache.hadoop.hbase.regionserver.HRegionServer; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 039import org.apache.hadoop.hbase.wal.WAL; 040import org.apache.hadoop.hbase.wal.WAL.Entry; 041import org.apache.hadoop.hbase.wal.WALFactory; 042import org.apache.hadoop.hbase.wal.WALProvider; 043import org.junit.After; 044import org.junit.AfterClass; 045import org.junit.BeforeClass; 046import org.junit.Rule; 047import org.junit.rules.TestName; 048 049/** 050 * Base class for testing serial replication. 051 */ 052public class SerialReplicationTestBase { 053 054 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 055 056 protected static String PEER_ID = "1"; 057 058 protected static byte[] CF = Bytes.toBytes("CF"); 059 060 protected static byte[] CQ = Bytes.toBytes("CQ"); 061 062 protected static FileSystem FS; 063 064 protected static Path LOG_DIR; 065 066 protected static WALProvider.Writer WRITER; 067 068 @Rule 069 public final TestName name = new TestName(); 070 071 protected Path logPath; 072 073 public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint { 074 075 private static final UUID PEER_UUID = UTIL.getRandomUUID(); 076 077 @Override 078 public UUID getPeerUUID() { 079 return PEER_UUID; 080 } 081 082 @Override 083 public boolean replicate(ReplicateContext replicateContext) { 084 synchronized (WRITER) { 085 try { 086 for (Entry entry : replicateContext.getEntries()) { 087 WRITER.append(entry); 088 } 089 WRITER.sync(false); 090 } catch (IOException e) { 091 throw new UncheckedIOException(e); 092 } 093 } 094 return true; 095 } 096 097 @Override 098 public void start() { 099 startAsync(); 100 } 101 102 @Override 103 public void stop() { 104 stopAsync(); 105 } 106 107 @Override 108 protected void doStart() { 109 notifyStarted(); 110 } 111 112 @Override 113 protected void doStop() { 114 notifyStopped(); 115 } 116 } 117 118 @BeforeClass 119 public static void setUpBeforeClass() throws Exception { 120 UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10); 121 UTIL.getConfiguration().setLong("replication.sleep.before.failover", 1000); 122 UTIL.getConfiguration().setLong("hbase.serial.replication.waiting.ms", 100); 123 UTIL.startMiniCluster(3); 124 // disable balancer 125 UTIL.getAdmin().balancerSwitch(false, true); 126 LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated"); 127 FS = UTIL.getTestFileSystem(); 128 FS.mkdirs(LOG_DIR); 129 } 130 131 @AfterClass 132 public static void tearDownAfterClass() throws Exception { 133 UTIL.shutdownMiniCluster(); 134 } 135 136 @After 137 public void tearDown() throws Exception { 138 Admin admin = UTIL.getAdmin(); 139 for (ReplicationPeerDescription pd : admin.listReplicationPeers()) { 140 admin.removeReplicationPeer(pd.getPeerId()); 141 } 142 rollAllWALs(); 143 if (WRITER != null) { 144 WRITER.close(); 145 WRITER = null; 146 } 147 } 148 149 protected static void moveRegion(RegionInfo region, HRegionServer rs) throws Exception { 150 UTIL.getAdmin().move(region.getEncodedNameAsBytes(), rs.getServerName()); 151 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 152 153 @Override 154 public boolean evaluate() throws Exception { 155 return rs.getRegion(region.getEncodedName()) != null; 156 } 157 158 @Override 159 public String explainFailure() throws Exception { 160 return region + " is still not on " + rs; 161 } 162 }); 163 } 164 165 protected static void rollAllWALs() throws Exception { 166 for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { 167 t.getRegionServer().getWalRoller().requestRollAll(); 168 } 169 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 170 171 @Override 172 public boolean evaluate() throws Exception { 173 return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream() 174 .map(t -> t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished); 175 } 176 177 @Override 178 public String explainFailure() throws Exception { 179 return "Log roll has not finished yet"; 180 } 181 }); 182 } 183 184 protected final void setupWALWriter() throws IOException { 185 logPath = new Path(LOG_DIR, name.getMethodName()); 186 WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration()); 187 } 188 189 protected final void waitUntilReplicationDone(int expectedEntries) throws Exception { 190 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 191 192 @Override 193 public boolean evaluate() throws Exception { 194 try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) { 195 int count = 0; 196 while (reader.next() != null) { 197 count++; 198 } 199 return count >= expectedEntries; 200 } catch (IOException e) { 201 return false; 202 } 203 } 204 205 @Override 206 public String explainFailure() throws Exception { 207 return "Not enough entries replicated"; 208 } 209 }); 210 } 211 212 protected final void enablePeerAndWaitUntilReplicationDone(int expectedEntries) throws Exception { 213 UTIL.getAdmin().enableReplicationPeer(PEER_ID); 214 waitUntilReplicationDone(expectedEntries); 215 } 216 217 protected final void addPeer(boolean enabled) throws IOException { 218 UTIL.getAdmin().addReplicationPeer(PEER_ID, 219 ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") 220 .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true) 221 .build(), 222 enabled); 223 } 224 225 protected final void checkOrder(int expectedEntries) throws IOException { 226 try (WAL.Reader reader = 227 WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { 228 long seqId = -1L; 229 int count = 0; 230 for (Entry entry;;) { 231 entry = reader.next(); 232 if (entry == null) { 233 break; 234 } 235 assertTrue( 236 "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(), 237 entry.getKey().getSequenceId() >= seqId); 238 seqId = entry.getKey().getSequenceId(); 239 count++; 240 } 241 assertEquals(expectedEntries, count); 242 } 243 } 244 245 protected final TableName createTable() throws IOException, InterruptedException { 246 TableName tableName = TableName.valueOf(name.getMethodName()); 247 UTIL.getAdmin().createTable( 248 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder 249 .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); 250 UTIL.waitTableAvailable(tableName); 251 return tableName; 252 } 253}