001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Map;
025import java.util.NavigableMap;
026import java.util.TreeMap;
027import java.util.function.Function;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.client.RegionInfo;
033import org.apache.hadoop.hbase.ipc.RpcServer;
034import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
035import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
036import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.util.CommonFSUtils;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.apache.hadoop.hbase.wal.WAL;
041import org.apache.hadoop.hbase.wal.WALEdit;
042import org.apache.hadoop.hbase.wal.WALKeyImpl;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
048
049import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
053
054/**
055 * Helper methods to ease Region Server integration with the Write Ahead Log (WAL). Note that
056 * methods in this class specifically should not require access to anything other than the API found
057 * in {@link WAL}. For internal use only.
058 */
059@InterfaceAudience.Private
060public class WALUtil {
061  private static final Logger LOG = LoggerFactory.getLogger(WALUtil.class);
062
063  public static final String WAL_BLOCK_SIZE = "hbase.regionserver.hlog.blocksize";
064
065  private WALUtil() {
066    // Shut down construction of this class.
067  }
068
069  /**
070   * Write the marker that a compaction has succeeded and is about to be committed. This provides
071   * info to the HMaster to allow it to recover the compaction if this regionserver dies in the
072   * middle. It also prevents the compaction from finishing if this regionserver has already lost
073   * its lease on the log.
074   * <p/>
075   * This write is for internal use only. Not for external client consumption.
076   * @param mvcc Used by WAL to get sequence Id for the waledit.
077   */
078  public static WALKeyImpl writeCompactionMarker(WAL wal,
079    NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
080    MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException {
081    WALKeyImpl walKey =
082      writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null, sink);
083    if (LOG.isTraceEnabled()) {
084      LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
085    }
086    return walKey;
087  }
088
089  /**
090   * Write a flush marker indicating a start / abort or a complete of a region flush
091   * <p/>
092   * This write is for internal use only. Not for external client consumption.
093   */
094  public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
095    RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc,
096    RegionReplicationSink sink) throws IOException {
097    WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri,
098      WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync, sink);
099    if (LOG.isTraceEnabled()) {
100      LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
101    }
102    return walKey;
103  }
104
105  /**
106   * Write a region open marker indicating that the region is opened. This write is for internal use
107   * only. Not for external client consumption.
108   */
109  public static WALKeyImpl writeRegionEventMarker(WAL wal,
110    NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, RegionEventDescriptor r,
111    MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException {
112    WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
113      WALEdit.createRegionEventWALEdit(hri, r), mvcc, null, sink);
114    if (LOG.isTraceEnabled()) {
115      LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
116    }
117    return walKey;
118  }
119
120  /**
121   * Write a log marker that a bulk load has succeeded and is about to be committed. This write is
122   * for internal use only. Not for external client consumption.
123   * @param wal              The log to write into.
124   * @param replicationScope The replication scope of the families in the HRegion
125   * @param hri              A description of the region in the table that we are bulk loading into.
126   * @param desc             A protocol buffers based description of the client's bulk loading
127   *                         request
128   * @return walKey with sequenceid filled out for this bulk load marker
129   * @throws IOException We will throw an IOException if we can not append to the HLog.
130   */
131  public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
132    final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
133    final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc,
134    final RegionReplicationSink sink) throws IOException {
135    WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
136      WALEdit.createBulkLoadEvent(hri, desc), mvcc, null, sink);
137    if (LOG.isTraceEnabled()) {
138      LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
139    }
140    return walKey;
141  }
142
143  private static WALKeyImpl writeMarker(final WAL wal,
144    final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
145    final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes,
146    final RegionReplicationSink sink) throws IOException {
147    // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
148    return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, mvcc, extendedAttributes,
149      true, sink);
150  }
151
152  /**
153   * A 'full' WAL transaction involves starting an mvcc transaction followed by an append, an
154   * optional sync, and then a call to complete the mvcc transaction. This method does it all. Good
155   * for case of adding a single edit or marker to the WAL.
156   * <p/>
157   * This write is for internal use only. Not for external client consumption.
158   * @return WALKeyImpl that was added to the WAL.
159   */
160  private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal,
161    final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
162    final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes,
163    final boolean sync, final RegionReplicationSink sink) throws IOException {
164    // TODO: Pass in current time to use?
165    WALKeyImpl walKey = createWALKey(hri, mvcc, replicationScope, extendedAttributes);
166    long trx = MultiVersionConcurrencyControl.NONE;
167    try {
168      trx = wal.appendMarker(hri, walKey, edit);
169      WriteEntry writeEntry = walKey.getWriteEntry();
170      if (sink != null) {
171        writeEntry.attachCompletionAction(() -> sink.add(walKey, edit,
172          RpcServer.getCurrentServerCallWithCellScanner().orElse(null)));
173      }
174      if (sync) {
175        wal.sync(trx);
176      }
177      // Call complete only here because these are markers only. They are not for clients to read.
178      mvcc.complete(writeEntry);
179    } catch (IOException ioe) {
180      if (walKey.getWriteEntry() != null) {
181        mvcc.complete(walKey.getWriteEntry());
182      }
183      /**
184       * Here we do not abort the RegionServer for {@link WALSyncTimeoutIOException} as
185       * {@link HRegion#doWALAppend} does,because WAL Marker just records the internal state and
186       * seems it is no need to always abort the RegionServer when {@link WAL#sync} timeout,it is
187       * the internal state transition that determines whether RegionServer is aborted or not.
188       */
189      throw ioe;
190    }
191    return walKey;
192  }
193
194  public static WALKeyImpl createWALKey(final RegionInfo hri, MultiVersionConcurrencyControl mvcc,
195    final NavigableMap<byte[], Integer> replicationScope,
196    final Map<String, byte[]> extendedAttributes) {
197    return new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
198      EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes);
199  }
200
201  /**
202   * Blocksize returned here is 2x the default HDFS blocksize unless explicitly set in
203   * Configuration. Works in tandem with hbase.regionserver.logroll.multiplier. See comment in
204   * AbstractFSWAL in Constructor where we set blocksize and logrollsize for why.
205   * @return Blocksize to use writing WALs.
206   */
207  public static long getWALBlockSize(Configuration conf, FileSystem fs, Path dir)
208    throws IOException {
209    return getWALBlockSize(conf, fs, dir, false);
210  }
211
212  /**
213   * Public because of FSHLog. Should be package-private
214   * @param isRecoverEdits the created writer is for recovered edits or WAL. For recovered edits, it
215   *                       is true and for WAL it is false.
216   */
217  public static long getWALBlockSize(Configuration conf, FileSystem fs, Path dir,
218    boolean isRecoverEdits) throws IOException {
219    long defaultBlockSize = CommonFSUtils.getDefaultBlockSize(fs, dir) * 2;
220    if (isRecoverEdits) {
221      return conf.getLong("hbase.regionserver.recoverededits.blocksize", defaultBlockSize);
222    }
223    return conf.getLong(WAL_BLOCK_SIZE, defaultBlockSize);
224  }
225
226  public static void filterCells(WALEdit edit, Function<Cell, Cell> mapper) {
227    ArrayList<Cell> cells = edit.getCells();
228    int size = cells.size();
229    int newSize = 0;
230    for (int i = 0; i < size; i++) {
231      Cell cell = mapper.apply(cells.get(i));
232      if (cell != null) {
233        cells.set(newSize, cell);
234        newSize++;
235      }
236    }
237    for (int i = size - 1; i >= newSize; i--) {
238      cells.remove(i);
239    }
240    if (newSize < size / 2) {
241      cells.trimToSize();
242    }
243  }
244
245  public static void writeReplicationMarkerAndSync(WAL wal, MultiVersionConcurrencyControl mvcc,
246    RegionInfo regionInfo, byte[] rowKey, long timestamp) throws IOException {
247    NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
248    replicationScope.put(WALEdit.METAFAMILY, REPLICATION_SCOPE_GLOBAL);
249    writeMarker(wal, replicationScope, regionInfo,
250      WALEdit.createReplicationMarkerEdit(rowKey, timestamp), mvcc, null, null);
251  }
252}