001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.security.SecureRandom;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Random;
032import java.util.Set;
033import java.util.concurrent.atomic.AtomicBoolean;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.FileUtil;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.HRegionInfo;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.Stoppable;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.TableNotFoundException;
048import org.apache.hadoop.hbase.client.Admin;
049import org.apache.hadoop.hbase.client.Connection;
050import org.apache.hadoop.hbase.client.ConnectionFactory;
051import org.apache.hadoop.hbase.client.Get;
052import org.apache.hadoop.hbase.client.RegionLocator;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.ResultScanner;
055import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
056import org.apache.hadoop.hbase.client.Scan;
057import org.apache.hadoop.hbase.client.Table;
058import org.apache.hadoop.hbase.testclassification.MediumTests;
059import org.apache.hadoop.hbase.testclassification.ReplicationTests;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.FSUtils;
062import org.apache.hadoop.hbase.util.HFileTestUtil;
063import org.junit.AfterClass;
064import org.junit.Assert;
065import org.junit.Before;
066import org.junit.BeforeClass;
067import org.junit.ClassRule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
074
075import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
080
081@Category({ReplicationTests.class, MediumTests.class})
082public class TestReplicationSink {
083
084  @ClassRule
085  public static final HBaseClassTestRule CLASS_RULE =
086      HBaseClassTestRule.forClass(TestReplicationSink.class);
087
088  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSink.class);
089  private static final int BATCH_SIZE = 10;
090
091  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
092
093  protected static ReplicationSink SINK;
094
095  protected static final TableName TABLE_NAME1 = TableName.valueOf("table1");
096  protected static final TableName TABLE_NAME2 = TableName.valueOf("table2");
097
098  protected static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
099  protected static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
100
101  protected static Table table1;
102  protected static Stoppable STOPPABLE = new Stoppable() {
103    final AtomicBoolean stop = new AtomicBoolean(false);
104
105    @Override
106    public boolean isStopped() {
107      return this.stop.get();
108    }
109
110    @Override
111    public void stop(String why) {
112      LOG.info("STOPPING BECAUSE: " + why);
113      this.stop.set(true);
114    }
115
116  };
117
118  protected static Table table2;
119  protected static String baseNamespaceDir;
120  protected static String hfileArchiveDir;
121  protected static String replicationClusterId;
122
123   /**
124   * @throws java.lang.Exception
125   */
126  @BeforeClass
127  public static void setUpBeforeClass() throws Exception {
128    TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
129      TestSourceFSConfigurationProvider.class.getCanonicalName());
130
131    TEST_UTIL.startMiniCluster(3);
132    SINK =
133      new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
134    table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
135    table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
136    Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
137    baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)).toString();
138    hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY)).toString();
139    replicationClusterId = "12345";
140  }
141
142  /**
143   * @throws java.lang.Exception
144   */
145  @AfterClass
146  public static void tearDownAfterClass() throws Exception {
147    STOPPABLE.stop("Shutting down");
148    TEST_UTIL.shutdownMiniCluster();
149  }
150
151  /**
152   * @throws java.lang.Exception
153   */
154  @Before
155  public void setUp() throws Exception {
156    table1 = TEST_UTIL.deleteTableData(TABLE_NAME1);
157    table2 = TEST_UTIL.deleteTableData(TABLE_NAME2);
158  }
159
160  /**
161   * Insert a whole batch of entries
162   * @throws Exception
163   */
164  @Test
165  public void testBatchSink() throws Exception {
166    List<WALEntry> entries = new ArrayList<>(BATCH_SIZE);
167    List<Cell> cells = new ArrayList<>();
168    for(int i = 0; i < BATCH_SIZE; i++) {
169      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
170    }
171    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
172      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
173    Scan scan = new Scan();
174    ResultScanner scanRes = table1.getScanner(scan);
175    assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
176  }
177
178  /**
179   * Insert a mix of puts and deletes
180   * @throws Exception
181   */
182  @Test
183  public void testMixedPutDelete() throws Exception {
184    List<WALEntry> entries = new ArrayList<>(BATCH_SIZE/2);
185    List<Cell> cells = new ArrayList<>();
186    for(int i = 0; i < BATCH_SIZE/2; i++) {
187      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
188    }
189    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
190      baseNamespaceDir, hfileArchiveDir);
191
192    entries = new ArrayList<>(BATCH_SIZE);
193    cells = new ArrayList<>();
194    for(int i = 0; i < BATCH_SIZE; i++) {
195      entries.add(createEntry(TABLE_NAME1, i,
196          i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn, cells));
197    }
198
199    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
200      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
201    Scan scan = new Scan();
202    ResultScanner scanRes = table1.getScanner(scan);
203    assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
204  }
205
206  /**
207   * Insert to 2 different tables
208   * @throws Exception
209   */
210  @Test
211  public void testMixedPutTables() throws Exception {
212    List<WALEntry> entries = new ArrayList<>(BATCH_SIZE/2);
213    List<Cell> cells = new ArrayList<>();
214    for(int i = 0; i < BATCH_SIZE; i++) {
215      entries.add(createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
216              i, KeyValue.Type.Put, cells));
217    }
218
219    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
220      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
221    Scan scan = new Scan();
222    ResultScanner scanRes = table2.getScanner(scan);
223    for(Result res : scanRes) {
224      assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
225    }
226  }
227
228  /**
229   * Insert then do different types of deletes
230   * @throws Exception
231   */
232  @Test
233  public void testMixedDeletes() throws Exception {
234    List<WALEntry> entries = new ArrayList<>(3);
235    List<Cell> cells = new ArrayList<>();
236    for(int i = 0; i < 3; i++) {
237      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
238    }
239    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
240      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
241    entries = new ArrayList<>(3);
242    cells = new ArrayList<>();
243    entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells));
244    entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
245    entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells));
246
247    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
248      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
249
250    Scan scan = new Scan();
251    ResultScanner scanRes = table1.getScanner(scan);
252    assertEquals(0, scanRes.next(3).length);
253  }
254
255  /**
256   * Puts are buffered, but this tests when a delete (not-buffered) is applied
257   * before the actual Put that creates it.
258   * @throws Exception
259   */
260  @Test
261  public void testApplyDeleteBeforePut() throws Exception {
262    List<WALEntry> entries = new ArrayList<>(5);
263    List<Cell> cells = new ArrayList<>();
264    for(int i = 0; i < 2; i++) {
265      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
266    }
267    entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
268    for(int i = 3; i < 5; i++) {
269      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
270    }
271    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
272      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
273    Get get = new Get(Bytes.toBytes(1));
274    Result res = table1.get(get);
275    assertEquals(0, res.size());
276  }
277
278  @Test
279  public void testRethrowRetriesExhaustedWithDetailsException() throws Exception {
280    TableName notExistTable = TableName.valueOf("notExistTable");
281    List<WALEntry> entries = new ArrayList<>();
282    List<Cell> cells = new ArrayList<>();
283    for (int i = 0; i < 10; i++) {
284      entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells));
285    }
286    try {
287      SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
288        replicationClusterId, baseNamespaceDir, hfileArchiveDir);
289      Assert.fail("Should re-throw TableNotFoundException.");
290    } catch (TableNotFoundException e) {
291    }
292    entries.clear();
293    cells.clear();
294    for (int i = 0; i < 10; i++) {
295      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
296    }
297    try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
298      try (Admin admin = conn.getAdmin()) {
299        admin.disableTable(TABLE_NAME1);
300        try {
301          SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
302            replicationClusterId, baseNamespaceDir, hfileArchiveDir);
303          Assert.fail("Should re-throw RetriesExhaustedWithDetailsException.");
304        } catch (RetriesExhaustedWithDetailsException e) {
305        } finally {
306          admin.enableTable(TABLE_NAME1);
307        }
308      }
309    }
310  }
311
312  /**
313   * Test replicateEntries with a bulk load entry for 25 HFiles
314   */
315  @Test
316  public void testReplicateEntriesForHFiles() throws Exception {
317    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testReplicateEntries");
318    Path familyDir = new Path(dir, Bytes.toString(FAM_NAME1));
319    int numRows = 10;
320    List<Path> p = new ArrayList<>(1);
321    final String hfilePrefix = "hfile-";
322
323    // 1. Generate 25 hfile ranges
324    Random rng = new SecureRandom();
325    Set<Integer> numbers = new HashSet<>();
326    while (numbers.size() < 50) {
327      numbers.add(rng.nextInt(1000));
328    }
329    List<Integer> numberList = new ArrayList<>(numbers);
330    Collections.sort(numberList);
331    Map<String, Long> storeFilesSize = new HashMap<>(1);
332
333    // 2. Create 25 hfiles
334    Configuration conf = TEST_UTIL.getConfiguration();
335    FileSystem fs = dir.getFileSystem(conf);
336    Iterator<Integer> numbersItr = numberList.iterator();
337    for (int i = 0; i < 25; i++) {
338      Path hfilePath = new Path(familyDir, hfilePrefix + i);
339      HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1,
340        Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows);
341      p.add(hfilePath);
342      storeFilesSize.put(hfilePath.getName(), fs.getFileStatus(hfilePath).getLen());
343    }
344
345    // 3. Create a BulkLoadDescriptor and a WALEdit
346    Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
347    storeFiles.put(FAM_NAME1, p);
348    org.apache.hadoop.hbase.wal.WALEdit edit = null;
349    WALProtos.BulkLoadDescriptor loadDescriptor = null;
350
351    try (Connection c = ConnectionFactory.createConnection(conf);
352        RegionLocator l = c.getRegionLocator(TABLE_NAME1)) {
353      HRegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegionInfo();
354      loadDescriptor =
355          ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1,
356              UnsafeByteOperations.unsafeWrap(regionInfo.getEncodedNameAsBytes()),
357              storeFiles, storeFilesSize, 1);
358      edit = org.apache.hadoop.hbase.wal.WALEdit.createBulkLoadEvent(regionInfo,
359        loadDescriptor);
360    }
361    List<WALEntry> entries = new ArrayList<>(1);
362
363    // 4. Create a WALEntryBuilder
364    WALEntry.Builder builder = createWALEntryBuilder(TABLE_NAME1);
365
366    // 5. Copy the hfile to the path as it is in reality
367    for (int i = 0; i < 25; i++) {
368      String pathToHfileFromNS =
369          new StringBuilder(100).append(TABLE_NAME1.getNamespaceAsString()).append(Path.SEPARATOR)
370              .append(Bytes.toString(TABLE_NAME1.getName())).append(Path.SEPARATOR)
371              .append(Bytes.toString(loadDescriptor.getEncodedRegionName().toByteArray()))
372              .append(Path.SEPARATOR).append(Bytes.toString(FAM_NAME1)).append(Path.SEPARATOR)
373              .append(hfilePrefix + i).toString();
374      String dst = baseNamespaceDir + Path.SEPARATOR + pathToHfileFromNS;
375      Path dstPath = new Path(dst);
376      FileUtil.copy(fs, p.get(0), fs, dstPath, false, conf);
377    }
378
379    entries.add(builder.build());
380    try (ResultScanner scanner = table1.getScanner(new Scan())) {
381      // 6. Assert no existing data in table
382      assertEquals(0, scanner.next(numRows).length);
383    }
384    // 7. Replicate the bulk loaded entry
385    SINK.replicateEntries(entries, CellUtil.createCellScanner(edit.getCells().iterator()),
386      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
387    try (ResultScanner scanner = table1.getScanner(new Scan())) {
388      // 8. Assert data is replicated
389      assertEquals(numRows, scanner.next(numRows).length);
390    }
391    // Clean up the created hfiles or it will mess up subsequent tests
392  }
393
394  private WALEntry createEntry(TableName table, int row,  KeyValue.Type type, List<Cell> cells) {
395    byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
396    byte[] rowBytes = Bytes.toBytes(row);
397    // Just make sure we don't get the same ts for two consecutive rows with
398    // same key
399    try {
400      Thread.sleep(1);
401    } catch (InterruptedException e) {
402      LOG.info("Was interrupted while sleep, meh", e);
403    }
404    final long now = System.currentTimeMillis();
405    KeyValue kv = null;
406    if(type.getCode() == KeyValue.Type.Put.getCode()) {
407      kv = new KeyValue(rowBytes, fam, fam, now,
408          KeyValue.Type.Put, Bytes.toBytes(row));
409    } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
410        kv = new KeyValue(rowBytes, fam, fam,
411            now, KeyValue.Type.DeleteColumn);
412    } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
413        kv = new KeyValue(rowBytes, fam, null,
414            now, KeyValue.Type.DeleteFamily);
415    }
416    WALEntry.Builder builder = createWALEntryBuilder(table);
417    cells.add(kv);
418
419    return builder.build();
420  }
421
422  private WALEntry.Builder createWALEntryBuilder(TableName table) {
423    WALEntry.Builder builder = WALEntry.newBuilder();
424    builder.setAssociatedCellCount(1);
425    WALKey.Builder keyBuilder = WALKey.newBuilder();
426    UUID.Builder uuidBuilder = UUID.newBuilder();
427    uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits());
428    uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits());
429    keyBuilder.setClusterId(uuidBuilder.build());
430    keyBuilder.setTableName(UnsafeByteOperations.unsafeWrap(table.getName()));
431    keyBuilder.setWriteTime(System.currentTimeMillis());
432    keyBuilder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(HConstants.EMPTY_BYTE_ARRAY));
433    keyBuilder.setLogSequenceNumber(-1);
434    builder.setKey(keyBuilder.build());
435    return builder;
436  }
437}