001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Date;
025import java.util.List;
026import java.util.Optional;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.concurrent.atomic.AtomicLong;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.PrivateCellUtil;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
038import org.apache.hadoop.hbase.regionserver.CellSink;
039import org.apache.hadoop.hbase.regionserver.HStore;
040import org.apache.hadoop.hbase.regionserver.InternalScanner;
041import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
042import org.apache.hadoop.hbase.regionserver.ScannerContext;
043import org.apache.hadoop.hbase.regionserver.ShipperListener;
044import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
045import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
046import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
047import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
049import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * This class is used for testing only. The main purpose is to emulate random failures during MOB
058 * compaction process. Example of usage:
059 *
060 * <pre>
061 * {
062 *   &#64;code
063 *   public class SomeTest {
064 *
065 *     public void initConfiguration(Configuration conf) {
066 *       conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY,
067 *         FaultyMobStoreCompactor.class.getName());
068 *       conf.setDouble("hbase.mob.compaction.fault.probability", 0.1);
069 *     }
070 *   }
071 * }
072 * </pre>
073 *
074 * @see org.apache.hadoop.hbase.mob.MobStressToolRunner on how to use and configure this class.
075 */
076@InterfaceAudience.Private
077public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
078
079  private static final Logger LOG = LoggerFactory.getLogger(FaultyMobStoreCompactor.class);
080
081  public static AtomicLong mobCounter = new AtomicLong();
082  public static AtomicLong totalFailures = new AtomicLong();
083  public static AtomicLong totalCompactions = new AtomicLong();
084  public static AtomicLong totalMajorCompactions = new AtomicLong();
085
086  static double failureProb = 0.1d;
087
088  public FaultyMobStoreCompactor(Configuration conf, HStore store) {
089    super(conf, store);
090    failureProb = conf.getDouble("hbase.mob.compaction.fault.probability", 0.1);
091  }
092
093  @Override
094  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
095    long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
096    CompactionRequestImpl request, CompactionProgress progress) throws IOException {
097
098    boolean major = request.isAllFiles();
099    totalCompactions.incrementAndGet();
100    if (major) {
101      totalMajorCompactions.incrementAndGet();
102    }
103    long bytesWrittenProgressForLog = 0;
104    long bytesWrittenProgressForShippedCall = 0;
105    // Clear old mob references
106    mobRefSet.get().clear();
107    boolean isUserRequest = userRequest.get();
108    boolean compactMOBs = major && isUserRequest;
109    boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY,
110      MobConstants.DEFAULT_MOB_DISCARD_MISS);
111
112    boolean mustFail = false;
113    if (compactMOBs) {
114      mobCounter.incrementAndGet();
115      double dv = ThreadLocalRandom.current().nextDouble();
116      if (dv < failureProb) {
117        mustFail = true;
118        totalFailures.incrementAndGet();
119      }
120    }
121
122    FileSystem fs = store.getFileSystem();
123
124    // Since scanner.next() can return 'false' but still be delivering data,
125    // we have to use a do/while loop.
126    List<Cell> cells = new ArrayList<>();
127    // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
128    long currentTime = EnvironmentEdgeManager.currentTime();
129    long lastMillis = 0;
130    if (LOG.isDebugEnabled()) {
131      lastMillis = currentTime;
132    }
133    CloseChecker closeChecker = new CloseChecker(conf, currentTime);
134    String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
135    long now = 0;
136    boolean hasMore;
137    Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
138    byte[] fileName = null;
139    StoreFileWriter mobFileWriter = null;
140    long mobCells = 0;
141    long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0;
142    long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
143    boolean finished = false;
144
145    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
146      .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
147        compactScannerSizeLimit)
148      .build();
149    throughputController.start(compactionName);
150    KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
151    long shippedCallSizeLimit =
152      (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
153
154    Cell mobCell = null;
155
156    long counter = 0;
157    long countFailAt = -1;
158    if (mustFail) {
159      countFailAt = ThreadLocalRandom.current().nextInt(100); // randomly fail fast
160    }
161
162    try {
163      try {
164        mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
165          major ? majorCompactionCompression : minorCompactionCompression,
166          store.getRegionInfo().getStartKey(), true);
167        fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
168      } catch (IOException e) {
169        // Bailing out
170        LOG.error("Failed to create mob writer, ", e);
171        throw e;
172      }
173      if (compactMOBs) {
174        // Add the only reference we get for compact MOB case
175        // because new store file will have only one MOB reference
176        // in this case - of newly compacted MOB file
177        mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
178      }
179      do {
180        hasMore = scanner.next(cells, scannerContext);
181        currentTime = EnvironmentEdgeManager.currentTime();
182        if (LOG.isDebugEnabled()) {
183          now = currentTime;
184        }
185        if (closeChecker.isTimeLimit(store, currentTime)) {
186          progress.cancel();
187          return false;
188        }
189        for (Cell c : cells) {
190          counter++;
191          if (compactMOBs) {
192            if (MobUtils.isMobReferenceCell(c)) {
193              if (counter == countFailAt) {
194                LOG.warn("INJECTED FAULT mobCounter={}", mobCounter.get());
195                throw new CorruptHFileException("injected fault");
196              }
197              String fName = MobUtils.getMobFileName(c);
198              // Added to support migration
199              try {
200                mobCell = mobStore.resolve(c, true, false).getCell();
201              } catch (DoNotRetryIOException e) {
202                if (
203                  discardMobMiss && e.getCause() != null
204                    && e.getCause() instanceof FileNotFoundException
205                ) {
206                  LOG.error("Missing MOB cell: file={} not found cell={}", fName, c);
207                  continue;
208                } else {
209                  throw e;
210                }
211              }
212
213              if (discardMobMiss && mobCell.getValueLength() == 0) {
214                LOG.error("Missing MOB cell value: file={} cell={}", fName, mobCell);
215                continue;
216              }
217
218              if (mobCell.getValueLength() > mobSizeThreshold) {
219                // put the mob data back to the store file
220                PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
221                mobFileWriter.append(mobCell);
222                writer.append(
223                  MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags()));
224                mobCells++;
225              } else {
226                // If MOB value is less than threshold, append it directly to a store file
227                PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
228                writer.append(mobCell);
229                cellsCountCompactedFromMob++;
230                cellsSizeCompactedFromMob += mobCell.getValueLength();
231              }
232            } else {
233              // Not a MOB reference cell
234              int size = c.getValueLength();
235              if (size > mobSizeThreshold) {
236                mobFileWriter.append(c);
237                writer
238                  .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()));
239                mobCells++;
240                cellsCountCompactedToMob++;
241                cellsSizeCompactedToMob += c.getValueLength();
242              } else {
243                writer.append(c);
244              }
245            }
246          } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) {
247            // Not a major compaction or major with MOB disabled
248            // If the kv type is not put, directly write the cell
249            // to the store file.
250            writer.append(c);
251          } else if (MobUtils.isMobReferenceCell(c)) {
252            // Not a major MOB compaction, Put MOB reference
253            if (MobUtils.hasValidMobRefCellValue(c)) {
254              int size = MobUtils.getMobValueLength(c);
255              if (size > mobSizeThreshold) {
256                // If the value size is larger than the threshold, it's regarded as a mob. Since
257                // its value is already in the mob file, directly write this cell to the store file
258                Optional<TableName> refTable = MobUtils.getTableName(c);
259                if (refTable.isPresent()) {
260                  mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
261                  writer.append(c);
262                } else {
263                  throw new IOException(String.format("MOB cell did not contain a tablename "
264                    + "tag. should not be possible. see ref guide on mob troubleshooting. "
265                    + "store=%s cell=%s", getStoreInfo(), c));
266                }
267              } else {
268                // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
269                // the mob cell from the mob file, and write it back to the store file.
270                mobCell = mobStore.resolve(c, true, false).getCell();
271                if (mobCell.getValueLength() != 0) {
272                  // put the mob data back to the store file
273                  PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
274                  writer.append(mobCell);
275                  cellsCountCompactedFromMob++;
276                  cellsSizeCompactedFromMob += mobCell.getValueLength();
277                } else {
278                  // If the value of a file is empty, there might be issues when retrieving,
279                  // directly write the cell to the store file, and leave it to be handled by the
280                  // next compaction.
281                  LOG.error("Empty value for: " + c);
282                  Optional<TableName> refTable = MobUtils.getTableName(c);
283                  if (refTable.isPresent()) {
284                    mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
285                    writer.append(c);
286                  } else {
287                    throw new IOException(String.format("MOB cell did not contain a tablename "
288                      + "tag. should not be possible. see ref guide on mob troubleshooting. "
289                      + "store=%s cell=%s", getStoreInfo(), c));
290                  }
291                }
292              }
293            } else {
294              LOG.error("Corrupted MOB reference: {}", c);
295              writer.append(c);
296            }
297          } else if (c.getValueLength() <= mobSizeThreshold) {
298            // If the value size of a cell is not larger than the threshold, directly write it to
299            // the store file.
300            writer.append(c);
301          } else {
302            // If the value size of a cell is larger than the threshold, it's regarded as a mob,
303            // write this cell to a mob file, and write the path to the store file.
304            mobCells++;
305            // append the original keyValue in the mob file.
306            mobFileWriter.append(c);
307            Cell reference = MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags());
308            // write the cell whose value is the path of a mob file to the store file.
309            writer.append(reference);
310            cellsCountCompactedToMob++;
311            cellsSizeCompactedToMob += c.getValueLength();
312            // Add ref we get for compact MOB case
313            mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
314          }
315
316          int len = c.getSerializedSize();
317          ++progress.currentCompactedKVs;
318          progress.totalCompactedSize += len;
319          bytesWrittenProgressForShippedCall += len;
320          if (LOG.isDebugEnabled()) {
321            bytesWrittenProgressForLog += len;
322          }
323          throughputController.control(compactionName, len);
324          if (closeChecker.isSizeLimit(store, len)) {
325            progress.cancel();
326            return false;
327          }
328          if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
329            ((ShipperListener) writer).beforeShipped();
330            kvs.shipped();
331            bytesWrittenProgressForShippedCall = 0;
332          }
333        }
334        // Log the progress of long running compactions every minute if
335        // logging at DEBUG level
336        if (LOG.isDebugEnabled()) {
337          if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
338            String rate = String.format("%.2f",
339              (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
340            LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
341              compactionName, progress, rate, throughputController);
342            lastMillis = now;
343            bytesWrittenProgressForLog = 0;
344          }
345        }
346        cells.clear();
347      } while (hasMore);
348      finished = true;
349    } catch (InterruptedException e) {
350      progress.cancel();
351      throw new InterruptedIOException(
352        "Interrupted while control throughput of compacting " + compactionName);
353    } catch (FileNotFoundException e) {
354      LOG.error("MOB Stress Test FAILED, region: " + store.getRegionInfo().getEncodedName(), e);
355      System.exit(-1);
356    } catch (IOException t) {
357      LOG.error("Mob compaction failed for region: " + store.getRegionInfo().getEncodedName());
358      throw t;
359    } finally {
360      // Clone last cell in the final because writer will append last cell when committing. If
361      // don't clone here and once the scanner get closed, then the memory of last cell will be
362      // released. (HBASE-22582)
363      ((ShipperListener) writer).beforeShipped();
364      throughputController.finish(compactionName);
365      if (!finished && mobFileWriter != null) {
366        // Remove all MOB references because compaction failed
367        mobRefSet.get().clear();
368        // Abort writer
369        abortWriter(mobFileWriter);
370      }
371    }
372
373    if (mobFileWriter != null) {
374      if (mobCells > 0) {
375        // If the mob file is not empty, commit it.
376        mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
377        mobFileWriter.close();
378        mobStore.commitFile(mobFileWriter.getPath(), path);
379      } else {
380        // If the mob file is empty, delete it instead of committing.
381        abortWriter(mobFileWriter);
382      }
383    }
384    mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
385    mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
386    mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
387    mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob);
388    progress.complete();
389    return true;
390
391  }
392
393}