001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Date;
025import java.util.List;
026import java.util.Optional;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.concurrent.atomic.AtomicLong;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.PrivateCellUtil;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
038import org.apache.hadoop.hbase.regionserver.CellSink;
039import org.apache.hadoop.hbase.regionserver.HStore;
040import org.apache.hadoop.hbase.regionserver.InternalScanner;
041import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
042import org.apache.hadoop.hbase.regionserver.ScannerContext;
043import org.apache.hadoop.hbase.regionserver.ShipperListener;
044import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
045import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
046import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
047import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
049import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * This class is used for testing only. The main purpose is to emulate random failures during MOB
058 * compaction process. Example of usage:
059 *
060 * <pre>
061 * {
062 *   &#64;code
063 *   public class SomeTest {
064 *
065 *     public void initConfiguration(Configuration conf) {
066 *       conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY,
067 *         FaultyMobStoreCompactor.class.getName());
068 *       conf.setDouble("hbase.mob.compaction.fault.probability", 0.1);
069 *     }
070 *   }
071 * }
072 * </pre>
073 *
074 * @see org.apache.hadoop.hbase.mob.MobStressToolRunner on how to use and configure this class.
075 */
076@InterfaceAudience.Private
077public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
078
079  private static final Logger LOG = LoggerFactory.getLogger(FaultyMobStoreCompactor.class);
080
081  public static AtomicLong mobCounter = new AtomicLong();
082  public static AtomicLong totalFailures = new AtomicLong();
083  public static AtomicLong totalCompactions = new AtomicLong();
084  public static AtomicLong totalMajorCompactions = new AtomicLong();
085
086  static double failureProb = 0.1d;
087
088  public FaultyMobStoreCompactor(Configuration conf, HStore store) {
089    super(conf, store);
090    failureProb = conf.getDouble("hbase.mob.compaction.fault.probability", 0.1);
091  }
092
093  @Override
094  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
095    long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
096    CompactionRequestImpl request, CompactionProgress progress) throws IOException {
097
098    boolean major = request.isAllFiles();
099    totalCompactions.incrementAndGet();
100    if (major) {
101      totalMajorCompactions.incrementAndGet();
102    }
103    long bytesWrittenProgressForLog = 0;
104    long bytesWrittenProgressForShippedCall = 0;
105    // Clear old mob references
106    mobRefSet.get().clear();
107    boolean isUserRequest = userRequest.get();
108    boolean compactMOBs = major && isUserRequest;
109    boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY,
110      MobConstants.DEFAULT_MOB_DISCARD_MISS);
111
112    boolean mustFail = false;
113    if (compactMOBs) {
114      mobCounter.incrementAndGet();
115      double dv = ThreadLocalRandom.current().nextDouble();
116      if (dv < failureProb) {
117        mustFail = true;
118        totalFailures.incrementAndGet();
119      }
120    }
121
122    FileSystem fs = store.getFileSystem();
123
124    // Since scanner.next() can return 'false' but still be delivering data,
125    // we have to use a do/while loop.
126    List<Cell> cells = new ArrayList<>();
127    // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
128    long currentTime = EnvironmentEdgeManager.currentTime();
129    long lastMillis = 0;
130    if (LOG.isDebugEnabled()) {
131      lastMillis = currentTime;
132    }
133    CloseChecker closeChecker = new CloseChecker(conf, currentTime);
134    String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
135    long now = 0;
136    boolean hasMore;
137    Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
138    byte[] fileName = null;
139    StoreFileWriter mobFileWriter = null;
140    long mobCells = 0;
141    long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0;
142    long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
143    boolean finished = false;
144
145    ScannerContext scannerContext =
146      ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
147    throughputController.start(compactionName);
148    KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
149    long shippedCallSizeLimit =
150      (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
151
152    Cell mobCell = null;
153
154    long counter = 0;
155    long countFailAt = -1;
156    if (mustFail) {
157      countFailAt = ThreadLocalRandom.current().nextInt(100); // randomly fail fast
158    }
159
160    try {
161      try {
162        mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
163          major ? majorCompactionCompression : minorCompactionCompression,
164          store.getRegionInfo().getStartKey(), true);
165        fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
166      } catch (IOException e) {
167        // Bailing out
168        LOG.error("Failed to create mob writer, ", e);
169        throw e;
170      }
171      if (compactMOBs) {
172        // Add the only reference we get for compact MOB case
173        // because new store file will have only one MOB reference
174        // in this case - of newly compacted MOB file
175        mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
176      }
177      do {
178        hasMore = scanner.next(cells, scannerContext);
179        currentTime = EnvironmentEdgeManager.currentTime();
180        if (LOG.isDebugEnabled()) {
181          now = currentTime;
182        }
183        if (closeChecker.isTimeLimit(store, currentTime)) {
184          progress.cancel();
185          return false;
186        }
187        for (Cell c : cells) {
188          counter++;
189          if (compactMOBs) {
190            if (MobUtils.isMobReferenceCell(c)) {
191              if (counter == countFailAt) {
192                LOG.warn("INJECTED FAULT mobCounter={}", mobCounter.get());
193                throw new CorruptHFileException("injected fault");
194              }
195              String fName = MobUtils.getMobFileName(c);
196              // Added to support migration
197              try {
198                mobCell = mobStore.resolve(c, true, false).getCell();
199              } catch (DoNotRetryIOException e) {
200                if (
201                  discardMobMiss && e.getCause() != null
202                    && e.getCause() instanceof FileNotFoundException
203                ) {
204                  LOG.error("Missing MOB cell: file={} not found cell={}", fName, c);
205                  continue;
206                } else {
207                  throw e;
208                }
209              }
210
211              if (discardMobMiss && mobCell.getValueLength() == 0) {
212                LOG.error("Missing MOB cell value: file={} cell={}", fName, mobCell);
213                continue;
214              }
215
216              if (mobCell.getValueLength() > mobSizeThreshold) {
217                // put the mob data back to the store file
218                PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
219                mobFileWriter.append(mobCell);
220                writer.append(
221                  MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags()));
222                mobCells++;
223              } else {
224                // If MOB value is less than threshold, append it directly to a store file
225                PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
226                writer.append(mobCell);
227                cellsCountCompactedFromMob++;
228                cellsSizeCompactedFromMob += mobCell.getValueLength();
229              }
230            } else {
231              // Not a MOB reference cell
232              int size = c.getValueLength();
233              if (size > mobSizeThreshold) {
234                mobFileWriter.append(c);
235                writer
236                  .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()));
237                mobCells++;
238                cellsCountCompactedToMob++;
239                cellsSizeCompactedToMob += c.getValueLength();
240              } else {
241                writer.append(c);
242              }
243            }
244          } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) {
245            // Not a major compaction or major with MOB disabled
246            // If the kv type is not put, directly write the cell
247            // to the store file.
248            writer.append(c);
249          } else if (MobUtils.isMobReferenceCell(c)) {
250            // Not a major MOB compaction, Put MOB reference
251            if (MobUtils.hasValidMobRefCellValue(c)) {
252              int size = MobUtils.getMobValueLength(c);
253              if (size > mobSizeThreshold) {
254                // If the value size is larger than the threshold, it's regarded as a mob. Since
255                // its value is already in the mob file, directly write this cell to the store file
256                Optional<TableName> refTable = MobUtils.getTableName(c);
257                if (refTable.isPresent()) {
258                  mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
259                  writer.append(c);
260                } else {
261                  throw new IOException(String.format("MOB cell did not contain a tablename "
262                    + "tag. should not be possible. see ref guide on mob troubleshooting. "
263                    + "store=%s cell=%s", getStoreInfo(), c));
264                }
265              } else {
266                // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
267                // the mob cell from the mob file, and write it back to the store file.
268                mobCell = mobStore.resolve(c, true, false).getCell();
269                if (mobCell.getValueLength() != 0) {
270                  // put the mob data back to the store file
271                  PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
272                  writer.append(mobCell);
273                  cellsCountCompactedFromMob++;
274                  cellsSizeCompactedFromMob += mobCell.getValueLength();
275                } else {
276                  // If the value of a file is empty, there might be issues when retrieving,
277                  // directly write the cell to the store file, and leave it to be handled by the
278                  // next compaction.
279                  LOG.error("Empty value for: " + c);
280                  Optional<TableName> refTable = MobUtils.getTableName(c);
281                  if (refTable.isPresent()) {
282                    mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
283                    writer.append(c);
284                  } else {
285                    throw new IOException(String.format("MOB cell did not contain a tablename "
286                      + "tag. should not be possible. see ref guide on mob troubleshooting. "
287                      + "store=%s cell=%s", getStoreInfo(), c));
288                  }
289                }
290              }
291            } else {
292              LOG.error("Corrupted MOB reference: {}", c);
293              writer.append(c);
294            }
295          } else if (c.getValueLength() <= mobSizeThreshold) {
296            // If the value size of a cell is not larger than the threshold, directly write it to
297            // the store file.
298            writer.append(c);
299          } else {
300            // If the value size of a cell is larger than the threshold, it's regarded as a mob,
301            // write this cell to a mob file, and write the path to the store file.
302            mobCells++;
303            // append the original keyValue in the mob file.
304            mobFileWriter.append(c);
305            Cell reference = MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags());
306            // write the cell whose value is the path of a mob file to the store file.
307            writer.append(reference);
308            cellsCountCompactedToMob++;
309            cellsSizeCompactedToMob += c.getValueLength();
310            // Add ref we get for compact MOB case
311            mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
312          }
313
314          int len = c.getSerializedSize();
315          ++progress.currentCompactedKVs;
316          progress.totalCompactedSize += len;
317          bytesWrittenProgressForShippedCall += len;
318          if (LOG.isDebugEnabled()) {
319            bytesWrittenProgressForLog += len;
320          }
321          throughputController.control(compactionName, len);
322          if (closeChecker.isSizeLimit(store, len)) {
323            progress.cancel();
324            return false;
325          }
326          if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
327            ((ShipperListener) writer).beforeShipped();
328            kvs.shipped();
329            bytesWrittenProgressForShippedCall = 0;
330          }
331        }
332        // Log the progress of long running compactions every minute if
333        // logging at DEBUG level
334        if (LOG.isDebugEnabled()) {
335          if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
336            String rate = String.format("%.2f",
337              (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
338            LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
339              compactionName, progress, rate, throughputController);
340            lastMillis = now;
341            bytesWrittenProgressForLog = 0;
342          }
343        }
344        cells.clear();
345      } while (hasMore);
346      finished = true;
347    } catch (InterruptedException e) {
348      progress.cancel();
349      throw new InterruptedIOException(
350        "Interrupted while control throughput of compacting " + compactionName);
351    } catch (FileNotFoundException e) {
352      LOG.error("MOB Stress Test FAILED, region: " + store.getRegionInfo().getEncodedName(), e);
353      System.exit(-1);
354    } catch (IOException t) {
355      LOG.error("Mob compaction failed for region: " + store.getRegionInfo().getEncodedName());
356      throw t;
357    } finally {
358      // Clone last cell in the final because writer will append last cell when committing. If
359      // don't clone here and once the scanner get closed, then the memory of last cell will be
360      // released. (HBASE-22582)
361      ((ShipperListener) writer).beforeShipped();
362      throughputController.finish(compactionName);
363      if (!finished && mobFileWriter != null) {
364        // Remove all MOB references because compaction failed
365        mobRefSet.get().clear();
366        // Abort writer
367        abortWriter(mobFileWriter);
368      }
369    }
370
371    if (mobFileWriter != null) {
372      if (mobCells > 0) {
373        // If the mob file is not empty, commit it.
374        mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
375        mobFileWriter.close();
376        mobStore.commitFile(mobFileWriter.getPath(), path);
377      } else {
378        // If the mob file is empty, delete it instead of committing.
379        abortWriter(mobFileWriter);
380      }
381    }
382    mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
383    mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
384    mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
385    mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob);
386    progress.complete();
387    return true;
388
389  }
390
391}