001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.fail; 022 023import java.io.IOException; 024import org.apache.hadoop.hbase.HConstants; 025import org.apache.hadoop.hbase.client.Get; 026import org.apache.hadoop.hbase.client.Put; 027import org.apache.hadoop.hbase.client.Result; 028import org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll; 029import org.apache.hadoop.hbase.testclassification.MediumTests; 030import org.apache.hadoop.hbase.testclassification.ReplicationTests; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.junit.jupiter.api.BeforeAll; 033import org.junit.jupiter.api.Tag; 034import org.junit.jupiter.api.Test; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038@Tag(ReplicationTests.TAG) 039@Tag(MediumTests.TAG) 040public class TestReplicationCompressedWAL extends TestReplicationBaseNoBeforeAll { 041 042 static final Logger LOG = LoggerFactory.getLogger(TestReplicationCompressedWAL.class); 043 static final int NUM_BATCHES = 20; 044 static final int NUM_ROWS_PER_BATCH = 100; 045 046 @BeforeAll 047 public static void setUpBeforeClass() throws Exception { 048 configureClusters(UTIL1, UTIL2); 049 CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); 050 startClusters(); 051 } 052 053 @Test 054 public void testMultiplePuts() throws Exception { 055 runMultiplePutTest(); 056 } 057 058 protected static void runMultiplePutTest() throws IOException, InterruptedException { 059 for (int i = 0; i < NUM_BATCHES; i++) { 060 putBatch(i); 061 getBatch(i); 062 } 063 } 064 065 protected static void getBatch(int batch) throws IOException, InterruptedException { 066 for (int i = 0; i < NUM_ROWS_PER_BATCH; i++) { 067 byte[] row = getRowKey(batch, i); 068 Get get = new Get(row); 069 for (int j = 0; j < NB_RETRIES; j++) { 070 if (j == NB_RETRIES - 1) { 071 fail("Waited too much time for replication"); 072 } 073 Result res = htable2.get(get); 074 if (res.isEmpty()) { 075 LOG.info("Row not available"); 076 Thread.sleep(SLEEP_TIME); 077 } else { 078 assertArrayEquals(row, res.value()); 079 break; 080 } 081 } 082 } 083 } 084 085 protected static byte[] getRowKey(int batch, int count) { 086 return Bytes.toBytes("row" + ((batch * NUM_ROWS_PER_BATCH) + count)); 087 } 088 089 protected static void putBatch(int batch) throws IOException { 090 for (int i = 0; i < NUM_ROWS_PER_BATCH; i++) { 091 byte[] row = getRowKey(batch, i); 092 Put put = new Put(row); 093 put.addColumn(famName, row, row); 094 htable1.put(put); 095 } 096 } 097 098}