001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mob;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Date;
025import java.util.List;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.KeyValueUtil;
033import org.apache.hadoop.hbase.monitoring.MonitoredTask;
034import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
035import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
036import org.apache.hadoop.hbase.regionserver.HMobStore;
037import org.apache.hadoop.hbase.regionserver.HStore;
038import org.apache.hadoop.hbase.regionserver.InternalScanner;
039import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
040import org.apache.hadoop.hbase.regionserver.ScannerContext;
041import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
042import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
043import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.util.StringUtils;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher.
052 * If the store is not a mob store, the flusher flushes the MemStore the same with
053 * DefaultStoreFlusher,
054 * If the store is a mob store, the flusher flushes the MemStore into two places.
055 * One is the store files of HBase, the other is the mob files.
056 * <ol>
057 * <li>Cells that are not PUT type or have the delete mark will be directly flushed to HBase.</li>
058 * <li>If the size of a cell value is larger than a threshold, it'll be flushed
059 * to a mob file, another cell with the path of this file will be flushed to HBase.</li>
060 * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
061 * HBase directly.</li>
062 * </ol>
063 *
064 */
065@InterfaceAudience.Private
066public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
067
068  private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreFlusher.class);
069  private final Object flushLock = new Object();
070  private long mobCellValueSizeThreshold = 0;
071  private Path targetPath;
072  private HMobStore mobStore;
073
074  public DefaultMobStoreFlusher(Configuration conf, HStore store) throws IOException {
075    super(conf, store);
076    if (!(store instanceof HMobStore)) {
077      throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
078    }
079    mobCellValueSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold();
080    this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(),
081        store.getColumnFamilyName());
082    if (!this.store.getFileSystem().exists(targetPath)) {
083      this.store.getFileSystem().mkdirs(targetPath);
084    }
085    this.mobStore = (HMobStore) store;
086  }
087
088  /**
089   * Flushes the snapshot of the MemStore.
090   * If this store is not a mob store, flush the cells in the snapshot to store files of HBase.
091   * If the store is a mob one, the flusher flushes the MemStore into two places.
092   * One is the store files of HBase, the other is the mob files.
093   * <ol>
094   * <li>Cells that are not PUT type or have the delete mark will be directly flushed to
095   * HBase.</li>
096   * <li>If the size of a cell value is larger than a threshold, it'll be
097   * flushed to a mob file, another cell with the path of this file will be flushed to HBase.</li>
098   * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
099   * HBase directly.</li>
100   * </ol>
101   */
102  @Override
103  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
104      MonitoredTask status, ThroughputController throughputController,
105      FlushLifeCycleTracker tracker) throws IOException {
106    ArrayList<Path> result = new ArrayList<>();
107    long cellsCount = snapshot.getCellsCount();
108    if (cellsCount == 0) return result; // don't flush if there are no entries
109
110    // Use a store scanner to find which rows to flush.
111    long smallestReadPoint = store.getSmallestReadPoint();
112    InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
113    StoreFileWriter writer;
114    try {
115      // TODO: We can fail in the below block before we complete adding this flush to
116      // list of store files. Add cleanup of anything put on filesystem if we fail.
117      synchronized (flushLock) {
118        status.setStatus("Flushing " + store + ": creating writer");
119        // Write the map out to the disk
120        writer = store.createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(),
121            false, true, true, false);
122        IOException e = null;
123        try {
124          // It's a mob store, flush the cells in a mob way. This is the difference of flushing
125          // between a normal and a mob store.
126          performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController);
127        } catch (IOException ioe) {
128          e = ioe;
129          // throw the exception out
130          throw ioe;
131        } finally {
132          if (e != null) {
133            writer.close();
134          } else {
135            finalizeWriter(writer, cacheFlushId, status);
136          }
137        }
138      }
139    } finally {
140      scanner.close();
141    }
142    LOG.info("Mob store is flushed, sequenceid=" + cacheFlushId + ", memsize="
143        + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getDataSize(), "", 1) +
144        ", hasBloomFilter=" + writer.hasGeneralBloom() +
145        ", into tmp file " + writer.getPath());
146    result.add(writer.getPath());
147    return result;
148  }
149
150  /**
151   * Flushes the cells in the mob store.
152   * <ol>In the mob store, the cells with PUT type might have or have no mob tags.
153   * <li>If a cell does not have a mob tag, flushing the cell to different files depends
154   * on the value length. If the length is larger than a threshold, it's flushed to a
155   * mob file and the mob file is flushed to a store file in HBase. Otherwise, directly
156   * flush the cell to a store file in HBase.</li>
157   * <li>If a cell have a mob tag, its value is a mob file name, directly flush it
158   * to a store file in HBase.</li>
159   * </ol>
160   * @param snapshot Memstore snapshot.
161   * @param cacheFlushId Log cache flush sequence number.
162   * @param scanner The scanner of memstore snapshot.
163   * @param writer The store file writer.
164   * @param status Task that represents the flush operation and may be updated with status.
165   * @param throughputController A controller to avoid flush too fast.
166   * @throws IOException
167   */
168  protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
169      InternalScanner scanner, StoreFileWriter writer, MonitoredTask status,
170      ThroughputController throughputController) throws IOException {
171    StoreFileWriter mobFileWriter = null;
172    int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX,
173        HConstants.COMPACTION_KV_MAX_DEFAULT);
174    long mobCount = 0;
175    long mobSize = 0;
176    long time = snapshot.getTimeRangeTracker().getMax();
177    mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
178        store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(), false);
179    // the target path is {tableName}/.mob/{cfName}/mobFiles
180    // the relative path is mobFiles
181    byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
182    ScannerContext scannerContext =
183        ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
184    List<Cell> cells = new ArrayList<>();
185    boolean hasMore;
186    String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush");
187    boolean control = throughputController != null && !store.getRegionInfo().getTable().isSystemTable();
188    if (control) {
189      throughputController.start(flushName);
190    }
191    IOException ioe = null;
192    try {
193      do {
194        hasMore = scanner.next(cells, scannerContext);
195        if (!cells.isEmpty()) {
196          for (Cell c : cells) {
197            // If we know that this KV is going to be included always, then let us
198            // set its memstoreTS to 0. This will help us save space when writing to
199            // disk.
200            if (c.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(c)
201                || c.getTypeByte() != KeyValue.Type.Put.getCode()) {
202              writer.append(c);
203            } else {
204              // append the original keyValue in the mob file.
205              mobFileWriter.append(c);
206              mobSize += c.getValueLength();
207              mobCount++;
208
209              // append the tags to the KeyValue.
210              // The key is same, the value is the filename of the mob file
211              Cell reference = MobUtils.createMobRefCell(c, fileName,
212                  this.mobStore.getRefCellTags());
213              writer.append(reference);
214            }
215            int len = KeyValueUtil.length(c);
216            if (control) {
217              throughputController.control(flushName, len);
218            }
219          }
220          cells.clear();
221        }
222      } while (hasMore);
223    } catch (InterruptedException e) {
224      ioe = new InterruptedIOException(
225          "Interrupted while control throughput of flushing " + flushName);
226      throw ioe;
227    } catch (IOException e) {
228      ioe = e;
229      throw e;
230    } finally {
231      if (control) {
232        throughputController.finish(flushName);
233      }
234      if (ioe != null) {
235        mobFileWriter.close();
236      }
237    }
238
239    if (mobCount > 0) {
240      // commit the mob file from temp folder to target folder.
241      // If the mob file is committed successfully but the store file is not,
242      // the committed mob file will be handled by the sweep tool as an unused
243      // file.
244      status.setStatus("Flushing mob file " + store + ": appending metadata");
245      mobFileWriter.appendMetadata(cacheFlushId, false, mobCount);
246      status.setStatus("Flushing mob file " + store + ": closing flushed file");
247      mobFileWriter.close();
248      mobStore.commitFile(mobFileWriter.getPath(), targetPath);
249      mobStore.updateMobFlushCount();
250      mobStore.updateMobFlushedCellsCount(mobCount);
251      mobStore.updateMobFlushedCellsSize(mobSize);
252    } else {
253      try {
254        status.setStatus("Flushing mob file " + store + ": no mob cells, closing flushed file");
255        mobFileWriter.close();
256        // If the mob file is empty, delete it instead of committing.
257        store.getFileSystem().delete(mobFileWriter.getPath(), true);
258      } catch (IOException e) {
259        LOG.error("Failed to delete the temp mob file", e);
260      }
261    }
262  }
263}