001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
021import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertNotEquals;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.io.File;
027import java.io.FileOutputStream;
028import java.io.IOException;
029import java.net.UnknownHostException;
030import java.util.ArrayList;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.Optional;
035import java.util.concurrent.CountDownLatch;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicInteger;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FSDataOutputStream;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.CellBuilderType;
043import org.apache.hadoop.hbase.ExtendedCellBuilder;
044import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
045import org.apache.hadoop.hbase.HBaseTestingUtil;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.Admin;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
051import org.apache.hadoop.hbase.client.Connection;
052import org.apache.hadoop.hbase.client.ConnectionFactory;
053import org.apache.hadoop.hbase.client.Get;
054import org.apache.hadoop.hbase.client.Result;
055import org.apache.hadoop.hbase.client.Table;
056import org.apache.hadoop.hbase.client.TableDescriptor;
057import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
058import org.apache.hadoop.hbase.coprocessor.ObserverContext;
059import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
061import org.apache.hadoop.hbase.coprocessor.RegionObserver;
062import org.apache.hadoop.hbase.io.hfile.HFile;
063import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
064import org.apache.hadoop.hbase.regionserver.HRegion;
065import org.apache.hadoop.hbase.testclassification.LargeTests;
066import org.apache.hadoop.hbase.testclassification.ReplicationTests;
067import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.Pair;
070import org.apache.hadoop.hdfs.MiniDFSCluster;
071import org.junit.jupiter.api.AfterAll;
072import org.junit.jupiter.api.AfterEach;
073import org.junit.jupiter.api.BeforeAll;
074import org.junit.jupiter.api.BeforeEach;
075import org.junit.jupiter.api.Tag;
076import org.junit.jupiter.api.Test;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080/**
081 * Integration test for bulk load replication. Defines three clusters, with the following
082 * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between 2
083 * and 3). For each of defined test clusters, it performs a bulk load, asserting values on bulk
084 * loaded file gets replicated to other two peers. Since we are doing 3 bulk loads, with the given
085 * replication topology all these bulk loads should get replicated only once on each peer. To assert
086 * this, this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each
087 * of the clusters. This CP counts the amount of times bulk load actually gets invoked, certifying
088 * we are not entering the infinite loop condition addressed by HBASE-22380.
089 */
090@Tag(ReplicationTests.TAG)
091@Tag(LargeTests.TAG)
092public class TestBulkLoadReplication extends TestReplicationBaseNoBeforeAll {
093
094  protected static final Logger LOG = LoggerFactory.getLogger(TestBulkLoadReplication.class);
095
096  private static final String PEER1_CLUSTER_ID = "peer1";
097  private static final String PEER2_CLUSTER_ID = "peer2";
098  private static final String PEER3_CLUSTER_ID = "peer3";
099
100  private static final String PEER_ID1 = "1";
101  private static final String PEER_ID3 = "3";
102
103  private static AtomicInteger BULK_LOADS_COUNT;
104  private static CountDownLatch BULK_LOAD_LATCH;
105
106  protected static final HBaseTestingUtil UTIL3 = new HBaseTestingUtil();
107  protected static final Configuration CONF3 = UTIL3.getConfiguration();
108
109  private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
110
111  private static File SOURCE_DIR;
112
113  private static ReplicationQueueStorage queueStorage;
114
115  @BeforeAll
116  public static void setUpBeforeAll() throws Exception {
117    setupConfig(UTIL3, "/3");
118    configureClusters(UTIL1, UTIL2);
119    SOURCE_DIR = new File(UTIL1.getRandomDir().toString()).getAbsoluteFile();
120    setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
121    setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID);
122    setupBulkLoadConfigsForCluster(CONF3, PEER3_CLUSTER_ID);
123    startClusters();
124    startThirdCluster();
125    queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getConnection(),
126      UTIL1.getConfiguration());
127    setupCoprocessor(UTIL1);
128    setupCoprocessor(UTIL2);
129    setupCoprocessor(UTIL3);
130
131    ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
132    ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
133    ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
134    // Set up following topology: "1 <-> 2 <-> 3"
135    UTIL1.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
136    UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
137    // adds cluster3 as a remote peer on cluster2
138    UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
139    // adds cluster2 as a remote peer on cluster3
140    UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
141  }
142
143  @AfterAll
144  public static void tearDownAfterAll() throws Exception {
145    UTIL3.shutdownMiniCluster();
146  }
147
148  private static void startThirdCluster() throws Exception {
149    LOG.info("Setup Zk to same one from UTIL1 and UTIL2");
150    UTIL3.setZkCluster(UTIL1.getZkCluster());
151    UTIL3.startMiniCluster(NUM_SLAVES1);
152
153    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
154      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMobEnabled(true)
155        .setMobThreshold(4000).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
156      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
157
158    Connection connection3 = ConnectionFactory.createConnection(CONF3);
159    try (Admin admin3 = connection3.getAdmin()) {
160      admin3.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
161    }
162    UTIL3.waitUntilAllRegionsAssigned(tableName);
163  }
164
165  @BeforeEach
166  @Override
167  public void setUpBase() throws Exception {
168    // Removing the peer and adding it back causes previously completed bulk-load jobs to be
169    // resubmitted. Override setUpBase/tearDownBase so we do not add/remove peers between tests;
170    // peers are added once in @BeforeAll.
171    BULK_LOADS_COUNT = new AtomicInteger(0);
172  }
173
174  @AfterEach
175  @Override
176  public void tearDownBase() throws Exception {
177    // do not remove PEER_ID2
178  }
179
180  private static ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtil util)
181    throws UnknownHostException {
182    return ReplicationPeerConfig.newBuilder().setClusterKey(util.getRpcConnnectionURI())
183      .setSerial(false).build();
184  }
185
186  private static void setupCoprocessor(HBaseTestingUtil cluster) throws IOException {
187    for (HRegion region : cluster.getHBaseCluster().getRegions(tableName)) {
188      TestBulkLoadReplication.BulkReplicationTestObserver cp = region.getCoprocessorHost()
189        .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
190      if (cp == null) {
191        region.getCoprocessorHost().load(TestBulkLoadReplication.BulkReplicationTestObserver.class,
192          0, cluster.getConfiguration());
193        cp = region.getCoprocessorHost()
194          .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
195        cp.clusterName = cluster.getRpcConnnectionURI();
196      }
197    }
198  }
199
200  protected static void setupBulkLoadConfigsForCluster(Configuration config,
201    String clusterReplicationId) throws Exception {
202    config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
203    config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
204    File sourceConfigFolder = new File(SOURCE_DIR, clusterReplicationId);
205    sourceConfigFolder.mkdirs();
206    File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath(), "hbase-site.xml");
207    try (FileOutputStream out = new FileOutputStream(sourceConfigFile)) {
208      config.writeXml(out);
209    }
210    config.set(REPLICATION_CONF_DIR, SOURCE_DIR.getAbsolutePath());
211  }
212
213  @Test
214  public void testBulkLoadReplicationActiveActive() throws Exception {
215    Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
216    Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
217    Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
218    byte[] row = Bytes.toBytes("001");
219    byte[] value = Bytes.toBytes("v1");
220    assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable, peer2TestTable,
221      peer3TestTable);
222    row = Bytes.toBytes("002");
223    value = Bytes.toBytes("v2");
224    assertBulkLoadConditions(tableName, row, value, UTIL2, peer1TestTable, peer2TestTable,
225      peer3TestTable);
226    row = Bytes.toBytes("003");
227    value = Bytes.toBytes("v3");
228    assertBulkLoadConditions(tableName, row, value, UTIL3, peer1TestTable, peer2TestTable,
229      peer3TestTable);
230    // Additional wait to make sure no extra bulk load happens
231    Thread.sleep(400);
232    // We have 3 bulk load events (1 initiated on each cluster).
233    // Each event gets 3 counts (the originator cluster, plus the two peers),
234    // so BULK_LOADS_COUNT expected value is 3 * 3 = 9.
235    assertEquals(9, BULK_LOADS_COUNT.get());
236  }
237
238  protected void assertBulkLoadConditions(TableName tableName, byte[] row, byte[] value,
239    HBaseTestingUtil utility, Table... tables) throws Exception {
240    BULK_LOAD_LATCH = new CountDownLatch(3);
241    bulkLoadOnCluster(tableName, row, value, utility);
242    assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
243    assertTableHasValue(tables[0], row, value);
244    assertTableHasValue(tables[1], row, value);
245    assertTableHasValue(tables[2], row, value);
246  }
247
248  protected void bulkLoadOnCluster(TableName tableName, byte[] row, byte[] value,
249    HBaseTestingUtil cluster) throws Exception {
250    String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration());
251    copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster());
252    BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
253    bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR);
254  }
255
256  private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
257    Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, "f");
258    cluster.getFileSystem().mkdirs(bulkLoadDir);
259    cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
260  }
261
262  protected void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
263    Get get = new Get(row);
264    Result result = table.get(get);
265    assertTrue(result.advance());
266    assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
267  }
268
269  protected void assertTableNoValue(Table table, byte[] row, byte[] value) throws Exception {
270    Get get = new Get(row);
271    Result result = table.get(get);
272    assertTrue(result.isEmpty());
273  }
274
275  private String createHFileForFamilies(byte[] row, byte[] value, Configuration clusterConfig)
276    throws IOException {
277    ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY);
278    cellBuilder.setRow(row).setFamily(TestReplicationBase.famName).setQualifier(Bytes.toBytes("1"))
279      .setValue(value).setType(Cell.Type.Put);
280
281    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
282    // TODO We need a way to do this without creating files
283    File randomDir = new File(UTIL1.getRandomDir().toString()).getAbsoluteFile();
284    randomDir.mkdirs();
285    File hFileLocation = new File(randomDir, "hfile");
286    FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
287    try {
288      hFileFactory.withOutputStream(out);
289      hFileFactory.withFileContext(new HFileContextBuilder().build());
290      HFile.Writer writer = hFileFactory.create();
291      try {
292        writer.append(new KeyValue(cellBuilder.build()));
293      } finally {
294        writer.close();
295      }
296    } finally {
297      out.close();
298    }
299    return hFileLocation.getAbsoluteFile().getAbsolutePath();
300  }
301
302  public static class BulkReplicationTestObserver implements RegionCoprocessor {
303
304    String clusterName;
305    AtomicInteger bulkLoadCounts = new AtomicInteger();
306
307    @Override
308    public Optional<RegionObserver> getRegionObserver() {
309      return Optional.of(new RegionObserver() {
310
311        @Override
312        public void postBulkLoadHFile(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
313          List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths)
314          throws IOException {
315          BULK_LOAD_LATCH.countDown();
316          BULK_LOADS_COUNT.incrementAndGet();
317          LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName,
318            bulkLoadCounts.addAndGet(1));
319        }
320      });
321    }
322  }
323
324  @Test
325  public void testBulkloadReplicationActiveActiveForNoRepFamily() throws Exception {
326    Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
327    Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
328    Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
329    byte[] row = Bytes.toBytes("004");
330    byte[] value = Bytes.toBytes("v4");
331    assertBulkLoadConditionsForNoRepFamily(row, value, UTIL1, peer1TestTable, peer2TestTable,
332      peer3TestTable);
333    // additional wait to make sure no extra bulk load happens
334    Thread.sleep(400);
335    assertEquals(1, BULK_LOADS_COUNT.get());
336    assertEquals(0, queueStorage.getAllHFileRefs().size());
337  }
338
339  private void assertBulkLoadConditionsForNoRepFamily(byte[] row, byte[] value,
340    HBaseTestingUtil utility, Table... tables) throws Exception {
341    BULK_LOAD_LATCH = new CountDownLatch(1);
342    bulkLoadOnClusterForNoRepFamily(row, value, utility);
343    assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
344    assertTableHasValue(tables[0], row, value);
345    assertTableNotHasValue(tables[1], row, value);
346    assertTableNotHasValue(tables[2], row, value);
347  }
348
349  private void bulkLoadOnClusterForNoRepFamily(byte[] row, byte[] value, HBaseTestingUtil cluster)
350    throws Exception {
351    String bulkloadFile = createHFileForNoRepFamilies(row, value, cluster.getConfiguration());
352    Path bulkLoadFilePath = new Path(bulkloadFile);
353    copyToHdfsForNoRepFamily(bulkloadFile, cluster.getDFSCluster());
354    BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
355    Map<byte[], List<Path>> family2Files = new HashMap<>();
356    List<Path> files = new ArrayList<>();
357    files.add(new Path(
358      BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/" + bulkLoadFilePath.getName()));
359    family2Files.put(noRepfamName, files);
360    bulkLoadHFilesTool.bulkLoad(tableName, family2Files);
361  }
362
363  private String createHFileForNoRepFamilies(byte[] row, byte[] value, Configuration clusterConfig)
364    throws IOException {
365    ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY);
366    cellBuilder.setRow(row).setFamily(TestReplicationBase.noRepfamName)
367      .setQualifier(Bytes.toBytes("1")).setValue(value).setType(Cell.Type.Put);
368
369    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
370    // TODO We need a way to do this without creating files
371    File randomDir = new File(UTIL1.getRandomDir().toString()).getAbsoluteFile();
372    randomDir.mkdirs();
373    File hFileLocation = new File(randomDir, "hfile");
374    FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
375    try {
376      hFileFactory.withOutputStream(out);
377      hFileFactory.withFileContext(new HFileContextBuilder().build());
378      HFile.Writer writer = hFileFactory.create();
379      try {
380        writer.append(new KeyValue(cellBuilder.build()));
381      } finally {
382        writer.close();
383      }
384    } finally {
385      out.close();
386    }
387    return hFileLocation.getAbsoluteFile().getAbsolutePath();
388  }
389
390  private void copyToHdfsForNoRepFamily(String bulkLoadFilePath, MiniDFSCluster cluster)
391    throws Exception {
392    Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/");
393    cluster.getFileSystem().mkdirs(bulkLoadDir);
394    cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
395  }
396
397  private void assertTableNotHasValue(Table table, byte[] row, byte[] value) throws IOException {
398    Get get = new Get(row);
399    Result result = table.get(get);
400    assertNotEquals(Bytes.toString(value), Bytes.toString(result.value()));
401  }
402}