001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021
022import java.security.SecureRandom;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Random;
031import java.util.Set;
032import java.util.concurrent.atomic.AtomicBoolean;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.FileUtil;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HRegionInfo;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.Stoppable;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.TableNotFoundException;
047import org.apache.hadoop.hbase.client.Admin;
048import org.apache.hadoop.hbase.client.Connection;
049import org.apache.hadoop.hbase.client.ConnectionFactory;
050import org.apache.hadoop.hbase.client.Get;
051import org.apache.hadoop.hbase.client.RegionLocator;
052import org.apache.hadoop.hbase.client.Result;
053import org.apache.hadoop.hbase.client.ResultScanner;
054import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
055import org.apache.hadoop.hbase.client.Scan;
056import org.apache.hadoop.hbase.client.Table;
057import org.apache.hadoop.hbase.testclassification.LargeTests;
058import org.apache.hadoop.hbase.testclassification.ReplicationTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.CommonFSUtils;
061import org.apache.hadoop.hbase.util.HFileTestUtil;
062import org.junit.AfterClass;
063import org.junit.Assert;
064import org.junit.Before;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
073
074import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
079
080@Category({ReplicationTests.class, LargeTests.class})
081public class TestReplicationSink {
082
083  @ClassRule
084  public static final HBaseClassTestRule CLASS_RULE =
085      HBaseClassTestRule.forClass(TestReplicationSink.class);
086
087  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSink.class);
088  private static final int BATCH_SIZE = 10;
089
090  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
091
092  protected static ReplicationSink SINK;
093
094  protected static final TableName TABLE_NAME1 = TableName.valueOf("table1");
095  protected static final TableName TABLE_NAME2 = TableName.valueOf("table2");
096
097  protected static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
098  protected static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
099
100  protected static Table table1;
101  protected static Stoppable STOPPABLE = new Stoppable() {
102    final AtomicBoolean stop = new AtomicBoolean(false);
103
104    @Override
105    public boolean isStopped() {
106      return this.stop.get();
107    }
108
109    @Override
110    public void stop(String why) {
111      LOG.info("STOPPING BECAUSE: " + why);
112      this.stop.set(true);
113    }
114
115  };
116
117  protected static Table table2;
118  protected static String baseNamespaceDir;
119  protected static String hfileArchiveDir;
120  protected static String replicationClusterId;
121
122   /**
123   * @throws java.lang.Exception
124   */
125  @BeforeClass
126  public static void setUpBeforeClass() throws Exception {
127    TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
128      TestSourceFSConfigurationProvider.class.getCanonicalName());
129    TEST_UTIL.startMiniCluster(3);
130    SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()));
131    table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
132    table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
133    Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
134    baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)).toString();
135    hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY)).toString();
136    replicationClusterId = "12345";
137  }
138
139  /**
140   * @throws java.lang.Exception
141   */
142  @AfterClass
143  public static void tearDownAfterClass() throws Exception {
144    STOPPABLE.stop("Shutting down");
145    TEST_UTIL.shutdownMiniCluster();
146  }
147
148  /**
149   * @throws java.lang.Exception
150   */
151  @Before
152  public void setUp() throws Exception {
153    table1 = TEST_UTIL.deleteTableData(TABLE_NAME1);
154    table2 = TEST_UTIL.deleteTableData(TABLE_NAME2);
155  }
156
157  /**
158   * Insert a whole batch of entries
159   * @throws Exception
160   */
161  @Test
162  public void testBatchSink() throws Exception {
163    List<WALEntry> entries = new ArrayList<>(BATCH_SIZE);
164    List<Cell> cells = new ArrayList<>();
165    for(int i = 0; i < BATCH_SIZE; i++) {
166      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
167    }
168    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
169      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
170    Scan scan = new Scan();
171    ResultScanner scanRes = table1.getScanner(scan);
172    assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
173  }
174
175  /**
176   * Insert a mix of puts and deletes
177   * @throws Exception
178   */
179  @Test
180  public void testMixedPutDelete() throws Exception {
181    List<WALEntry> entries = new ArrayList<>(BATCH_SIZE/2);
182    List<Cell> cells = new ArrayList<>();
183    for(int i = 0; i < BATCH_SIZE/2; i++) {
184      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
185    }
186    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
187      baseNamespaceDir, hfileArchiveDir);
188
189    entries = new ArrayList<>(BATCH_SIZE);
190    cells = new ArrayList<>();
191    for(int i = 0; i < BATCH_SIZE; i++) {
192      entries.add(createEntry(TABLE_NAME1, i,
193          i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn, cells));
194    }
195
196    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
197      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
198    Scan scan = new Scan();
199    ResultScanner scanRes = table1.getScanner(scan);
200    assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
201  }
202
203  @Test
204  public void testLargeEditsPutDelete() throws Exception {
205    List<WALEntry> entries = new ArrayList<>();
206    List<Cell> cells = new ArrayList<>();
207    for (int i = 0; i < 5510; i++) {
208      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
209    }
210    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
211      baseNamespaceDir, hfileArchiveDir);
212
213    ResultScanner resultScanner = table1.getScanner(new Scan());
214    int totalRows = 0;
215    while (resultScanner.next() != null) {
216      totalRows++;
217    }
218    assertEquals(5510, totalRows);
219
220    entries = new ArrayList<>();
221    cells = new ArrayList<>();
222    for (int i = 0; i < 11000; i++) {
223      entries.add(
224        createEntry(TABLE_NAME1, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn,
225          cells));
226    }
227    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
228      baseNamespaceDir, hfileArchiveDir);
229    resultScanner = table1.getScanner(new Scan());
230    totalRows = 0;
231    while (resultScanner.next() != null) {
232      totalRows++;
233    }
234    assertEquals(5500, totalRows);
235  }
236
237  /**
238   * Insert to 2 different tables
239   * @throws Exception
240   */
241  @Test
242  public void testMixedPutTables() throws Exception {
243    List<WALEntry> entries = new ArrayList<>(BATCH_SIZE/2);
244    List<Cell> cells = new ArrayList<>();
245    for(int i = 0; i < BATCH_SIZE; i++) {
246      entries.add(createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
247              i, KeyValue.Type.Put, cells));
248    }
249
250    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
251      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
252    Scan scan = new Scan();
253    ResultScanner scanRes = table2.getScanner(scan);
254    for(Result res : scanRes) {
255      assertEquals(0, Bytes.toInt(res.getRow()) % 2);
256    }
257    scanRes = table1.getScanner(scan);
258    for(Result res : scanRes) {
259      assertEquals(1, Bytes.toInt(res.getRow()) % 2);
260    }
261  }
262
263  /**
264   * Insert then do different types of deletes
265   * @throws Exception
266   */
267  @Test
268  public void testMixedDeletes() throws Exception {
269    List<WALEntry> entries = new ArrayList<>(3);
270    List<Cell> cells = new ArrayList<>();
271    for(int i = 0; i < 3; i++) {
272      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
273    }
274    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
275      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
276    entries = new ArrayList<>(3);
277    cells = new ArrayList<>();
278    entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells));
279    entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
280    entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells));
281
282    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
283      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
284
285    Scan scan = new Scan();
286    ResultScanner scanRes = table1.getScanner(scan);
287    assertEquals(0, scanRes.next(3).length);
288  }
289
290  /**
291   * Puts are buffered, but this tests when a delete (not-buffered) is applied
292   * before the actual Put that creates it.
293   * @throws Exception
294   */
295  @Test
296  public void testApplyDeleteBeforePut() throws Exception {
297    List<WALEntry> entries = new ArrayList<>(5);
298    List<Cell> cells = new ArrayList<>();
299    for(int i = 0; i < 2; i++) {
300      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
301    }
302    entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
303    for(int i = 3; i < 5; i++) {
304      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
305    }
306    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
307      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
308    Get get = new Get(Bytes.toBytes(1));
309    Result res = table1.get(get);
310    assertEquals(0, res.size());
311  }
312
313  @Test
314  public void testRethrowRetriesExhaustedWithDetailsException() throws Exception {
315    TableName notExistTable = TableName.valueOf("notExistTable");
316    List<WALEntry> entries = new ArrayList<>();
317    List<Cell> cells = new ArrayList<>();
318    for (int i = 0; i < 10; i++) {
319      entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells));
320    }
321    try {
322      SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
323        replicationClusterId, baseNamespaceDir, hfileArchiveDir);
324      Assert.fail("Should re-throw TableNotFoundException.");
325    } catch (TableNotFoundException e) {
326    }
327    entries.clear();
328    cells.clear();
329    for (int i = 0; i < 10; i++) {
330      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
331    }
332    try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
333      try (Admin admin = conn.getAdmin()) {
334        admin.disableTable(TABLE_NAME1);
335        try {
336          SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
337            replicationClusterId, baseNamespaceDir, hfileArchiveDir);
338          Assert.fail("Should re-throw RetriesExhaustedWithDetailsException.");
339        } catch (RetriesExhaustedWithDetailsException e) {
340        } finally {
341          admin.enableTable(TABLE_NAME1);
342        }
343      }
344    }
345  }
346
347  /**
348   * Test replicateEntries with a bulk load entry for 25 HFiles
349   */
350  @Test
351  public void testReplicateEntriesForHFiles() throws Exception {
352    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testReplicateEntries");
353    Path familyDir = new Path(dir, Bytes.toString(FAM_NAME1));
354    int numRows = 10;
355    List<Path> p = new ArrayList<>(1);
356    final String hfilePrefix = "hfile-";
357
358    // 1. Generate 25 hfile ranges
359    Random rng = new SecureRandom();
360    Set<Integer> numbers = new HashSet<>();
361    while (numbers.size() < 50) {
362      numbers.add(rng.nextInt(1000));
363    }
364    List<Integer> numberList = new ArrayList<>(numbers);
365    Collections.sort(numberList);
366    Map<String, Long> storeFilesSize = new HashMap<>(1);
367
368    // 2. Create 25 hfiles
369    Configuration conf = TEST_UTIL.getConfiguration();
370    FileSystem fs = dir.getFileSystem(conf);
371    Iterator<Integer> numbersItr = numberList.iterator();
372    for (int i = 0; i < 25; i++) {
373      Path hfilePath = new Path(familyDir, hfilePrefix + i);
374      HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1,
375        Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows);
376      p.add(hfilePath);
377      storeFilesSize.put(hfilePath.getName(), fs.getFileStatus(hfilePath).getLen());
378    }
379
380    // 3. Create a BulkLoadDescriptor and a WALEdit
381    Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
382    storeFiles.put(FAM_NAME1, p);
383    org.apache.hadoop.hbase.wal.WALEdit edit = null;
384    WALProtos.BulkLoadDescriptor loadDescriptor = null;
385
386    try (Connection c = ConnectionFactory.createConnection(conf);
387        RegionLocator l = c.getRegionLocator(TABLE_NAME1)) {
388      HRegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegionInfo();
389      loadDescriptor =
390          ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1,
391              UnsafeByteOperations.unsafeWrap(regionInfo.getEncodedNameAsBytes()),
392              storeFiles, storeFilesSize, 1);
393      edit = org.apache.hadoop.hbase.wal.WALEdit.createBulkLoadEvent(regionInfo,
394        loadDescriptor);
395    }
396    List<WALEntry> entries = new ArrayList<>(1);
397
398    // 4. Create a WALEntryBuilder
399    WALEntry.Builder builder = createWALEntryBuilder(TABLE_NAME1);
400
401    // 5. Copy the hfile to the path as it is in reality
402    for (int i = 0; i < 25; i++) {
403      String pathToHfileFromNS =
404          new StringBuilder(100).append(TABLE_NAME1.getNamespaceAsString()).append(Path.SEPARATOR)
405              .append(Bytes.toString(TABLE_NAME1.getName())).append(Path.SEPARATOR)
406              .append(Bytes.toString(loadDescriptor.getEncodedRegionName().toByteArray()))
407              .append(Path.SEPARATOR).append(Bytes.toString(FAM_NAME1)).append(Path.SEPARATOR)
408              .append(hfilePrefix + i).toString();
409      String dst = baseNamespaceDir + Path.SEPARATOR + pathToHfileFromNS;
410      Path dstPath = new Path(dst);
411      FileUtil.copy(fs, p.get(0), fs, dstPath, false, conf);
412    }
413
414    entries.add(builder.build());
415    try (ResultScanner scanner = table1.getScanner(new Scan())) {
416      // 6. Assert no existing data in table
417      assertEquals(0, scanner.next(numRows).length);
418    }
419    // 7. Replicate the bulk loaded entry
420    SINK.replicateEntries(entries, CellUtil.createCellScanner(edit.getCells().iterator()),
421      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
422    try (ResultScanner scanner = table1.getScanner(new Scan())) {
423      // 8. Assert data is replicated
424      assertEquals(numRows, scanner.next(numRows).length);
425    }
426    // Clean up the created hfiles or it will mess up subsequent tests
427  }
428
429  private WALEntry createEntry(TableName table, int row,  KeyValue.Type type, List<Cell> cells) {
430    byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
431    byte[] rowBytes = Bytes.toBytes(row);
432    // Just make sure we don't get the same ts for two consecutive rows with
433    // same key
434    try {
435      Thread.sleep(1);
436    } catch (InterruptedException e) {
437      LOG.info("Was interrupted while sleep, meh", e);
438    }
439    final long now = System.currentTimeMillis();
440    KeyValue kv = null;
441    if(type.getCode() == KeyValue.Type.Put.getCode()) {
442      kv = new KeyValue(rowBytes, fam, fam, now,
443          KeyValue.Type.Put, Bytes.toBytes(row));
444    } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
445        kv = new KeyValue(rowBytes, fam, fam,
446            now, KeyValue.Type.DeleteColumn);
447    } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
448        kv = new KeyValue(rowBytes, fam, null,
449            now, KeyValue.Type.DeleteFamily);
450    }
451    WALEntry.Builder builder = createWALEntryBuilder(table);
452    cells.add(kv);
453
454    return builder.build();
455  }
456
457  private WALEntry.Builder createWALEntryBuilder(TableName table) {
458    WALEntry.Builder builder = WALEntry.newBuilder();
459    builder.setAssociatedCellCount(1);
460    WALKey.Builder keyBuilder = WALKey.newBuilder();
461    UUID.Builder uuidBuilder = UUID.newBuilder();
462    uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits());
463    uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits());
464    keyBuilder.setClusterId(uuidBuilder.build());
465    keyBuilder.setTableName(UnsafeByteOperations.unsafeWrap(table.getName()));
466    keyBuilder.setWriteTime(System.currentTimeMillis());
467    keyBuilder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(HConstants.EMPTY_BYTE_ARRAY));
468    keyBuilder.setLogSequenceNumber(-1);
469    builder.setKey(keyBuilder.build());
470    return builder;
471  }
472}