001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mob;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Date;
025import java.util.List;
026import java.util.Optional;
027import java.util.Random;
028import java.util.concurrent.atomic.AtomicLong;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.PrivateCellUtil;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
036import org.apache.hadoop.hbase.regionserver.CellSink;
037import org.apache.hadoop.hbase.regionserver.HStore;
038import org.apache.hadoop.hbase.regionserver.InternalScanner;
039import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
040import org.apache.hadoop.hbase.regionserver.ScannerContext;
041import org.apache.hadoop.hbase.regionserver.ShipperListener;
042import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
043import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
044import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
045import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
046import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
047import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * This class is used for testing only. The main purpose is to emulate random failures during MOB
056 * compaction process. Example of usage:
057 *
058 * <pre>
059 * {
060 *   &#64;code
061 *   public class SomeTest {
062 *
063 *     public void initConfiguration(Configuration conf) {
064 *       conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY,
065 *         FaultyMobStoreCompactor.class.getName());
066 *       conf.setDouble("hbase.mob.compaction.fault.probability", 0.1);
067 *     }
068 *   }
069 * }
070 * </pre>
071 *
072 * @see org.apache.hadoop.hbase.mob.MobStressToolRunner on how to use and configure this class.
073 */
074@InterfaceAudience.Private
075public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
076
077  private static final Logger LOG = LoggerFactory.getLogger(FaultyMobStoreCompactor.class);
078
079  public static AtomicLong mobCounter = new AtomicLong();
080  public static AtomicLong totalFailures = new AtomicLong();
081  public static AtomicLong totalCompactions = new AtomicLong();
082  public static AtomicLong totalMajorCompactions = new AtomicLong();
083
084  static double failureProb = 0.1d;
085  static Random rnd = new Random();
086
087  public FaultyMobStoreCompactor(Configuration conf, HStore store) {
088    super(conf, store);
089    failureProb = conf.getDouble("hbase.mob.compaction.fault.probability", 0.1);
090  }
091
092  @Override
093  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
094    long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
095    CompactionRequestImpl request, CompactionProgress progress) throws IOException {
096
097    boolean major = request.isAllFiles();
098    totalCompactions.incrementAndGet();
099    if (major) {
100      totalMajorCompactions.incrementAndGet();
101    }
102    long bytesWrittenProgressForCloseCheck = 0;
103    long bytesWrittenProgressForLog = 0;
104    long bytesWrittenProgressForShippedCall = 0;
105    // Clear old mob references
106    mobRefSet.get().clear();
107    boolean isUserRequest = userRequest.get();
108    boolean compactMOBs = major && isUserRequest;
109    boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY,
110      MobConstants.DEFAULT_MOB_DISCARD_MISS);
111
112    boolean mustFail = false;
113    if (compactMOBs) {
114      mobCounter.incrementAndGet();
115      double dv = rnd.nextDouble();
116      if (dv < failureProb) {
117        mustFail = true;
118        totalFailures.incrementAndGet();
119      }
120    }
121
122    // Since scanner.next() can return 'false' but still be delivering data,
123    // we have to use a do/while loop.
124    List<Cell> cells = new ArrayList<>();
125    // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
126    int closeCheckSizeLimit =
127      conf.getInt(CloseChecker.SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */);
128    long lastMillis = 0;
129    if (LOG.isDebugEnabled()) {
130      lastMillis = EnvironmentEdgeManager.currentTime();
131    }
132    String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
133    long now = 0;
134    boolean hasMore;
135    Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
136    byte[] fileName = null;
137    StoreFileWriter mobFileWriter = null;
138    long mobCells = 0;
139    long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0;
140    long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
141    boolean finished = false;
142
143    ScannerContext scannerContext =
144      ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
145    throughputController.start(compactionName);
146    KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
147    long shippedCallSizeLimit =
148      (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
149
150    Cell mobCell = null;
151
152    long counter = 0;
153    long countFailAt = -1;
154    if (mustFail) {
155      countFailAt = rnd.nextInt(100); // randomly fail fast
156    }
157
158    try {
159      try {
160        mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
161          major ? majorCompactionCompression : minorCompactionCompression,
162          store.getRegionInfo().getStartKey(), true);
163        fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
164      } catch (IOException e) {
165        // Bailing out
166        LOG.error("Failed to create mob writer, ", e);
167        throw e;
168      }
169      if (compactMOBs) {
170        // Add the only reference we get for compact MOB case
171        // because new store file will have only one MOB reference
172        // in this case - of newly compacted MOB file
173        mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
174      }
175      do {
176        hasMore = scanner.next(cells, scannerContext);
177        if (LOG.isDebugEnabled()) {
178          now = EnvironmentEdgeManager.currentTime();
179        }
180        for (Cell c : cells) {
181          counter++;
182          if (compactMOBs) {
183            if (MobUtils.isMobReferenceCell(c)) {
184              if (counter == countFailAt) {
185                LOG.warn("INJECTED FAULT mobCounter={}", mobCounter.get());
186                throw new CorruptHFileException("injected fault");
187              }
188              String fName = MobUtils.getMobFileName(c);
189              // Added to support migration
190              try {
191                mobCell = mobStore.resolve(c, true, false).getCell();
192              } catch (FileNotFoundException fnfe) {
193                if (discardMobMiss) {
194                  LOG.error("Missing MOB cell: file={} not found", fName);
195                  continue;
196                } else {
197                  throw fnfe;
198                }
199              }
200
201              if (discardMobMiss && mobCell.getValueLength() == 0) {
202                LOG.error("Missing MOB cell value: file={} cell={}", fName, mobCell);
203                continue;
204              }
205
206              if (mobCell.getValueLength() > mobSizeThreshold) {
207                // put the mob data back to the store file
208                PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
209                mobFileWriter.append(mobCell);
210                writer.append(
211                  MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags()));
212                mobCells++;
213              } else {
214                // If MOB value is less than threshold, append it directly to a store file
215                PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
216                writer.append(mobCell);
217                cellsCountCompactedFromMob++;
218                cellsSizeCompactedFromMob += mobCell.getValueLength();
219              }
220            } else {
221              // Not a MOB reference cell
222              int size = c.getValueLength();
223              if (size > mobSizeThreshold) {
224                mobFileWriter.append(c);
225                writer
226                  .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()));
227                mobCells++;
228                cellsCountCompactedToMob++;
229                cellsSizeCompactedToMob += c.getValueLength();
230              } else {
231                writer.append(c);
232              }
233            }
234          } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) {
235            // Not a major compaction or major with MOB disabled
236            // If the kv type is not put, directly write the cell
237            // to the store file.
238            writer.append(c);
239          } else if (MobUtils.isMobReferenceCell(c)) {
240            // Not a major MOB compaction, Put MOB reference
241            if (MobUtils.hasValidMobRefCellValue(c)) {
242              int size = MobUtils.getMobValueLength(c);
243              if (size > mobSizeThreshold) {
244                // If the value size is larger than the threshold, it's regarded as a mob. Since
245                // its value is already in the mob file, directly write this cell to the store file
246                Optional<TableName> refTable = MobUtils.getTableName(c);
247                if (refTable.isPresent()) {
248                  mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
249                  writer.append(c);
250                } else {
251                  throw new IOException(String.format("MOB cell did not contain a tablename "
252                    + "tag. should not be possible. see ref guide on mob troubleshooting. "
253                    + "store=%s cell=%s", getStoreInfo(), c));
254                }
255              } else {
256                // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
257                // the mob cell from the mob file, and write it back to the store file.
258                mobCell = mobStore.resolve(c, true, false).getCell();
259                if (mobCell.getValueLength() != 0) {
260                  // put the mob data back to the store file
261                  PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
262                  writer.append(mobCell);
263                  cellsCountCompactedFromMob++;
264                  cellsSizeCompactedFromMob += mobCell.getValueLength();
265                } else {
266                  // If the value of a file is empty, there might be issues when retrieving,
267                  // directly write the cell to the store file, and leave it to be handled by the
268                  // next compaction.
269                  LOG.error("Empty value for: " + c);
270                  Optional<TableName> refTable = MobUtils.getTableName(c);
271                  if (refTable.isPresent()) {
272                    mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
273                    writer.append(c);
274                  } else {
275                    throw new IOException(String.format("MOB cell did not contain a tablename "
276                      + "tag. should not be possible. see ref guide on mob troubleshooting. "
277                      + "store=%s cell=%s", getStoreInfo(), c));
278                  }
279                }
280              }
281            } else {
282              LOG.error("Corrupted MOB reference: {}", c);
283              writer.append(c);
284            }
285          } else if (c.getValueLength() <= mobSizeThreshold) {
286            // If the value size of a cell is not larger than the threshold, directly write it to
287            // the store file.
288            writer.append(c);
289          } else {
290            // If the value size of a cell is larger than the threshold, it's regarded as a mob,
291            // write this cell to a mob file, and write the path to the store file.
292            mobCells++;
293            // append the original keyValue in the mob file.
294            mobFileWriter.append(c);
295            Cell reference = MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags());
296            // write the cell whose value is the path of a mob file to the store file.
297            writer.append(reference);
298            cellsCountCompactedToMob++;
299            cellsSizeCompactedToMob += c.getValueLength();
300            // Add ref we get for compact MOB case
301            mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
302          }
303
304          int len = c.getSerializedSize();
305          ++progress.currentCompactedKVs;
306          progress.totalCompactedSize += len;
307          bytesWrittenProgressForShippedCall += len;
308          if (LOG.isDebugEnabled()) {
309            bytesWrittenProgressForLog += len;
310          }
311          throughputController.control(compactionName, len);
312          // check periodically to see if a system stop is requested
313          if (closeCheckSizeLimit > 0) {
314            bytesWrittenProgressForCloseCheck += len;
315            if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
316              bytesWrittenProgressForCloseCheck = 0;
317              if (!store.areWritesEnabled()) {
318                progress.cancel();
319                return false;
320              }
321            }
322          }
323          if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
324            ((ShipperListener) writer).beforeShipped();
325            kvs.shipped();
326            bytesWrittenProgressForShippedCall = 0;
327          }
328        }
329        // Log the progress of long running compactions every minute if
330        // logging at DEBUG level
331        if (LOG.isDebugEnabled()) {
332          if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
333            String rate = String.format("%.2f",
334              (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
335            LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
336              compactionName, progress, rate, throughputController);
337            lastMillis = now;
338            bytesWrittenProgressForLog = 0;
339          }
340        }
341        cells.clear();
342      } while (hasMore);
343      finished = true;
344    } catch (InterruptedException e) {
345      progress.cancel();
346      throw new InterruptedIOException(
347        "Interrupted while control throughput of compacting " + compactionName);
348    } catch (FileNotFoundException e) {
349      LOG.error("MOB Stress Test FAILED, region: " + store.getRegionInfo().getEncodedName(), e);
350      System.exit(-1);
351    } catch (IOException t) {
352      LOG.error("Mob compaction failed for region: " + store.getRegionInfo().getEncodedName());
353      throw t;
354    } finally {
355      // Clone last cell in the final because writer will append last cell when committing. If
356      // don't clone here and once the scanner get closed, then the memory of last cell will be
357      // released. (HBASE-22582)
358      ((ShipperListener) writer).beforeShipped();
359      throughputController.finish(compactionName);
360      if (!finished && mobFileWriter != null) {
361        // Remove all MOB references because compaction failed
362        mobRefSet.get().clear();
363        // Abort writer
364        abortWriter(mobFileWriter);
365      }
366    }
367
368    if (mobFileWriter != null) {
369      if (mobCells > 0) {
370        // If the mob file is not empty, commit it.
371        mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
372        mobFileWriter.close();
373        mobStore.commitFile(mobFileWriter.getPath(), path);
374      } else {
375        // If the mob file is empty, delete it instead of committing.
376        abortWriter(mobFileWriter);
377      }
378    }
379    mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
380    mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
381    mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
382    mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob);
383    progress.complete();
384    return true;
385
386  }
387
388}