001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertNull;
022import static org.junit.Assert.assertTrue;
023
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.NavigableMap;
029import java.util.TreeMap;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionInfo;
037import org.apache.hadoop.hbase.client.RegionInfoBuilder;
038import org.apache.hadoop.hbase.testclassification.ReplicationTests;
039import org.apache.hadoop.hbase.testclassification.SmallTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.wal.WALEdit;
042import org.apache.hadoop.hbase.wal.WALKeyImpl;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046
047import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
048
049import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
052
053@Category({ ReplicationTests.class, SmallTests.class })
054public class TestReplicationWALEdits {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestReplicationWALEdits.class);
059
060  private static final Configuration CONF = HBaseConfiguration.create();
061
062  private static final TableName TABLE_NAME = TableName.valueOf("test");
063
064  private static final byte[] F1 = Bytes.toBytes("f1");
065
066  private static final byte[] F2 = Bytes.toBytes("f2");
067
068  private static final RegionInfo RI = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
069
070  /**
071   * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the
072   * compaction WALEdit.
073   */
074  @Test
075  public void testCompactionWALEdits() throws Exception {
076    TableName tableName = TableName.valueOf("testCompactionWALEdits");
077    WALProtos.CompactionDescriptor compactionDescriptor =
078      WALProtos.CompactionDescriptor.getDefaultInstance();
079    RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW)
080      .setEndKey(HConstants.EMPTY_END_ROW).build();
081    WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
082    ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, CONF);
083  }
084
085  private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
086    // 1. Create store files for the families
087    Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
088    Map<String, Long> storeFilesSize = new HashMap<>(1);
089    List<Path> p = new ArrayList<>(1);
090    Path hfilePath1 = new Path(Bytes.toString(F1));
091    p.add(hfilePath1);
092    storeFilesSize.put(hfilePath1.getName(), 0L);
093    storeFiles.put(F1, p);
094    scope.put(F1, 1);
095    p = new ArrayList<>(1);
096    Path hfilePath2 = new Path(Bytes.toString(F2));
097    p.add(hfilePath2);
098    storeFilesSize.put(hfilePath2.getName(), 0L);
099    storeFiles.put(F2, p);
100    // 2. Create bulk load descriptor
101    BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(RI.getTable(),
102      UnsafeByteOperations.unsafeWrap(RI.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1);
103
104    // 3. create bulk load wal edit event
105    WALEdit logEdit = WALEdit.createBulkLoadEvent(RI, desc);
106    return logEdit;
107  }
108
109  @Test
110  public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
111    NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
112    // 1. Get the bulk load wal edit event
113    WALEdit logEdit = getBulkLoadWALEdit(scope);
114    // 2. Create wal key
115    WALKeyImpl logKey = new WALKeyImpl(scope);
116
117    // 3. Get the scopes for the key
118    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, CONF);
119
120    // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled
121    assertNull("No bulk load entries scope should be added if bulk load replication is disabled.",
122      logKey.getReplicationScopes());
123  }
124
125  @Test
126  public void testBulkLoadWALEdits() throws Exception {
127    // 1. Get the bulk load wal edit event
128    NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
129    WALEdit logEdit = getBulkLoadWALEdit(scope);
130    // 2. Create wal key
131    WALKeyImpl logKey = new WALKeyImpl(scope);
132    // 3. Enable bulk load hfile replication
133    Configuration bulkLoadConf = HBaseConfiguration.create(CONF);
134    bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
135
136    // 4. Get the scopes for the key
137    ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf);
138
139    NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
140    // Assert family with replication scope global is present in the key scopes
141    assertTrue("This family scope is set to global, should be part of replication key scopes.",
142      scopes.containsKey(F1));
143    // Assert family with replication scope local is not present in the key scopes
144    assertFalse("This family scope is set to local, should not be part of replication key scopes",
145      scopes.containsKey(F2));
146  }
147}