001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues; 019 020import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY; 021import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY; 022import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.RS_COLUMN; 023import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.TIMESTAMP_COLUMN; 024import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME; 025import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_LENGTH_COLUMN; 026import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_NAME_COLUMN; 027import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_STATE_COLUMN; 028import static org.junit.jupiter.api.Assertions.assertEquals; 029 030import java.io.IOException; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.List; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.CellUtil; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.Waiter; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.ResultScanner; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.client.Table; 047import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 048import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; 049import org.apache.hadoop.hbase.testclassification.MediumTests; 050import org.apache.hadoop.hbase.testclassification.RegionServerTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.wal.WAL; 053import org.junit.jupiter.api.AfterAll; 054import org.junit.jupiter.api.BeforeAll; 055import org.junit.jupiter.api.BeforeEach; 056import org.junit.jupiter.api.Tag; 057import org.junit.jupiter.api.Test; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061@Tag(MediumTests.TAG) 062@Tag(RegionServerTests.TAG) 063public class TestWALEventTracker { 064 065 private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class); 066 private static HBaseTestingUtil TEST_UTIL; 067 public static Configuration CONF; 068 069 @BeforeAll 070 public static void setup() throws Exception { 071 CONF = HBaseConfiguration.create(); 072 CONF.setBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, true); 073 // Set the chore for less than a second. 074 CONF.setInt(NAMED_QUEUE_CHORE_DURATION_KEY, 900); 075 CONF.setLong(WALEventTrackerTableAccessor.SLEEP_INTERVAL_KEY, 100); 076 TEST_UTIL = new HBaseTestingUtil(CONF); 077 TEST_UTIL.startMiniCluster(); 078 } 079 080 @AfterAll 081 public static void teardown() throws Exception { 082 LOG.info("Calling teardown"); 083 TEST_UTIL.shutdownMiniHBaseCluster(); 084 } 085 086 @BeforeEach 087 public void waitForWalEventTrackerTableCreation() { 088 Waiter.waitFor(CONF, 10000, 089 (Waiter.Predicate) () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME)); 090 } 091 092 @Test 093 public void testWALRolling() throws Exception { 094 Connection connection = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getConnection(); 095 waitForWALEventTrackerTable(connection); 096 List<WAL> wals = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWALs(); 097 assertEquals(1, wals.size()); 098 AbstractFSWAL wal = (AbstractFSWAL) wals.get(0); 099 Path wal1Path = wal.getOldPath(); 100 wal.rollWriter(true); 101 102 FileSystem fs = TEST_UTIL.getTestFileSystem(); 103 long wal1Length = fs.getFileStatus(wal1Path).getLen(); 104 Path wal2Path = wal.getOldPath(); 105 String hostName = 106 TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName().getHostname(); 107 108 TEST_UTIL.waitFor(5000, () -> getTableCount(connection) >= 3); 109 List<WALEventTrackerPayload> walEventsList = getRows(hostName, connection); 110 111 // There should be atleast 2 events for wal1Name, with ROLLING and ROLLED state. Most of the 112 // time we will lose ACTIVE event for the first wal creates since hmaster will take some time 113 // to create hbase:waleventtracker table and by that time RS will already create the first wal 114 // and will try to persist it. 115 compareEvents(hostName, wal1Path.getName(), walEventsList, 116 new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ROLLING.name(), 117 WALEventTrackerListener.WalState.ROLLED.name())), 118 false); 119 120 // There should be only 1 event for wal2Name which is current wal, with ACTIVE state 121 compareEvents(hostName, wal2Path.getName(), walEventsList, 122 new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ACTIVE.name())), true); 123 124 // Check that event with wal1Path and state ROLLED has the wal length set. 125 checkWALRolledEventHasSize(walEventsList, wal1Path.getName(), wal1Length); 126 } 127 128 private void checkWALRolledEventHasSize(List<WALEventTrackerPayload> walEvents, String walName, 129 long actualSize) { 130 List<WALEventTrackerPayload> eventsFilteredByNameState = new ArrayList<>(); 131 // Filter the list by walName and wal state. 132 for (WALEventTrackerPayload event : walEvents) { 133 if ( 134 walName.equals(event.getWalName()) 135 && WALEventTrackerListener.WalState.ROLLED.name().equals(event.getState()) 136 ) { 137 eventsFilteredByNameState.add(event); 138 } 139 } 140 141 assertEquals(1, eventsFilteredByNameState.size()); 142 // We are not comparing the size of the WAL in the tracker table with actual size. 143 // For AsyncWAL implementation, since the WAL file is closed in an async fashion, the WAL length 144 // will always be incorrect. 145 // For FSHLog implementation, we close the WAL in an executor thread. So there will always be 146 // a difference of trailer size bytes. 147 // assertEquals(actualSize, eventsFilteredByNameState.get(0).getWalLength()); 148 } 149 150 /** 151 * Compare the events from @{@link WALEventTrackerTableAccessor#WAL_EVENT_TRACKER_TABLE_NAME} 152 * @param hostName hostname 153 * @param walName walname 154 * @param walEvents event from table 155 * @param expectedStates expected states for the hostname and wal name 156 * @param strict whether to check strictly or not. Sometimes we lose the ACTIVE state 157 * event for the first wal since it takes some time for hmaster to create 158 * the table and by that time RS already creates the first WAL and will try 159 * to persist ACTIVE event to waleventtracker table. 160 */ 161 private void compareEvents(String hostName, String walName, 162 List<WALEventTrackerPayload> walEvents, List<String> expectedStates, boolean strict) { 163 List<WALEventTrackerPayload> eventsFilteredByWalName = new ArrayList<>(); 164 165 // Assert that all the events have the same host name i.e they came from the same RS. 166 for (WALEventTrackerPayload event : walEvents) { 167 assertEquals(hostName, event.getRsName()); 168 } 169 170 // Filter the list by walName. 171 for (WALEventTrackerPayload event : walEvents) { 172 if (walName.equals(event.getWalName())) { 173 eventsFilteredByWalName.add(event); 174 } 175 } 176 177 // Assert that the list of events after filtering by walName should be same as expected states. 178 if (strict) { 179 assertEquals(expectedStates.size(), eventsFilteredByWalName.size()); 180 } 181 182 for (WALEventTrackerPayload event : eventsFilteredByWalName) { 183 expectedStates.remove(event.getState()); 184 } 185 assertEquals(0, expectedStates.size()); 186 } 187 188 private void waitForWALEventTrackerTable(Connection connection) throws IOException { 189 TEST_UTIL.waitFor(5000, () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME)); 190 } 191 192 private List<WALEventTrackerPayload> getRows(String rowKeyPrefix, Connection connection) 193 throws IOException { 194 List<WALEventTrackerPayload> list = new ArrayList<>(); 195 Scan scan = new Scan(); 196 scan.withStartRow(Bytes.toBytes(rowKeyPrefix)); 197 Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME); 198 ResultScanner scanner = table.getScanner(scan); 199 200 Result r; 201 while ((r = scanner.next()) != null) { 202 List<Cell> cells = r.listCells(); 203 list.add(getPayload(cells)); 204 } 205 return list; 206 } 207 208 private WALEventTrackerPayload getPayload(List<Cell> cells) { 209 String rsName = null, walName = null, walState = null; 210 long timestamp = 0L, walLength = 0L; 211 for (Cell cell : cells) { 212 byte[] qualifier = CellUtil.cloneQualifier(cell); 213 byte[] value = CellUtil.cloneValue(cell); 214 String qualifierStr = Bytes.toString(qualifier); 215 216 if (RS_COLUMN.equals(qualifierStr)) { 217 rsName = Bytes.toString(value); 218 } else if (WAL_NAME_COLUMN.equals(qualifierStr)) { 219 walName = Bytes.toString(value); 220 } else if (WAL_STATE_COLUMN.equals(qualifierStr)) { 221 walState = Bytes.toString(value); 222 } else if (TIMESTAMP_COLUMN.equals(qualifierStr)) { 223 timestamp = Bytes.toLong(value); 224 } else if (WAL_LENGTH_COLUMN.equals(qualifierStr)) { 225 walLength = Bytes.toLong(value); 226 } 227 } 228 return new WALEventTrackerPayload(rsName, walName, timestamp, walState, walLength); 229 } 230 231 private int getTableCount(Connection connection) throws Exception { 232 Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME); 233 ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM)); 234 int count = 0; 235 while (resultScanner.next() != null) { 236 count++; 237 } 238 LOG.info("Table count: " + count); 239 return count; 240 } 241}