001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion; 021import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion; 022import static org.mockito.Mockito.mock; 023import static org.mockito.Mockito.when; 024 025import java.io.IOException; 026import java.util.Optional; 027import java.util.Queue; 028import java.util.concurrent.ConcurrentLinkedQueue; 029import java.util.concurrent.atomic.AtomicLong; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.CellUtil; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.HRegionInfo; 036import org.apache.hadoop.hbase.HTableDescriptor; 037import org.apache.hadoop.hbase.RegionLocations; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.ClusterConnection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.client.RegionInfo; 042import org.apache.hadoop.hbase.client.RegionLocator; 043import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 046import org.apache.hadoop.hbase.coprocessor.ObserverContext; 047import org.apache.hadoop.hbase.coprocessor.WALCoprocessor; 048import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment; 049import org.apache.hadoop.hbase.coprocessor.WALObserver; 050import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 051import org.apache.hadoop.hbase.regionserver.HRegionServer; 052import org.apache.hadoop.hbase.regionserver.Region; 053import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster; 054import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 055import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext; 056import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable; 057import org.apache.hadoop.hbase.testclassification.MediumTests; 058import org.apache.hadoop.hbase.testclassification.ReplicationTests; 059import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 060import org.apache.hadoop.hbase.wal.WAL.Entry; 061import org.apache.hadoop.hbase.wal.WALEdit; 062import org.apache.hadoop.hbase.wal.WALKey; 063import org.apache.hadoop.hbase.wal.WALKeyImpl; 064import org.junit.After; 065import org.junit.AfterClass; 066import org.junit.Assert; 067import org.junit.Before; 068import org.junit.BeforeClass; 069import org.junit.ClassRule; 070import org.junit.Test; 071import org.junit.experimental.categories.Category; 072 073import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 074 075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 076 077/** 078 * Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this 079 * class contains lower level tests using callables. 080 */ 081@Category({ReplicationTests.class, MediumTests.class}) 082public class TestRegionReplicaReplicationEndpointNoMaster { 083 084 @ClassRule 085 public static final HBaseClassTestRule CLASS_RULE = 086 HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpointNoMaster.class); 087 088 private static final int NB_SERVERS = 2; 089 private static TableName tableName = TableName.valueOf( 090 TestRegionReplicaReplicationEndpointNoMaster.class.getSimpleName()); 091 private static Table table; 092 private static final byte[] row = "TestRegionReplicaReplicator".getBytes(); 093 094 private static HRegionServer rs0; 095 private static HRegionServer rs1; 096 097 private static HRegionInfo hriPrimary; 098 private static HRegionInfo hriSecondary; 099 100 private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); 101 private static final byte[] f = HConstants.CATALOG_FAMILY; 102 103 @BeforeClass 104 public static void beforeClass() throws Exception { 105 Configuration conf = HTU.getConfiguration(); 106 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 107 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false); 108 109 // install WALObserver coprocessor for tests 110 String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY); 111 if (walCoprocs == null) { 112 walCoprocs = WALEditCopro.class.getName(); 113 } else { 114 walCoprocs += "," + WALEditCopro.class.getName(); 115 } 116 HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, 117 walCoprocs); 118 HTU.startMiniCluster(NB_SERVERS); 119 120 // Create table then get the single region for our new table. 121 HTableDescriptor htd = HTU.createTableDescriptor(tableName.getNameAsString()); 122 table = HTU.createTable(htd, new byte[][]{f}, null); 123 124 try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) { 125 hriPrimary = locator.getRegionLocation(row, false).getRegionInfo(); 126 } 127 128 // mock a secondary region info to open 129 hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(), 130 hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1); 131 132 // No master 133 TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU); 134 rs0 = HTU.getMiniHBaseCluster().getRegionServer(0); 135 rs1 = HTU.getMiniHBaseCluster().getRegionServer(1); 136 } 137 138 @AfterClass 139 public static void afterClass() throws Exception { 140 table.close(); 141 HTU.shutdownMiniCluster(); 142 } 143 144 @Before 145 public void before() throws Exception{ 146 entries.clear(); 147 } 148 149 @After 150 public void after() throws Exception { 151 } 152 153 static ConcurrentLinkedQueue<Entry> entries = new ConcurrentLinkedQueue<>(); 154 155 public static class WALEditCopro implements WALCoprocessor, WALObserver { 156 public WALEditCopro() { 157 entries.clear(); 158 } 159 160 @Override 161 public Optional<WALObserver> getWALObserver() { 162 return Optional.of(this); 163 } 164 165 @Override 166 public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx, 167 RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { 168 // only keep primary region's edits 169 if (logKey.getTableName().equals(tableName) && info.getReplicaId() == 0) { 170 // Presume type is a WALKeyImpl 171 entries.add(new Entry((WALKeyImpl)logKey, logEdit)); 172 } 173 } 174 } 175 176 @Test 177 public void testReplayCallable() throws Exception { 178 // tests replaying the edits to a secondary region replica using the Callable directly 179 openRegion(HTU, rs0, hriSecondary); 180 ClusterConnection connection = 181 (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration()); 182 183 //load some data to primary 184 HTU.loadNumericRows(table, f, 0, 1000); 185 186 Assert.assertEquals(1000, entries.size()); 187 // replay the edits to the secondary using replay callable 188 replicateUsingCallable(connection, entries); 189 190 Region region = rs0.getRegion(hriSecondary.getEncodedName()); 191 HTU.verifyNumericRows(region, f, 0, 1000); 192 193 HTU.deleteNumericRows(table, f, 0, 1000); 194 closeRegion(HTU, rs0, hriSecondary); 195 connection.close(); 196 } 197 198 private void replicateUsingCallable(ClusterConnection connection, Queue<Entry> entries) 199 throws IOException, RuntimeException { 200 Entry entry; 201 while ((entry = entries.poll()) != null) { 202 byte[] row = CellUtil.cloneRow(entry.getEdit().getCells().get(0)); 203 RegionLocations locations = connection.locateRegion(tableName, row, true, true); 204 RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection, 205 RpcControllerFactory.instantiate(connection.getConfiguration()), 206 table.getName(), locations.getRegionLocation(1), 207 locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry), 208 new AtomicLong()); 209 210 RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate( 211 connection.getConfiguration()); 212 factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000); 213 } 214 } 215 216 @Test 217 public void testReplayCallableWithRegionMove() throws Exception { 218 // tests replaying the edits to a secondary region replica using the Callable directly while 219 // the region is moved to another location.It tests handling of RME. 220 openRegion(HTU, rs0, hriSecondary); 221 ClusterConnection connection = 222 (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration()); 223 //load some data to primary 224 HTU.loadNumericRows(table, f, 0, 1000); 225 226 Assert.assertEquals(1000, entries.size()); 227 // replay the edits to the secondary using replay callable 228 replicateUsingCallable(connection, entries); 229 230 Region region = rs0.getRegion(hriSecondary.getEncodedName()); 231 HTU.verifyNumericRows(region, f, 0, 1000); 232 233 HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary 234 235 // move the secondary region from RS0 to RS1 236 closeRegion(HTU, rs0, hriSecondary); 237 openRegion(HTU, rs1, hriSecondary); 238 239 // replicate the new data 240 replicateUsingCallable(connection, entries); 241 242 region = rs1.getRegion(hriSecondary.getEncodedName()); 243 // verify the new data. old data may or may not be there 244 HTU.verifyNumericRows(region, f, 1000, 2000); 245 246 HTU.deleteNumericRows(table, f, 0, 2000); 247 closeRegion(HTU, rs1, hriSecondary); 248 connection.close(); 249 } 250 251 @Test 252 public void testRegionReplicaReplicationEndpointReplicate() throws Exception { 253 // tests replaying the edits to a secondary region replica using the RRRE.replicate() 254 openRegion(HTU, rs0, hriSecondary); 255 ClusterConnection connection = 256 (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration()); 257 RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint(); 258 259 ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class); 260 when(context.getConfiguration()).thenReturn(HTU.getConfiguration()); 261 when(context.getMetrics()).thenReturn(mock(MetricsSource.class)); 262 263 replicator.init(context); 264 replicator.startAsync(); 265 266 //load some data to primary 267 HTU.loadNumericRows(table, f, 0, 1000); 268 269 Assert.assertEquals(1000, entries.size()); 270 // replay the edits to the secondary using replay callable 271 final String fakeWalGroupId = "fakeWALGroup"; 272 replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries)) 273 .setWalGroupId(fakeWalGroupId)); 274 275 Region region = rs0.getRegion(hriSecondary.getEncodedName()); 276 HTU.verifyNumericRows(region, f, 0, 1000); 277 278 HTU.deleteNumericRows(table, f, 0, 1000); 279 closeRegion(HTU, rs0, hriSecondary); 280 connection.close(); 281 } 282}