001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mob;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.util.ArrayList;
025import java.util.Date;
026import java.util.List;
027import java.util.Optional;
028import java.util.Random;
029import java.util.concurrent.atomic.AtomicLong;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.DoNotRetryIOException;
036import org.apache.hadoop.hbase.KeyValue;
037import org.apache.hadoop.hbase.PrivateCellUtil;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
040import org.apache.hadoop.hbase.regionserver.HStore;
041import org.apache.hadoop.hbase.regionserver.InternalScanner;
042import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
043import org.apache.hadoop.hbase.regionserver.ScannerContext;
044import org.apache.hadoop.hbase.regionserver.ShipperListener;
045import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
046import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
047import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
048import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054/**
055 * This class is used for testing only. The main purpose is to emulate
056 * random failures during MOB compaction process.
057 * Example of usage:
058 * <pre>{@code
059 * public class SomeTest {
060 *
061 *   public void initConfiguration(Configuration conf){
062 *     conf.set(MobStoreEngine.DEFAULT_MOB_COMPACTOR_CLASS_KEY,
063         FaultyMobStoreCompactor.class.getName());
064       conf.setDouble("hbase.mob.compaction.fault.probability", 0.1);
065 *   }
066 * }
067 * }</pre>
068 * @see org.apache.hadoop.hbase.mob.MobStressToolRunner on how to use and configure
069 *   this class.
070 *
071 */
072@InterfaceAudience.Private
073public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
074
075  private static final Logger LOG = LoggerFactory.getLogger(FaultyMobStoreCompactor.class);
076
077  public static AtomicLong mobCounter = new AtomicLong();
078  public static AtomicLong totalFailures = new AtomicLong();
079  public static AtomicLong totalCompactions = new AtomicLong();
080  public static AtomicLong totalMajorCompactions = new AtomicLong();
081
082  static double failureProb = 0.1d;
083  static Random rnd = new Random();
084
085  public FaultyMobStoreCompactor(Configuration conf, HStore store) {
086    super(conf, store);
087    failureProb = conf.getDouble("hbase.mob.compaction.fault.probability", 0.1);
088  }
089
090  @Override
091  protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
092      long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
093      boolean major, int numofFilesToCompact) throws IOException {
094
095    totalCompactions.incrementAndGet();
096    if (major) {
097      totalMajorCompactions.incrementAndGet();
098    }
099    long bytesWrittenProgressForLog = 0;
100    long bytesWrittenProgressForShippedCall = 0;
101    // Clear old mob references
102    mobRefSet.get().clear();
103    boolean isUserRequest = userRequest.get();
104    boolean compactMOBs = major && isUserRequest;
105    boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY,
106      MobConstants.DEFAULT_MOB_DISCARD_MISS);
107
108    boolean mustFail = false;
109    if (compactMOBs) {
110      mobCounter.incrementAndGet();
111      double dv = rnd.nextDouble();
112      if (dv < failureProb) {
113        mustFail = true;
114        totalFailures.incrementAndGet();
115      }
116    }
117
118    FileSystem fs = store.getFileSystem();
119
120    // Since scanner.next() can return 'false' but still be delivering data,
121    // we have to use a do/while loop.
122    List<Cell> cells = new ArrayList<>();
123    // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
124    long currentTime = EnvironmentEdgeManager.currentTime();
125    long lastMillis = 0;
126    if (LOG.isDebugEnabled()) {
127      lastMillis = currentTime;
128    }
129    CloseChecker closeChecker = new CloseChecker(conf, currentTime);
130    String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
131    long now = 0;
132    boolean hasMore;
133    Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
134    byte[] fileName = null;
135    StoreFileWriter mobFileWriter = null;
136    long mobCells = 0;
137    long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0;
138    long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
139    boolean finished = false;
140
141    ScannerContext scannerContext =
142        ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
143    throughputController.start(compactionName);
144    KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
145    long shippedCallSizeLimit =
146        (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize();
147
148    Cell mobCell = null;
149
150    long counter = 0;
151    long countFailAt = -1;
152    if (mustFail) {
153      countFailAt = rnd.nextInt(100); // randomly fail fast
154    }
155
156    try {
157      try {
158        mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
159          major ? majorCompactionCompression : minorCompactionCompression,
160          store.getRegionInfo().getStartKey(), true);
161        fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
162      } catch (IOException e) {
163        // Bailing out
164        LOG.error("Failed to create mob writer, ", e);
165        throw e;
166      }
167      if (compactMOBs) {
168        // Add the only reference we get for compact MOB case
169        // because new store file will have only one MOB reference
170        // in this case - of newly compacted MOB file
171        mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
172      }
173      do {
174        hasMore = scanner.next(cells, scannerContext);
175        currentTime = EnvironmentEdgeManager.currentTime();
176        if (LOG.isDebugEnabled()) {
177          now = currentTime;
178        }
179        if (closeChecker.isTimeLimit(store, currentTime)) {
180          progress.cancel();
181          return false;
182        }
183        for (Cell c : cells) {
184          counter++;
185          if (compactMOBs) {
186            if (MobUtils.isMobReferenceCell(c)) {
187              if (counter == countFailAt) {
188                LOG.warn("INJECTED FAULT mobCounter={}", mobCounter.get());
189                throw new CorruptHFileException("injected fault");
190              }
191              String fName = MobUtils.getMobFileName(c);
192              // Added to support migration
193              try {
194                mobCell = mobStore.resolve(c, true, false).getCell();
195              } catch (DoNotRetryIOException e) {
196                if (discardMobMiss && e.getCause() != null
197                  && e.getCause() instanceof FileNotFoundException) {
198                  LOG.error("Missing MOB cell: file={} not found cell={}", fName, c);
199                  continue;
200                } else {
201                  throw e;
202                }
203              }
204
205              if (discardMobMiss && mobCell.getValueLength() == 0) {
206                LOG.error("Missing MOB cell value: file={} cell={}", fName, mobCell);
207                continue;
208              }
209
210              if (mobCell.getValueLength() > mobSizeThreshold) {
211                // put the mob data back to the store file
212                PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
213                mobFileWriter.append(mobCell);
214                writer.append(
215                  MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags()));
216                mobCells++;
217              } else {
218                // If MOB value is less than threshold, append it directly to a store file
219                PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
220                writer.append(mobCell);
221                cellsCountCompactedFromMob++;
222                cellsSizeCompactedFromMob += mobCell.getValueLength();
223              }
224            } else {
225              // Not a MOB reference cell
226              int size = c.getValueLength();
227              if (size > mobSizeThreshold) {
228                mobFileWriter.append(c);
229                writer
230                    .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()));
231                mobCells++;
232                cellsCountCompactedToMob++;
233                cellsSizeCompactedToMob += c.getValueLength();
234              } else {
235                writer.append(c);
236              }
237            }
238          } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) {
239            // Not a major compaction or major with MOB disabled
240            // If the kv type is not put, directly write the cell
241            // to the store file.
242            writer.append(c);
243          } else if (MobUtils.isMobReferenceCell(c)) {
244            // Not a major MOB compaction, Put MOB reference
245            if (MobUtils.hasValidMobRefCellValue(c)) {
246              int size = MobUtils.getMobValueLength(c);
247              if (size > mobSizeThreshold) {
248                // If the value size is larger than the threshold, it's regarded as a mob. Since
249                // its value is already in the mob file, directly write this cell to the store file
250                Optional<TableName> refTable = MobUtils.getTableName(c);
251                if (refTable.isPresent()) {
252                  mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
253                  writer.append(c);
254                } else {
255                  throw new IOException(String.format("MOB cell did not contain a tablename " +
256                      "tag. should not be possible. see ref guide on mob troubleshooting. " +
257                      "store=%s cell=%s", getStoreInfo(), c));
258                }
259              } else {
260                // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
261                // the mob cell from the mob file, and write it back to the store file.
262                mobCell = mobStore.resolve(c, true, false).getCell();
263                if (mobCell.getValueLength() != 0) {
264                  // put the mob data back to the store file
265                  PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
266                  writer.append(mobCell);
267                  cellsCountCompactedFromMob++;
268                  cellsSizeCompactedFromMob += mobCell.getValueLength();
269                } else {
270                  // If the value of a file is empty, there might be issues when retrieving,
271                  // directly write the cell to the store file, and leave it to be handled by the
272                  // next compaction.
273                  LOG.error("Empty value for: " + c);
274                  Optional<TableName> refTable = MobUtils.getTableName(c);
275                  if (refTable.isPresent()) {
276                    mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
277                    writer.append(c);
278                  } else {
279                    throw new IOException(String.format("MOB cell did not contain a tablename " +
280                        "tag. should not be possible. see ref guide on mob troubleshooting. " +
281                        "store=%s cell=%s", getStoreInfo(), c));
282                  }
283                }
284              }
285            } else {
286              LOG.error("Corrupted MOB reference: {}", c);
287              writer.append(c);
288            }
289          } else if (c.getValueLength() <= mobSizeThreshold) {
290            // If the value size of a cell is not larger than the threshold, directly write it to
291            // the store file.
292            writer.append(c);
293          } else {
294            // If the value size of a cell is larger than the threshold, it's regarded as a mob,
295            // write this cell to a mob file, and write the path to the store file.
296            mobCells++;
297            // append the original keyValue in the mob file.
298            mobFileWriter.append(c);
299            Cell reference = MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags());
300            // write the cell whose value is the path of a mob file to the store file.
301            writer.append(reference);
302            cellsCountCompactedToMob++;
303            cellsSizeCompactedToMob += c.getValueLength();
304            // Add ref we get for compact MOB case
305            mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
306          }
307
308          int len = c.getSerializedSize();
309          ++progress.currentCompactedKVs;
310          progress.totalCompactedSize += len;
311          bytesWrittenProgressForShippedCall += len;
312          if (LOG.isDebugEnabled()) {
313            bytesWrittenProgressForLog += len;
314          }
315          throughputController.control(compactionName, len);
316          if (closeChecker.isSizeLimit(store, len)) {
317            progress.cancel();
318            return false;
319          }
320          if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
321            ((ShipperListener) writer).beforeShipped();
322            kvs.shipped();
323            bytesWrittenProgressForShippedCall = 0;
324          }
325        }
326        // Log the progress of long running compactions every minute if
327        // logging at DEBUG level
328        if (LOG.isDebugEnabled()) {
329          if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
330            String rate = String.format("%.2f",
331              (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
332            LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
333              compactionName, progress, rate, throughputController);
334            lastMillis = now;
335            bytesWrittenProgressForLog = 0;
336          }
337        }
338        cells.clear();
339      } while (hasMore);
340      finished = true;
341    } catch (InterruptedException e) {
342      progress.cancel();
343      throw new InterruptedIOException(
344          "Interrupted while control throughput of compacting " + compactionName);
345    } catch (FileNotFoundException e) {
346      LOG.error("MOB Stress Test FAILED, region: " + store.getRegionInfo().getEncodedName(), e);
347      System.exit(-1);
348    } catch (IOException t) {
349      LOG.error("Mob compaction failed for region: " + store.getRegionInfo().getEncodedName());
350      throw t;
351    } finally {
352      // Clone last cell in the final because writer will append last cell when committing. If
353      // don't clone here and once the scanner get closed, then the memory of last cell will be
354      // released. (HBASE-22582)
355      ((ShipperListener) writer).beforeShipped();
356      throughputController.finish(compactionName);
357      if (!finished && mobFileWriter != null) {
358        // Remove all MOB references because compaction failed
359        mobRefSet.get().clear();
360        // Abort writer
361        abortWriter(mobFileWriter);
362      }
363    }
364
365    if (mobFileWriter != null) {
366      if (mobCells > 0) {
367        // If the mob file is not empty, commit it.
368        mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
369        mobFileWriter.close();
370        mobStore.commitFile(mobFileWriter.getPath(), path);
371      } else {
372        // If the mob file is empty, delete it instead of committing.
373        abortWriter(mobFileWriter);
374      }
375    }
376    mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
377    mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
378    mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
379    mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob);
380    progress.complete();
381    return true;
382
383  }
384
385}