001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mob;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Date;
025import java.util.List;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.monitoring.MonitoredTask;
033import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
034import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
035import org.apache.hadoop.hbase.regionserver.HMobStore;
036import org.apache.hadoop.hbase.regionserver.HStore;
037import org.apache.hadoop.hbase.regionserver.InternalScanner;
038import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
039import org.apache.hadoop.hbase.regionserver.ScannerContext;
040import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
041import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
042import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.util.StringUtils;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher.
051 * If the store is not a mob store, the flusher flushes the MemStore the same with
052 * DefaultStoreFlusher,
053 * If the store is a mob store, the flusher flushes the MemStore into two places.
054 * One is the store files of HBase, the other is the mob files.
055 * <ol>
056 * <li>Cells that are not PUT type or have the delete mark will be directly flushed to HBase.</li>
057 * <li>If the size of a cell value is larger than a threshold, it'll be flushed
058 * to a mob file, another cell with the path of this file will be flushed to HBase.</li>
059 * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
060 * HBase directly.</li>
061 * </ol>
062 *
063 */
064@InterfaceAudience.Private
065public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
066
067  private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreFlusher.class);
068  private final Object flushLock = new Object();
069  private long mobCellValueSizeThreshold = 0;
070  private Path targetPath;
071  private HMobStore mobStore;
072
073  public DefaultMobStoreFlusher(Configuration conf, HStore store) throws IOException {
074    super(conf, store);
075    if (!(store instanceof HMobStore)) {
076      throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
077    }
078    mobCellValueSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold();
079    this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(),
080        store.getColumnFamilyName());
081    if (!this.store.getFileSystem().exists(targetPath)) {
082      this.store.getFileSystem().mkdirs(targetPath);
083    }
084    this.mobStore = (HMobStore) store;
085  }
086
087  /**
088   * Flushes the snapshot of the MemStore.
089   * If this store is not a mob store, flush the cells in the snapshot to store files of HBase.
090   * If the store is a mob one, the flusher flushes the MemStore into two places.
091   * One is the store files of HBase, the other is the mob files.
092   * <ol>
093   * <li>Cells that are not PUT type or have the delete mark will be directly flushed to
094   * HBase.</li>
095   * <li>If the size of a cell value is larger than a threshold, it'll be
096   * flushed to a mob file, another cell with the path of this file will be flushed to HBase.</li>
097   * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
098   * HBase directly.</li>
099   * </ol>
100   */
101  @Override
102  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
103      MonitoredTask status, ThroughputController throughputController,
104      FlushLifeCycleTracker tracker) throws IOException {
105    ArrayList<Path> result = new ArrayList<>();
106    long cellsCount = snapshot.getCellsCount();
107    if (cellsCount == 0) return result; // don't flush if there are no entries
108
109    // Use a store scanner to find which rows to flush.
110    long smallestReadPoint = store.getSmallestReadPoint();
111    InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
112    StoreFileWriter writer;
113    try {
114      // TODO: We can fail in the below block before we complete adding this flush to
115      // list of store files. Add cleanup of anything put on filesystem if we fail.
116      synchronized (flushLock) {
117        status.setStatus("Flushing " + store + ": creating writer");
118        // Write the map out to the disk
119        writer = store.createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(),
120            false, true, true, false);
121        IOException e = null;
122        try {
123          // It's a mob store, flush the cells in a mob way. This is the difference of flushing
124          // between a normal and a mob store.
125          performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController);
126        } catch (IOException ioe) {
127          e = ioe;
128          // throw the exception out
129          throw ioe;
130        } finally {
131          if (e != null) {
132            writer.close();
133          } else {
134            finalizeWriter(writer, cacheFlushId, status);
135          }
136        }
137      }
138    } finally {
139      scanner.close();
140    }
141    LOG.info("Mob store is flushed, sequenceid=" + cacheFlushId + ", memsize="
142        + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getDataSize(), "", 1) +
143        ", hasBloomFilter=" + writer.hasGeneralBloom() +
144        ", into tmp file " + writer.getPath());
145    result.add(writer.getPath());
146    return result;
147  }
148
149  /**
150   * Flushes the cells in the mob store.
151   * <ol>In the mob store, the cells with PUT type might have or have no mob tags.
152   * <li>If a cell does not have a mob tag, flushing the cell to different files depends
153   * on the value length. If the length is larger than a threshold, it's flushed to a
154   * mob file and the mob file is flushed to a store file in HBase. Otherwise, directly
155   * flush the cell to a store file in HBase.</li>
156   * <li>If a cell have a mob tag, its value is a mob file name, directly flush it
157   * to a store file in HBase.</li>
158   * </ol>
159   * @param snapshot Memstore snapshot.
160   * @param cacheFlushId Log cache flush sequence number.
161   * @param scanner The scanner of memstore snapshot.
162   * @param writer The store file writer.
163   * @param status Task that represents the flush operation and may be updated with status.
164   * @param throughputController A controller to avoid flush too fast.
165   * @throws IOException
166   */
167  protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
168      InternalScanner scanner, StoreFileWriter writer, MonitoredTask status,
169      ThroughputController throughputController) throws IOException {
170    StoreFileWriter mobFileWriter = null;
171    int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX,
172        HConstants.COMPACTION_KV_MAX_DEFAULT);
173    long mobCount = 0;
174    long mobSize = 0;
175    long time = snapshot.getTimeRangeTracker().getMax();
176    mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
177        store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(), false);
178    // the target path is {tableName}/.mob/{cfName}/mobFiles
179    // the relative path is mobFiles
180    byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
181    ScannerContext scannerContext =
182        ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
183    List<Cell> cells = new ArrayList<>();
184    boolean hasMore;
185    String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush");
186    boolean control = throughputController != null && !store.getRegionInfo().getTable().isSystemTable();
187    if (control) {
188      throughputController.start(flushName);
189    }
190    IOException ioe = null;
191    try {
192      do {
193        hasMore = scanner.next(cells, scannerContext);
194        if (!cells.isEmpty()) {
195          for (Cell c : cells) {
196            // If we know that this KV is going to be included always, then let us
197            // set its memstoreTS to 0. This will help us save space when writing to
198            // disk.
199            if (c.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(c)
200                || c.getTypeByte() != KeyValue.Type.Put.getCode()) {
201              writer.append(c);
202            } else {
203              // append the original keyValue in the mob file.
204              mobFileWriter.append(c);
205              mobSize += c.getValueLength();
206              mobCount++;
207
208              // append the tags to the KeyValue.
209              // The key is same, the value is the filename of the mob file
210              Cell reference = MobUtils.createMobRefCell(c, fileName,
211                  this.mobStore.getRefCellTags());
212              writer.append(reference);
213            }
214            if (control) {
215              throughputController.control(flushName, c.getSerializedSize());
216            }
217          }
218          cells.clear();
219        }
220      } while (hasMore);
221    } catch (InterruptedException e) {
222      ioe = new InterruptedIOException(
223          "Interrupted while control throughput of flushing " + flushName);
224      throw ioe;
225    } catch (IOException e) {
226      ioe = e;
227      throw e;
228    } finally {
229      if (control) {
230        throughputController.finish(flushName);
231      }
232      if (ioe != null) {
233        mobFileWriter.close();
234      }
235    }
236
237    if (mobCount > 0) {
238      // commit the mob file from temp folder to target folder.
239      // If the mob file is committed successfully but the store file is not,
240      // the committed mob file will be handled by the sweep tool as an unused
241      // file.
242      status.setStatus("Flushing mob file " + store + ": appending metadata");
243      mobFileWriter.appendMetadata(cacheFlushId, false, mobCount);
244      status.setStatus("Flushing mob file " + store + ": closing flushed file");
245      mobFileWriter.close();
246      mobStore.commitFile(mobFileWriter.getPath(), targetPath);
247      mobStore.updateMobFlushCount();
248      mobStore.updateMobFlushedCellsCount(mobCount);
249      mobStore.updateMobFlushedCellsSize(mobSize);
250    } else {
251      try {
252        status.setStatus("Flushing mob file " + store + ": no mob cells, closing flushed file");
253        mobFileWriter.close();
254        // If the mob file is empty, delete it instead of committing.
255        store.getFileSystem().delete(mobFileWriter.getPath(), true);
256      } catch (IOException e) {
257        LOG.error("Failed to delete the temp mob file", e);
258      }
259    }
260  }
261}