001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022 023import java.io.IOException; 024import java.io.UncheckedIOException; 025import java.util.UUID; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 038import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; 039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 040import org.apache.hadoop.hbase.testclassification.MediumTests; 041import org.apache.hadoop.hbase.testclassification.ReplicationTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 044import org.apache.hadoop.hbase.wal.WAL.Entry; 045import org.apache.hadoop.hbase.wal.WALFactory; 046import org.apache.hadoop.hbase.wal.WALProvider; 047import org.apache.hadoop.hbase.wal.WALStreamReader; 048import org.junit.jupiter.api.AfterAll; 049import org.junit.jupiter.api.BeforeAll; 050import org.junit.jupiter.api.Tag; 051import org.junit.jupiter.api.Test; 052 053/** 054 * Testcase for HBASE-20624. 055 */ 056@Tag(ReplicationTests.TAG) 057@Tag(MediumTests.TAG) 058public class TestRaceWhenCreatingReplicationSource { 059 060 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 061 062 private static String PEER_ID = "1"; 063 064 private static TableName TABLE_NAME = TableName.valueOf("race"); 065 066 private static byte[] CF = Bytes.toBytes("CF"); 067 068 private static byte[] CQ = Bytes.toBytes("CQ"); 069 070 private static FileSystem FS; 071 072 private static Path LOG_PATH; 073 074 private static WALProvider.Writer WRITER; 075 076 private static volatile boolean NULL_UUID = true; 077 078 public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint { 079 080 private static final UUID PEER_UUID = HBaseTestingUtil.getRandomUUID(); 081 082 @Override 083 public UUID getPeerUUID() { 084 if (NULL_UUID) { 085 return null; 086 } else { 087 return PEER_UUID; 088 } 089 } 090 091 @Override 092 public boolean replicate(ReplicateContext replicateContext) { 093 synchronized (WRITER) { 094 try { 095 for (Entry entry : replicateContext.getEntries()) { 096 WRITER.append(entry); 097 } 098 WRITER.sync(false); 099 } catch (IOException e) { 100 throw new UncheckedIOException(e); 101 } 102 } 103 return true; 104 } 105 106 @Override 107 public void start() { 108 startAsync(); 109 } 110 111 @Override 112 public void stop() { 113 stopAsync(); 114 } 115 116 @Override 117 protected void doStart() { 118 notifyStarted(); 119 } 120 121 @Override 122 protected void doStop() { 123 notifyStopped(); 124 } 125 126 @Override 127 public boolean canReplicateToSameCluster() { 128 return true; 129 } 130 } 131 132 @BeforeAll 133 public static void setUpBeforeClass() throws Exception { 134 UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "multiwal"); 135 // make sure that we will create a new group for the table 136 UTIL.getConfiguration().setInt("hbase.wal.regiongrouping.numgroups", 8); 137 UTIL.startMiniCluster(3); 138 Path dir = UTIL.getDataTestDirOnTestFS(); 139 FS = UTIL.getTestFileSystem(); 140 LOG_PATH = new Path(dir, "replicated"); 141 WRITER = WALFactory.createWALWriter(FS, LOG_PATH, UTIL.getConfiguration()); 142 UTIL.getAdmin().addReplicationPeer(PEER_ID, 143 ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") 144 .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(), 145 true); 146 } 147 148 @AfterAll 149 public static void tearDownAfterClass() throws Exception { 150 UTIL.shutdownMiniCluster(); 151 } 152 153 @Test 154 public void testRace() throws Exception { 155 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 156 157 @Override 158 public boolean evaluate() throws Exception { 159 for (RegionServerThread t : UTIL.getMiniHBaseCluster().getRegionServerThreads()) { 160 ReplicationSource source = 161 (ReplicationSource) ((Replication) t.getRegionServer().getReplicationSourceService()) 162 .getReplicationManager().getSource(PEER_ID); 163 if (source == null || source.getReplicationEndpoint() == null) { 164 return false; 165 } 166 } 167 return true; 168 } 169 170 @Override 171 public String explainFailure() throws Exception { 172 return "Replication source has not been initialized yet"; 173 } 174 }); 175 UTIL.getAdmin().createTable( 176 TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder 177 .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build()); 178 UTIL.waitTableAvailable(TABLE_NAME); 179 try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { 180 table.put(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, Bytes.toBytes(1))); 181 } 182 NULL_UUID = false; 183 UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 184 185 @Override 186 public boolean evaluate() throws Exception { 187 try (WALStreamReader reader = 188 WALFactory.createStreamReader(FS, LOG_PATH, UTIL.getConfiguration())) { 189 return reader.next() != null; 190 } catch (IOException e) { 191 return false; 192 } 193 } 194 195 @Override 196 public String explainFailure() throws Exception { 197 return "Replication has not catched up"; 198 } 199 }); 200 try (WALStreamReader reader = 201 WALFactory.createStreamReader(FS, LOG_PATH, UTIL.getConfiguration())) { 202 Cell cell = reader.next().getEdit().getCells().get(0); 203 assertEquals(1, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 204 assertArrayEquals(CF, CellUtil.cloneFamily(cell)); 205 assertArrayEquals(CQ, CellUtil.cloneQualifier(cell)); 206 assertEquals(1, 207 Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); 208 } 209 } 210}