001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Map;
023import java.util.NavigableMap;
024import java.util.function.Function;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
031import org.apache.hadoop.hbase.util.CommonFSUtils;
032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
033import org.apache.hadoop.hbase.wal.WAL;
034import org.apache.hadoop.hbase.wal.WALEdit;
035import org.apache.hadoop.hbase.wal.WALKeyImpl;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
041
042import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
046
047/**
048 * Helper methods to ease Region Server integration with the Write Ahead Log (WAL). Note that
049 * methods in this class specifically should not require access to anything other than the API found
050 * in {@link WAL}. For internal use only.
051 */
052@InterfaceAudience.Private
053public class WALUtil {
054  private static final Logger LOG = LoggerFactory.getLogger(WALUtil.class);
055
056  public static final String WAL_BLOCK_SIZE = "hbase.regionserver.hlog.blocksize";
057
058  private WALUtil() {
059    // Shut down construction of this class.
060  }
061
062  /**
063   * Write the marker that a compaction has succeeded and is about to be committed. This provides
064   * info to the HMaster to allow it to recover the compaction if this regionserver dies in the
065   * middle. It also prevents the compaction from finishing if this regionserver has already lost
066   * its lease on the log.
067   * <p/>
068   * This write is for internal use only. Not for external client consumption.
069   * @param mvcc Used by WAL to get sequence Id for the waledit.
070   */
071  public static WALKeyImpl writeCompactionMarker(WAL wal,
072    NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
073    MultiVersionConcurrencyControl mvcc) throws IOException {
074    WALKeyImpl walKey =
075      writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null);
076    if (LOG.isTraceEnabled()) {
077      LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
078    }
079    return walKey;
080  }
081
082  /**
083   * Write a flush marker indicating a start / abort or a complete of a region flush
084   * <p/>
085   * This write is for internal use only. Not for external client consumption.
086   */
087  public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
088    RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
089    throws IOException {
090    WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri,
091      WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync);
092    if (LOG.isTraceEnabled()) {
093      LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
094    }
095    return walKey;
096  }
097
098  /**
099   * Write a region open marker indicating that the region is opened. This write is for internal use
100   * only. Not for external client consumption.
101   */
102  public static WALKeyImpl writeRegionEventMarker(WAL wal,
103    NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, RegionEventDescriptor r,
104    MultiVersionConcurrencyControl mvcc) throws IOException {
105    WALKeyImpl walKey =
106      writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc, null);
107    if (LOG.isTraceEnabled()) {
108      LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
109    }
110    return walKey;
111  }
112
113  /**
114   * Write a log marker that a bulk load has succeeded and is about to be committed. This write is
115   * for internal use only. Not for external client consumption.
116   * @param wal              The log to write into.
117   * @param replicationScope The replication scope of the families in the HRegion
118   * @param hri              A description of the region in the table that we are bulk loading into.
119   * @param desc             A protocol buffers based description of the client's bulk loading
120   *                         request
121   * @return walKey with sequenceid filled out for this bulk load marker
122   * @throws IOException We will throw an IOException if we can not append to the HLog.
123   */
124  public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
125    final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
126    final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
127    throws IOException {
128    WALKeyImpl walKey =
129      writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc, null);
130    if (LOG.isTraceEnabled()) {
131      LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
132    }
133    return walKey;
134  }
135
136  private static WALKeyImpl writeMarker(final WAL wal,
137    NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, WALEdit edit,
138    MultiVersionConcurrencyControl mvcc, Map<String, byte[]> extendedAttributes)
139    throws IOException {
140    // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
141    return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, mvcc, extendedAttributes,
142      true);
143  }
144
145  /**
146   * A 'full' WAL transaction involves starting an mvcc transaction followed by an append, an
147   * optional sync, and then a call to complete the mvcc transaction. This method does it all. Good
148   * for case of adding a single edit or marker to the WAL.
149   * <p/>
150   * This write is for internal use only. Not for external client consumption.
151   * @return WALKeyImpl that was added to the WAL.
152   */
153  private static WALKeyImpl doFullMarkerAppendTransaction(WAL wal,
154    NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final WALEdit edit,
155    MultiVersionConcurrencyControl mvcc, Map<String, byte[]> extendedAttributes, boolean sync)
156    throws IOException {
157    // TODO: Pass in current time to use?
158    WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
159      EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes);
160    long trx = MultiVersionConcurrencyControl.NONE;
161    try {
162      trx = wal.appendMarker(hri, walKey, edit);
163      if (sync) {
164        wal.sync(trx);
165      }
166      // Call complete only here because these are markers only. They are not for clients to read.
167      mvcc.complete(walKey.getWriteEntry());
168    } catch (IOException ioe) {
169      if (walKey.getWriteEntry() != null) {
170        mvcc.complete(walKey.getWriteEntry());
171      }
172      throw ioe;
173    }
174    return walKey;
175  }
176
177  /**
178   * Blocksize returned here is 2x the default HDFS blocksize unless explicitly set in
179   * Configuration. Works in tandem with hbase.regionserver.logroll.multiplier. See comment in
180   * AbstractFSWAL in Constructor where we set blocksize and logrollsize for why.
181   * @return Blocksize to use writing WALs.
182   */
183  public static long getWALBlockSize(Configuration conf, FileSystem fs, Path dir)
184    throws IOException {
185    return getWALBlockSize(conf, fs, dir, false);
186  }
187
188  /**
189   * Public because of FSHLog. Should be package-private
190   * @param isRecoverEdits the created writer is for recovered edits or WAL. For recovered edits, it
191   *                       is true and for WAL it is false.
192   */
193  public static long getWALBlockSize(Configuration conf, FileSystem fs, Path dir,
194    boolean isRecoverEdits) throws IOException {
195    long defaultBlockSize = CommonFSUtils.getDefaultBlockSize(fs, dir) * 2;
196    if (isRecoverEdits) {
197      return conf.getLong("hbase.regionserver.recoverededits.blocksize", defaultBlockSize);
198    }
199    return conf.getLong(WAL_BLOCK_SIZE, defaultBlockSize);
200  }
201
202  public static void filterCells(WALEdit edit, Function<Cell, Cell> mapper) {
203    ArrayList<Cell> cells = edit.getCells();
204    int size = cells.size();
205    int newSize = 0;
206    for (int i = 0; i < size; i++) {
207      Cell cell = mapper.apply(cells.get(i));
208      if (cell != null) {
209        cells.set(newSize, cell);
210        newSize++;
211      }
212    }
213    for (int i = size - 1; i >= newSize; i--) {
214      cells.remove(i);
215    }
216    if (newSize < size / 2) {
217      cells.trimToSize();
218    }
219  }
220}