001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
021import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024import java.io.File;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Date;
030import java.util.HashMap;
031import java.util.List;
032import java.util.Map;
033import java.util.Optional;
034import java.util.Random;
035import java.util.UUID;
036import java.util.concurrent.CountDownLatch;
037import java.util.concurrent.ExecutorService;
038import java.util.concurrent.Executors;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicInteger;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FSDataOutputStream;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.CellBuilder;
046import org.apache.hadoop.hbase.CellBuilderFactory;
047import org.apache.hadoop.hbase.CellBuilderType;
048import org.apache.hadoop.hbase.HBaseClassTestRule;
049import org.apache.hadoop.hbase.HBaseConfiguration;
050import org.apache.hadoop.hbase.HBaseTestingUtility;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.KeyValue;
053import org.apache.hadoop.hbase.client.Admin;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
056import org.apache.hadoop.hbase.client.Connection;
057import org.apache.hadoop.hbase.client.ConnectionFactory;
058import org.apache.hadoop.hbase.client.Get;
059import org.apache.hadoop.hbase.client.Result;
060import org.apache.hadoop.hbase.client.Table;
061import org.apache.hadoop.hbase.client.TableDescriptor;
062import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
063import org.apache.hadoop.hbase.coprocessor.ObserverContext;
064import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
065import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
066import org.apache.hadoop.hbase.coprocessor.RegionObserver;
067import org.apache.hadoop.hbase.io.hfile.CacheConfig;
068import org.apache.hadoop.hbase.io.hfile.HFile;
069import org.apache.hadoop.hbase.io.hfile.HFileContext;
070import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
071import org.apache.hadoop.hbase.mob.MobConstants;
072import org.apache.hadoop.hbase.mob.MobFileName;
073import org.apache.hadoop.hbase.mob.MobUtils;
074import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
075import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
076import org.apache.hadoop.hbase.replication.TestReplicationBase;
077import org.apache.hadoop.hbase.testclassification.MediumTests;
078import org.apache.hadoop.hbase.testclassification.ReplicationTests;
079import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
080import org.apache.hadoop.hbase.util.Bytes;
081import org.apache.hadoop.hbase.util.FSUtils;
082import org.apache.hadoop.hbase.util.Pair;
083import org.apache.hadoop.hdfs.MiniDFSCluster;
084import org.junit.After;
085import org.junit.Before;
086import org.junit.BeforeClass;
087import org.junit.ClassRule;
088import org.junit.Rule;
089import org.junit.Test;
090import org.junit.experimental.categories.Category;
091import org.junit.rules.TemporaryFolder;
092import org.junit.rules.TestName;
093import org.slf4j.Logger;
094import org.slf4j.LoggerFactory;
095
096/**
097 * Integration test for bulk load replication. Defines three clusters, with the following
098 * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between
099 * 2 and 3).
100 *
101 * For each of defined test clusters, it performs a bulk load, asserting values on bulk loaded file
102 * gets replicated to other two peers. Since we are doing 3 bulk loads, with the given replication
103 * topology all these bulk loads should get replicated only once on each peer. To assert this,
104 * this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each of the
105 * clusters. This CP counts the amount of times bulk load actually gets invoked, certifying
106 * we are not entering the infinite loop condition addressed by HBASE-22380.
107 */
108@Category({ ReplicationTests.class, MediumTests.class})
109public class TestBulkLoadReplication extends TestReplicationBase {
110
111  @ClassRule
112  public static final HBaseClassTestRule CLASS_RULE =
113    HBaseClassTestRule.forClass(TestBulkLoadReplication.class);
114
115  protected static final Logger LOG =
116    LoggerFactory.getLogger(TestBulkLoadReplication.class);
117
118  private static final String PEER1_CLUSTER_ID = "peer1";
119  private static final String PEER4_CLUSTER_ID = "peer4";
120  private static final String PEER3_CLUSTER_ID = "peer3";
121
122  private static final String PEER_ID1 = "1";
123  private static final String PEER_ID3 = "3";
124  private static final String PEER_ID4 = "4";
125
126  private static AtomicInteger BULK_LOADS_COUNT;
127  private static CountDownLatch BULK_LOAD_LATCH;
128
129  private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
130
131  private static HBaseTestingUtility utility3;
132  private static HBaseTestingUtility utility4;
133  private static Configuration conf3;
134  private static Configuration conf4;
135  private static Table htable3;
136  private static Table htable4;
137
138  @Rule
139  public TestName name = new TestName();
140
141  @ClassRule
142  public static TemporaryFolder testFolder = new TemporaryFolder();
143
144  @BeforeClass
145  public static void setUpBeforeClass() throws Exception {
146    setupBulkLoadConfigsForCluster(conf1, PEER1_CLUSTER_ID);
147    conf3 = HBaseConfiguration.create(conf1);
148    conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
149    utility3 = new HBaseTestingUtility(conf3);
150    conf4 = HBaseConfiguration.create(conf1);
151    conf4.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/4");
152    utility3 = new HBaseTestingUtility(conf3);
153    utility4 = new HBaseTestingUtility(conf4);
154    TestReplicationBase.setUpBeforeClass();
155    setupBulkLoadConfigsForCluster(conf3, PEER3_CLUSTER_ID);
156    //utility4 is started within TestReplicationBase.setUpBeforeClass(), but we had not set
157    //bulkload replication configs yet, so setting a 4th utility.
158    setupBulkLoadConfigsForCluster(conf4, PEER4_CLUSTER_ID);
159    startCluster(utility3, conf3);
160    startCluster(utility4, conf4);
161  }
162
163  private static void startCluster(HBaseTestingUtility util, Configuration configuration)
164      throws Exception {
165    LOG.info("Setup Zk to same one from utility1 and utility4");
166    util.setZkCluster(utility1.getZkCluster());
167    util.startMiniCluster(2);
168
169    TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
170      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
171        .setMobEnabled(true)
172        .setMobThreshold(4000)
173        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
174      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
175
176    Connection connection = ConnectionFactory.createConnection(configuration);
177    try (Admin admin = connection.getAdmin()) {
178      admin.createTable(tableDesc, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
179    }
180    util.waitUntilAllRegionsAssigned(tableName);
181  }
182
183  @Before
184  @Override
185  public void setUpBase() throws Exception {
186    super.setUpBase();
187    ReplicationPeerConfig peer1Config = getPeerConfigForCluster(utility1);
188    ReplicationPeerConfig peer4Config = getPeerConfigForCluster(utility4);
189    ReplicationPeerConfig peer3Config = getPeerConfigForCluster(utility3);
190    //adds cluster4 as a remote peer on cluster1
191    utility1.getAdmin().addReplicationPeer(PEER_ID4, peer4Config);
192    //adds cluster1 as a remote peer on cluster4
193    utility4.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
194    //adds cluster3 as a remote peer on cluster4
195    utility4.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
196    //adds cluster4 as a remote peer on cluster3
197    utility3.getAdmin().addReplicationPeer(PEER_ID4, peer4Config);
198    setupCoprocessor(utility1);
199    setupCoprocessor(utility4);
200    setupCoprocessor(utility3);
201    BULK_LOADS_COUNT = new AtomicInteger(0);
202  }
203
204  private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) {
205    return ReplicationPeerConfig.newBuilder()
206      .setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build();
207  }
208
209  private void setupCoprocessor(HBaseTestingUtility cluster){
210    cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
211      try {
212        TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost().
213          findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
214        if(cp == null) {
215          r.getCoprocessorHost().
216            load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
217              cluster.getConfiguration());
218          cp = r.getCoprocessorHost().
219            findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
220          cp.clusterName = cluster.getClusterKey();
221        }
222      } catch (Exception e){
223        LOG.error(e.getMessage(), e);
224      }
225    });
226  }
227
228  @After
229  @Override
230  public void tearDownBase() throws Exception {
231    super.tearDownBase();
232    utility4.getAdmin().removeReplicationPeer(PEER_ID1);
233    utility4.getAdmin().removeReplicationPeer(PEER_ID3);
234    utility3.getAdmin().removeReplicationPeer(PEER_ID4);
235    utility1.getAdmin().removeReplicationPeer(PEER_ID4);
236  }
237
238  private static void setupBulkLoadConfigsForCluster(Configuration config,
239    String clusterReplicationId) throws Exception {
240    config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
241    config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
242    File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
243    File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath()
244      + "/hbase-site.xml");
245    config.writeXml(new FileOutputStream(sourceConfigFile));
246    config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
247  }
248
249  @Test
250  public void testBulkLoadReplicationActiveActive() throws Exception {
251    Table peer1TestTable = utility1.getConnection().getTable(TestReplicationBase.tableName);
252    Table peer4TestTable = utility4.getConnection().getTable(TestReplicationBase.tableName);
253    Table peer3TestTable = utility3.getConnection().getTable(TestReplicationBase.tableName);
254    byte[] row = Bytes.toBytes("001");
255    byte[] value = Bytes.toBytes("v1");
256    assertBulkLoadConditions(row, value, utility1, peer1TestTable, peer4TestTable, peer3TestTable);
257    row = Bytes.toBytes("002");
258    value = Bytes.toBytes("v2");
259    assertBulkLoadConditions(row, value, utility4, peer1TestTable, peer4TestTable, peer3TestTable);
260    row = Bytes.toBytes("003");
261    value = Bytes.toBytes("v3");
262    assertBulkLoadConditions(row, value, utility3, peer1TestTable, peer4TestTable, peer3TestTable);
263    //Additional wait to make sure no extra bulk load happens
264    Thread.sleep(400);
265    //We have 3 bulk load events (1 initiated on each cluster).
266    //Each event gets 3 counts (the originator cluster, plus the two peers),
267    //so BULK_LOADS_COUNT expected value is 3 * 3 = 9.
268    assertEquals(9, BULK_LOADS_COUNT.get());
269  }
270
271  @Test
272  public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception {
273    Path path = createMobFiles(utility3);
274    ColumnFamilyDescriptor descriptor =
275      new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName);
276    ExecutorService pool = null;
277    try {
278      pool = Executors.newFixedThreadPool(1);
279      PartitionedMobCompactor compactor =
280        new PartitionedMobCompactor(utility3.getConfiguration(), utility3.getTestFileSystem(),
281          tableName, descriptor, pool);
282      BULK_LOAD_LATCH = new CountDownLatch(1);
283      BULK_LOADS_COUNT.set(0);
284      compactor.compact(Arrays.asList(utility3.getTestFileSystem().listStatus(path)), true);
285      assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS));
286      Thread.sleep(400);
287      assertEquals(1, BULK_LOADS_COUNT.get());
288    } finally {
289      if(pool != null && !pool.isTerminated()) {
290        pool.shutdownNow();
291      }
292    }
293  }
294
295
296  private void assertBulkLoadConditions(byte[] row, byte[] value,
297      HBaseTestingUtility utility, Table...tables) throws Exception {
298    BULK_LOAD_LATCH = new CountDownLatch(3);
299    bulkLoadOnCluster(row, value, utility);
300    assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
301    assertTableHasValue(tables[0], row, value);
302    assertTableHasValue(tables[1], row, value);
303    assertTableHasValue(tables[2], row, value);
304  }
305
306  private void bulkLoadOnCluster(byte[] row, byte[] value,
307      HBaseTestingUtility cluster) throws Exception {
308    String bulkLoadFile = createHFileForFamilies(row, value, cluster.getConfiguration());
309    Path bulkLoadFilePath = new Path(bulkLoadFile);
310    copyToHdfs(bulkLoadFile, cluster.getDFSCluster());
311    LoadIncrementalHFiles bulkLoadHFilesTool =
312      new LoadIncrementalHFiles(cluster.getConfiguration());
313    Map<byte[], List<Path>> family2Files = new HashMap<>();
314    List<Path> files = new ArrayList<>();
315    files.add(new Path(BULK_LOAD_BASE_DIR + "/f/" + bulkLoadFilePath.getName()));
316    family2Files.put(Bytes.toBytes("f"), files);
317    bulkLoadHFilesTool.run(family2Files, tableName);
318  }
319
320  private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
321    Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR + "/f/");
322    cluster.getFileSystem().mkdirs(bulkLoadDir);
323    cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
324  }
325
326  private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
327    Get get = new Get(row);
328    Result result = table.get(get);
329    assertTrue(result.advance());
330    assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
331  }
332
333  private String createHFileForFamilies(byte[] row, byte[] value,
334      Configuration clusterConfig) throws IOException {
335    CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
336    cellBuilder.setRow(row)
337      .setFamily(TestReplicationBase.famName)
338      .setQualifier(Bytes.toBytes("1"))
339      .setValue(value)
340      .setType(Cell.Type.Put);
341
342    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
343    // TODO We need a way to do this without creating files
344    File hFileLocation = testFolder.newFile();
345    FSDataOutputStream out =
346      new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
347    try {
348      hFileFactory.withOutputStream(out);
349      hFileFactory.withFileContext(new HFileContext());
350      HFile.Writer writer = hFileFactory.create();
351      try {
352        writer.append(new KeyValue(cellBuilder.build()));
353      } finally {
354        writer.close();
355      }
356    } finally {
357      out.close();
358    }
359    return hFileLocation.getAbsoluteFile().getAbsolutePath();
360  }
361
362  private Path createMobFiles(HBaseTestingUtility util) throws IOException {
363    Path testDir = FSUtils.getRootDir(util.getConfiguration());
364    Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
365    Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f");
366    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
367    MobFileName mobFileName = null;
368    byte[] mobFileStartRow = new byte[32];
369    for (byte rowKey : Bytes.toBytes("01234")) {
370      mobFileName = MobFileName.create(mobFileStartRow, MobUtils.formatDate(new Date()),
371        UUID.randomUUID().toString().replaceAll("-", ""));
372      StoreFileWriter mobFileWriter =
373        new StoreFileWriter.Builder(util.getConfiguration(),
374          new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta)
375          .withFilePath(new Path(basePath, mobFileName.getFileName())).build();
376      long now = System.currentTimeMillis();
377      try {
378        for (int i = 0; i < 10; i++) {
379          byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i));
380          byte[] dummyData = new byte[5000];
381          new Random().nextBytes(dummyData);
382          mobFileWriter.append(
383            new KeyValue(key, famName, Bytes.toBytes("1"), now, KeyValue.Type.Put, dummyData));
384        }
385      } finally {
386        mobFileWriter.close();
387      }
388    }
389    return basePath;
390  }
391
392  public static class BulkReplicationTestObserver implements RegionCoprocessor {
393
394    String clusterName;
395    AtomicInteger bulkLoadCounts = new AtomicInteger();
396
397    @Override
398    public Optional<RegionObserver> getRegionObserver() {
399      return Optional.of(new RegionObserver() {
400
401        @Override
402        public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
403          List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths)
404            throws IOException {
405          BULK_LOAD_LATCH.countDown();
406          BULK_LOADS_COUNT.incrementAndGet();
407          LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName,
408            bulkLoadCounts.addAndGet(1));
409        }
410      });
411    }
412  }
413}