001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.List;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellUtil;
030import org.apache.hadoop.hbase.PrivateCellUtil;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.KeyValueUtil;
033import org.apache.hadoop.hbase.regionserver.CellSink;
034import org.apache.hadoop.hbase.regionserver.HMobStore;
035import org.apache.hadoop.hbase.regionserver.HStore;
036import org.apache.hadoop.hbase.regionserver.InternalScanner;
037import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
038import org.apache.hadoop.hbase.regionserver.ScanInfo;
039import org.apache.hadoop.hbase.regionserver.ScanType;
040import org.apache.hadoop.hbase.regionserver.ScannerContext;
041import org.apache.hadoop.hbase.regionserver.ShipperListener;
042import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
043import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
044import org.apache.hadoop.hbase.regionserver.StoreScanner;
045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
046import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
047import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
049import org.apache.hadoop.hbase.security.User;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * Compact passed set of files in the mob-enabled column family.
058 */
059@InterfaceAudience.Private
060public class DefaultMobStoreCompactor extends DefaultCompactor {
061
062  private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreCompactor.class);
063  private long mobSizeThreshold;
064  private HMobStore mobStore;
065
066  private final InternalScannerFactory scannerFactory = new InternalScannerFactory() {
067
068    @Override
069    public ScanType getScanType(CompactionRequestImpl request) {
070      // retain the delete markers until they are expired.
071      return ScanType.COMPACT_RETAIN_DELETES;
072    }
073
074    @Override
075    public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
076        ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
077      return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint,
078          fd.earliestPutTs);
079    }
080  };
081
082  private final CellSinkFactory<StoreFileWriter> writerFactory =
083      new CellSinkFactory<StoreFileWriter>() {
084        @Override
085        public StoreFileWriter createWriter(InternalScanner scanner,
086            org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
087            boolean shouldDropBehind) throws IOException {
088          // make this writer with tags always because of possible new cells with tags.
089          return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, true, true,
090            shouldDropBehind);
091        }
092      };
093
094  public DefaultMobStoreCompactor(Configuration conf, HStore store) {
095    super(conf, store);
096    // The mob cells reside in the mob-enabled column family which is held by HMobStore.
097    // During the compaction, the compactor reads the cells from the mob files and
098    // probably creates new mob files. All of these operations are included in HMobStore,
099    // so we need to cast the Store to HMobStore.
100    if (!(store instanceof HMobStore)) {
101      throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
102    }
103    mobStore = (HMobStore) store;
104    mobSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold();
105  }
106
107  @Override
108  public List<Path> compact(CompactionRequestImpl request, ThroughputController throughputController,
109      User user) throws IOException {
110    return compact(request, scannerFactory, writerFactory, throughputController, user);
111  }
112
113  /**
114   * Performs compaction on a column family with the mob flag enabled.
115   * This is for when the mob threshold size has changed or if the mob
116   * column family mode has been toggled via an alter table statement.
117   * Compacts the files by the following rules.
118   * 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file.
119   * <ol>
120   * <li>
121   * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
122   * directly copy the (with mob tag) cell into the new store file.
123   * </li>
124   * <li>
125   * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into
126   * the new store file.
127   * </li>
128   * </ol>
129   * 2. If the Put cell doesn't have a reference tag.
130   * <ol>
131   * <li>
132   * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
133   * write this cell to a mob file, and write the path of this mob file to the store file.
134   * </li>
135   * <li>
136   * Otherwise, directly write this cell into the store file.
137   * </li>
138   * </ol>
139   * 3. Decide how to write a Delete cell.
140   * <ol>
141   * <li>
142   * If a Delete cell does not have a mob reference tag which means this delete marker have not
143   * been written to the mob del file, write this cell to the mob del file, and write this cell
144   * with a ref tag to a store file.
145   * </li>
146   * <li>
147   * Otherwise, directly write it to a store file.
148   * </li>
149   * </ol>
150   * After the major compaction on the normal hfiles, we have a guarantee that we have purged all
151   * deleted or old version mob refs, and the delete markers are written to a del file with the
152   * suffix _del. Because of this, it is safe to use the del file in the mob compaction.
153   * The mob compaction doesn't take place in the normal hfiles, it occurs directly in the
154   * mob files. When the small mob files are merged into bigger ones, the del file is added into
155   * the scanner to filter the deleted cells.
156   * @param fd File details
157   * @param scanner Where to read from.
158   * @param writer Where to write to.
159   * @param smallestReadPoint Smallest read point.
160   * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
161   * @param throughputController The compaction throughput controller.
162   * @param major Is a major compaction.
163   * @param numofFilesToCompact the number of files to compact
164   * @return Whether compaction ended; false if it was interrupted for any reason.
165   */
166  @Override
167  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
168      long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
169      boolean major, int numofFilesToCompact) throws IOException {
170    long bytesWrittenProgressForCloseCheck = 0;
171    long bytesWrittenProgressForLog = 0;
172    long bytesWrittenProgressForShippedCall = 0;
173    // Since scanner.next() can return 'false' but still be delivering data,
174    // we have to use a do/while loop.
175    List<Cell> cells = new ArrayList<>();
176    // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
177    int closeCheckSizeLimit = HStore.getCloseCheckInterval();
178    long lastMillis = 0;
179    if (LOG.isDebugEnabled()) {
180      lastMillis = EnvironmentEdgeManager.currentTime();
181    }
182    String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
183    long now = 0;
184    boolean hasMore;
185    Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
186    byte[] fileName = null;
187    StoreFileWriter mobFileWriter = null, delFileWriter = null;
188    long mobCells = 0, deleteMarkersCount = 0;
189    long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0;
190    long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
191    boolean finished = false;
192    ScannerContext scannerContext =
193        ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
194    throughputController.start(compactionName);
195    KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null;
196    long shippedCallSizeLimit = (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
197    try {
198      try {
199        // If the mob file writer could not be created, directly write the cell to the store file.
200        mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
201          compactionCompression, store.getRegionInfo().getStartKey(), true);
202        fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
203      } catch (IOException e) {
204        LOG.warn("Failed to create mob writer, "
205               + "we will continue the compaction by writing MOB cells directly in store files", e);
206      }
207      if (major) {
208        try {
209          delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs),
210            fd.maxKeyCount, compactionCompression, store.getRegionInfo().getStartKey());
211        } catch (IOException e) {
212          LOG.warn(
213            "Failed to create del writer, "
214            + "we will continue the compaction by writing delete markers directly in store files",
215            e);
216        }
217      }
218      do {
219        hasMore = scanner.next(cells, scannerContext);
220        if (LOG.isDebugEnabled()) {
221          now = EnvironmentEdgeManager.currentTime();
222        }
223        for (Cell c : cells) {
224          if (major && CellUtil.isDelete(c)) {
225            if (MobUtils.isMobReferenceCell(c) || delFileWriter == null) {
226              // Directly write it to a store file
227              writer.append(c);
228            } else {
229              // Add a ref tag to this cell and write it to a store file.
230              writer.append(MobUtils.createMobRefDeleteMarker(c));
231              // Write the cell to a del file
232              delFileWriter.append(c);
233              deleteMarkersCount++;
234            }
235          } else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) {
236            // If the mob file writer is null or the kv type is not put, directly write the cell
237            // to the store file.
238            writer.append(c);
239          } else if (MobUtils.isMobReferenceCell(c)) {
240            if (MobUtils.hasValidMobRefCellValue(c)) {
241              int size = MobUtils.getMobValueLength(c);
242              if (size > mobSizeThreshold) {
243                // If the value size is larger than the threshold, it's regarded as a mob. Since
244                // its value is already in the mob file, directly write this cell to the store file
245                writer.append(c);
246              } else {
247                // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
248                // the mob cell from the mob file, and write it back to the store file.
249                Cell mobCell = mobStore.resolve(c, false);
250                if (mobCell.getValueLength() != 0) {
251                  // put the mob data back to the store file
252                  PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
253                  writer.append(mobCell);
254                  cellsCountCompactedFromMob++;
255                  cellsSizeCompactedFromMob += mobCell.getValueLength();
256                } else {
257                  // If the value of a file is empty, there might be issues when retrieving,
258                  // directly write the cell to the store file, and leave it to be handled by the
259                  // next compaction.
260                  writer.append(c);
261                }
262              }
263            } else {
264              LOG.warn("The value format of the KeyValue " + c
265                  + " is wrong, its length is less than " + Bytes.SIZEOF_INT);
266              writer.append(c);
267            }
268          } else if (c.getValueLength() <= mobSizeThreshold) {
269            //If value size of a cell is not larger than the threshold, directly write to store file
270            writer.append(c);
271          } else {
272            // If the value size of a cell is larger than the threshold, it's regarded as a mob,
273            // write this cell to a mob file, and write the path to the store file.
274            mobCells++;
275            // append the original keyValue in the mob file.
276            mobFileWriter.append(c);
277            Cell reference = MobUtils.createMobRefCell(c, fileName,
278                this.mobStore.getRefCellTags());
279            // write the cell whose value is the path of a mob file to the store file.
280            writer.append(reference);
281            cellsCountCompactedToMob++;
282            cellsSizeCompactedToMob += c.getValueLength();
283          }
284          int len = KeyValueUtil.length(c);
285          ++progress.currentCompactedKVs;
286          progress.totalCompactedSize += len;
287          bytesWrittenProgressForShippedCall += len;
288          if (LOG.isDebugEnabled()) {
289            bytesWrittenProgressForLog += len;
290          }
291          throughputController.control(compactionName, len);
292          // check periodically to see if a system stop is requested
293          if (closeCheckSizeLimit > 0) {
294            bytesWrittenProgressForCloseCheck += len;
295            if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
296              bytesWrittenProgressForCloseCheck = 0;
297              if (!store.areWritesEnabled()) {
298                progress.cancel();
299                return false;
300              }
301            }
302          }
303          if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
304            ((ShipperListener)writer).beforeShipped();
305            kvs.shipped();
306            bytesWrittenProgressForShippedCall = 0;
307          }
308        }
309        // Log the progress of long running compactions every minute if
310        // logging at DEBUG level
311        if (LOG.isDebugEnabled()) {
312          if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
313            String rate = String.format("%.2f",
314              (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
315            LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
316              compactionName, progress, rate, throughputController);
317            lastMillis = now;
318            bytesWrittenProgressForLog = 0;
319          }
320        }
321        cells.clear();
322      } while (hasMore);
323      finished = true;
324    } catch (InterruptedException e) {
325      progress.cancel();
326      throw new InterruptedIOException(
327          "Interrupted while control throughput of compacting " + compactionName);
328    } finally {
329      // Clone last cell in the final because writer will append last cell when committing. If
330      // don't clone here and once the scanner get closed, then the memory of last cell will be
331      // released. (HBASE-22582)
332      ((ShipperListener) writer).beforeShipped();
333      throughputController.finish(compactionName);
334      if (!finished && mobFileWriter != null) {
335        abortWriter(mobFileWriter);
336      }
337      if (!finished && delFileWriter != null) {
338        abortWriter(delFileWriter);
339      }
340    }
341    if (delFileWriter != null) {
342      if (deleteMarkersCount > 0) {
343        // If the del file is not empty, commit it.
344        // If the commit fails, the compaction is re-performed again.
345        delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount);
346        delFileWriter.close();
347        mobStore.commitFile(delFileWriter.getPath(), path);
348      } else {
349        // If the del file is empty, delete it instead of committing.
350        abortWriter(delFileWriter);
351      }
352    }
353    if (mobFileWriter != null) {
354      if (mobCells > 0) {
355        // If the mob file is not empty, commit it.
356        mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
357        mobFileWriter.close();
358        mobStore.commitFile(mobFileWriter.getPath(), path);
359      } else {
360        // If the mob file is empty, delete it instead of committing.
361        abortWriter(mobFileWriter);
362      }
363    }
364    mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
365    mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
366    mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
367    mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob);
368    progress.complete();
369    return true;
370  }
371}