001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example.row.stats; 019 020import static org.awaitility.Awaitility.await; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertFalse; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.IOException; 026import java.time.Duration; 027import java.util.Optional; 028import java.util.concurrent.ThreadLocalRandom; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellScanner; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.ConnectionFactory; 037import org.apache.hadoop.hbase.client.Delete; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.RowStatisticsRecorder; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.junit.jupiter.api.AfterAll; 044import org.junit.jupiter.api.BeforeAll; 045import org.junit.jupiter.api.BeforeEach; 046import org.junit.jupiter.api.Tag; 047import org.junit.jupiter.api.Test; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051@Tag(MediumTests.TAG) 052public class TestRowStatisticsCompactionObserver { 053 054 private static final Logger LOG = 055 LoggerFactory.getLogger(TestRowStatisticsCompactionObserver.class); 056 057 public static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 058 public static final TestableRowStatisticsRecorder RECORDER = new TestableRowStatisticsRecorder(); 059 private static final TableName TABLE_NAME = TableName.valueOf("test-table"); 060 private static final byte[] FAMILY = Bytes.toBytes("0"); 061 private static SingleProcessHBaseCluster cluster; 062 private static Connection connection; 063 private static Table table; 064 065 @BeforeAll 066 public static void setUpClass() throws Exception { 067 cluster = TEST_UTIL.startMiniCluster(1); 068 connection = ConnectionFactory.createConnection(cluster.getConf()); 069 table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 1, 070 HConstants.DEFAULT_BLOCKSIZE, TestableRowStatisticsCompactionObserver.class.getName()); 071 } 072 073 @AfterAll 074 public static void afterClass() throws Exception { 075 cluster.close(); 076 TEST_UTIL.shutdownMiniCluster(); 077 table.close(); 078 connection.close(); 079 } 080 081 @BeforeEach 082 public void setUp() throws Exception { 083 RECORDER.clear(); 084 } 085 086 @Test 087 public void itRecordsStats() throws IOException, InterruptedException { 088 int numRows = 10; 089 int largestRowNum = -1; 090 int largestRowSize = 0; 091 092 int largestCellRowNum = -1; 093 int largestCellColNum = -1; 094 long largestCellSize = 0; 095 096 for (int i = 0; i < numRows; i++) { 097 int cells = ThreadLocalRandom.current().nextInt(1000) + 10; 098 099 Put p = new Put(Bytes.toBytes(i)); 100 for (int j = 0; j < cells; j++) { 101 byte[] val = new byte[ThreadLocalRandom.current().nextInt(100) + 1]; 102 p.addColumn(FAMILY, Bytes.toBytes(j), val); 103 } 104 105 int rowSize = 0; 106 CellScanner cellScanner = p.cellScanner(); 107 int j = 0; 108 while (cellScanner.advance()) { 109 Cell current = cellScanner.current(); 110 int serializedSize = current.getSerializedSize(); 111 if (serializedSize > largestCellSize) { 112 largestCellSize = serializedSize; 113 largestCellRowNum = i; 114 largestCellColNum = j; 115 } 116 rowSize += serializedSize; 117 j++; 118 } 119 120 if (rowSize > largestRowSize) { 121 largestRowNum = i; 122 largestRowSize = rowSize; 123 } 124 125 table.put(p); 126 connection.getAdmin().flush(table.getName()); 127 } 128 129 for (int i = 0; i < numRows; i++) { 130 Delete d = new Delete(Bytes.toBytes(i)); 131 d.addColumn(FAMILY, Bytes.toBytes(0)); 132 table.delete(d); 133 } 134 135 LOG.info("Final flush"); 136 await().atMost(Duration.ofSeconds(10)) 137 .untilAsserted(() -> connection.getAdmin().flush(table.getName())); 138 139 LOG.info("Compacting"); 140 connection.getAdmin().compact(table.getName()); 141 await().during(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(10)) 142 .until(() -> RECORDER.getLastStats() != null); 143 assertFalse(RECORDER.getLastIsMajor()); 144 assertEquals(10, RECORDER.getLastStats().getTotalDeletesCount()); 145 assertEquals(10, RECORDER.getLastStats().getTotalRowsCount()); 146 147 RECORDER.clear(); 148 connection.getAdmin().majorCompact(table.getName()); 149 150 // Must wait for async majorCompaction to complete 151 await().during(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(10)) 152 .until(() -> RECORDER.getLastStats() != null); 153 assertTrue(RECORDER.getLastIsMajor()); 154 // no deletes after major compact 155 RowStatisticsImpl lastStats = RECORDER.getLastStats(); 156 assertEquals(0, lastStats.getTotalDeletesCount()); 157 assertEquals(10, lastStats.getTotalRowsCount()); 158 // can only check largest values after major compact, since the above minor compact might not 159 // contain all storefiles 160 assertEquals(Bytes.toInt(lastStats.getLargestRow()), largestRowNum); 161 assertEquals( 162 Bytes.toInt(lastStats.getLargestCell().getRowArray(), 163 lastStats.getLargestCell().getRowOffset(), lastStats.getLargestCell().getRowLength()), 164 largestCellRowNum); 165 assertEquals(Bytes.toInt(lastStats.getLargestCell().getQualifierArray(), 166 lastStats.getLargestCell().getQualifierOffset(), 167 lastStats.getLargestCell().getQualifierLength()), largestCellColNum); 168 } 169 170 public static class TestableRowStatisticsCompactionObserver 171 extends RowStatisticsCompactionObserver { 172 173 public TestableRowStatisticsCompactionObserver() { 174 super(TestRowStatisticsCompactionObserver.RECORDER); 175 } 176 } 177 178 public static class TestableRowStatisticsRecorder implements RowStatisticsRecorder { 179 180 private volatile RowStatisticsImpl lastStats = null; 181 private volatile Boolean lastIsMajor = null; 182 183 @Override 184 public void record(RowStatisticsImpl stats, Optional<byte[]> fullRegionName) { 185 LOG.info("Record called with isMajor={}, stats={}, fullRegionName={}", stats.isMajor(), stats, 186 fullRegionName); 187 lastStats = stats; 188 lastIsMajor = stats.isMajor(); 189 } 190 191 public void clear() { 192 lastStats = null; 193 lastIsMajor = null; 194 } 195 196 public RowStatisticsImpl getLastStats() { 197 return lastStats; 198 } 199 200 public Boolean getLastIsMajor() { 201 return lastIsMajor; 202 } 203 } 204}