001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.mockito.Mockito.mock; 021 022import java.io.IOException; 023import java.util.NavigableMap; 024import java.util.TreeMap; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.KeyValue; 031import org.apache.hadoop.hbase.ServerName; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.TableNameTestRule; 034import org.apache.hadoop.hbase.Waiter; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.client.RegionInfoBuilder; 037import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 038import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.hbase.wal.WAL; 042import org.apache.hadoop.hbase.wal.WAL.Entry; 043import org.apache.hadoop.hbase.wal.WALEdit; 044import org.apache.hadoop.hbase.wal.WALFactory; 045import org.apache.hadoop.hbase.wal.WALKeyImpl; 046import org.apache.hadoop.hdfs.MiniDFSCluster; 047import org.junit.After; 048import org.junit.AfterClass; 049import org.junit.Rule; 050import org.junit.rules.TestName; 051 052import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 053 054/** 055 * Base class for WALEntryStream tests. 056 */ 057public abstract class WALEntryStreamTestBase { 058 059 protected static final long TEST_TIMEOUT_MS = 5000; 060 protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();; 061 protected static Configuration CONF; 062 protected static FileSystem fs; 063 protected static MiniDFSCluster cluster; 064 protected static final TableName tableName = TableName.valueOf("tablename"); 065 protected static final byte[] family = Bytes.toBytes("column"); 066 protected static final byte[] qualifier = Bytes.toBytes("qualifier"); 067 protected static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName) 068 .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build(); 069 protected static final NavigableMap<byte[], Integer> scopes = getScopes(); 070 protected final String fakeWalGroupId = "fake-wal-group-id"; 071 072 /** 073 * Test helper that waits until a non-null entry is available in the stream next or times out. A 074 * {@link WALEntryStream} provides a streaming access to a queue of log files. Since the stream 075 * can be consumed as the file is being written, callers relying on {@link WALEntryStream#next()} 076 * may need to retry multiple times before an entry appended to the WAL is visible to the stream 077 * consumers. One such cause of delay is the close() of writer writing these log files. While the 078 * closure is in progress, the stream does not switch to the next log in the queue and next() may 079 * return null entries. This utility wraps these retries into a single next call and that makes 080 * the test code simpler. 081 */ 082 protected static class WALEntryStreamWithRetries extends WALEntryStream { 083 // Class member to be able to set a non-final from within a lambda. 084 private Entry result; 085 086 public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, Configuration conf, 087 long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, 088 MetricsSource metrics, String walGroupId) throws IOException { 089 super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, walGroupId); 090 } 091 092 @Override 093 public Entry next() { 094 Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> { 095 result = WALEntryStreamWithRetries.super.next(); 096 return result != null; 097 }); 098 return result; 099 } 100 } 101 102 private static NavigableMap<byte[], Integer> getScopes() { 103 NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); 104 scopes.put(family, 1); 105 return scopes; 106 } 107 108 class PathWatcher implements WALActionsListener { 109 110 Path currentPath; 111 112 @Override 113 public void preLogRoll(Path oldPath, Path newPath) { 114 logQueue.enqueueLog(newPath, fakeWalGroupId); 115 currentPath = newPath; 116 } 117 } 118 119 protected WAL log; 120 protected ReplicationSourceLogQueue logQueue; 121 protected PathWatcher pathWatcher; 122 123 @Rule 124 public TestName tn = new TestName(); 125 protected final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 126 127 protected static void startCluster() throws Exception { 128 TEST_UTIL = new HBaseTestingUtility(); 129 CONF = TEST_UTIL.getConfiguration(); 130 CONF.setLong("replication.source.sleepforretries", 10); 131 TEST_UTIL.startMiniDFSCluster(3); 132 133 cluster = TEST_UTIL.getDFSCluster(); 134 fs = cluster.getFileSystem(); 135 } 136 137 @AfterClass 138 public static void tearDownAfterClass() throws Exception { 139 TEST_UTIL.shutdownMiniCluster(); 140 } 141 142 protected void initWAL() throws IOException { 143 ReplicationSource source = mock(ReplicationSource.class); 144 MetricsSource metricsSource = new MetricsSource("2"); 145 // Source with the same id is shared and carries values from the last run 146 metricsSource.clear(); 147 logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source); 148 pathWatcher = new PathWatcher(); 149 final WALFactory wals = 150 new WALFactory(CONF, TableNameTestRule.cleanUpTestName(tn.getMethodName())); 151 wals.getWALProvider().addWALActionsListener(pathWatcher); 152 log = wals.getWAL(info); 153 } 154 155 @After 156 public void tearDown() throws Exception { 157 Closeables.close(log, true); 158 } 159 160 protected void appendToLogAndSync() throws IOException { 161 appendToLogAndSync(1); 162 } 163 164 protected void appendToLogAndSync(int count) throws IOException { 165 long txid = appendToLog(count); 166 log.sync(txid); 167 } 168 169 protected long appendToLog(int count) throws IOException { 170 return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 171 EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdits(count)); 172 } 173 174 protected WALEdit getWALEdits(int count) { 175 WALEdit edit = new WALEdit(); 176 for (int i = 0; i < count; i++) { 177 edit.add(new KeyValue(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), family, qualifier, 178 EnvironmentEdgeManager.currentTime(), qualifier)); 179 } 180 return edit; 181 } 182}