001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Map; 025import java.util.NavigableMap; 026import java.util.TreeMap; 027import java.util.function.Function; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.client.RegionInfo; 033import org.apache.hadoop.hbase.ipc.RpcServer; 034import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 035import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; 036import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.util.CommonFSUtils; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.apache.hadoop.hbase.wal.WAL; 041import org.apache.hadoop.hbase.wal.WALEdit; 042import org.apache.hadoop.hbase.wal.WALKeyImpl; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 048 049import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; 053 054/** 055 * Helper methods to ease Region Server integration with the Write Ahead Log (WAL). Note that 056 * methods in this class specifically should not require access to anything other than the API found 057 * in {@link WAL}. For internal use only. 058 */ 059@InterfaceAudience.Private 060public class WALUtil { 061 private static final Logger LOG = LoggerFactory.getLogger(WALUtil.class); 062 063 public static final String WAL_BLOCK_SIZE = "hbase.regionserver.hlog.blocksize"; 064 065 private WALUtil() { 066 // Shut down construction of this class. 067 } 068 069 /** 070 * Write the marker that a compaction has succeeded and is about to be committed. This provides 071 * info to the HMaster to allow it to recover the compaction if this regionserver dies in the 072 * middle. It also prevents the compaction from finishing if this regionserver has already lost 073 * its lease on the log. 074 * <p/> 075 * This write is for internal use only. Not for external client consumption. 076 * @param mvcc Used by WAL to get sequence Id for the waledit. 077 */ 078 public static WALKeyImpl writeCompactionMarker(WAL wal, 079 NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c, 080 MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException { 081 WALKeyImpl walKey = 082 writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null, sink); 083 if (LOG.isTraceEnabled()) { 084 LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); 085 } 086 return walKey; 087 } 088 089 /** 090 * Write a flush marker indicating a start / abort or a complete of a region flush 091 * <p/> 092 * This write is for internal use only. Not for external client consumption. 093 */ 094 public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope, 095 RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc, 096 RegionReplicationSink sink) throws IOException { 097 WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri, 098 WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync, sink); 099 if (LOG.isTraceEnabled()) { 100 LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); 101 } 102 return walKey; 103 } 104 105 /** 106 * Write a region open marker indicating that the region is opened. This write is for internal use 107 * only. Not for external client consumption. 108 */ 109 public static WALKeyImpl writeRegionEventMarker(WAL wal, 110 NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, RegionEventDescriptor r, 111 MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException { 112 WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, 113 WALEdit.createRegionEventWALEdit(hri, r), mvcc, null, sink); 114 if (LOG.isTraceEnabled()) { 115 LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r)); 116 } 117 return walKey; 118 } 119 120 /** 121 * Write a log marker that a bulk load has succeeded and is about to be committed. This write is 122 * for internal use only. Not for external client consumption. 123 * @param wal The log to write into. 124 * @param replicationScope The replication scope of the families in the HRegion 125 * @param hri A description of the region in the table that we are bulk loading into. 126 * @param desc A protocol buffers based description of the client's bulk loading 127 * request 128 * @return walKey with sequenceid filled out for this bulk load marker 129 * @throws IOException We will throw an IOException if we can not append to the HLog. 130 */ 131 public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal, 132 final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, 133 final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc, 134 final RegionReplicationSink sink) throws IOException { 135 WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, 136 WALEdit.createBulkLoadEvent(hri, desc), mvcc, null, sink); 137 if (LOG.isTraceEnabled()) { 138 LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc)); 139 } 140 return walKey; 141 } 142 143 private static WALKeyImpl writeMarker(final WAL wal, 144 final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit, 145 final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes, 146 final RegionReplicationSink sink) throws IOException { 147 // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT 148 return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, mvcc, extendedAttributes, 149 true, sink); 150 } 151 152 /** 153 * A 'full' WAL transaction involves starting an mvcc transaction followed by an append, an 154 * optional sync, and then a call to complete the mvcc transaction. This method does it all. Good 155 * for case of adding a single edit or marker to the WAL. 156 * <p/> 157 * This write is for internal use only. Not for external client consumption. 158 * @return WALKeyImpl that was added to the WAL. 159 */ 160 private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal, 161 final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit, 162 final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes, 163 final boolean sync, final RegionReplicationSink sink) throws IOException { 164 // TODO: Pass in current time to use? 165 WALKeyImpl walKey = createWALKey(hri, mvcc, replicationScope, extendedAttributes); 166 long trx = MultiVersionConcurrencyControl.NONE; 167 try { 168 trx = wal.appendMarker(hri, walKey, edit); 169 WriteEntry writeEntry = walKey.getWriteEntry(); 170 if (sink != null) { 171 writeEntry.attachCompletionAction(() -> sink.add(walKey, edit, 172 RpcServer.getCurrentServerCallWithCellScanner().orElse(null))); 173 } 174 if (sync) { 175 wal.sync(trx); 176 } 177 // Call complete only here because these are markers only. They are not for clients to read. 178 mvcc.complete(writeEntry); 179 } catch (IOException ioe) { 180 if (walKey.getWriteEntry() != null) { 181 mvcc.complete(walKey.getWriteEntry()); 182 } 183 /** 184 * Here we do not abort the RegionServer for {@link WALSyncTimeoutIOException} as 185 * {@link HRegion#doWALAppend} does,because WAL Marker just records the internal state and 186 * seems it is no need to always abort the RegionServer when {@link WAL#sync} timeout,it is 187 * the internal state transition that determines whether RegionServer is aborted or not. 188 */ 189 throw ioe; 190 } 191 return walKey; 192 } 193 194 public static WALKeyImpl createWALKey(final RegionInfo hri, MultiVersionConcurrencyControl mvcc, 195 final NavigableMap<byte[], Integer> replicationScope, 196 final Map<String, byte[]> extendedAttributes) { 197 return new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), 198 EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes); 199 } 200 201 /** 202 * Blocksize returned here is 2x the default HDFS blocksize unless explicitly set in 203 * Configuration. Works in tandem with hbase.regionserver.logroll.multiplier. See comment in 204 * AbstractFSWAL in Constructor where we set blocksize and logrollsize for why. 205 * @return Blocksize to use writing WALs. 206 */ 207 public static long getWALBlockSize(Configuration conf, FileSystem fs, Path dir) 208 throws IOException { 209 return getWALBlockSize(conf, fs, dir, false); 210 } 211 212 /** 213 * Public because of FSHLog. Should be package-private 214 * @param isRecoverEdits the created writer is for recovered edits or WAL. For recovered edits, it 215 * is true and for WAL it is false. 216 */ 217 public static long getWALBlockSize(Configuration conf, FileSystem fs, Path dir, 218 boolean isRecoverEdits) throws IOException { 219 long defaultBlockSize = CommonFSUtils.getDefaultBlockSize(fs, dir) * 2; 220 if (isRecoverEdits) { 221 return conf.getLong("hbase.regionserver.recoverededits.blocksize", defaultBlockSize); 222 } 223 return conf.getLong(WAL_BLOCK_SIZE, defaultBlockSize); 224 } 225 226 public static void filterCells(WALEdit edit, Function<Cell, Cell> mapper) { 227 ArrayList<Cell> cells = edit.getCells(); 228 int size = cells.size(); 229 int newSize = 0; 230 for (int i = 0; i < size; i++) { 231 Cell cell = mapper.apply(cells.get(i)); 232 if (cell != null) { 233 cells.set(newSize, cell); 234 newSize++; 235 } 236 } 237 for (int i = size - 1; i >= newSize; i--) { 238 cells.remove(i); 239 } 240 if (newSize < size / 2) { 241 cells.trimToSize(); 242 } 243 } 244 245 public static void writeReplicationMarkerAndSync(WAL wal, MultiVersionConcurrencyControl mvcc, 246 RegionInfo regionInfo, byte[] rowKey, long timestamp) throws IOException { 247 NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR); 248 replicationScope.put(WALEdit.METAFAMILY, REPLICATION_SCOPE_GLOBAL); 249 writeMarker(wal, replicationScope, regionInfo, 250 WALEdit.createReplicationMarkerEdit(rowKey, timestamp), mvcc, null, null); 251 } 252}