001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertFalse;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.util.ArrayList;
027import java.util.List;
028import java.util.NavigableMap;
029import java.util.TreeMap;
030import java.util.stream.Stream;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Delete;
039import org.apache.hadoop.hbase.client.Get;
040import org.apache.hadoop.hbase.client.Put;
041import org.apache.hadoop.hbase.client.RegionInfo;
042import org.apache.hadoop.hbase.client.Result;
043import org.apache.hadoop.hbase.client.ResultScanner;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
046import org.apache.hadoop.hbase.client.replication.TableCFs;
047import org.apache.hadoop.hbase.regionserver.HRegion;
048import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
049import org.apache.hadoop.hbase.testclassification.LargeTests;
050import org.apache.hadoop.hbase.testclassification.ReplicationTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.hadoop.hbase.wal.WAL;
054import org.apache.hadoop.hbase.wal.WALEdit;
055import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
056import org.apache.hadoop.hbase.wal.WALKeyImpl;
057import org.junit.jupiter.api.BeforeEach;
058import org.junit.jupiter.api.Tag;
059import org.junit.jupiter.api.TestTemplate;
060import org.junit.jupiter.params.provider.Arguments;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064@Tag(ReplicationTests.TAG)
065@Tag(LargeTests.TAG)
066@HBaseParameterizedTestTemplate(name = "{index}: serialPeer={0}")
067public class TestReplicationSmallTests extends TestReplicationBase {
068
069  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSmallTests.class);
070  private static final String PEER_ID = "2";
071
072  private boolean serialPeer;
073
074  public TestReplicationSmallTests(boolean serialPeer) {
075    this.serialPeer = serialPeer;
076  }
077
078  @Override
079  protected boolean isSerialPeer() {
080    return serialPeer;
081  }
082
083  public static Stream<Arguments> parameters() {
084    return Stream.of(Arguments.of(true), Arguments.of(false));
085  }
086
087  @BeforeEach
088  public void setUp() throws Exception {
089    cleanUp();
090  }
091
092  /**
093   * Verify that version and column delete marker types are replicated correctly.
094   */
095  @TestTemplate
096  public void testDeleteTypes() throws Exception {
097    LOG.info("testDeleteTypes");
098    final byte[] v1 = Bytes.toBytes("v1");
099    final byte[] v2 = Bytes.toBytes("v2");
100    final byte[] v3 = Bytes.toBytes("v3");
101    htable1 = UTIL1.getConnection().getTable(tableName);
102
103    long t = EnvironmentEdgeManager.currentTime();
104    // create three versions for "row"
105    Put put = new Put(row);
106    put.addColumn(famName, row, t, v1);
107    htable1.put(put);
108
109    put = new Put(row);
110    put.addColumn(famName, row, t + 1, v2);
111    htable1.put(put);
112
113    put = new Put(row);
114    put.addColumn(famName, row, t + 2, v3);
115    htable1.put(put);
116
117    Get get = new Get(row);
118    get.readAllVersions();
119    for (int i = 0; i < NB_RETRIES; i++) {
120      if (i == NB_RETRIES - 1) {
121        fail("Waited too much time for put replication");
122      }
123      Result res = htable2.get(get);
124      if (res.size() < 3) {
125        LOG.info("Rows not available");
126        Thread.sleep(SLEEP_TIME);
127      } else {
128        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
129        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
130        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1);
131        break;
132      }
133    }
134    // place a version delete marker (delete last version)
135    Delete d = new Delete(row);
136    d.addColumn(famName, row, t);
137    htable1.delete(d);
138
139    get = new Get(row);
140    get.readAllVersions();
141    for (int i = 0; i < NB_RETRIES; i++) {
142      if (i == NB_RETRIES - 1) {
143        fail("Waited too much time for put replication");
144      }
145      Result res = htable2.get(get);
146      if (res.size() > 2) {
147        LOG.info("Version not deleted");
148        Thread.sleep(SLEEP_TIME);
149      } else {
150        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
151        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
152        break;
153      }
154    }
155
156    // place a column delete marker
157    d = new Delete(row);
158    d.addColumns(famName, row, t + 2);
159    htable1.delete(d);
160
161    // now *both* of the remaining version should be deleted
162    // at the replica
163    get = new Get(row);
164    for (int i = 0; i < NB_RETRIES; i++) {
165      if (i == NB_RETRIES - 1) {
166        fail("Waited too much time for del replication");
167      }
168      Result res = htable2.get(get);
169      if (res.size() >= 1) {
170        LOG.info("Rows not deleted");
171        Thread.sleep(SLEEP_TIME);
172      } else {
173        break;
174      }
175    }
176  }
177
178  /**
179   * Add a row, check it's replicated, delete it, check's gone
180   */
181  @TestTemplate
182  public void testSimplePutDelete() throws Exception {
183    LOG.info("testSimplePutDelete");
184    runSimplePutDeleteTest();
185  }
186
187  /**
188   * Try a small batch upload using the write buffer, check it's replicated
189   */
190  @TestTemplate
191  public void testSmallBatch() throws Exception {
192    LOG.info("testSmallBatch");
193    runSmallBatchTest();
194  }
195
196  /**
197   * Test disable/enable replication, trying to insert, make sure nothing's replicated, enable it,
198   * the insert should be replicated
199   */
200  @TestTemplate
201  public void testDisableEnable() throws Exception {
202    // Test disabling replication
203    hbaseAdmin.disableReplicationPeer(PEER_ID);
204
205    byte[] rowkey = Bytes.toBytes("disable enable");
206    Put put = new Put(rowkey);
207    put.addColumn(famName, row, row);
208    htable1.put(put);
209
210    Get get = new Get(rowkey);
211    for (int i = 0; i < NB_RETRIES; i++) {
212      Result res = htable2.get(get);
213      if (res.size() >= 1) {
214        fail("Replication wasn't disabled");
215      } else {
216        LOG.info("Row not replicated, let's wait a bit more...");
217        Thread.sleep(SLEEP_TIME);
218      }
219    }
220
221    // Test enable replication
222    hbaseAdmin.enableReplicationPeer(PEER_ID);
223
224    for (int i = 0; i < NB_RETRIES; i++) {
225      Result res = htable2.get(get);
226      if (res.isEmpty()) {
227        LOG.info("Row not available");
228        Thread.sleep(SLEEP_TIME);
229      } else {
230        assertArrayEquals(row, res.value());
231        return;
232      }
233    }
234    fail("Waited too much time for put replication");
235  }
236
237  /**
238   * Removes and re-add a peer cluster
239   */
240  @TestTemplate
241  public void testAddAndRemoveClusters() throws Exception {
242    LOG.info("testAddAndRemoveClusters");
243    hbaseAdmin.removeReplicationPeer(PEER_ID);
244    Thread.sleep(SLEEP_TIME);
245    byte[] rowKey = Bytes.toBytes("Won't be replicated");
246    Put put = new Put(rowKey);
247    put.addColumn(famName, row, row);
248    htable1.put(put);
249
250    Get get = new Get(rowKey);
251    for (int i = 0; i < NB_RETRIES; i++) {
252      if (i == NB_RETRIES - 1) {
253        break;
254      }
255      Result res = htable2.get(get);
256      if (res.size() >= 1) {
257        fail("Not supposed to be replicated");
258      } else {
259        LOG.info("Row not replicated, let's wait a bit more...");
260        Thread.sleep(SLEEP_TIME);
261      }
262    }
263    ReplicationPeerConfig rpc =
264      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()).build();
265    hbaseAdmin.addReplicationPeer(PEER_ID, rpc);
266    Thread.sleep(SLEEP_TIME);
267    rowKey = Bytes.toBytes("do rep");
268    put = new Put(rowKey);
269    put.addColumn(famName, row, row);
270    LOG.info("Adding new row");
271    htable1.put(put);
272
273    get = new Get(rowKey);
274    for (int i = 0; i < NB_RETRIES; i++) {
275      if (i == NB_RETRIES - 1) {
276        fail("Waited too much time for put replication");
277      }
278      Result res = htable2.get(get);
279      if (res.isEmpty()) {
280        LOG.info("Row not available");
281        Thread.sleep(SLEEP_TIME * i);
282      } else {
283        assertArrayEquals(row, res.value());
284        break;
285      }
286    }
287  }
288
289  /**
290   * Do a more intense version testSmallBatch, one that will trigger wal rolling and other
291   * non-trivial code paths
292   */
293  @TestTemplate
294  public void testLoading() throws Exception {
295    LOG.info("Writing out rows to table1 in testLoading");
296    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BIG_BATCH);
297    for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
298      Put put = new Put(Bytes.toBytes(i));
299      put.addColumn(famName, row, row);
300      puts.add(put);
301    }
302    // The puts will be iterated through and flushed only when the buffer
303    // size is reached.
304    htable1.put(puts);
305
306    Scan scan = new Scan();
307
308    ResultScanner scanner = htable1.getScanner(scan);
309    Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
310    scanner.close();
311
312    assertEquals(NB_ROWS_IN_BIG_BATCH, res.length);
313
314    LOG.info("Looking in table2 for replicated rows in testLoading");
315    long start = EnvironmentEdgeManager.currentTime();
316    // Retry more than NB_RETRIES. As it was, retries were done in 5 seconds and we'd fail
317    // sometimes.
318    final long retries = NB_RETRIES * 10;
319    for (int i = 0; i < retries; i++) {
320      scan = new Scan();
321      scanner = htable2.getScanner(scan);
322      res = scanner.next(NB_ROWS_IN_BIG_BATCH);
323      scanner.close();
324      if (res.length != NB_ROWS_IN_BIG_BATCH) {
325        if (i == retries - 1) {
326          int lastRow = -1;
327          for (Result result : res) {
328            int currentRow = Bytes.toInt(result.getRow());
329            for (int row = lastRow + 1; row < currentRow; row++) {
330              LOG.error("Row missing: " + row);
331            }
332            lastRow = currentRow;
333          }
334          LOG.error("Last row: " + lastRow);
335          fail("Waited too much time for normal batch replication, " + res.length + " instead of "
336            + NB_ROWS_IN_BIG_BATCH + "; waited=" + (EnvironmentEdgeManager.currentTime() - start)
337            + "ms");
338        } else {
339          LOG.info("Only got " + res.length + " rows... retrying");
340          Thread.sleep(SLEEP_TIME);
341        }
342      } else {
343        break;
344      }
345    }
346  }
347
348  /**
349   * Test for HBASE-8663
350   * <p>
351   * Create two new Tables with colfamilies enabled for replication then run
352   * {@link Admin#listReplicatedTableCFs()}. Finally verify the table:colfamilies.
353   */
354  @TestTemplate
355  public void testVerifyListReplicatedTable() throws Exception {
356    LOG.info("testVerifyListReplicatedTable");
357
358    final String tName = "VerifyListReplicated_";
359    final String colFam = "cf1";
360    final int numOfTables = 3;
361
362    Admin hadmin = UTIL1.getAdmin();
363
364    // Create Tables
365    for (int i = 0; i < numOfTables; i++) {
366      hadmin.createTable(TableDescriptorBuilder
367        .newBuilder(TableName.valueOf(tName + i)).setColumnFamily(ColumnFamilyDescriptorBuilder
368          .newBuilder(Bytes.toBytes(colFam)).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
369        .build());
370    }
371
372    // verify the result
373    List<TableCFs> replicationColFams = hbaseAdmin.listReplicatedTableCFs();
374    int[] match = new int[numOfTables]; // array of 3 with init value of zero
375
376    for (int i = 0; i < replicationColFams.size(); i++) {
377      TableCFs replicationEntry = replicationColFams.get(i);
378      String tn = replicationEntry.getTable().getNameAsString();
379      if (tn.startsWith(tName) && replicationEntry.getColumnFamilyMap().containsKey(colFam)) {
380        int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit
381        match[m]++; // should only increase once
382      }
383    }
384
385    // check the matching result
386    for (int i = 0; i < match.length; i++) {
387      assertEquals(1, match[i], "listReplicated() does not match table " + i);
388    }
389
390    // drop tables
391    for (int i = 0; i < numOfTables; i++) {
392      TableName tableName = TableName.valueOf(tName + i);
393      hadmin.disableTable(tableName);
394      hadmin.deleteTable(tableName);
395    }
396
397    hadmin.close();
398  }
399
400  /**
401   * Test for HBase-15259 WALEdits under replay will also be replicated
402   */
403  @TestTemplate
404  public void testReplicationInReplay() throws Exception {
405    final TableName tableName = htable1.getName();
406
407    HRegion region = UTIL1.getMiniHBaseCluster().getRegions(tableName).get(0);
408    RegionInfo hri = region.getRegionInfo();
409    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
410    for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) {
411      scopes.put(fam, 1);
412    }
413    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
414    int index = UTIL1.getMiniHBaseCluster().getServerWith(hri.getRegionName());
415    WAL wal = UTIL1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo());
416    final byte[] rowName = Bytes.toBytes("testReplicationInReplay");
417    final byte[] qualifier = Bytes.toBytes("q");
418    final byte[] value = Bytes.toBytes("v");
419    WALEdit edit = new WALEdit(true);
420    long now = EnvironmentEdgeManager.currentTime();
421    WALEditInternalHelper.addExtendedCell(edit,
422      new KeyValue(rowName, famName, qualifier, now, value));
423    WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes);
424    wal.appendData(hri, walKey, edit);
425    wal.sync();
426
427    Get get = new Get(rowName);
428    for (int i = 0; i < NB_RETRIES; i++) {
429      if (i == NB_RETRIES - 1) {
430        break;
431      }
432      Result res = htable2.get(get);
433      if (res.size() >= 1) {
434        fail("Not supposed to be replicated for " + Bytes.toString(res.getRow()));
435      } else {
436        LOG.info("Row not replicated, let's wait a bit more...");
437        Thread.sleep(SLEEP_TIME);
438      }
439    }
440  }
441
442  /**
443   * Test for HBASE-27448 Add an admin method to get replication enabled state
444   */
445  @TestTemplate
446  public void testGetReplicationPeerState() throws Exception {
447
448    // Test disable replication peer
449    hbaseAdmin.disableReplicationPeer("2");
450    assertFalse(hbaseAdmin.isReplicationPeerEnabled("2"));
451
452    // Test enable replication peer
453    hbaseAdmin.enableReplicationPeer("2");
454    assertTrue(hbaseAdmin.isReplicationPeerEnabled("2"));
455  }
456}