001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
021import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNotEquals;
024import static org.junit.Assert.assertTrue;
025
026import java.io.File;
027import java.io.FileOutputStream;
028import java.io.IOException;
029import java.net.UnknownHostException;
030import java.util.ArrayList;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.Optional;
035import java.util.concurrent.CountDownLatch;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicInteger;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FSDataOutputStream;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.CellBuilderType;
043import org.apache.hadoop.hbase.ExtendedCellBuilder;
044import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
045import org.apache.hadoop.hbase.HBaseClassTestRule;
046import org.apache.hadoop.hbase.HBaseTestingUtil;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.KeyValue;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.Admin;
051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
052import org.apache.hadoop.hbase.client.Connection;
053import org.apache.hadoop.hbase.client.ConnectionFactory;
054import org.apache.hadoop.hbase.client.Get;
055import org.apache.hadoop.hbase.client.Result;
056import org.apache.hadoop.hbase.client.Table;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
059import org.apache.hadoop.hbase.coprocessor.ObserverContext;
060import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
062import org.apache.hadoop.hbase.coprocessor.RegionObserver;
063import org.apache.hadoop.hbase.io.hfile.HFile;
064import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
065import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
066import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
067import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
068import org.apache.hadoop.hbase.replication.TestReplicationBase;
069import org.apache.hadoop.hbase.testclassification.MediumTests;
070import org.apache.hadoop.hbase.testclassification.ReplicationTests;
071import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
072import org.apache.hadoop.hbase.util.Bytes;
073import org.apache.hadoop.hbase.util.Pair;
074import org.apache.hadoop.hdfs.MiniDFSCluster;
075import org.junit.Before;
076import org.junit.BeforeClass;
077import org.junit.ClassRule;
078import org.junit.Rule;
079import org.junit.Test;
080import org.junit.experimental.categories.Category;
081import org.junit.rules.TemporaryFolder;
082import org.junit.rules.TestName;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086/**
087 * Integration test for bulk load replication. Defines three clusters, with the following
088 * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between 2
089 * and 3). For each of defined test clusters, it performs a bulk load, asserting values on bulk
090 * loaded file gets replicated to other two peers. Since we are doing 3 bulk loads, with the given
091 * replication topology all these bulk loads should get replicated only once on each peer. To assert
092 * this, this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each
093 * of the clusters. This CP counts the amount of times bulk load actually gets invoked, certifying
094 * we are not entering the infinite loop condition addressed by HBASE-22380.
095 */
096@Category({ ReplicationTests.class, MediumTests.class })
097public class TestBulkLoadReplication extends TestReplicationBase {
098
099  @ClassRule
100  public static final HBaseClassTestRule CLASS_RULE =
101    HBaseClassTestRule.forClass(TestBulkLoadReplication.class);
102
103  protected static final Logger LOG = LoggerFactory.getLogger(TestBulkLoadReplication.class);
104
105  private static final String PEER1_CLUSTER_ID = "peer1";
106  private static final String PEER2_CLUSTER_ID = "peer2";
107  private static final String PEER3_CLUSTER_ID = "peer3";
108
109  private static final String PEER_ID1 = "1";
110  private static final String PEER_ID3 = "3";
111
112  private static AtomicInteger BULK_LOADS_COUNT;
113  private static CountDownLatch BULK_LOAD_LATCH;
114
115  protected static final HBaseTestingUtil UTIL3 = new HBaseTestingUtil();
116  protected static final Configuration CONF3 = UTIL3.getConfiguration();
117
118  private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
119
120  private static Table htable3;
121
122  @Rule
123  public TestName name = new TestName();
124
125  @ClassRule
126  public static TemporaryFolder testFolder = new TemporaryFolder();
127
128  private static ReplicationQueueStorage queueStorage;
129
130  private static boolean replicationPeersAdded = false;
131
132  @BeforeClass
133  public static void setUpBeforeClass() throws Exception {
134    setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
135    setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID);
136    setupBulkLoadConfigsForCluster(CONF3, PEER3_CLUSTER_ID);
137    setupConfig(UTIL3, "/3");
138    TestReplicationBase.setUpBeforeClass();
139    startThirdCluster();
140    queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getConnection(),
141      UTIL1.getConfiguration());
142  }
143
144  private static void startThirdCluster() throws Exception {
145    LOG.info("Setup Zk to same one from UTIL1 and UTIL2");
146    UTIL3.setZkCluster(UTIL1.getZkCluster());
147    UTIL3.startMiniCluster(NUM_SLAVES1);
148
149    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
150      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMobEnabled(true)
151        .setMobThreshold(4000).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
152      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
153
154    Connection connection3 = ConnectionFactory.createConnection(CONF3);
155    try (Admin admin3 = connection3.getAdmin()) {
156      admin3.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
157    }
158    UTIL3.waitUntilAllRegionsAssigned(tableName);
159    htable3 = connection3.getTable(tableName);
160  }
161
162  @Before
163  @Override
164  public void setUpBase() throws Exception {
165    // removing the peer and adding again causing the previously completed bulk load jobs getting
166    // submitted again, adding a check to add the peers only once.
167    if (!replicationPeersAdded) {
168      // "super.setUpBase()" already sets replication from 1->2,
169      // then on the subsequent lines, sets 2->1, 2->3 and 3->2.
170      // So we have following topology: "1 <-> 2 <->3"
171      super.setUpBase();
172      ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
173      ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
174      ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
175      // adds cluster1 as a remote peer on cluster2
176      UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
177      // adds cluster3 as a remote peer on cluster2
178      UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
179      // adds cluster2 as a remote peer on cluster3
180      UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
181      setupCoprocessor(UTIL1);
182      setupCoprocessor(UTIL2);
183      setupCoprocessor(UTIL3);
184      replicationPeersAdded = true;
185    }
186    BULK_LOADS_COUNT = new AtomicInteger(0);
187  }
188
189  private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtil util)
190    throws UnknownHostException {
191    return ReplicationPeerConfig.newBuilder().setClusterKey(util.getRpcConnnectionURI())
192      .setSerial(isSerialPeer()).build();
193  }
194
195  private void setupCoprocessor(HBaseTestingUtil cluster) {
196    cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
197      try {
198        TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost()
199          .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
200        if (cp == null) {
201          r.getCoprocessorHost().load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
202            cluster.getConfiguration());
203          cp = r.getCoprocessorHost()
204            .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
205          cp.clusterName = cluster.getRpcConnnectionURI();
206        }
207      } catch (Exception e) {
208        LOG.error(e.getMessage(), e);
209      }
210    });
211  }
212
213  protected static void setupBulkLoadConfigsForCluster(Configuration config,
214    String clusterReplicationId) throws Exception {
215    config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
216    config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
217    File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
218    File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() + "/hbase-site.xml");
219    config.writeXml(new FileOutputStream(sourceConfigFile));
220    config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
221  }
222
223  @Test
224  public void testBulkLoadReplicationActiveActive() throws Exception {
225    Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
226    Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
227    Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
228    byte[] row = Bytes.toBytes("001");
229    byte[] value = Bytes.toBytes("v1");
230    assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable, peer2TestTable,
231      peer3TestTable);
232    row = Bytes.toBytes("002");
233    value = Bytes.toBytes("v2");
234    assertBulkLoadConditions(tableName, row, value, UTIL2, peer1TestTable, peer2TestTable,
235      peer3TestTable);
236    row = Bytes.toBytes("003");
237    value = Bytes.toBytes("v3");
238    assertBulkLoadConditions(tableName, row, value, UTIL3, peer1TestTable, peer2TestTable,
239      peer3TestTable);
240    // Additional wait to make sure no extra bulk load happens
241    Thread.sleep(400);
242    // We have 3 bulk load events (1 initiated on each cluster).
243    // Each event gets 3 counts (the originator cluster, plus the two peers),
244    // so BULK_LOADS_COUNT expected value is 3 * 3 = 9.
245    assertEquals(9, BULK_LOADS_COUNT.get());
246  }
247
248  protected void assertBulkLoadConditions(TableName tableName, byte[] row, byte[] value,
249    HBaseTestingUtil utility, Table... tables) throws Exception {
250    BULK_LOAD_LATCH = new CountDownLatch(3);
251    bulkLoadOnCluster(tableName, row, value, utility);
252    assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
253    assertTableHasValue(tables[0], row, value);
254    assertTableHasValue(tables[1], row, value);
255    assertTableHasValue(tables[2], row, value);
256  }
257
258  protected void bulkLoadOnCluster(TableName tableName, byte[] row, byte[] value,
259    HBaseTestingUtil cluster) throws Exception {
260    String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration());
261    copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster());
262    BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
263    bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR);
264  }
265
266  private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
267    Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, "f");
268    cluster.getFileSystem().mkdirs(bulkLoadDir);
269    cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
270  }
271
272  protected void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
273    Get get = new Get(row);
274    Result result = table.get(get);
275    assertTrue(result.advance());
276    assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
277  }
278
279  protected void assertTableNoValue(Table table, byte[] row, byte[] value) throws Exception {
280    Get get = new Get(row);
281    Result result = table.get(get);
282    assertTrue(result.isEmpty());
283  }
284
285  private String createHFileForFamilies(byte[] row, byte[] value, Configuration clusterConfig)
286    throws IOException {
287    ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY);
288    cellBuilder.setRow(row).setFamily(TestReplicationBase.famName).setQualifier(Bytes.toBytes("1"))
289      .setValue(value).setType(Cell.Type.Put);
290
291    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
292    // TODO We need a way to do this without creating files
293    File hFileLocation = testFolder.newFile();
294    FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
295    try {
296      hFileFactory.withOutputStream(out);
297      hFileFactory.withFileContext(new HFileContextBuilder().build());
298      HFile.Writer writer = hFileFactory.create();
299      try {
300        writer.append(new KeyValue(cellBuilder.build()));
301      } finally {
302        writer.close();
303      }
304    } finally {
305      out.close();
306    }
307    return hFileLocation.getAbsoluteFile().getAbsolutePath();
308  }
309
310  public static class BulkReplicationTestObserver implements RegionCoprocessor {
311
312    String clusterName;
313    AtomicInteger bulkLoadCounts = new AtomicInteger();
314
315    @Override
316    public Optional<RegionObserver> getRegionObserver() {
317      return Optional.of(new RegionObserver() {
318
319        @Override
320        public void postBulkLoadHFile(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
321          List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths)
322          throws IOException {
323          BULK_LOAD_LATCH.countDown();
324          BULK_LOADS_COUNT.incrementAndGet();
325          LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName,
326            bulkLoadCounts.addAndGet(1));
327        }
328      });
329    }
330  }
331
332  @Test
333  public void testBulkloadReplicationActiveActiveForNoRepFamily() throws Exception {
334    Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
335    Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
336    Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
337    byte[] row = Bytes.toBytes("004");
338    byte[] value = Bytes.toBytes("v4");
339    assertBulkLoadConditionsForNoRepFamily(row, value, UTIL1, peer1TestTable, peer2TestTable,
340      peer3TestTable);
341    // additional wait to make sure no extra bulk load happens
342    Thread.sleep(400);
343    assertEquals(1, BULK_LOADS_COUNT.get());
344    assertEquals(0, queueStorage.getAllHFileRefs().size());
345  }
346
347  private void assertBulkLoadConditionsForNoRepFamily(byte[] row, byte[] value,
348    HBaseTestingUtil utility, Table... tables) throws Exception {
349    BULK_LOAD_LATCH = new CountDownLatch(1);
350    bulkLoadOnClusterForNoRepFamily(row, value, utility);
351    assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
352    assertTableHasValue(tables[0], row, value);
353    assertTableNotHasValue(tables[1], row, value);
354    assertTableNotHasValue(tables[2], row, value);
355  }
356
357  private void bulkLoadOnClusterForNoRepFamily(byte[] row, byte[] value, HBaseTestingUtil cluster)
358    throws Exception {
359    String bulkloadFile = createHFileForNoRepFamilies(row, value, cluster.getConfiguration());
360    Path bulkLoadFilePath = new Path(bulkloadFile);
361    copyToHdfsForNoRepFamily(bulkloadFile, cluster.getDFSCluster());
362    BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
363    Map<byte[], List<Path>> family2Files = new HashMap<>();
364    List<Path> files = new ArrayList<>();
365    files.add(new Path(
366      BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/" + bulkLoadFilePath.getName()));
367    family2Files.put(noRepfamName, files);
368    bulkLoadHFilesTool.bulkLoad(tableName, family2Files);
369  }
370
371  private String createHFileForNoRepFamilies(byte[] row, byte[] value, Configuration clusterConfig)
372    throws IOException {
373    ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY);
374    cellBuilder.setRow(row).setFamily(TestReplicationBase.noRepfamName)
375      .setQualifier(Bytes.toBytes("1")).setValue(value).setType(Cell.Type.Put);
376
377    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
378    // TODO We need a way to do this without creating files
379    File hFileLocation = testFolder.newFile();
380    FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
381    try {
382      hFileFactory.withOutputStream(out);
383      hFileFactory.withFileContext(new HFileContextBuilder().build());
384      HFile.Writer writer = hFileFactory.create();
385      try {
386        writer.append(new KeyValue(cellBuilder.build()));
387      } finally {
388        writer.close();
389      }
390    } finally {
391      out.close();
392    }
393    return hFileLocation.getAbsoluteFile().getAbsolutePath();
394  }
395
396  private void copyToHdfsForNoRepFamily(String bulkLoadFilePath, MiniDFSCluster cluster)
397    throws Exception {
398    Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/");
399    cluster.getFileSystem().mkdirs(bulkLoadDir);
400    cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
401  }
402
403  private void assertTableNotHasValue(Table table, byte[] row, byte[] value) throws IOException {
404    Get get = new Get(row);
405    Result result = table.get(get);
406    assertNotEquals(Bytes.toString(value), Bytes.toString(result.value()));
407  }
408}