001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.util.ArrayList;
026import java.util.List;
027import java.util.NavigableMap;
028import java.util.TreeMap;
029import org.apache.hadoop.hbase.CellUtil;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
036import org.apache.hadoop.hbase.client.Delete;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.ResultScanner;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
044import org.apache.hadoop.hbase.client.replication.TableCFs;
045import org.apache.hadoop.hbase.regionserver.HRegion;
046import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
047import org.apache.hadoop.hbase.testclassification.LargeTests;
048import org.apache.hadoop.hbase.testclassification.ReplicationTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.wal.WAL;
052import org.apache.hadoop.hbase.wal.WALEdit;
053import org.apache.hadoop.hbase.wal.WALKeyImpl;
054import org.junit.Before;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061@Category({ ReplicationTests.class, LargeTests.class })
062public class TestReplicationSmallTests extends TestReplicationBase {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066      HBaseClassTestRule.forClass(TestReplicationSmallTests.class);
067
068  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSmallTests.class);
069  private static final String PEER_ID = "2";
070
071  @Before
072  public void setUp() throws Exception {
073    cleanUp();
074  }
075
076  /**
077   * Verify that version and column delete marker types are replicated correctly.
078   */
079  @Test
080  public void testDeleteTypes() throws Exception {
081    LOG.info("testDeleteTypes");
082    final byte[] v1 = Bytes.toBytes("v1");
083    final byte[] v2 = Bytes.toBytes("v2");
084    final byte[] v3 = Bytes.toBytes("v3");
085    htable1 = utility1.getConnection().getTable(tableName);
086
087    long t = EnvironmentEdgeManager.currentTime();
088    // create three versions for "row"
089    Put put = new Put(row);
090    put.addColumn(famName, row, t, v1);
091    htable1.put(put);
092
093    put = new Put(row);
094    put.addColumn(famName, row, t + 1, v2);
095    htable1.put(put);
096
097    put = new Put(row);
098    put.addColumn(famName, row, t + 2, v3);
099    htable1.put(put);
100
101    Get get = new Get(row);
102    get.readAllVersions();
103    for (int i = 0; i < NB_RETRIES; i++) {
104      if (i == NB_RETRIES - 1) {
105        fail("Waited too much time for put replication");
106      }
107      Result res = htable2.get(get);
108      if (res.size() < 3) {
109        LOG.info("Rows not available");
110        Thread.sleep(SLEEP_TIME);
111      } else {
112        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
113        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
114        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1);
115        break;
116      }
117    }
118    // place a version delete marker (delete last version)
119    Delete d = new Delete(row);
120    d.addColumn(famName, row, t);
121    htable1.delete(d);
122
123    get = new Get(row);
124    get.readAllVersions();
125    for (int i = 0; i < NB_RETRIES; i++) {
126      if (i == NB_RETRIES - 1) {
127        fail("Waited too much time for put replication");
128      }
129      Result res = htable2.get(get);
130      if (res.size() > 2) {
131        LOG.info("Version not deleted");
132        Thread.sleep(SLEEP_TIME);
133      } else {
134        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
135        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
136        break;
137      }
138    }
139
140    // place a column delete marker
141    d = new Delete(row);
142    d.addColumns(famName, row, t + 2);
143    htable1.delete(d);
144
145    // now *both* of the remaining version should be deleted
146    // at the replica
147    get = new Get(row);
148    for (int i = 0; i < NB_RETRIES; i++) {
149      if (i == NB_RETRIES - 1) {
150        fail("Waited too much time for del replication");
151      }
152      Result res = htable2.get(get);
153      if (res.size() >= 1) {
154        LOG.info("Rows not deleted");
155        Thread.sleep(SLEEP_TIME);
156      } else {
157        break;
158      }
159    }
160  }
161
162  /**
163   * Add a row, check it's replicated, delete it, check's gone
164   */
165  @Test
166  public void testSimplePutDelete() throws Exception {
167    LOG.info("testSimplePutDelete");
168    runSimplePutDeleteTest();
169  }
170
171  /**
172   * Try a small batch upload using the write buffer, check it's replicated
173   */
174  @Test
175  public void testSmallBatch() throws Exception {
176    LOG.info("testSmallBatch");
177    runSmallBatchTest();
178  }
179
180  /**
181   * Test disable/enable replication, trying to insert, make sure nothing's replicated, enable it,
182   * the insert should be replicated
183   */
184  @Test
185  public void testDisableEnable() throws Exception {
186    // Test disabling replication
187    hbaseAdmin.disableReplicationPeer(PEER_ID);
188
189    byte[] rowkey = Bytes.toBytes("disable enable");
190    Put put = new Put(rowkey);
191    put.addColumn(famName, row, row);
192    htable1.put(put);
193
194    Get get = new Get(rowkey);
195    for (int i = 0; i < NB_RETRIES; i++) {
196      Result res = htable2.get(get);
197      if (res.size() >= 1) {
198        fail("Replication wasn't disabled");
199      } else {
200        LOG.info("Row not replicated, let's wait a bit more...");
201        Thread.sleep(SLEEP_TIME);
202      }
203    }
204
205    // Test enable replication
206    hbaseAdmin.enableReplicationPeer(PEER_ID);
207
208    for (int i = 0; i < NB_RETRIES; i++) {
209      Result res = htable2.get(get);
210      if (res.isEmpty()) {
211        LOG.info("Row not available");
212        Thread.sleep(SLEEP_TIME);
213      } else {
214        assertArrayEquals(row, res.value());
215        return;
216      }
217    }
218    fail("Waited too much time for put replication");
219  }
220
221  /**
222   * Integration test for TestReplicationAdmin, removes and re-add a peer cluster
223   */
224  @Test
225  public void testAddAndRemoveClusters() throws Exception {
226    LOG.info("testAddAndRemoveClusters");
227    hbaseAdmin.removeReplicationPeer(PEER_ID);
228    Thread.sleep(SLEEP_TIME);
229    byte[] rowKey = Bytes.toBytes("Won't be replicated");
230    Put put = new Put(rowKey);
231    put.addColumn(famName, row, row);
232    htable1.put(put);
233
234    Get get = new Get(rowKey);
235    for (int i = 0; i < NB_RETRIES; i++) {
236      if (i == NB_RETRIES - 1) {
237        break;
238      }
239      Result res = htable2.get(get);
240      if (res.size() >= 1) {
241        fail("Not supposed to be replicated");
242      } else {
243        LOG.info("Row not replicated, let's wait a bit more...");
244        Thread.sleep(SLEEP_TIME);
245      }
246    }
247    ReplicationPeerConfig rpc =
248        ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build();
249    hbaseAdmin.addReplicationPeer(PEER_ID, rpc);
250    Thread.sleep(SLEEP_TIME);
251    rowKey = Bytes.toBytes("do rep");
252    put = new Put(rowKey);
253    put.addColumn(famName, row, row);
254    LOG.info("Adding new row");
255    htable1.put(put);
256
257    get = new Get(rowKey);
258    for (int i = 0; i < NB_RETRIES; i++) {
259      if (i == NB_RETRIES - 1) {
260        fail("Waited too much time for put replication");
261      }
262      Result res = htable2.get(get);
263      if (res.isEmpty()) {
264        LOG.info("Row not available");
265        Thread.sleep(SLEEP_TIME * i);
266      } else {
267        assertArrayEquals(row, res.value());
268        break;
269      }
270    }
271  }
272
273  /**
274   * Do a more intense version testSmallBatch, one that will trigger wal rolling and other
275   * non-trivial code paths
276   */
277  @Test
278  public void testLoading() throws Exception {
279    LOG.info("Writing out rows to table1 in testLoading");
280    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BIG_BATCH);
281    for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
282      Put put = new Put(Bytes.toBytes(i));
283      put.addColumn(famName, row, row);
284      puts.add(put);
285    }
286    // The puts will be iterated through and flushed only when the buffer
287    // size is reached.
288    htable1.put(puts);
289
290    Scan scan = new Scan();
291
292    ResultScanner scanner = htable1.getScanner(scan);
293    Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
294    scanner.close();
295
296    assertEquals(NB_ROWS_IN_BIG_BATCH, res.length);
297
298    LOG.info("Looking in table2 for replicated rows in testLoading");
299    long start = System.currentTimeMillis();
300    // Retry more than NB_RETRIES. As it was, retries were done in 5 seconds and we'd fail
301    // sometimes.
302    final long retries = NB_RETRIES * 10;
303    for (int i = 0; i < retries; i++) {
304      scan = new Scan();
305      scanner = htable2.getScanner(scan);
306      res = scanner.next(NB_ROWS_IN_BIG_BATCH);
307      scanner.close();
308      if (res.length != NB_ROWS_IN_BIG_BATCH) {
309        if (i == retries - 1) {
310          int lastRow = -1;
311          for (Result result : res) {
312            int currentRow = Bytes.toInt(result.getRow());
313            for (int row = lastRow + 1; row < currentRow; row++) {
314              LOG.error("Row missing: " + row);
315            }
316            lastRow = currentRow;
317          }
318          LOG.error("Last row: " + lastRow);
319          fail("Waited too much time for normal batch replication, " + res.length + " instead of " +
320            NB_ROWS_IN_BIG_BATCH + "; waited=" + (System.currentTimeMillis() - start) + "ms");
321        } else {
322          LOG.info("Only got " + res.length + " rows... retrying");
323          Thread.sleep(SLEEP_TIME);
324        }
325      } else {
326        break;
327      }
328    }
329  }
330
331  /**
332   * Test for HBASE-8663
333   * <p>
334   * Create two new Tables with colfamilies enabled for replication then run
335   * ReplicationAdmin.listReplicated(). Finally verify the table:colfamilies. Note:
336   * TestReplicationAdmin is a better place for this testing but it would need mocks.
337   */
338  @Test
339  public void testVerifyListReplicatedTable() throws Exception {
340    LOG.info("testVerifyListReplicatedTable");
341
342    final String tName = "VerifyListReplicated_";
343    final String colFam = "cf1";
344    final int numOfTables = 3;
345
346    Admin hadmin = utility1.getAdmin();
347
348    // Create Tables
349    for (int i = 0; i < numOfTables; i++) {
350      hadmin.createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(tName + i))
351          .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(colFam))
352              .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
353          .build());
354    }
355
356    // verify the result
357    List<TableCFs> replicationColFams = hbaseAdmin.listReplicatedTableCFs();
358    int[] match = new int[numOfTables]; // array of 3 with init value of zero
359
360    for (int i = 0; i < replicationColFams.size(); i++) {
361      TableCFs replicationEntry = replicationColFams.get(i);
362      String tn = replicationEntry.getTable().getNameAsString();
363      if (tn.startsWith(tName) && replicationEntry.getColumnFamilyMap().containsKey(colFam)) {
364        int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit
365        match[m]++; // should only increase once
366      }
367    }
368
369    // check the matching result
370    for (int i = 0; i < match.length; i++) {
371      assertTrue("listReplicated() does not match table " + i, (match[i] == 1));
372    }
373
374    // drop tables
375    for (int i = 0; i < numOfTables; i++) {
376      TableName tableName = TableName.valueOf(tName + i);
377      hadmin.disableTable(tableName);
378      hadmin.deleteTable(tableName);
379    }
380
381    hadmin.close();
382  }
383
384  /**
385   * Test for HBase-15259 WALEdits under replay will also be replicated
386   */
387  @Test
388  public void testReplicationInReplay() throws Exception {
389    final TableName tableName = htable1.getName();
390
391    HRegion region = utility1.getMiniHBaseCluster().getRegions(tableName).get(0);
392    RegionInfo hri = region.getRegionInfo();
393    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
394    for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) {
395      scopes.put(fam, 1);
396    }
397    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
398    int index = utility1.getMiniHBaseCluster().getServerWith(hri.getRegionName());
399    WAL wal = utility1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo());
400    final byte[] rowName = Bytes.toBytes("testReplicationInReplay");
401    final byte[] qualifier = Bytes.toBytes("q");
402    final byte[] value = Bytes.toBytes("v");
403    WALEdit edit = new WALEdit(true);
404    long now = EnvironmentEdgeManager.currentTime();
405    edit.add(new KeyValue(rowName, famName, qualifier, now, value));
406    WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes);
407    wal.append(hri, walKey, edit, true);
408    wal.sync();
409
410    Get get = new Get(rowName);
411    for (int i = 0; i < NB_RETRIES; i++) {
412      if (i == NB_RETRIES - 1) {
413        break;
414      }
415      Result res = htable2.get(get);
416      if (res.size() >= 1) {
417        fail("Not supposed to be replicated for " + Bytes.toString(res.getRow()));
418      } else {
419        LOG.info("Row not replicated, let's wait a bit more...");
420        Thread.sleep(SLEEP_TIME);
421      }
422    }
423  }
424}