001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Set;
027import java.util.function.Consumer;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.monitoring.MonitoredTask;
035import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
036import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
037import org.apache.hadoop.hbase.regionserver.HMobStore;
038import org.apache.hadoop.hbase.regionserver.HStore;
039import org.apache.hadoop.hbase.regionserver.InternalScanner;
040import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
041import org.apache.hadoop.hbase.regionserver.ScannerContext;
042import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
043import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
044import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.util.StringUtils;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
052
053/**
054 * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher. If the store is not a
055 * mob store, the flusher flushes the MemStore the same with DefaultStoreFlusher, If the store is a
056 * mob store, the flusher flushes the MemStore into two places. One is the store files of HBase, the
057 * other is the mob files.
058 * <ol>
059 * <li>Cells that are not PUT type or have the delete mark will be directly flushed to HBase.</li>
060 * <li>If the size of a cell value is larger than a threshold, it'll be flushed to a mob file,
061 * another cell with the path of this file will be flushed to HBase.</li>
062 * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
063 * HBase directly.</li>
064 * </ol>
065 */
066@InterfaceAudience.Private
067public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
068
069  private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreFlusher.class);
070  private final Object flushLock = new Object();
071  private long mobCellValueSizeThreshold = 0;
072  private Path targetPath;
073  private HMobStore mobStore;
074  // MOB file reference set
075  static ThreadLocal<Set<String>> mobRefSet = new ThreadLocal<Set<String>>() {
076    @Override
077    protected Set<String> initialValue() {
078      return new HashSet<String>();
079    }
080  };
081
082  public DefaultMobStoreFlusher(Configuration conf, HStore store) throws IOException {
083    super(conf, store);
084    if (!(store instanceof HMobStore)) {
085      throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
086    }
087    mobCellValueSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold();
088    this.targetPath =
089      MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
090    if (!this.store.getFileSystem().exists(targetPath)) {
091      this.store.getFileSystem().mkdirs(targetPath);
092    }
093    this.mobStore = (HMobStore) store;
094  }
095
096  /**
097   * Flushes the snapshot of the MemStore. If this store is not a mob store, flush the cells in the
098   * snapshot to store files of HBase. If the store is a mob one, the flusher flushes the MemStore
099   * into two places. One is the store files of HBase, the other is the mob files.
100   * <ol>
101   * <li>Cells that are not PUT type or have the delete mark will be directly flushed to HBase.</li>
102   * <li>If the size of a cell value is larger than a threshold, it'll be flushed to a mob file,
103   * another cell with the path of this file will be flushed to HBase.</li>
104   * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
105   * HBase directly.</li>
106   * </ol>
107   */
108  @Override
109  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
110    MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
111    Consumer<Path> writerCreationTracker) throws IOException {
112    ArrayList<Path> result = new ArrayList<>();
113    long cellsCount = snapshot.getCellsCount();
114    if (cellsCount == 0) return result; // don't flush if there are no entries
115
116    // Use a store scanner to find which rows to flush.
117    InternalScanner scanner = createScanner(snapshot.getScanners(), tracker);
118    StoreFileWriter writer;
119    try {
120      // TODO: We can fail in the below block before we complete adding this flush to
121      // list of store files. Add cleanup of anything put on filesystem if we fail.
122      synchronized (flushLock) {
123        status.setStatus("Flushing " + store + ": creating writer");
124        // Write the map out to the disk
125        writer = createWriter(snapshot, true, writerCreationTracker);
126        IOException e = null;
127        try {
128          // It's a mob store, flush the cells in a mob way. This is the difference of flushing
129          // between a normal and a mob store.
130          performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController,
131            writerCreationTracker);
132        } catch (IOException ioe) {
133          e = ioe;
134          // throw the exception out
135          throw ioe;
136        } finally {
137          if (e != null) {
138            writer.close();
139          } else {
140            finalizeWriter(writer, cacheFlushId, status);
141          }
142        }
143      }
144    } finally {
145      scanner.close();
146    }
147    LOG.info("Mob store is flushed, sequenceid=" + cacheFlushId + ", memsize="
148      + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getDataSize(), "", 1)
149      + ", hasBloomFilter=" + writer.hasGeneralBloom() + ", into tmp file " + writer.getPath());
150    result.add(writer.getPath());
151    return result;
152  }
153
154  /**
155   * Flushes the cells in the mob store.
156   * <ol>
157   * In the mob store, the cells with PUT type might have or have no mob tags.
158   * <li>If a cell does not have a mob tag, flushing the cell to different files depends on the
159   * value length. If the length is larger than a threshold, it's flushed to a mob file and the mob
160   * file is flushed to a store file in HBase. Otherwise, directly flush the cell to a store file in
161   * HBase.</li>
162   * <li>If a cell have a mob tag, its value is a mob file name, directly flush it to a store file
163   * in HBase.</li>
164   * </ol>
165   * @param snapshot             Memstore snapshot.
166   * @param cacheFlushId         Log cache flush sequence number.
167   * @param scanner              The scanner of memstore snapshot.
168   * @param writer               The store file writer.
169   * @param status               Task that represents the flush operation and may be updated with
170   *                             status.
171   * @param throughputController A controller to avoid flush too fast. n
172   */
173  protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
174    InternalScanner scanner, StoreFileWriter writer, MonitoredTask status,
175    ThroughputController throughputController, Consumer<Path> writerCreationTracker)
176    throws IOException {
177    StoreFileWriter mobFileWriter = null;
178    int compactionKVMax =
179      conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
180    long mobCount = 0;
181    long mobSize = 0;
182    long time = snapshot.getTimeRangeTracker().getMax();
183    mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst()
184      ? mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
185        store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(),
186        false)
187      : mobStore.createWriter(new Date(time), snapshot.getCellsCount(),
188        store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(),
189        false, writerCreationTracker);
190    // the target path is {tableName}/.mob/{cfName}/mobFiles
191    // the relative path is mobFiles
192    byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
193    ScannerContext scannerContext =
194      ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
195    List<Cell> cells = new ArrayList<>();
196    boolean hasMore;
197    String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush");
198    boolean control =
199      throughputController != null && !store.getRegionInfo().getTable().isSystemTable();
200    if (control) {
201      throughputController.start(flushName);
202    }
203    IOException ioe = null;
204    // Clear all past MOB references
205    mobRefSet.get().clear();
206    try {
207      do {
208        hasMore = scanner.next(cells, scannerContext);
209        if (!cells.isEmpty()) {
210          for (Cell c : cells) {
211            // If we know that this KV is going to be included always, then let us
212            // set its memstoreTS to 0. This will help us save space when writing to
213            // disk.
214            if (
215              c.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(c)
216                || c.getTypeByte() != KeyValue.Type.Put.getCode()
217            ) {
218              writer.append(c);
219            } else {
220              // append the original keyValue in the mob file.
221              mobFileWriter.append(c);
222              mobSize += c.getValueLength();
223              mobCount++;
224              // append the tags to the KeyValue.
225              // The key is same, the value is the filename of the mob file
226              Cell reference =
227                MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags());
228              writer.append(reference);
229            }
230            if (control) {
231              throughputController.control(flushName, c.getSerializedSize());
232            }
233          }
234          cells.clear();
235        }
236      } while (hasMore);
237    } catch (InterruptedException e) {
238      ioe =
239        new InterruptedIOException("Interrupted while control throughput of flushing " + flushName);
240      throw ioe;
241    } catch (IOException e) {
242      ioe = e;
243      throw e;
244    } finally {
245      if (control) {
246        throughputController.finish(flushName);
247      }
248      if (ioe != null) {
249        mobFileWriter.close();
250      }
251    }
252
253    if (mobCount > 0) {
254      // commit the mob file from temp folder to target folder.
255      // If the mob file is committed successfully but the store file is not,
256      // the committed mob file will be handled by the sweep tool as an unused
257      // file.
258      status.setStatus("Flushing mob file " + store + ": appending metadata");
259      mobFileWriter.appendMetadata(cacheFlushId, false, mobCount);
260      status.setStatus("Flushing mob file " + store + ": closing flushed file");
261      mobFileWriter.close();
262      mobStore.commitFile(mobFileWriter.getPath(), targetPath);
263      LOG.debug("Flush store file: {}, store: {}", writer.getPath(), getStoreInfo());
264      mobStore.updateMobFlushCount();
265      mobStore.updateMobFlushedCellsCount(mobCount);
266      mobStore.updateMobFlushedCellsSize(mobSize);
267      // Add mob reference to store file metadata
268      mobRefSet.get().add(mobFileWriter.getPath().getName());
269    } else {
270      try {
271        status.setStatus("Flushing mob file " + store + ": no mob cells, closing flushed file");
272        mobFileWriter.close();
273        // If the mob file is empty, delete it instead of committing.
274        store.getFileSystem().delete(mobFileWriter.getPath(), true);
275      } catch (IOException e) {
276        LOG.error("Failed to delete the temp mob file", e);
277      }
278    }
279  }
280
281  @Override
282  protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum, MonitoredTask status)
283    throws IOException {
284    // Write out the log sequence number that corresponds to this output
285    // hfile. Also write current time in metadata as minFlushTime.
286    // The hfile is current up to and including cacheFlushSeqNum.
287    status.setStatus("Flushing " + store + ": appending metadata");
288    writer.appendMetadata(cacheFlushSeqNum, false);
289    writer.appendMobMetadata(ImmutableSetMultimap.<TableName, String> builder()
290      .putAll(store.getTableName(), mobRefSet.get()).build());
291    status.setStatus("Flushing " + store + ": closing flushed file");
292    writer.close();
293  }
294
295  private String getStoreInfo() {
296    return String.format("[table=%s family=%s region=%s]", store.getTableName().getNameAsString(),
297      store.getColumnFamilyName(), store.getRegionInfo().getEncodedName());
298  }
299}