001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022 023import java.io.IOException; 024import java.io.UncheckedIOException; 025import java.util.UUID; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 039import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; 040import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.apache.hadoop.hbase.testclassification.ReplicationTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 045import org.apache.hadoop.hbase.wal.WAL; 046import org.apache.hadoop.hbase.wal.WAL.Entry; 047import org.apache.hadoop.hbase.wal.WALFactory; 048import org.apache.hadoop.hbase.wal.WALProvider; 049import org.junit.AfterClass; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054 055/** 056 * Testcase for HBASE-20624. 057 */ 058@Category({ ReplicationTests.class, MediumTests.class }) 059public class TestRaceWhenCreatingReplicationSource { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestRaceWhenCreatingReplicationSource.class); 064 065 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 066 067 private static String PEER_ID = "1"; 068 069 private static TableName TABLE_NAME = TableName.valueOf("race"); 070 071 private static byte[] CF = Bytes.toBytes("CF"); 072 073 private static byte[] CQ = Bytes.toBytes("CQ"); 074 075 private static FileSystem FS; 076 077 private static Path LOG_PATH; 078 079 private static WALProvider.Writer WRITER; 080 081 private static volatile boolean NULL_UUID = true; 082 083 public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint { 084 085 private static final UUID PEER_UUID = UTIL.getRandomUUID(); 086 087 @Override 088 public UUID getPeerUUID() { 089 if (NULL_UUID) { 090 return null; 091 } else { 092 return PEER_UUID; 093 } 094 } 095 096 @Override 097 public boolean replicate(ReplicateContext replicateContext) { 098 synchronized (WRITER) { 099 try { 100 for (Entry entry : replicateContext.getEntries()) { 101 WRITER.append(entry); 102 } 103 WRITER.sync(false); 104 } catch (IOException e) { 105 throw new UncheckedIOException(e); 106 } 107 } 108 return true; 109 } 110 111 @Override 112 public void start() { 113 startAsync(); 114 } 115 116 @Override 117 public void stop() { 118 stopAsync(); 119 } 120 121 @Override 122 protected void doStart() { 123 notifyStarted(); 124 } 125 126 @Override 127 protected void doStop() { 128 notifyStopped(); 129 } 130 } 131 132 @BeforeClass 133 public static void setUpBeforeClass() throws Exception { 134 UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "multiwal"); 135 // make sure that we will create a new group for the table 136 UTIL.getConfiguration().setInt("hbase.wal.regiongrouping.numgroups", 8); 137 UTIL.startMiniCluster(3); 138 Path dir = UTIL.getDataTestDirOnTestFS(); 139 FS = UTIL.getTestFileSystem(); 140 LOG_PATH = new Path(dir, "replicated"); 141 WRITER = WALFactory.createWALWriter(FS, LOG_PATH, UTIL.getConfiguration()); 142 UTIL.getAdmin().addReplicationPeer(PEER_ID, 143 ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") 144 .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(), 145 true); 146 } 147 148 @AfterClass 149 public static void tearDownAfterClass() throws Exception { 150 UTIL.shutdownMiniCluster(); 151 } 152 153 @Test 154 public void testRace() throws Exception { 155 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 156 157 @Override 158 public boolean evaluate() throws Exception { 159 for (RegionServerThread t : UTIL.getMiniHBaseCluster().getRegionServerThreads()) { 160 ReplicationSource source = 161 (ReplicationSource) ((Replication) t.getRegionServer().getReplicationSourceService()) 162 .getReplicationManager().getSource(PEER_ID); 163 if (source == null || source.getReplicationEndpoint() == null) { 164 return false; 165 } 166 } 167 return true; 168 } 169 170 @Override 171 public String explainFailure() throws Exception { 172 return "Replication source has not been initialized yet"; 173 } 174 }); 175 UTIL.getAdmin().createTable( 176 TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder 177 .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); 178 UTIL.waitTableAvailable(TABLE_NAME); 179 try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { 180 table.put(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, Bytes.toBytes(1))); 181 } 182 NULL_UUID = false; 183 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 184 185 @Override 186 public boolean evaluate() throws Exception { 187 try (WAL.Reader reader = WALFactory.createReader(FS, LOG_PATH, UTIL.getConfiguration())) { 188 return reader.next() != null; 189 } catch (IOException e) { 190 return false; 191 } 192 } 193 194 @Override 195 public String explainFailure() throws Exception { 196 return "Replication has not catched up"; 197 } 198 }); 199 try (WAL.Reader reader = WALFactory.createReader(FS, LOG_PATH, UTIL.getConfiguration())) { 200 Cell cell = reader.next().getEdit().getCells().get(0); 201 assertEquals(1, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 202 assertArrayEquals(CF, CellUtil.cloneFamily(cell)); 203 assertArrayEquals(CQ, CellUtil.cloneQualifier(cell)); 204 assertEquals(1, 205 Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); 206 } 207 } 208}