001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
021import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertFalse;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.io.File;
027import java.io.FileOutputStream;
028import java.io.IOException;
029import java.util.List;
030import java.util.Map;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FSDataOutputStream;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellBuilderType;
036import org.apache.hadoop.hbase.ExtendedCellBuilder;
037import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.NamespaceDescriptor;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
044import org.apache.hadoop.hbase.client.Get;
045import org.apache.hadoop.hbase.client.Result;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.client.TableDescriptor;
048import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
049import org.apache.hadoop.hbase.io.hfile.HFile;
050import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
051import org.apache.hadoop.hbase.testclassification.LargeTests;
052import org.apache.hadoop.hbase.testclassification.ReplicationTests;
053import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.Threads;
056import org.apache.hadoop.hdfs.MiniDFSCluster;
057import org.junit.jupiter.api.AfterEach;
058import org.junit.jupiter.api.BeforeAll;
059import org.junit.jupiter.api.BeforeEach;
060import org.junit.jupiter.api.Tag;
061import org.junit.jupiter.api.Test;
062
063import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
064import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
065import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
066
067@Tag(ReplicationTests.TAG)
068@Tag(LargeTests.TAG)
069public class TestBulkLoadReplicationHFileRefs extends TestReplicationBaseNoBeforeAll {
070
071  private static final String PEER1_CLUSTER_ID = "peer1";
072  private static final String PEER2_CLUSTER_ID = "peer2";
073
074  private static final String REPLICATE_NAMESPACE = "replicate_ns";
075  private static final String NO_REPLICATE_NAMESPACE = "no_replicate_ns";
076  private static final TableName REPLICATE_TABLE =
077    TableName.valueOf(REPLICATE_NAMESPACE, "replicate_table");
078  private static final TableName NO_REPLICATE_TABLE =
079    TableName.valueOf(NO_REPLICATE_NAMESPACE, "no_replicate_table");
080  private static final byte[] CF_A = Bytes.toBytes("cfa");
081  private static final byte[] CF_B = Bytes.toBytes("cfb");
082
083  private byte[] row = Bytes.toBytes("r1");
084  private byte[] qualifier = Bytes.toBytes("q1");
085  private byte[] value = Bytes.toBytes("v1");
086
087  private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
088
089  private static Admin admin1;
090  private static Admin admin2;
091
092  private static ReplicationQueueStorage queueStorage;
093
094  private static File sourceDir;
095
096  @BeforeAll
097  public static void setUpBeforeClass() throws Exception {
098    configureClusters(UTIL1, UTIL2);
099    sourceDir = new File(UTIL1.getRandomDir().toString()).getAbsoluteFile();
100    setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
101    setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID);
102    startClusters();
103    admin1 = UTIL1.getConnection().getAdmin();
104    admin2 = UTIL2.getConnection().getAdmin();
105
106    queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getConnection(),
107      UTIL1.getConfiguration());
108
109    admin1.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build());
110    admin2.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build());
111    admin1.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build());
112    admin2.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build());
113  }
114
115  protected static void setupBulkLoadConfigsForCluster(Configuration config,
116    String clusterReplicationId) throws Exception {
117    config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
118    config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
119    File sourceConfigFolder = new File(sourceDir, clusterReplicationId);
120    sourceConfigFolder.mkdirs();
121    File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath(), "hbase-site.xml");
122    config.writeXml(new FileOutputStream(sourceConfigFile));
123    config.set(REPLICATION_CONF_DIR, sourceDir.getAbsolutePath());
124  }
125
126  @BeforeEach
127  public void setUp() throws Exception {
128    for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) {
129      admin1.removeReplicationPeer(peer.getPeerId());
130    }
131  }
132
133  @AfterEach
134  public void teardown() throws Exception {
135    for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) {
136      admin1.removeReplicationPeer(peer.getPeerId());
137    }
138    for (TableName tableName : admin1.listTableNames()) {
139      UTIL1.deleteTable(tableName);
140    }
141    for (TableName tableName : admin2.listTableNames()) {
142      UTIL2.deleteTable(tableName);
143    }
144  }
145
146  @Test
147  public void testWhenExcludeCF() throws Exception {
148    // Create table in source and remote clusters.
149    createTableOnClusters(REPLICATE_TABLE, CF_A, CF_B);
150    // Add peer, setReplicateAllUserTables true, but exclude CF_B.
151    Map<TableName, List<String>> excludeTableCFs = Maps.newHashMap();
152    excludeTableCFs.put(REPLICATE_TABLE, Lists.newArrayList(Bytes.toString(CF_B)));
153    ReplicationPeerConfig peerConfig =
154      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI())
155        .setReplicateAllUserTables(true).setExcludeTableCFsMap(excludeTableCFs).build();
156    admin1.addReplicationPeer(PEER_ID2, peerConfig);
157    assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
158    assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
159    assertFalse(peerConfig.needToReplicate(REPLICATE_TABLE, CF_B));
160
161    assertEquals(0, queueStorage.getAllHFileRefs().size());
162
163    // Bulk load data into the CF that is not replicated.
164    bulkLoadOnCluster(REPLICATE_TABLE, CF_B);
165    Threads.sleep(1000);
166
167    // Cannot get data from remote cluster
168    Table table2 = UTIL2.getConnection().getTable(REPLICATE_TABLE);
169    Result result = table2.get(new Get(row));
170    assertTrue(Bytes.equals(null, result.getValue(CF_B, qualifier)));
171    // The extra HFile is never added to the HFileRefs
172    assertEquals(0, queueStorage.getAllHFileRefs().size());
173  }
174
175  @Test
176  public void testWhenExcludeTable() throws Exception {
177    // Create 2 tables in source and remote clusters.
178    createTableOnClusters(REPLICATE_TABLE, CF_A);
179    createTableOnClusters(NO_REPLICATE_TABLE, CF_A);
180
181    // Add peer, setReplicateAllUserTables true, but exclude one table.
182    Map<TableName, List<String>> excludeTableCFs = Maps.newHashMap();
183    excludeTableCFs.put(NO_REPLICATE_TABLE, null);
184    ReplicationPeerConfig peerConfig =
185      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI())
186        .setReplicateAllUserTables(true).setExcludeTableCFsMap(excludeTableCFs).build();
187    admin1.addReplicationPeer(PEER_ID2, peerConfig);
188    assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
189    assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE));
190    assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
191    assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A));
192
193    assertEquals(0, queueStorage.getAllHFileRefs().size());
194
195    // Bulk load data into the table that is not replicated.
196    bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A);
197    Threads.sleep(1000);
198
199    // Cannot get data from remote cluster
200    Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE);
201    Result result = table2.get(new Get(row));
202    assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier)));
203
204    // The extra HFile is never added to the HFileRefs
205    assertEquals(0, queueStorage.getAllHFileRefs().size());
206  }
207
208  @Test
209  public void testWhenExcludeNamespace() throws Exception {
210    // Create 2 tables in source and remote clusters.
211    createTableOnClusters(REPLICATE_TABLE, CF_A);
212    createTableOnClusters(NO_REPLICATE_TABLE, CF_A);
213
214    // Add peer, setReplicateAllUserTables true, but exclude one namespace.
215    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
216      .setClusterKey(UTIL2.getRpcConnnectionURI()).setReplicateAllUserTables(true)
217      .setExcludeNamespaces(Sets.newHashSet(NO_REPLICATE_NAMESPACE)).build();
218    admin1.addReplicationPeer(PEER_ID2, peerConfig);
219    assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
220    assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE));
221    assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
222    assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A));
223
224    assertEquals(0, queueStorage.getAllHFileRefs().size());
225
226    // Bulk load data into the table of the namespace that is not replicated.
227    byte[] row = Bytes.toBytes("001");
228    bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A);
229    Threads.sleep(1000);
230
231    // Cannot get data from remote cluster
232    Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE);
233    Result result = table2.get(new Get(row));
234    assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier)));
235
236    // The extra HFile is never added to the HFileRefs
237    assertEquals(0, queueStorage.getAllHFileRefs().size());
238  }
239
240  protected void bulkLoadOnCluster(TableName tableName, byte[] family) throws Exception {
241    String bulkLoadFilePath = createHFileForFamilies(family);
242    copyToHdfs(family, bulkLoadFilePath, UTIL1.getDFSCluster());
243    BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(UTIL1.getConfiguration());
244    bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR);
245  }
246
247  private String createHFileForFamilies(byte[] family) throws IOException {
248    ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY);
249    cellBuilder.setRow(row).setFamily(family).setQualifier(qualifier).setValue(value)
250      .setType(Cell.Type.Put);
251
252    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(UTIL1.getConfiguration());
253    File randomDir = new File(UTIL1.getRandomDir().toString()).getAbsoluteFile();
254    randomDir.mkdirs();
255    File hFileLocation = new File(randomDir, "hfile");
256    FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
257    try {
258      hFileFactory.withOutputStream(out);
259      hFileFactory.withFileContext(new HFileContextBuilder().build());
260      HFile.Writer writer = hFileFactory.create();
261      try {
262        writer.append(new KeyValue(cellBuilder.build()));
263      } finally {
264        writer.close();
265      }
266    } finally {
267      out.close();
268    }
269    return hFileLocation.getAbsoluteFile().getAbsolutePath();
270  }
271
272  private void copyToHdfs(byte[] family, String bulkLoadFilePath, MiniDFSCluster cluster)
273    throws Exception {
274    Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, Bytes.toString(family));
275    cluster.getFileSystem().mkdirs(bulkLoadDir);
276    cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
277  }
278
279  private void createTableOnClusters(TableName tableName, byte[]... cfs) throws IOException {
280    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
281    for (byte[] cf : cfs) {
282      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cf)
283        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
284    }
285    TableDescriptor td = builder.build();
286    admin1.createTable(td);
287    admin2.createTable(td);
288  }
289}