001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
021import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024
025import java.io.File;
026import java.io.FileOutputStream;
027import java.io.IOException;
028import java.util.Arrays;
029import java.util.Date;
030import java.util.List;
031import java.util.Map;
032import java.util.Optional;
033import java.util.Random;
034import java.util.UUID;
035import java.util.concurrent.CountDownLatch;
036import java.util.concurrent.ExecutorService;
037import java.util.concurrent.Executors;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicInteger;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FSDataOutputStream;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.Cell;
044import org.apache.hadoop.hbase.CellBuilder;
045import org.apache.hadoop.hbase.CellBuilderFactory;
046import org.apache.hadoop.hbase.CellBuilderType;
047import org.apache.hadoop.hbase.HBaseClassTestRule;
048import org.apache.hadoop.hbase.HBaseTestingUtility;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.KeyValue;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.client.Admin;
053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
055import org.apache.hadoop.hbase.client.Connection;
056import org.apache.hadoop.hbase.client.ConnectionFactory;
057import org.apache.hadoop.hbase.client.Get;
058import org.apache.hadoop.hbase.client.Result;
059import org.apache.hadoop.hbase.client.Table;
060import org.apache.hadoop.hbase.client.TableDescriptor;
061import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
062import org.apache.hadoop.hbase.coprocessor.ObserverContext;
063import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
064import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
065import org.apache.hadoop.hbase.coprocessor.RegionObserver;
066import org.apache.hadoop.hbase.io.hfile.CacheConfig;
067import org.apache.hadoop.hbase.io.hfile.HFile;
068import org.apache.hadoop.hbase.io.hfile.HFileContext;
069import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
070import org.apache.hadoop.hbase.mob.MobConstants;
071import org.apache.hadoop.hbase.mob.MobFileName;
072import org.apache.hadoop.hbase.mob.MobUtils;
073import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
074import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
075import org.apache.hadoop.hbase.replication.TestReplicationBase;
076import org.apache.hadoop.hbase.testclassification.MediumTests;
077import org.apache.hadoop.hbase.testclassification.ReplicationTests;
078import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
079import org.apache.hadoop.hbase.util.Bytes;
080import org.apache.hadoop.hbase.util.CommonFSUtils;
081import org.apache.hadoop.hbase.util.Pair;
082import org.apache.hadoop.hdfs.MiniDFSCluster;
083import org.junit.After;
084import org.junit.Before;
085import org.junit.BeforeClass;
086import org.junit.ClassRule;
087import org.junit.Rule;
088import org.junit.Test;
089import org.junit.experimental.categories.Category;
090import org.junit.rules.TemporaryFolder;
091import org.junit.rules.TestName;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095/**
096 * Integration test for bulk load replication. Defines three clusters, with the following
097 * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between
098 * 2 and 3).
099 *
100 * For each of defined test clusters, it performs a bulk load, asserting values on bulk loaded file
101 * gets replicated to other two peers. Since we are doing 3 bulk loads, with the given replication
102 * topology all these bulk loads should get replicated only once on each peer. To assert this,
103 * this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each of the
104 * clusters. This CP counts the amount of times bulk load actually gets invoked, certifying
105 * we are not entering the infinite loop condition addressed by HBASE-22380.
106 */
107@Category({ ReplicationTests.class, MediumTests.class})
108public class TestBulkLoadReplication extends TestReplicationBase {
109
110  @ClassRule
111  public static final HBaseClassTestRule CLASS_RULE =
112    HBaseClassTestRule.forClass(TestBulkLoadReplication.class);
113
114  protected static final Logger LOG =
115    LoggerFactory.getLogger(TestBulkLoadReplication.class);
116
117  private static final String PEER1_CLUSTER_ID = "peer1";
118  private static final String PEER2_CLUSTER_ID = "peer2";
119  private static final String PEER3_CLUSTER_ID = "peer3";
120
121  private static final String PEER_ID1 = "1";
122  private static final String PEER_ID3 = "3";
123
124  private static AtomicInteger BULK_LOADS_COUNT;
125  private static CountDownLatch BULK_LOAD_LATCH;
126
127  protected static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility();
128  protected static final Configuration CONF3 = UTIL3.getConfiguration();
129
130  private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
131
132  private static Table htable3;
133
134  @Rule
135  public TestName name = new TestName();
136
137  @ClassRule
138  public static TemporaryFolder testFolder = new TemporaryFolder();
139
140  @BeforeClass
141  public static void setUpBeforeClass() throws Exception {
142    setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
143    setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID);
144    setupBulkLoadConfigsForCluster(CONF3, PEER3_CLUSTER_ID);
145    setupConfig(UTIL3, "/3");
146    TestReplicationBase.setUpBeforeClass();
147    startThirdCluster();
148  }
149
150  private static void startThirdCluster() throws Exception {
151    LOG.info("Setup Zk to same one from UTIL1 and UTIL2");
152    UTIL3.setZkCluster(UTIL1.getZkCluster());
153    UTIL3.startMiniCluster(NUM_SLAVES1);
154
155    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
156      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
157        .setMobEnabled(true)
158        .setMobThreshold(4000)
159        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
160      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
161
162    Connection connection3 = ConnectionFactory.createConnection(CONF3);
163    try (Admin admin3 = connection3.getAdmin()) {
164      admin3.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
165    }
166    UTIL3.waitUntilAllRegionsAssigned(tableName);
167    htable3 = connection3.getTable(tableName);
168  }
169
170  @Before
171  @Override
172  public void setUpBase() throws Exception {
173    //"super.setUpBase()" already sets replication from 1->2,
174    //then on the subsequent lines, sets 2->1, 2->3 and 3->2.
175    //So we have following topology: "1 <-> 2 <->3"
176    super.setUpBase();
177    ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
178    ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
179    ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
180    //adds cluster1 as a remote peer on cluster2
181    UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
182    //adds cluster3 as a remote peer on cluster2
183    UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
184    //adds cluster2 as a remote peer on cluster3
185    UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
186    setupCoprocessor(UTIL1, "cluster1");
187    setupCoprocessor(UTIL2, "cluster2");
188    setupCoprocessor(UTIL3, "cluster3");
189    BULK_LOADS_COUNT = new AtomicInteger(0);
190  }
191
192  private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) {
193    return ReplicationPeerConfig.newBuilder()
194      .setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build();
195  }
196
197  private void setupCoprocessor(HBaseTestingUtility cluster, String name){
198    cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
199      try {
200        TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost().
201          findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
202        if(cp == null) {
203          r.getCoprocessorHost().
204            load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
205              cluster.getConfiguration());
206          cp = r.getCoprocessorHost().
207            findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
208          cp.clusterName = cluster.getClusterKey();
209        }
210      } catch (Exception e){
211        LOG.error(e.getMessage(), e);
212      }
213    });
214  }
215
216  @After
217  @Override
218  public void tearDownBase() throws Exception {
219    super.tearDownBase();
220    UTIL2.getAdmin().removeReplicationPeer(PEER_ID1);
221    UTIL2.getAdmin().removeReplicationPeer(PEER_ID3);
222    UTIL3.getAdmin().removeReplicationPeer(PEER_ID2);
223  }
224
225  protected static void setupBulkLoadConfigsForCluster(Configuration config,
226    String clusterReplicationId) throws Exception {
227    config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
228    config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
229    File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
230    File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath()
231      + "/hbase-site.xml");
232    config.writeXml(new FileOutputStream(sourceConfigFile));
233    config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
234  }
235
236  @Test
237  public void testBulkLoadReplicationActiveActive() throws Exception {
238    Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
239    Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
240    Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
241    byte[] row = Bytes.toBytes("001");
242    byte[] value = Bytes.toBytes("v1");
243    assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable,
244        peer2TestTable, peer3TestTable);
245    row = Bytes.toBytes("002");
246    value = Bytes.toBytes("v2");
247    assertBulkLoadConditions(tableName, row, value, UTIL2, peer1TestTable,
248        peer2TestTable, peer3TestTable);
249    row = Bytes.toBytes("003");
250    value = Bytes.toBytes("v3");
251    assertBulkLoadConditions(tableName, row, value, UTIL3, peer1TestTable,
252        peer2TestTable, peer3TestTable);
253    //Additional wait to make sure no extra bulk load happens
254    Thread.sleep(400);
255    //We have 3 bulk load events (1 initiated on each cluster).
256    //Each event gets 3 counts (the originator cluster, plus the two peers),
257    //so BULK_LOADS_COUNT expected value is 3 * 3 = 9.
258    assertEquals(9, BULK_LOADS_COUNT.get());
259  }
260
261  @Test
262  public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception {
263    Path path = createMobFiles(UTIL3);
264    ColumnFamilyDescriptor descriptor =
265      new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName);
266    ExecutorService pool = null;
267    try {
268      pool = Executors.newFixedThreadPool(1);
269      PartitionedMobCompactor compactor =
270        new PartitionedMobCompactor(UTIL3.getConfiguration(), UTIL3.getTestFileSystem(), tableName,
271          descriptor, pool);
272      BULK_LOAD_LATCH = new CountDownLatch(1);
273      BULK_LOADS_COUNT.set(0);
274      compactor.compact(Arrays.asList(UTIL3.getTestFileSystem().listStatus(path)), true);
275      assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS));
276      Thread.sleep(400);
277      assertEquals(1, BULK_LOADS_COUNT.get());
278    } finally {
279      if(pool != null && !pool.isTerminated()) {
280        pool.shutdownNow();
281      }
282    }
283  }
284
285
286  protected void assertBulkLoadConditions(TableName tableName, byte[] row, byte[] value,
287      HBaseTestingUtility utility, Table...tables) throws Exception {
288    BULK_LOAD_LATCH = new CountDownLatch(3);
289    bulkLoadOnCluster(tableName, row, value, utility);
290    assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
291    assertTableHasValue(tables[0], row, value);
292    assertTableHasValue(tables[1], row, value);
293    assertTableHasValue(tables[2], row, value);
294  }
295
296  protected void bulkLoadOnCluster(TableName tableName, byte[] row, byte[] value,
297                                 HBaseTestingUtility cluster) throws Exception {
298    String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration());
299    copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster());
300    BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
301    bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR);
302  }
303
304  private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
305    Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, "f");
306    cluster.getFileSystem().mkdirs(bulkLoadDir);
307    cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
308  }
309
310  protected void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
311    Get get = new Get(row);
312    Result result = table.get(get);
313    assertTrue(result.advance());
314    assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
315  }
316
317  protected void assertTableNoValue(Table table, byte[] row, byte[] value) throws Exception {
318    Get get = new Get(row);
319    Result result = table.get(get);
320    assertTrue(result.isEmpty());
321  }
322
323  private String createHFileForFamilies(byte[] row, byte[] value,
324      Configuration clusterConfig) throws IOException {
325    CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
326    cellBuilder.setRow(row)
327      .setFamily(TestReplicationBase.famName)
328      .setQualifier(Bytes.toBytes("1"))
329      .setValue(value)
330      .setType(Cell.Type.Put);
331
332    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
333    // TODO We need a way to do this without creating files
334    File hFileLocation = testFolder.newFile();
335    FSDataOutputStream out =
336      new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
337    try {
338      hFileFactory.withOutputStream(out);
339      hFileFactory.withFileContext(new HFileContextBuilder().build());
340      HFile.Writer writer = hFileFactory.create();
341      try {
342        writer.append(new KeyValue(cellBuilder.build()));
343      } finally {
344        writer.close();
345      }
346    } finally {
347      out.close();
348    }
349    return hFileLocation.getAbsoluteFile().getAbsolutePath();
350  }
351
352  private Path createMobFiles(HBaseTestingUtility util) throws IOException {
353    Path testDir = CommonFSUtils.getRootDir(util.getConfiguration());
354    Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
355    Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f");
356    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
357    MobFileName mobFileName = null;
358    byte[] mobFileStartRow = new byte[32];
359    for (byte rowKey : Bytes.toBytes("01234")) {
360      mobFileName = MobFileName.create(mobFileStartRow, MobUtils.formatDate(new Date()),
361        UUID.randomUUID().toString().replaceAll("-", ""));
362      StoreFileWriter mobFileWriter =
363        new StoreFileWriter.Builder(util.getConfiguration(),
364          new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta)
365          .withFilePath(new Path(basePath, mobFileName.getFileName())).build();
366      long now = System.currentTimeMillis();
367      try {
368        for (int i = 0; i < 10; i++) {
369          byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i));
370          byte[] dummyData = new byte[5000];
371          new Random().nextBytes(dummyData);
372          mobFileWriter.append(
373            new KeyValue(key, famName, Bytes.toBytes("1"), now, KeyValue.Type.Put, dummyData));
374        }
375      } finally {
376        mobFileWriter.close();
377      }
378    }
379    return basePath;
380  }
381
382  public static class BulkReplicationTestObserver implements RegionCoprocessor {
383
384    String clusterName;
385    AtomicInteger bulkLoadCounts = new AtomicInteger();
386
387    @Override
388    public Optional<RegionObserver> getRegionObserver() {
389      return Optional.of(new RegionObserver() {
390
391        @Override
392        public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
393          List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths)
394            throws IOException {
395          BULK_LOAD_LATCH.countDown();
396          BULK_LOADS_COUNT.incrementAndGet();
397          LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName,
398            bulkLoadCounts.addAndGet(1));
399        }
400      });
401    }
402  }
403}