001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues;
019
020import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
021import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY;
022import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.RS_COLUMN;
023import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.TIMESTAMP_COLUMN;
024import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME;
025import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_LENGTH_COLUMN;
026import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_NAME_COLUMN;
027import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_STATE_COLUMN;
028import static org.junit.jupiter.api.Assertions.assertEquals;
029
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.Waiter;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.ResultScanner;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
048import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
049import org.apache.hadoop.hbase.testclassification.MediumTests;
050import org.apache.hadoop.hbase.testclassification.RegionServerTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.wal.WAL;
053import org.junit.jupiter.api.AfterAll;
054import org.junit.jupiter.api.BeforeAll;
055import org.junit.jupiter.api.BeforeEach;
056import org.junit.jupiter.api.Tag;
057import org.junit.jupiter.api.Test;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061@Tag(MediumTests.TAG)
062@Tag(RegionServerTests.TAG)
063public class TestWALEventTracker {
064
065  private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
066  private static HBaseTestingUtil TEST_UTIL;
067  public static Configuration CONF;
068
069  @BeforeAll
070  public static void setup() throws Exception {
071    CONF = HBaseConfiguration.create();
072    CONF.setBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, true);
073    // Set the chore for less than a second.
074    CONF.setInt(NAMED_QUEUE_CHORE_DURATION_KEY, 900);
075    CONF.setLong(WALEventTrackerTableAccessor.SLEEP_INTERVAL_KEY, 100);
076    TEST_UTIL = new HBaseTestingUtil(CONF);
077    TEST_UTIL.startMiniCluster();
078  }
079
080  @AfterAll
081  public static void teardown() throws Exception {
082    LOG.info("Calling teardown");
083    TEST_UTIL.shutdownMiniHBaseCluster();
084  }
085
086  @BeforeEach
087  public void waitForWalEventTrackerTableCreation() {
088    Waiter.waitFor(CONF, 10000,
089      (Waiter.Predicate) () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME));
090  }
091
092  @Test
093  public void testWALRolling() throws Exception {
094    Connection connection = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getConnection();
095    waitForWALEventTrackerTable(connection);
096    List<WAL> wals = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWALs();
097    assertEquals(1, wals.size());
098    AbstractFSWAL wal = (AbstractFSWAL) wals.get(0);
099    Path wal1Path = wal.getOldPath();
100    wal.rollWriter(true);
101
102    FileSystem fs = TEST_UTIL.getTestFileSystem();
103    long wal1Length = fs.getFileStatus(wal1Path).getLen();
104    Path wal2Path = wal.getOldPath();
105    String hostName =
106      TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName().getHostname();
107
108    TEST_UTIL.waitFor(5000, () -> getTableCount(connection) >= 3);
109    List<WALEventTrackerPayload> walEventsList = getRows(hostName, connection);
110
111    // There should be atleast 2 events for wal1Name, with ROLLING and ROLLED state. Most of the
112    // time we will lose ACTIVE event for the first wal creates since hmaster will take some time
113    // to create hbase:waleventtracker table and by that time RS will already create the first wal
114    // and will try to persist it.
115    compareEvents(hostName, wal1Path.getName(), walEventsList,
116      new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ROLLING.name(),
117        WALEventTrackerListener.WalState.ROLLED.name())),
118      false);
119
120    // There should be only 1 event for wal2Name which is current wal, with ACTIVE state
121    compareEvents(hostName, wal2Path.getName(), walEventsList,
122      new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ACTIVE.name())), true);
123
124    // Check that event with wal1Path and state ROLLED has the wal length set.
125    checkWALRolledEventHasSize(walEventsList, wal1Path.getName(), wal1Length);
126  }
127
128  private void checkWALRolledEventHasSize(List<WALEventTrackerPayload> walEvents, String walName,
129    long actualSize) {
130    List<WALEventTrackerPayload> eventsFilteredByNameState = new ArrayList<>();
131    // Filter the list by walName and wal state.
132    for (WALEventTrackerPayload event : walEvents) {
133      if (
134        walName.equals(event.getWalName())
135          && WALEventTrackerListener.WalState.ROLLED.name().equals(event.getState())
136      ) {
137        eventsFilteredByNameState.add(event);
138      }
139    }
140
141    assertEquals(1, eventsFilteredByNameState.size());
142    // We are not comparing the size of the WAL in the tracker table with actual size.
143    // For AsyncWAL implementation, since the WAL file is closed in an async fashion, the WAL length
144    // will always be incorrect.
145    // For FSHLog implementation, we close the WAL in an executor thread. So there will always be
146    // a difference of trailer size bytes.
147    // assertEquals(actualSize, eventsFilteredByNameState.get(0).getWalLength());
148  }
149
150  /**
151   * Compare the events from @{@link WALEventTrackerTableAccessor#WAL_EVENT_TRACKER_TABLE_NAME}
152   * @param hostName       hostname
153   * @param walName        walname
154   * @param walEvents      event from table
155   * @param expectedStates expected states for the hostname and wal name
156   * @param strict         whether to check strictly or not. Sometimes we lose the ACTIVE state
157   *                       event for the first wal since it takes some time for hmaster to create
158   *                       the table and by that time RS already creates the first WAL and will try
159   *                       to persist ACTIVE event to waleventtracker table.
160   */
161  private void compareEvents(String hostName, String walName,
162    List<WALEventTrackerPayload> walEvents, List<String> expectedStates, boolean strict) {
163    List<WALEventTrackerPayload> eventsFilteredByWalName = new ArrayList<>();
164
165    // Assert that all the events have the same host name i.e they came from the same RS.
166    for (WALEventTrackerPayload event : walEvents) {
167      assertEquals(hostName, event.getRsName());
168    }
169
170    // Filter the list by walName.
171    for (WALEventTrackerPayload event : walEvents) {
172      if (walName.equals(event.getWalName())) {
173        eventsFilteredByWalName.add(event);
174      }
175    }
176
177    // Assert that the list of events after filtering by walName should be same as expected states.
178    if (strict) {
179      assertEquals(expectedStates.size(), eventsFilteredByWalName.size());
180    }
181
182    for (WALEventTrackerPayload event : eventsFilteredByWalName) {
183      expectedStates.remove(event.getState());
184    }
185    assertEquals(0, expectedStates.size());
186  }
187
188  private void waitForWALEventTrackerTable(Connection connection) throws IOException {
189    TEST_UTIL.waitFor(5000, () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME));
190  }
191
192  private List<WALEventTrackerPayload> getRows(String rowKeyPrefix, Connection connection)
193    throws IOException {
194    List<WALEventTrackerPayload> list = new ArrayList<>();
195    Scan scan = new Scan();
196    scan.withStartRow(Bytes.toBytes(rowKeyPrefix));
197    Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME);
198    ResultScanner scanner = table.getScanner(scan);
199
200    Result r;
201    while ((r = scanner.next()) != null) {
202      List<Cell> cells = r.listCells();
203      list.add(getPayload(cells));
204    }
205    return list;
206  }
207
208  private WALEventTrackerPayload getPayload(List<Cell> cells) {
209    String rsName = null, walName = null, walState = null;
210    long timestamp = 0L, walLength = 0L;
211    for (Cell cell : cells) {
212      byte[] qualifier = CellUtil.cloneQualifier(cell);
213      byte[] value = CellUtil.cloneValue(cell);
214      String qualifierStr = Bytes.toString(qualifier);
215
216      if (RS_COLUMN.equals(qualifierStr)) {
217        rsName = Bytes.toString(value);
218      } else if (WAL_NAME_COLUMN.equals(qualifierStr)) {
219        walName = Bytes.toString(value);
220      } else if (WAL_STATE_COLUMN.equals(qualifierStr)) {
221        walState = Bytes.toString(value);
222      } else if (TIMESTAMP_COLUMN.equals(qualifierStr)) {
223        timestamp = Bytes.toLong(value);
224      } else if (WAL_LENGTH_COLUMN.equals(qualifierStr)) {
225        walLength = Bytes.toLong(value);
226      }
227    }
228    return new WALEventTrackerPayload(rsName, walName, timestamp, walState, walLength);
229  }
230
231  private int getTableCount(Connection connection) throws Exception {
232    Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME);
233    ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM));
234    int count = 0;
235    while (resultScanner.next() != null) {
236      count++;
237    }
238    LOG.info("Table count: " + count);
239    return count;
240  }
241}