001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.junit.Assert.assertEquals; 021 022import com.google.protobuf.ServiceException; 023import java.io.IOException; 024import java.util.List; 025import java.util.Random; 026import java.util.SortedMap; 027import java.util.TreeMap; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Delete; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.master.HMaster; 035import org.apache.hadoop.hbase.regionserver.HRegionServer; 036import org.apache.hadoop.hbase.regionserver.Region; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.testclassification.RegionServerTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.junit.After; 041import org.junit.Before; 042import org.junit.ClassRule; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045 046import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 047 048import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; 051 052@Category({ RegionServerTests.class, MediumTests.class }) 053public class TestWALFiltering { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestWALFiltering.class); 058 059 private static final int NUM_RS = 4; 060 061 private static final TableName TABLE_NAME = TableName.valueOf("TestWALFiltering"); 062 private static final byte[] CF1 = Bytes.toBytes("MyCF1"); 063 private static final byte[] CF2 = Bytes.toBytes("MyCF2"); 064 private static final byte[][] FAMILIES = { CF1, CF2 }; 065 066 private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 067 068 @Before 069 public void setUp() throws Exception { 070 TEST_UTIL.startMiniCluster(NUM_RS); 071 fillTable(); 072 } 073 074 @After 075 public void tearDown() throws Exception { 076 TEST_UTIL.shutdownMiniCluster(); 077 } 078 079 private void fillTable() throws IOException, InterruptedException { 080 Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILIES, 3, Bytes.toBytes("row0"), 081 Bytes.toBytes("row99"), NUM_RS); 082 Random rand = new Random(19387129L); 083 for (int iStoreFile = 0; iStoreFile < 4; ++iStoreFile) { 084 for (int iRow = 0; iRow < 100; ++iRow) { 085 final byte[] row = Bytes.toBytes("row" + iRow); 086 Put put = new Put(row); 087 Delete del = new Delete(row); 088 for (int iCol = 0; iCol < 10; ++iCol) { 089 final byte[] cf = rand.nextBoolean() ? CF1 : CF2; 090 final long ts = Math.abs(rand.nextInt()); 091 final byte[] qual = Bytes.toBytes("col" + iCol); 092 if (rand.nextBoolean()) { 093 final byte[] value = 094 Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_" 095 + iCol + "_ts_" + ts + "_random_" + rand.nextLong()); 096 put.addColumn(cf, qual, ts, value); 097 } else if (rand.nextDouble() < 0.8) { 098 del.addColumn(cf, qual, ts); 099 } else { 100 del.addColumn(cf, qual, ts); 101 } 102 } 103 table.put(put); 104 table.delete(del); 105 } 106 } 107 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME); 108 } 109 110 @Test 111 public void testFlushedSequenceIdsSentToHMaster() throws IOException, InterruptedException, 112 org.apache.hbase.thirdparty.com.google.protobuf.ServiceException, ServiceException { 113 SortedMap<byte[], Long> allFlushedSequenceIds = new TreeMap<>(Bytes.BYTES_COMPARATOR); 114 for (int i = 0; i < NUM_RS; ++i) { 115 flushAllRegions(i); 116 } 117 Thread.sleep(10000); 118 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 119 for (int i = 0; i < NUM_RS; ++i) { 120 for (byte[] regionName : getRegionsByServer(i)) { 121 if (allFlushedSequenceIds.containsKey(regionName)) { 122 GetLastFlushedSequenceIdRequest req = 123 RequestConverter.buildGetLastFlushedSequenceIdRequest(regionName); 124 125 assertEquals((long) allFlushedSequenceIds.get(regionName), master.getMasterRpcServices() 126 .getLastFlushedSequenceId(null, req).getLastFlushedSequenceId()); 127 } 128 } 129 } 130 } 131 132 private List<byte[]> getRegionsByServer(int rsId) throws IOException { 133 List<byte[]> regionNames = Lists.newArrayList(); 134 HRegionServer hrs = getRegionServer(rsId); 135 for (Region r : hrs.getRegions(TABLE_NAME)) { 136 regionNames.add(r.getRegionInfo().getRegionName()); 137 } 138 return regionNames; 139 } 140 141 private HRegionServer getRegionServer(int rsId) { 142 return TEST_UTIL.getMiniHBaseCluster().getRegionServer(rsId); 143 } 144 145 private void flushAllRegions(int rsId) throws ServiceException, 146 org.apache.hbase.thirdparty.com.google.protobuf.ServiceException, IOException { 147 HRegionServer hrs = getRegionServer(rsId); 148 for (byte[] regionName : getRegionsByServer(rsId)) { 149 FlushRegionRequest request = RequestConverter.buildFlushRegionRequest(regionName); 150 hrs.getRSRpcServices().flushRegion(null, request); 151 } 152 } 153 154}