001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
021import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
022import static org.mockito.Mockito.mock;
023import static org.mockito.Mockito.when;
024
025import java.io.IOException;
026import java.util.Collections;
027import java.util.Optional;
028import java.util.Queue;
029import java.util.concurrent.ConcurrentLinkedQueue;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.TimeUnit;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HRegionInfo;
038import org.apache.hadoop.hbase.HTableDescriptor;
039import org.apache.hadoop.hbase.RegionLocations;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.AsyncClusterConnection;
042import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.RegionLocator;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
047import org.apache.hadoop.hbase.coprocessor.ObserverContext;
048import org.apache.hadoop.hbase.coprocessor.WALCoprocessor;
049import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
050import org.apache.hadoop.hbase.coprocessor.WALObserver;
051import org.apache.hadoop.hbase.regionserver.HRegionServer;
052import org.apache.hadoop.hbase.regionserver.Region;
053import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
054import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
055import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
056import org.apache.hadoop.hbase.security.User;
057import org.apache.hadoop.hbase.testclassification.MediumTests;
058import org.apache.hadoop.hbase.testclassification.ReplicationTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
061import org.apache.hadoop.hbase.wal.WAL.Entry;
062import org.apache.hadoop.hbase.wal.WALEdit;
063import org.apache.hadoop.hbase.wal.WALKey;
064import org.apache.hadoop.hbase.wal.WALKeyImpl;
065import org.junit.After;
066import org.junit.AfterClass;
067import org.junit.Assert;
068import org.junit.Before;
069import org.junit.BeforeClass;
070import org.junit.ClassRule;
071import org.junit.Test;
072import org.junit.experimental.categories.Category;
073
074import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
075
076/**
077 * Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this
078 * class contains lower level tests using callables.
079 */
080@Category({ReplicationTests.class, MediumTests.class})
081public class TestRegionReplicaReplicationEndpointNoMaster {
082
083  @ClassRule
084  public static final HBaseClassTestRule CLASS_RULE =
085      HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpointNoMaster.class);
086
087  private static final int NB_SERVERS = 2;
088  private static TableName tableName = TableName.valueOf(
089    TestRegionReplicaReplicationEndpointNoMaster.class.getSimpleName());
090  private static Table table;
091  private static final byte[] row = Bytes.toBytes("TestRegionReplicaReplicator");
092
093  private static HRegionServer rs0;
094  private static HRegionServer rs1;
095
096  private static RegionInfo hriPrimary;
097  private static HRegionInfo hriSecondary;
098
099  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
100  private static final byte[] f = HConstants.CATALOG_FAMILY;
101
102  @BeforeClass
103  public static void beforeClass() throws Exception {
104    Configuration conf = HTU.getConfiguration();
105    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
106    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
107
108    // install WALObserver coprocessor for tests
109    String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);
110    if (walCoprocs == null) {
111      walCoprocs = WALEditCopro.class.getName();
112    } else {
113      walCoprocs += "," + WALEditCopro.class.getName();
114    }
115    HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
116      walCoprocs);
117    HTU.startMiniCluster(NB_SERVERS);
118
119    // Create table then get the single region for our new table.
120    HTableDescriptor htd = HTU.createTableDescriptor(tableName.getNameAsString());
121    table = HTU.createTable(htd, new byte[][]{f}, null);
122
123    try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) {
124      hriPrimary = locator.getRegionLocation(row, false).getRegion();
125    }
126
127    // mock a secondary region info to open
128    hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
129        hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
130
131    // No master
132    TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
133    rs0 = HTU.getMiniHBaseCluster().getRegionServer(0);
134    rs1 = HTU.getMiniHBaseCluster().getRegionServer(1);
135  }
136
137  @AfterClass
138  public static void afterClass() throws Exception {
139    table.close();
140    HTU.shutdownMiniCluster();
141  }
142
143  @Before
144  public void before() throws Exception{
145    entries.clear();
146  }
147
148  @After
149  public void after() throws Exception {
150  }
151
152  static ConcurrentLinkedQueue<Entry> entries = new ConcurrentLinkedQueue<>();
153
154  public static class WALEditCopro implements WALCoprocessor, WALObserver {
155    public WALEditCopro() {
156      entries.clear();
157    }
158
159    @Override
160    public Optional<WALObserver> getWALObserver() {
161      return Optional.of(this);
162    }
163
164    @Override
165    public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
166                             RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
167      // only keep primary region's edits
168      if (logKey.getTableName().equals(tableName) && info.getReplicaId() == 0) {
169        // Presume type is a WALKeyImpl
170        entries.add(new Entry((WALKeyImpl)logKey, logEdit));
171      }
172    }
173  }
174
175  @Test
176  public void testReplayCallable() throws Exception {
177    // tests replaying the edits to a secondary region replica using the Callable directly
178    openRegion(HTU, rs0, hriSecondary);
179
180    // load some data to primary
181    HTU.loadNumericRows(table, f, 0, 1000);
182
183    Assert.assertEquals(1000, entries.size());
184    try (AsyncClusterConnection conn = ClusterConnectionFactory
185      .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
186      // replay the edits to the secondary using replay callable
187      replicateUsingCallable(conn, entries);
188    }
189
190    Region region = rs0.getRegion(hriSecondary.getEncodedName());
191    HTU.verifyNumericRows(region, f, 0, 1000);
192
193    HTU.deleteNumericRows(table, f, 0, 1000);
194    closeRegion(HTU, rs0, hriSecondary);
195  }
196
197  private void replicateUsingCallable(AsyncClusterConnection connection, Queue<Entry> entries)
198      throws IOException, ExecutionException, InterruptedException {
199    Entry entry;
200    while ((entry = entries.poll()) != null) {
201      byte[] row = CellUtil.cloneRow(entry.getEdit().getCells().get(0));
202      RegionLocations locations = connection.getRegionLocations(tableName, row, true).get();
203      connection
204        .replay(tableName, locations.getRegionLocation(1).getRegion().getEncodedNameAsBytes(), row,
205          Collections.singletonList(entry), 1, Integer.MAX_VALUE, TimeUnit.SECONDS.toNanos(10))
206        .get();
207    }
208  }
209
210  @Test
211  public void testReplayCallableWithRegionMove() throws Exception {
212    // tests replaying the edits to a secondary region replica using the Callable directly while
213    // the region is moved to another location.It tests handling of RME.
214    try (AsyncClusterConnection conn = ClusterConnectionFactory
215      .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
216      openRegion(HTU, rs0, hriSecondary);
217      // load some data to primary
218      HTU.loadNumericRows(table, f, 0, 1000);
219
220      Assert.assertEquals(1000, entries.size());
221
222      // replay the edits to the secondary using replay callable
223      replicateUsingCallable(conn, entries);
224
225      Region region = rs0.getRegion(hriSecondary.getEncodedName());
226      HTU.verifyNumericRows(region, f, 0, 1000);
227
228      HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
229
230      // move the secondary region from RS0 to RS1
231      closeRegion(HTU, rs0, hriSecondary);
232      openRegion(HTU, rs1, hriSecondary);
233
234      // replicate the new data
235      replicateUsingCallable(conn, entries);
236
237      region = rs1.getRegion(hriSecondary.getEncodedName());
238      // verify the new data. old data may or may not be there
239      HTU.verifyNumericRows(region, f, 1000, 2000);
240
241      HTU.deleteNumericRows(table, f, 0, 2000);
242      closeRegion(HTU, rs1, hriSecondary);
243    }
244  }
245
246  @Test
247  public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
248    // tests replaying the edits to a secondary region replica using the RRRE.replicate()
249    openRegion(HTU, rs0, hriSecondary);
250    RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
251
252    ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
253    when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
254    when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
255    when(context.getServer()).thenReturn(rs0);
256    when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors());
257    replicator.init(context);
258    replicator.startAsync();
259
260    //load some data to primary
261    HTU.loadNumericRows(table, f, 0, 1000);
262
263    Assert.assertEquals(1000, entries.size());
264    // replay the edits to the secondary using replay callable
265    final String fakeWalGroupId = "fakeWALGroup";
266    replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))
267        .setWalGroupId(fakeWalGroupId));
268    replicator.stop();
269    Region region = rs0.getRegion(hriSecondary.getEncodedName());
270    HTU.verifyNumericRows(region, f, 0, 1000);
271
272    HTU.deleteNumericRows(table, f, 0, 1000);
273    closeRegion(HTU, rs0, hriSecondary);
274  }
275}