001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.monitoring.MonitoredTask;
031import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
032import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
037
038/**
039 * Stripe implementation of StoreFlusher. Flushes files either into L0 file w/o metadata, or
040 * into separate striped files, avoiding L0.
041 */
042@InterfaceAudience.Private
043public class StripeStoreFlusher extends StoreFlusher {
044  private static final Logger LOG = LoggerFactory.getLogger(StripeStoreFlusher.class);
045  private final Object flushLock = new Object();
046  private final StripeCompactionPolicy policy;
047  private final StripeCompactionPolicy.StripeInformationProvider stripes;
048
049  public StripeStoreFlusher(Configuration conf, HStore store,
050      StripeCompactionPolicy policy, StripeStoreFileManager stripes) {
051    super(conf, store);
052    this.policy = policy;
053    this.stripes = stripes;
054  }
055
056  @Override
057  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
058      MonitoredTask status, ThroughputController throughputController,
059      FlushLifeCycleTracker tracker) throws IOException {
060    List<Path> result = new ArrayList<>();
061    int cellsCount = snapshot.getCellsCount();
062    if (cellsCount == 0) return result; // don't flush if there are no entries
063
064    long smallestReadPoint = store.getSmallestReadPoint();
065    InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
066
067    // Let policy select flush method.
068    StripeFlushRequest req = this.policy.selectFlush(store.getComparator(), this.stripes,
069      cellsCount);
070
071    boolean success = false;
072    StripeMultiFileWriter mw = null;
073    try {
074      mw = req.createWriter(); // Writer according to the policy.
075      StripeMultiFileWriter.WriterFactory factory = createWriterFactory(cellsCount);
076      StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
077      mw.init(storeScanner, factory);
078
079      synchronized (flushLock) {
080        performFlush(scanner, mw, smallestReadPoint, throughputController);
081        result = mw.commitWriters(cacheFlushSeqNum, false);
082        success = true;
083      }
084    } finally {
085      if (!success && (mw != null)) {
086        for (Path leftoverFile : mw.abortWriters()) {
087          try {
088            store.getFileSystem().delete(leftoverFile, false);
089          } catch (Exception e) {
090            LOG.error("Failed to delete a file after failed flush: " + e);
091          }
092        }
093      }
094      try {
095        scanner.close();
096      } catch (IOException ex) {
097        LOG.warn("Failed to close flush scanner, ignoring", ex);
098      }
099    }
100    return result;
101  }
102
103  private StripeMultiFileWriter.WriterFactory createWriterFactory(final long kvCount) {
104    return new StripeMultiFileWriter.WriterFactory() {
105      @Override
106      public StoreFileWriter createWriter() throws IOException {
107        StoreFileWriter writer = store.createWriterInTmp(kvCount,
108            store.getColumnFamilyDescriptor().getCompressionType(), false, true, true, false);
109        return writer;
110      }
111    };
112  }
113
114  /** Stripe flush request wrapper that writes a non-striped file. */
115  public static class StripeFlushRequest {
116
117    protected final CellComparator comparator;
118
119    public StripeFlushRequest(CellComparator comparator) {
120      this.comparator = comparator;
121    }
122
123    @VisibleForTesting
124    public StripeMultiFileWriter createWriter() throws IOException {
125      StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(comparator, 1,
126          Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
127      writer.setNoStripeMetadata();
128      return writer;
129    }
130  }
131
132  /** Stripe flush request wrapper based on boundaries. */
133  public static class BoundaryStripeFlushRequest extends StripeFlushRequest {
134    private final List<byte[]> targetBoundaries;
135
136    /** @param targetBoundaries New files should be written with these boundaries. */
137    public BoundaryStripeFlushRequest(CellComparator comparator, List<byte[]> targetBoundaries) {
138      super(comparator);
139      this.targetBoundaries = targetBoundaries;
140    }
141
142    @Override
143    public StripeMultiFileWriter createWriter() throws IOException {
144      return new StripeMultiFileWriter.BoundaryMultiWriter(comparator, targetBoundaries, null,
145          null);
146    }
147  }
148
149  /** Stripe flush request wrapper based on size. */
150  public static class SizeStripeFlushRequest extends StripeFlushRequest {
151    private final int targetCount;
152    private final long targetKvs;
153
154    /**
155     * @param targetCount The maximum number of stripes to flush into.
156     * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
157     *                  total number of kvs, all the overflow data goes into the last stripe.
158     */
159    public SizeStripeFlushRequest(CellComparator comparator, int targetCount, long targetKvs) {
160      super(comparator);
161      this.targetCount = targetCount;
162      this.targetKvs = targetKvs;
163    }
164
165    @Override
166    public StripeMultiFileWriter createWriter() throws IOException {
167      return new StripeMultiFileWriter.SizeMultiWriter(comparator, this.targetCount, this.targetKvs,
168          OPEN_KEY, OPEN_KEY);
169    }
170  }
171}