001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
021import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
022import static org.mockito.Mockito.mock;
023import static org.mockito.Mockito.when;
024
025import java.io.IOException;
026import java.util.Optional;
027import java.util.Queue;
028import java.util.concurrent.ConcurrentLinkedQueue;
029import java.util.concurrent.atomic.AtomicLong;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.HRegionInfo;
036import org.apache.hadoop.hbase.HTableDescriptor;
037import org.apache.hadoop.hbase.RegionLocations;
038import org.apache.hadoop.hbase.StartMiniClusterOption;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.ClusterConnection;
041import org.apache.hadoop.hbase.client.ConnectionFactory;
042import org.apache.hadoop.hbase.client.RegionInfo;
043import org.apache.hadoop.hbase.client.RegionLocator;
044import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
047import org.apache.hadoop.hbase.coprocessor.ObserverContext;
048import org.apache.hadoop.hbase.coprocessor.WALCoprocessor;
049import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
050import org.apache.hadoop.hbase.coprocessor.WALObserver;
051import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
052import org.apache.hadoop.hbase.regionserver.HRegionServer;
053import org.apache.hadoop.hbase.regionserver.Region;
054import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
055import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
056import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
057import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable;
058import org.apache.hadoop.hbase.testclassification.MediumTests;
059import org.apache.hadoop.hbase.testclassification.ReplicationTests;
060import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
061import org.apache.hadoop.hbase.wal.WAL.Entry;
062import org.apache.hadoop.hbase.wal.WALEdit;
063import org.apache.hadoop.hbase.wal.WALKey;
064import org.apache.hadoop.hbase.wal.WALKeyImpl;
065import org.junit.After;
066import org.junit.AfterClass;
067import org.junit.Assert;
068import org.junit.Before;
069import org.junit.BeforeClass;
070import org.junit.ClassRule;
071import org.junit.Test;
072import org.junit.experimental.categories.Category;
073
074import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
075
076import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
077
078/**
079 * Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this
080 * class contains lower level tests using callables.
081 */
082@Category({ReplicationTests.class, MediumTests.class})
083public class TestRegionReplicaReplicationEndpointNoMaster {
084
085  @ClassRule
086  public static final HBaseClassTestRule CLASS_RULE =
087      HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpointNoMaster.class);
088
089  private static final int NB_SERVERS = 2;
090  private static TableName tableName = TableName.valueOf(
091    TestRegionReplicaReplicationEndpointNoMaster.class.getSimpleName());
092  private static Table table;
093  private static final byte[] row = "TestRegionReplicaReplicator".getBytes();
094
095  private static HRegionServer rs0;
096  private static HRegionServer rs1;
097
098  private static HRegionInfo hriPrimary;
099  private static HRegionInfo hriSecondary;
100
101  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
102  private static final byte[] f = HConstants.CATALOG_FAMILY;
103
104  @BeforeClass
105  public static void beforeClass() throws Exception {
106    Configuration conf = HTU.getConfiguration();
107    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
108    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
109
110    // install WALObserver coprocessor for tests
111    String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);
112    if (walCoprocs == null) {
113      walCoprocs = WALEditCopro.class.getName();
114    } else {
115      walCoprocs += "," + WALEditCopro.class.getName();
116    }
117    HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
118      walCoprocs);
119    StartMiniClusterOption option = StartMiniClusterOption.builder().numAlwaysStandByMasters(1).
120        numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build();
121    HTU.startMiniCluster(option);
122
123    // Create table then get the single region for our new table.
124    HTableDescriptor htd = HTU.createTableDescriptor(tableName.getNameAsString());
125    table = HTU.createTable(htd, new byte[][]{f}, null);
126
127    try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) {
128      hriPrimary = locator.getRegionLocation(row, false).getRegionInfo();
129    }
130
131    // mock a secondary region info to open
132    hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
133        hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
134
135    // No master
136    TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
137    rs0 = HTU.getMiniHBaseCluster().getRegionServer(0);
138    rs1 = HTU.getMiniHBaseCluster().getRegionServer(1);
139  }
140
141  @AfterClass
142  public static void afterClass() throws Exception {
143    table.close();
144    HTU.shutdownMiniCluster();
145  }
146
147  @Before
148  public void before() throws Exception{
149    entries.clear();
150  }
151
152  @After
153  public void after() throws Exception {
154  }
155
156  static ConcurrentLinkedQueue<Entry> entries = new ConcurrentLinkedQueue<>();
157
158  public static class WALEditCopro implements WALCoprocessor, WALObserver {
159    public WALEditCopro() {
160      entries.clear();
161    }
162
163    @Override
164    public Optional<WALObserver> getWALObserver() {
165      return Optional.of(this);
166    }
167
168    @Override
169    public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
170                             RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
171      // only keep primary region's edits
172      if (logKey.getTableName().equals(tableName) && info.getReplicaId() == 0) {
173        // Presume type is a WALKeyImpl
174        entries.add(new Entry((WALKeyImpl)logKey, logEdit));
175      }
176    }
177  }
178
179  @Test
180  public void testReplayCallable() throws Exception {
181    // tests replaying the edits to a secondary region replica using the Callable directly
182    openRegion(HTU, rs0, hriSecondary);
183    ClusterConnection connection =
184        (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
185
186    //load some data to primary
187    HTU.loadNumericRows(table, f, 0, 1000);
188
189    Assert.assertEquals(1000, entries.size());
190    // replay the edits to the secondary using replay callable
191    replicateUsingCallable(connection, entries);
192
193    Region region = rs0.getRegion(hriSecondary.getEncodedName());
194    HTU.verifyNumericRows(region, f, 0, 1000);
195
196    HTU.deleteNumericRows(table, f, 0, 1000);
197    closeRegion(HTU, rs0, hriSecondary);
198    connection.close();
199  }
200
201  private void replicateUsingCallable(ClusterConnection connection, Queue<Entry> entries)
202      throws IOException, RuntimeException {
203    Entry entry;
204    while ((entry = entries.poll()) != null) {
205      byte[] row = CellUtil.cloneRow(entry.getEdit().getCells().get(0));
206      RegionLocations locations = connection.locateRegion(tableName, row, true, true);
207      RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
208        RpcControllerFactory.instantiate(connection.getConfiguration()),
209        table.getName(), locations.getRegionLocation(1),
210        locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry),
211        new AtomicLong());
212
213      RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(
214        connection.getConfiguration());
215      factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000);
216    }
217  }
218
219  @Test
220  public void testReplayCallableWithRegionMove() throws Exception {
221    // tests replaying the edits to a secondary region replica using the Callable directly while
222    // the region is moved to another location.It tests handling of RME.
223    openRegion(HTU, rs0, hriSecondary);
224    ClusterConnection connection =
225        (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
226    //load some data to primary
227    HTU.loadNumericRows(table, f, 0, 1000);
228
229    Assert.assertEquals(1000, entries.size());
230    // replay the edits to the secondary using replay callable
231    replicateUsingCallable(connection, entries);
232
233    Region region = rs0.getRegion(hriSecondary.getEncodedName());
234    HTU.verifyNumericRows(region, f, 0, 1000);
235
236    HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
237
238    // move the secondary region from RS0 to RS1
239    closeRegion(HTU, rs0, hriSecondary);
240    openRegion(HTU, rs1, hriSecondary);
241
242    // replicate the new data
243    replicateUsingCallable(connection, entries);
244
245    region = rs1.getRegion(hriSecondary.getEncodedName());
246    // verify the new data. old data may or may not be there
247    HTU.verifyNumericRows(region, f, 1000, 2000);
248
249    HTU.deleteNumericRows(table, f, 0, 2000);
250    closeRegion(HTU, rs1, hriSecondary);
251    connection.close();
252  }
253
254  @Test
255  public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
256    // tests replaying the edits to a secondary region replica using the RRRE.replicate()
257    openRegion(HTU, rs0, hriSecondary);
258    ClusterConnection connection =
259        (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
260    RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
261
262    ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
263    when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
264    when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
265
266    replicator.init(context);
267    replicator.startAsync();
268
269    //load some data to primary
270    HTU.loadNumericRows(table, f, 0, 1000);
271
272    Assert.assertEquals(1000, entries.size());
273    // replay the edits to the secondary using replay callable
274    final String fakeWalGroupId = "fakeWALGroup";
275    replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))
276        .setWalGroupId(fakeWalGroupId));
277
278    Region region = rs0.getRegion(hriSecondary.getEncodedName());
279    HTU.verifyNumericRows(region, f, 0, 1000);
280
281    HTU.deleteNumericRows(table, f, 0, 1000);
282    closeRegion(HTU, rs0, hriSecondary);
283    connection.close();
284  }
285}