001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.List;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellUtil;
030import org.apache.hadoop.hbase.PrivateCellUtil;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.regionserver.CellSink;
033import org.apache.hadoop.hbase.regionserver.HMobStore;
034import org.apache.hadoop.hbase.regionserver.HStore;
035import org.apache.hadoop.hbase.regionserver.InternalScanner;
036import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
037import org.apache.hadoop.hbase.regionserver.ScanInfo;
038import org.apache.hadoop.hbase.regionserver.ScanType;
039import org.apache.hadoop.hbase.regionserver.ScannerContext;
040import org.apache.hadoop.hbase.regionserver.ShipperListener;
041import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
042import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
043import org.apache.hadoop.hbase.regionserver.StoreScanner;
044import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
045import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
046import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
047import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
048import org.apache.hadoop.hbase.security.User;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Compact passed set of files in the mob-enabled column family.
057 */
058@InterfaceAudience.Private
059public class DefaultMobStoreCompactor extends DefaultCompactor {
060
061  private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreCompactor.class);
062  private long mobSizeThreshold;
063  private HMobStore mobStore;
064
065  private final InternalScannerFactory scannerFactory = new InternalScannerFactory() {
066
067    @Override
068    public ScanType getScanType(CompactionRequestImpl request) {
069      // retain the delete markers until they are expired.
070      return ScanType.COMPACT_RETAIN_DELETES;
071    }
072
073    @Override
074    public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
075        ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
076      return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint,
077          fd.earliestPutTs);
078    }
079  };
080
081  private final CellSinkFactory<StoreFileWriter> writerFactory =
082      new CellSinkFactory<StoreFileWriter>() {
083        @Override
084        public StoreFileWriter createWriter(InternalScanner scanner,
085            org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
086            boolean shouldDropBehind) throws IOException {
087          // make this writer with tags always because of possible new cells with tags.
088          return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, true, true,
089            shouldDropBehind);
090        }
091      };
092
093  public DefaultMobStoreCompactor(Configuration conf, HStore store) {
094    super(conf, store);
095    // The mob cells reside in the mob-enabled column family which is held by HMobStore.
096    // During the compaction, the compactor reads the cells from the mob files and
097    // probably creates new mob files. All of these operations are included in HMobStore,
098    // so we need to cast the Store to HMobStore.
099    if (!(store instanceof HMobStore)) {
100      throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
101    }
102    mobStore = (HMobStore) store;
103    mobSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold();
104  }
105
106  @Override
107  public List<Path> compact(CompactionRequestImpl request, ThroughputController throughputController,
108      User user) throws IOException {
109    return compact(request, scannerFactory, writerFactory, throughputController, user);
110  }
111
112  /**
113   * Performs compaction on a column family with the mob flag enabled.
114   * This is for when the mob threshold size has changed or if the mob
115   * column family mode has been toggled via an alter table statement.
116   * Compacts the files by the following rules.
117   * 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file.
118   * <ol>
119   * <li>
120   * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
121   * directly copy the (with mob tag) cell into the new store file.
122   * </li>
123   * <li>
124   * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into
125   * the new store file.
126   * </li>
127   * </ol>
128   * 2. If the Put cell doesn't have a reference tag.
129   * <ol>
130   * <li>
131   * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
132   * write this cell to a mob file, and write the path of this mob file to the store file.
133   * </li>
134   * <li>
135   * Otherwise, directly write this cell into the store file.
136   * </li>
137   * </ol>
138   * 3. Decide how to write a Delete cell.
139   * <ol>
140   * <li>
141   * If a Delete cell does not have a mob reference tag which means this delete marker have not
142   * been written to the mob del file, write this cell to the mob del file, and write this cell
143   * with a ref tag to a store file.
144   * </li>
145   * <li>
146   * Otherwise, directly write it to a store file.
147   * </li>
148   * </ol>
149   * After the major compaction on the normal hfiles, we have a guarantee that we have purged all
150   * deleted or old version mob refs, and the delete markers are written to a del file with the
151   * suffix _del. Because of this, it is safe to use the del file in the mob compaction.
152   * The mob compaction doesn't take place in the normal hfiles, it occurs directly in the
153   * mob files. When the small mob files are merged into bigger ones, the del file is added into
154   * the scanner to filter the deleted cells.
155   * @param fd File details
156   * @param scanner Where to read from.
157   * @param writer Where to write to.
158   * @param smallestReadPoint Smallest read point.
159   * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
160   * @param throughputController The compaction throughput controller.
161   * @param major Is a major compaction.
162   * @param numofFilesToCompact the number of files to compact
163   * @return Whether compaction ended; false if it was interrupted for any reason.
164   */
165  @Override
166  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
167      long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
168      boolean major, int numofFilesToCompact) throws IOException {
169    long bytesWrittenProgressForCloseCheck = 0;
170    long bytesWrittenProgressForLog = 0;
171    long bytesWrittenProgressForShippedCall = 0;
172    // Since scanner.next() can return 'false' but still be delivering data,
173    // we have to use a do/while loop.
174    List<Cell> cells = new ArrayList<>();
175    // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
176    int closeCheckSizeLimit = HStore.getCloseCheckInterval();
177    long lastMillis = 0;
178    if (LOG.isDebugEnabled()) {
179      lastMillis = EnvironmentEdgeManager.currentTime();
180    }
181    String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
182    long now = 0;
183    boolean hasMore;
184    Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
185    byte[] fileName = null;
186    StoreFileWriter mobFileWriter = null, delFileWriter = null;
187    long mobCells = 0, deleteMarkersCount = 0;
188    long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0;
189    long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
190    boolean finished = false;
191    ScannerContext scannerContext =
192        ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
193    throughputController.start(compactionName);
194    KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null;
195    long shippedCallSizeLimit = (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
196    try {
197      try {
198        // If the mob file writer could not be created, directly write the cell to the store file.
199        mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
200          compactionCompression, store.getRegionInfo().getStartKey(), true);
201        fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
202      } catch (IOException e) {
203        LOG.warn("Failed to create mob writer, "
204               + "we will continue the compaction by writing MOB cells directly in store files", e);
205      }
206      if (major) {
207        try {
208          delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs),
209            fd.maxKeyCount, compactionCompression, store.getRegionInfo().getStartKey());
210        } catch (IOException e) {
211          LOG.warn(
212            "Failed to create del writer, "
213            + "we will continue the compaction by writing delete markers directly in store files",
214            e);
215        }
216      }
217      do {
218        hasMore = scanner.next(cells, scannerContext);
219        if (LOG.isDebugEnabled()) {
220          now = EnvironmentEdgeManager.currentTime();
221        }
222        for (Cell c : cells) {
223          if (major && CellUtil.isDelete(c)) {
224            if (MobUtils.isMobReferenceCell(c) || delFileWriter == null) {
225              // Directly write it to a store file
226              writer.append(c);
227            } else {
228              // Add a ref tag to this cell and write it to a store file.
229              writer.append(MobUtils.createMobRefDeleteMarker(c));
230              // Write the cell to a del file
231              delFileWriter.append(c);
232              deleteMarkersCount++;
233            }
234          } else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) {
235            // If the mob file writer is null or the kv type is not put, directly write the cell
236            // to the store file.
237            writer.append(c);
238          } else if (MobUtils.isMobReferenceCell(c)) {
239            if (MobUtils.hasValidMobRefCellValue(c)) {
240              int size = MobUtils.getMobValueLength(c);
241              if (size > mobSizeThreshold) {
242                // If the value size is larger than the threshold, it's regarded as a mob. Since
243                // its value is already in the mob file, directly write this cell to the store file
244                writer.append(c);
245              } else {
246                // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
247                // the mob cell from the mob file, and write it back to the store file. Must
248                // close the mob scanner once the life cycle finished.
249                try (MobCell mobCell = mobStore.resolve(c, false)) {
250                  if (mobCell.getCell().getValueLength() != 0) {
251                    // put the mob data back to the store file
252                    PrivateCellUtil.setSequenceId(mobCell.getCell(), c.getSequenceId());
253                    writer.append(mobCell.getCell());
254                    cellsCountCompactedFromMob++;
255                    cellsSizeCompactedFromMob += mobCell.getCell().getValueLength();
256                  } else {
257                    // If the value of a file is empty, there might be issues when retrieving,
258                    // directly write the cell to the store file, and leave it to be handled by the
259                    // next compaction.
260                    writer.append(c);
261                  }
262                }
263              }
264            } else {
265              LOG.warn("The value format of the KeyValue " + c
266                  + " is wrong, its length is less than " + Bytes.SIZEOF_INT);
267              writer.append(c);
268            }
269          } else if (c.getValueLength() <= mobSizeThreshold) {
270            //If value size of a cell is not larger than the threshold, directly write to store file
271            writer.append(c);
272          } else {
273            // If the value size of a cell is larger than the threshold, it's regarded as a mob,
274            // write this cell to a mob file, and write the path to the store file.
275            mobCells++;
276            // append the original keyValue in the mob file.
277            mobFileWriter.append(c);
278            Cell reference = MobUtils.createMobRefCell(c, fileName,
279                this.mobStore.getRefCellTags());
280            // write the cell whose value is the path of a mob file to the store file.
281            writer.append(reference);
282            cellsCountCompactedToMob++;
283            cellsSizeCompactedToMob += c.getValueLength();
284          }
285          int len = c.getSerializedSize();
286          ++progress.currentCompactedKVs;
287          progress.totalCompactedSize += len;
288          bytesWrittenProgressForShippedCall += len;
289          if (LOG.isDebugEnabled()) {
290            bytesWrittenProgressForLog += len;
291          }
292          throughputController.control(compactionName, len);
293          // check periodically to see if a system stop is requested
294          if (closeCheckSizeLimit > 0) {
295            bytesWrittenProgressForCloseCheck += len;
296            if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
297              bytesWrittenProgressForCloseCheck = 0;
298              if (!store.areWritesEnabled()) {
299                progress.cancel();
300                return false;
301              }
302            }
303          }
304          if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
305            ((ShipperListener)writer).beforeShipped();
306            kvs.shipped();
307            bytesWrittenProgressForShippedCall = 0;
308          }
309        }
310        // Log the progress of long running compactions every minute if
311        // logging at DEBUG level
312        if (LOG.isDebugEnabled()) {
313          if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
314            String rate = String.format("%.2f",
315              (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
316            LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
317              compactionName, progress, rate, throughputController);
318            lastMillis = now;
319            bytesWrittenProgressForLog = 0;
320          }
321        }
322        cells.clear();
323      } while (hasMore);
324      finished = true;
325    } catch (InterruptedException e) {
326      progress.cancel();
327      throw new InterruptedIOException(
328          "Interrupted while control throughput of compacting " + compactionName);
329    } finally {
330      // Clone last cell in the final because writer will append last cell when committing. If
331      // don't clone here and once the scanner get closed, then the memory of last cell will be
332      // released. (HBASE-22582)
333      ((ShipperListener) writer).beforeShipped();
334      throughputController.finish(compactionName);
335      if (!finished && mobFileWriter != null) {
336        abortWriter(mobFileWriter);
337      }
338      if (!finished && delFileWriter != null) {
339        abortWriter(delFileWriter);
340      }
341    }
342    if (delFileWriter != null) {
343      if (deleteMarkersCount > 0) {
344        // If the del file is not empty, commit it.
345        // If the commit fails, the compaction is re-performed again.
346        delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount);
347        delFileWriter.close();
348        mobStore.commitFile(delFileWriter.getPath(), path);
349      } else {
350        // If the del file is empty, delete it instead of committing.
351        abortWriter(delFileWriter);
352      }
353    }
354    if (mobFileWriter != null) {
355      if (mobCells > 0) {
356        // If the mob file is not empty, commit it.
357        mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
358        mobFileWriter.close();
359        mobStore.commitFile(mobFileWriter.getPath(), path);
360      } else {
361        // If the mob file is empty, delete it instead of committing.
362        abortWriter(mobFileWriter);
363      }
364    }
365    mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
366    mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
367    mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
368    mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob);
369    progress.complete();
370    return true;
371  }
372}