001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor.example.row.stats;
019
020import static org.awaitility.Awaitility.await;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertFalse;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.IOException;
026import java.time.Duration;
027import java.util.Optional;
028import java.util.concurrent.ThreadLocalRandom;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellScanner;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.ConnectionFactory;
037import org.apache.hadoop.hbase.client.Delete;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.RowStatisticsRecorder;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.junit.jupiter.api.AfterAll;
044import org.junit.jupiter.api.BeforeAll;
045import org.junit.jupiter.api.BeforeEach;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.Test;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051@Tag(MediumTests.TAG)
052public class TestRowStatisticsCompactionObserver {
053
054  private static final Logger LOG =
055    LoggerFactory.getLogger(TestRowStatisticsCompactionObserver.class);
056
057  public static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
058  public static final TestableRowStatisticsRecorder RECORDER = new TestableRowStatisticsRecorder();
059  private static final TableName TABLE_NAME = TableName.valueOf("test-table");
060  private static final byte[] FAMILY = Bytes.toBytes("0");
061  private static SingleProcessHBaseCluster cluster;
062  private static Connection connection;
063  private static Table table;
064
065  @BeforeAll
066  public static void setUpClass() throws Exception {
067    cluster = TEST_UTIL.startMiniCluster(1);
068    connection = ConnectionFactory.createConnection(cluster.getConf());
069    table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 1,
070      HConstants.DEFAULT_BLOCKSIZE, TestableRowStatisticsCompactionObserver.class.getName());
071  }
072
073  @AfterAll
074  public static void afterClass() throws Exception {
075    cluster.close();
076    TEST_UTIL.shutdownMiniCluster();
077    table.close();
078    connection.close();
079  }
080
081  @BeforeEach
082  public void setUp() throws Exception {
083    RECORDER.clear();
084  }
085
086  @Test
087  public void itRecordsStats() throws IOException, InterruptedException {
088    int numRows = 10;
089    int largestRowNum = -1;
090    int largestRowSize = 0;
091
092    int largestCellRowNum = -1;
093    int largestCellColNum = -1;
094    long largestCellSize = 0;
095
096    for (int i = 0; i < numRows; i++) {
097      int cells = ThreadLocalRandom.current().nextInt(1000) + 10;
098
099      Put p = new Put(Bytes.toBytes(i));
100      for (int j = 0; j < cells; j++) {
101        byte[] val = new byte[ThreadLocalRandom.current().nextInt(100) + 1];
102        p.addColumn(FAMILY, Bytes.toBytes(j), val);
103      }
104
105      int rowSize = 0;
106      CellScanner cellScanner = p.cellScanner();
107      int j = 0;
108      while (cellScanner.advance()) {
109        Cell current = cellScanner.current();
110        int serializedSize = current.getSerializedSize();
111        if (serializedSize > largestCellSize) {
112          largestCellSize = serializedSize;
113          largestCellRowNum = i;
114          largestCellColNum = j;
115        }
116        rowSize += serializedSize;
117        j++;
118      }
119
120      if (rowSize > largestRowSize) {
121        largestRowNum = i;
122        largestRowSize = rowSize;
123      }
124
125      table.put(p);
126      connection.getAdmin().flush(table.getName());
127    }
128
129    for (int i = 0; i < numRows; i++) {
130      Delete d = new Delete(Bytes.toBytes(i));
131      d.addColumn(FAMILY, Bytes.toBytes(0));
132      table.delete(d);
133    }
134
135    LOG.info("Final flush");
136    await().atMost(Duration.ofSeconds(10))
137      .untilAsserted(() -> connection.getAdmin().flush(table.getName()));
138
139    LOG.info("Compacting");
140    connection.getAdmin().compact(table.getName());
141    await().during(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(10))
142      .until(() -> RECORDER.getLastStats() != null);
143    assertFalse(RECORDER.getLastIsMajor());
144    assertEquals(10, RECORDER.getLastStats().getTotalDeletesCount());
145    assertEquals(10, RECORDER.getLastStats().getTotalRowsCount());
146
147    RECORDER.clear();
148    connection.getAdmin().majorCompact(table.getName());
149
150    // Must wait for async majorCompaction to complete
151    await().during(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(10))
152      .until(() -> RECORDER.getLastStats() != null);
153    assertTrue(RECORDER.getLastIsMajor());
154    // no deletes after major compact
155    RowStatisticsImpl lastStats = RECORDER.getLastStats();
156    assertEquals(0, lastStats.getTotalDeletesCount());
157    assertEquals(10, lastStats.getTotalRowsCount());
158    // can only check largest values after major compact, since the above minor compact might not
159    // contain all storefiles
160    assertEquals(Bytes.toInt(lastStats.getLargestRow()), largestRowNum);
161    assertEquals(
162      Bytes.toInt(lastStats.getLargestCell().getRowArray(),
163        lastStats.getLargestCell().getRowOffset(), lastStats.getLargestCell().getRowLength()),
164      largestCellRowNum);
165    assertEquals(Bytes.toInt(lastStats.getLargestCell().getQualifierArray(),
166      lastStats.getLargestCell().getQualifierOffset(),
167      lastStats.getLargestCell().getQualifierLength()), largestCellColNum);
168  }
169
170  public static class TestableRowStatisticsCompactionObserver
171    extends RowStatisticsCompactionObserver {
172
173    public TestableRowStatisticsCompactionObserver() {
174      super(TestRowStatisticsCompactionObserver.RECORDER);
175    }
176  }
177
178  public static class TestableRowStatisticsRecorder implements RowStatisticsRecorder {
179
180    private volatile RowStatisticsImpl lastStats = null;
181    private volatile Boolean lastIsMajor = null;
182
183    @Override
184    public void record(RowStatisticsImpl stats, Optional<byte[]> fullRegionName) {
185      LOG.info("Record called with isMajor={}, stats={}, fullRegionName={}", stats.isMajor(), stats,
186        fullRegionName);
187      lastStats = stats;
188      lastIsMajor = stats.isMajor();
189    }
190
191    public void clear() {
192      lastStats = null;
193      lastIsMajor = null;
194    }
195
196    public RowStatisticsImpl getLastStats() {
197      return lastStats;
198    }
199
200    public Boolean getLastIsMajor() {
201      return lastIsMajor;
202    }
203  }
204}