001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mob;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Date;
025import java.util.List;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.monitoring.MonitoredTask;
033import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
034import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
035import org.apache.hadoop.hbase.regionserver.HMobStore;
036import org.apache.hadoop.hbase.regionserver.HStore;
037import org.apache.hadoop.hbase.regionserver.InternalScanner;
038import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
039import org.apache.hadoop.hbase.regionserver.ScannerContext;
040import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
041import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
042import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.util.StringUtils;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher.
051 * If the store is not a mob store, the flusher flushes the MemStore the same with
052 * DefaultStoreFlusher,
053 * If the store is a mob store, the flusher flushes the MemStore into two places.
054 * One is the store files of HBase, the other is the mob files.
055 * <ol>
056 * <li>Cells that are not PUT type or have the delete mark will be directly flushed to HBase.</li>
057 * <li>If the size of a cell value is larger than a threshold, it'll be flushed
058 * to a mob file, another cell with the path of this file will be flushed to HBase.</li>
059 * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
060 * HBase directly.</li>
061 * </ol>
062 *
063 */
064@InterfaceAudience.Private
065public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
066
067  private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreFlusher.class);
068  private final Object flushLock = new Object();
069  private long mobCellValueSizeThreshold = 0;
070  private Path targetPath;
071  private HMobStore mobStore;
072
073  public DefaultMobStoreFlusher(Configuration conf, HStore store) throws IOException {
074    super(conf, store);
075    if (!(store instanceof HMobStore)) {
076      throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
077    }
078    mobCellValueSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold();
079    this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(),
080        store.getColumnFamilyName());
081    if (!this.store.getFileSystem().exists(targetPath)) {
082      this.store.getFileSystem().mkdirs(targetPath);
083    }
084    this.mobStore = (HMobStore) store;
085  }
086
087  /**
088   * Flushes the snapshot of the MemStore.
089   * If this store is not a mob store, flush the cells in the snapshot to store files of HBase.
090   * If the store is a mob one, the flusher flushes the MemStore into two places.
091   * One is the store files of HBase, the other is the mob files.
092   * <ol>
093   * <li>Cells that are not PUT type or have the delete mark will be directly flushed to
094   * HBase.</li>
095   * <li>If the size of a cell value is larger than a threshold, it'll be
096   * flushed to a mob file, another cell with the path of this file will be flushed to HBase.</li>
097   * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
098   * HBase directly.</li>
099   * </ol>
100   */
101  @Override
102  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
103      MonitoredTask status, ThroughputController throughputController,
104      FlushLifeCycleTracker tracker) throws IOException {
105    ArrayList<Path> result = new ArrayList<>();
106    long cellsCount = snapshot.getCellsCount();
107    if (cellsCount == 0) return result; // don't flush if there are no entries
108
109    // Use a store scanner to find which rows to flush.
110    InternalScanner scanner = createScanner(snapshot.getScanners(), tracker);
111    StoreFileWriter writer;
112    try {
113      // TODO: We can fail in the below block before we complete adding this flush to
114      // list of store files. Add cleanup of anything put on filesystem if we fail.
115      synchronized (flushLock) {
116        status.setStatus("Flushing " + store + ": creating writer");
117        // Write the map out to the disk
118        writer = store.createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(),
119            false, true, true, false);
120        IOException e = null;
121        try {
122          // It's a mob store, flush the cells in a mob way. This is the difference of flushing
123          // between a normal and a mob store.
124          performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController);
125        } catch (IOException ioe) {
126          e = ioe;
127          // throw the exception out
128          throw ioe;
129        } finally {
130          if (e != null) {
131            writer.close();
132          } else {
133            finalizeWriter(writer, cacheFlushId, status);
134          }
135        }
136      }
137    } finally {
138      scanner.close();
139    }
140    LOG.info("Mob store is flushed, sequenceid=" + cacheFlushId + ", memsize="
141        + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getDataSize(), "", 1) +
142        ", hasBloomFilter=" + writer.hasGeneralBloom() +
143        ", into tmp file " + writer.getPath());
144    result.add(writer.getPath());
145    return result;
146  }
147
148  /**
149   * Flushes the cells in the mob store.
150   * <ol>In the mob store, the cells with PUT type might have or have no mob tags.
151   * <li>If a cell does not have a mob tag, flushing the cell to different files depends
152   * on the value length. If the length is larger than a threshold, it's flushed to a
153   * mob file and the mob file is flushed to a store file in HBase. Otherwise, directly
154   * flush the cell to a store file in HBase.</li>
155   * <li>If a cell have a mob tag, its value is a mob file name, directly flush it
156   * to a store file in HBase.</li>
157   * </ol>
158   * @param snapshot Memstore snapshot.
159   * @param cacheFlushId Log cache flush sequence number.
160   * @param scanner The scanner of memstore snapshot.
161   * @param writer The store file writer.
162   * @param status Task that represents the flush operation and may be updated with status.
163   * @param throughputController A controller to avoid flush too fast.
164   * @throws IOException
165   */
166  protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
167      InternalScanner scanner, StoreFileWriter writer, MonitoredTask status,
168      ThroughputController throughputController) throws IOException {
169    StoreFileWriter mobFileWriter = null;
170    int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX,
171        HConstants.COMPACTION_KV_MAX_DEFAULT);
172    long mobCount = 0;
173    long mobSize = 0;
174    long time = snapshot.getTimeRangeTracker().getMax();
175    mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
176        store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(), false);
177    // the target path is {tableName}/.mob/{cfName}/mobFiles
178    // the relative path is mobFiles
179    byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
180    ScannerContext scannerContext =
181        ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
182    List<Cell> cells = new ArrayList<>();
183    boolean hasMore;
184    String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush");
185    boolean control = throughputController != null && !store.getRegionInfo().getTable().isSystemTable();
186    if (control) {
187      throughputController.start(flushName);
188    }
189    IOException ioe = null;
190    try {
191      do {
192        hasMore = scanner.next(cells, scannerContext);
193        if (!cells.isEmpty()) {
194          for (Cell c : cells) {
195            // If we know that this KV is going to be included always, then let us
196            // set its memstoreTS to 0. This will help us save space when writing to
197            // disk.
198            if (c.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(c)
199                || c.getTypeByte() != KeyValue.Type.Put.getCode()) {
200              writer.append(c);
201            } else {
202              // append the original keyValue in the mob file.
203              mobFileWriter.append(c);
204              mobSize += c.getValueLength();
205              mobCount++;
206
207              // append the tags to the KeyValue.
208              // The key is same, the value is the filename of the mob file
209              Cell reference = MobUtils.createMobRefCell(c, fileName,
210                  this.mobStore.getRefCellTags());
211              writer.append(reference);
212            }
213            if (control) {
214              throughputController.control(flushName, c.getSerializedSize());
215            }
216          }
217          cells.clear();
218        }
219      } while (hasMore);
220    } catch (InterruptedException e) {
221      ioe = new InterruptedIOException(
222          "Interrupted while control throughput of flushing " + flushName);
223      throw ioe;
224    } catch (IOException e) {
225      ioe = e;
226      throw e;
227    } finally {
228      if (control) {
229        throughputController.finish(flushName);
230      }
231      if (ioe != null) {
232        mobFileWriter.close();
233      }
234    }
235
236    if (mobCount > 0) {
237      // commit the mob file from temp folder to target folder.
238      // If the mob file is committed successfully but the store file is not,
239      // the committed mob file will be handled by the sweep tool as an unused
240      // file.
241      status.setStatus("Flushing mob file " + store + ": appending metadata");
242      mobFileWriter.appendMetadata(cacheFlushId, false, mobCount);
243      status.setStatus("Flushing mob file " + store + ": closing flushed file");
244      mobFileWriter.close();
245      mobStore.commitFile(mobFileWriter.getPath(), targetPath);
246      mobStore.updateMobFlushCount();
247      mobStore.updateMobFlushedCellsCount(mobCount);
248      mobStore.updateMobFlushedCellsSize(mobSize);
249    } else {
250      try {
251        status.setStatus("Flushing mob file " + store + ": no mob cells, closing flushed file");
252        mobFileWriter.close();
253        // If the mob file is empty, delete it instead of committing.
254        store.getFileSystem().delete(mobFileWriter.getPath(), true);
255      } catch (IOException e) {
256        LOG.error("Failed to delete the temp mob file", e);
257      }
258    }
259  }
260}