001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.List;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellUtil;
030import org.apache.hadoop.hbase.PrivateCellUtil;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.regionserver.CellSink;
033import org.apache.hadoop.hbase.regionserver.HMobStore;
034import org.apache.hadoop.hbase.regionserver.HStore;
035import org.apache.hadoop.hbase.regionserver.InternalScanner;
036import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
037import org.apache.hadoop.hbase.regionserver.ScanInfo;
038import org.apache.hadoop.hbase.regionserver.ScanType;
039import org.apache.hadoop.hbase.regionserver.ScannerContext;
040import org.apache.hadoop.hbase.regionserver.ShipperListener;
041import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
042import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
043import org.apache.hadoop.hbase.regionserver.StoreScanner;
044import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
045import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
046import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
047import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
048import org.apache.hadoop.hbase.security.User;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Compact passed set of files in the mob-enabled column family.
057 */
058@InterfaceAudience.Private
059public class DefaultMobStoreCompactor extends DefaultCompactor {
060
061  private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreCompactor.class);
062  private long mobSizeThreshold;
063  private HMobStore mobStore;
064
065  private final InternalScannerFactory scannerFactory = new InternalScannerFactory() {
066
067    @Override
068    public ScanType getScanType(CompactionRequestImpl request) {
069      // retain the delete markers until they are expired.
070      return ScanType.COMPACT_RETAIN_DELETES;
071    }
072
073    @Override
074    public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
075        ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
076      return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint,
077          fd.earliestPutTs);
078    }
079  };
080
081  private final CellSinkFactory<StoreFileWriter> writerFactory =
082      new CellSinkFactory<StoreFileWriter>() {
083        @Override
084        public StoreFileWriter createWriter(InternalScanner scanner,
085            org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
086            boolean shouldDropBehind) throws IOException {
087          // make this writer with tags always because of possible new cells with tags.
088          return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, true, true,
089            shouldDropBehind);
090        }
091      };
092
093  public DefaultMobStoreCompactor(Configuration conf, HStore store) {
094    super(conf, store);
095    // The mob cells reside in the mob-enabled column family which is held by HMobStore.
096    // During the compaction, the compactor reads the cells from the mob files and
097    // probably creates new mob files. All of these operations are included in HMobStore,
098    // so we need to cast the Store to HMobStore.
099    if (!(store instanceof HMobStore)) {
100      throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
101    }
102    mobStore = (HMobStore) store;
103    mobSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold();
104  }
105
106  @Override
107  public List<Path> compact(CompactionRequestImpl request, ThroughputController throughputController,
108      User user) throws IOException {
109    return compact(request, scannerFactory, writerFactory, throughputController, user);
110  }
111
112  /**
113   * Performs compaction on a column family with the mob flag enabled.
114   * This is for when the mob threshold size has changed or if the mob
115   * column family mode has been toggled via an alter table statement.
116   * Compacts the files by the following rules.
117   * 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file.
118   * <ol>
119   * <li>
120   * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
121   * directly copy the (with mob tag) cell into the new store file.
122   * </li>
123   * <li>
124   * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into
125   * the new store file.
126   * </li>
127   * </ol>
128   * 2. If the Put cell doesn't have a reference tag.
129   * <ol>
130   * <li>
131   * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
132   * write this cell to a mob file, and write the path of this mob file to the store file.
133   * </li>
134   * <li>
135   * Otherwise, directly write this cell into the store file.
136   * </li>
137   * </ol>
138   * 3. Decide how to write a Delete cell.
139   * <ol>
140   * <li>
141   * If a Delete cell does not have a mob reference tag which means this delete marker have not
142   * been written to the mob del file, write this cell to the mob del file, and write this cell
143   * with a ref tag to a store file.
144   * </li>
145   * <li>
146   * Otherwise, directly write it to a store file.
147   * </li>
148   * </ol>
149   * After the major compaction on the normal hfiles, we have a guarantee that we have purged all
150   * deleted or old version mob refs, and the delete markers are written to a del file with the
151   * suffix _del. Because of this, it is safe to use the del file in the mob compaction.
152   * The mob compaction doesn't take place in the normal hfiles, it occurs directly in the
153   * mob files. When the small mob files are merged into bigger ones, the del file is added into
154   * the scanner to filter the deleted cells.
155   * @param fd File details
156   * @param scanner Where to read from.
157   * @param writer Where to write to.
158   * @param smallestReadPoint Smallest read point.
159   * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
160   * @param throughputController The compaction throughput controller.
161   * @param major Is a major compaction.
162   * @param numofFilesToCompact the number of files to compact
163   * @return Whether compaction ended; false if it was interrupted for any reason.
164   */
165  @Override
166  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
167      long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
168      boolean major, int numofFilesToCompact) throws IOException {
169    long bytesWrittenProgressForCloseCheck = 0;
170    long bytesWrittenProgressForLog = 0;
171    long bytesWrittenProgressForShippedCall = 0;
172    // Since scanner.next() can return 'false' but still be delivering data,
173    // we have to use a do/while loop.
174    List<Cell> cells = new ArrayList<>();
175    // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
176    int closeCheckSizeLimit = HStore.getCloseCheckInterval();
177    long lastMillis = 0;
178    if (LOG.isDebugEnabled()) {
179      lastMillis = EnvironmentEdgeManager.currentTime();
180    }
181    String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
182    long now = 0;
183    boolean hasMore;
184    Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
185    byte[] fileName = null;
186    StoreFileWriter mobFileWriter = null, delFileWriter = null;
187    long mobCells = 0, deleteMarkersCount = 0;
188    long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0;
189    long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
190    boolean finished = false;
191    ScannerContext scannerContext =
192        ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
193    throughputController.start(compactionName);
194    KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null;
195    long shippedCallSizeLimit = (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
196    try {
197      try {
198        // If the mob file writer could not be created, directly write the cell to the store file.
199        mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
200          compactionCompression, store.getRegionInfo().getStartKey(), true);
201        fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
202      } catch (IOException e) {
203        LOG.warn("Failed to create mob writer, "
204               + "we will continue the compaction by writing MOB cells directly in store files", e);
205      }
206      if (major) {
207        try {
208          delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs),
209            fd.maxKeyCount, compactionCompression, store.getRegionInfo().getStartKey());
210        } catch (IOException e) {
211          LOG.warn(
212            "Failed to create del writer, "
213            + "we will continue the compaction by writing delete markers directly in store files",
214            e);
215        }
216      }
217      do {
218        hasMore = scanner.next(cells, scannerContext);
219        if (LOG.isDebugEnabled()) {
220          now = EnvironmentEdgeManager.currentTime();
221        }
222        for (Cell c : cells) {
223          if (major && CellUtil.isDelete(c)) {
224            if (MobUtils.isMobReferenceCell(c) || delFileWriter == null) {
225              // Directly write it to a store file
226              writer.append(c);
227            } else {
228              // Add a ref tag to this cell and write it to a store file.
229              writer.append(MobUtils.createMobRefDeleteMarker(c));
230              // Write the cell to a del file
231              delFileWriter.append(c);
232              deleteMarkersCount++;
233            }
234          } else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) {
235            // If the mob file writer is null or the kv type is not put, directly write the cell
236            // to the store file.
237            writer.append(c);
238          } else if (MobUtils.isMobReferenceCell(c)) {
239            if (MobUtils.hasValidMobRefCellValue(c)) {
240              int size = MobUtils.getMobValueLength(c);
241              if (size > mobSizeThreshold) {
242                // If the value size is larger than the threshold, it's regarded as a mob. Since
243                // its value is already in the mob file, directly write this cell to the store file
244                writer.append(c);
245              } else {
246                // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
247                // the mob cell from the mob file, and write it back to the store file.
248                Cell mobCell = mobStore.resolve(c, false);
249                if (mobCell.getValueLength() != 0) {
250                  // put the mob data back to the store file
251                  PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
252                  writer.append(mobCell);
253                  cellsCountCompactedFromMob++;
254                  cellsSizeCompactedFromMob += mobCell.getValueLength();
255                } else {
256                  // If the value of a file is empty, there might be issues when retrieving,
257                  // directly write the cell to the store file, and leave it to be handled by the
258                  // next compaction.
259                  writer.append(c);
260                }
261              }
262            } else {
263              LOG.warn("The value format of the KeyValue " + c
264                  + " is wrong, its length is less than " + Bytes.SIZEOF_INT);
265              writer.append(c);
266            }
267          } else if (c.getValueLength() <= mobSizeThreshold) {
268            //If value size of a cell is not larger than the threshold, directly write to store file
269            writer.append(c);
270          } else {
271            // If the value size of a cell is larger than the threshold, it's regarded as a mob,
272            // write this cell to a mob file, and write the path to the store file.
273            mobCells++;
274            // append the original keyValue in the mob file.
275            mobFileWriter.append(c);
276            Cell reference = MobUtils.createMobRefCell(c, fileName,
277                this.mobStore.getRefCellTags());
278            // write the cell whose value is the path of a mob file to the store file.
279            writer.append(reference);
280            cellsCountCompactedToMob++;
281            cellsSizeCompactedToMob += c.getValueLength();
282          }
283          int len = c.getSerializedSize();
284          ++progress.currentCompactedKVs;
285          progress.totalCompactedSize += len;
286          bytesWrittenProgressForShippedCall += len;
287          if (LOG.isDebugEnabled()) {
288            bytesWrittenProgressForLog += len;
289          }
290          throughputController.control(compactionName, len);
291          // check periodically to see if a system stop is requested
292          if (closeCheckSizeLimit > 0) {
293            bytesWrittenProgressForCloseCheck += len;
294            if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
295              bytesWrittenProgressForCloseCheck = 0;
296              if (!store.areWritesEnabled()) {
297                progress.cancel();
298                return false;
299              }
300            }
301          }
302          if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
303            ((ShipperListener)writer).beforeShipped();
304            kvs.shipped();
305            bytesWrittenProgressForShippedCall = 0;
306          }
307        }
308        // Log the progress of long running compactions every minute if
309        // logging at DEBUG level
310        if (LOG.isDebugEnabled()) {
311          if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
312            String rate = String.format("%.2f",
313              (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
314            LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
315              compactionName, progress, rate, throughputController);
316            lastMillis = now;
317            bytesWrittenProgressForLog = 0;
318          }
319        }
320        cells.clear();
321      } while (hasMore);
322      finished = true;
323    } catch (InterruptedException e) {
324      progress.cancel();
325      throw new InterruptedIOException(
326          "Interrupted while control throughput of compacting " + compactionName);
327    } finally {
328      // Clone last cell in the final because writer will append last cell when committing. If
329      // don't clone here and once the scanner get closed, then the memory of last cell will be
330      // released. (HBASE-22582)
331      ((ShipperListener) writer).beforeShipped();
332      throughputController.finish(compactionName);
333      if (!finished && mobFileWriter != null) {
334        abortWriter(mobFileWriter);
335      }
336      if (!finished && delFileWriter != null) {
337        abortWriter(delFileWriter);
338      }
339    }
340    if (delFileWriter != null) {
341      if (deleteMarkersCount > 0) {
342        // If the del file is not empty, commit it.
343        // If the commit fails, the compaction is re-performed again.
344        delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount);
345        delFileWriter.close();
346        mobStore.commitFile(delFileWriter.getPath(), path);
347      } else {
348        // If the del file is empty, delete it instead of committing.
349        abortWriter(delFileWriter);
350      }
351    }
352    if (mobFileWriter != null) {
353      if (mobCells > 0) {
354        // If the mob file is not empty, commit it.
355        mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
356        mobFileWriter.close();
357        mobStore.commitFile(mobFileWriter.getPath(), path);
358      } else {
359        // If the mob file is empty, delete it instead of committing.
360        abortWriter(mobFileWriter);
361      }
362    }
363    mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
364    mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
365    mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
366    mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob);
367    progress.complete();
368    return true;
369  }
370}