001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.regionserver.wal;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Map;
024import java.util.NavigableMap;
025import java.util.function.Function;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.client.RegionInfo;
032import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
033import org.apache.hadoop.hbase.util.CommonFSUtils;
034import org.apache.hadoop.hbase.wal.WAL;
035import org.apache.hadoop.hbase.wal.WALEdit;
036import org.apache.hadoop.hbase.wal.WALKeyImpl;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
045
046/**
047 * Helper methods to ease Region Server integration with the Write Ahead Log (WAL).
048 * Note that methods in this class specifically should not require access to anything
049 * other than the API found in {@link WAL}. For internal use only.
050 */
051@InterfaceAudience.Private
052public class WALUtil {
053  private static final Logger LOG = LoggerFactory.getLogger(WALUtil.class);
054
055  public static final String WAL_BLOCK_SIZE = "hbase.regionserver.hlog.blocksize";
056
057  private WALUtil() {
058    // Shut down construction of this class.
059  }
060
061  /**
062   * Write the marker that a compaction has succeeded and is about to be committed. This provides
063   * info to the HMaster to allow it to recover the compaction if this regionserver dies in the
064   * middle. It also prevents the compaction from finishing if this regionserver has already lost
065   * its lease on the log.
066   * <p/>
067   * This write is for internal use only. Not for external client consumption.
068   * @param mvcc Used by WAL to get sequence Id for the waledit.
069   */
070  public static WALKeyImpl writeCompactionMarker(WAL wal,
071    NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
072    MultiVersionConcurrencyControl mvcc) throws IOException {
073    WALKeyImpl walKey =
074      writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null);
075    if (LOG.isTraceEnabled()) {
076      LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
077    }
078    return walKey;
079  }
080
081  /**
082   * Write a flush marker indicating a start / abort or a complete of a region flush
083   * <p/>
084   * This write is for internal use only. Not for external client consumption.
085   */
086  public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
087    RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
088    throws IOException {
089    WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri,
090      WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync);
091    if (LOG.isTraceEnabled()) {
092      LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
093    }
094    return walKey;
095  }
096
097  /**
098   * Write a region open marker indicating that the region is opened. This write is for internal use
099   * only. Not for external client consumption.
100   */
101  public static WALKeyImpl writeRegionEventMarker(WAL wal,
102      NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, RegionEventDescriptor r,
103      MultiVersionConcurrencyControl mvcc)
104    throws IOException {
105    WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
106        WALEdit.createRegionEventWALEdit(hri, r), mvcc, null);
107    if (LOG.isTraceEnabled()) {
108      LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
109    }
110    return walKey;
111  }
112
113  /**
114   * Write a log marker that a bulk load has succeeded and is about to be committed.
115   * This write is for internal use only. Not for external client consumption.
116   * @param wal The log to write into.
117   * @param replicationScope The replication scope of the families in the HRegion
118   * @param hri A description of the region in the table that we are bulk loading into.
119   * @param desc A protocol buffers based description of the client's bulk loading request
120   * @return walKey with sequenceid filled out for this bulk load marker
121   * @throws IOException We will throw an IOException if we can not append to the HLog.
122   */
123  public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
124      final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
125      final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
126    throws IOException {
127    WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
128      WALEdit.createBulkLoadEvent(hri, desc), mvcc, null);
129    if (LOG.isTraceEnabled()) {
130      LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
131    }
132    return walKey;
133  }
134
135  private static WALKeyImpl writeMarker(final WAL wal,
136      NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, WALEdit edit, MultiVersionConcurrencyControl mvcc,
137      Map<String, byte[]> extendedAttributes)
138    throws IOException {
139    // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
140    return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, mvcc,
141      extendedAttributes, true);
142  }
143
144  /**
145   * A 'full' WAL transaction involves starting an mvcc transaction followed by an append, an
146   * optional sync, and then a call to complete the mvcc transaction. This method does it all. Good
147   * for case of adding a single edit or marker to the WAL.
148   * <p/>
149   * This write is for internal use only. Not for external client consumption.
150   * @return WALKeyImpl that was added to the WAL.
151   */
152  private static WALKeyImpl doFullMarkerAppendTransaction(WAL wal,
153      NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final WALEdit edit,
154      MultiVersionConcurrencyControl mvcc, Map<String, byte[]> extendedAttributes, boolean sync)
155    throws IOException {
156    // TODO: Pass in current time to use?
157    WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
158      System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes);
159    long trx = MultiVersionConcurrencyControl.NONE;
160    try {
161      trx = wal.appendMarker(hri, walKey, edit);
162      if (sync) {
163        wal.sync(trx);
164      }
165      // Call complete only here because these are markers only. They are not for clients to read.
166      mvcc.complete(walKey.getWriteEntry());
167    } catch (IOException ioe) {
168      if (walKey.getWriteEntry() != null) {
169        mvcc.complete(walKey.getWriteEntry());
170      }
171      throw ioe;
172    }
173    return walKey;
174  }
175
176  /**
177   * Blocksize returned here is 2x the default HDFS blocksize unless explicitly set in
178   * Configuration. Works in tandem with hbase.regionserver.logroll.multiplier. See comment in
179   * AbstractFSWAL in Constructor where we set blocksize and logrollsize for why.
180   * @return Blocksize to use writing WALs.
181   */
182  public static long getWALBlockSize(Configuration conf, FileSystem fs, Path dir)
183      throws IOException {
184    return getWALBlockSize(conf, fs, dir, false);
185  }
186
187  /**
188   * Public because of FSHLog. Should be package-private
189   * @param isRecoverEdits the created writer is for recovered edits or WAL. For recovered edits, it
190   *          is true and for WAL it is false.
191   */
192  public static long getWALBlockSize(Configuration conf, FileSystem fs, Path dir,
193      boolean isRecoverEdits) throws IOException {
194    long defaultBlockSize = CommonFSUtils.getDefaultBlockSize(fs, dir) * 2;
195    if (isRecoverEdits) {
196      return conf.getLong("hbase.regionserver.recoverededits.blocksize", defaultBlockSize);
197    }
198    return conf.getLong(WAL_BLOCK_SIZE, defaultBlockSize);
199  }
200
201  public static void filterCells(WALEdit edit, Function<Cell, Cell> mapper) {
202    ArrayList<Cell> cells = edit.getCells();
203    int size = cells.size();
204    int newSize = 0;
205    for (int i = 0; i < size; i++) {
206      Cell cell = mapper.apply(cells.get(i));
207      if (cell != null) {
208        cells.set(newSize, cell);
209        newSize++;
210      }
211    }
212    for (int i = size - 1; i >= newSize; i--) {
213      cells.remove(i);
214    }
215    if (newSize < size / 2) {
216      cells.trimToSize();
217    }
218  }
219}