View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.snapshot;
19  
20  import java.io.IOException;
21  import java.util.List;
22  import java.util.concurrent.Callable;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceStability;
28  import org.apache.hadoop.hbase.client.IsolationLevel;
29  import org.apache.hadoop.hbase.errorhandling.ForeignException;
30  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
31  import org.apache.hadoop.hbase.procedure.ProcedureMember;
32  import org.apache.hadoop.hbase.procedure.Subprocedure;
33  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
34  import org.apache.hadoop.hbase.regionserver.HRegion;
35  import org.apache.hadoop.hbase.regionserver.Region;
36  import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
37  import org.apache.hadoop.hbase.regionserver.Region.Operation;
38  import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool;
39  import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
40  
41  /**
42   * This online snapshot implementation uses the distributed procedure framework to force a
43   * store flush and then records the hfiles.  Its enter stage does nothing.  Its leave stage then
44   * flushes the memstore, builds the region server's snapshot manifest from its hfiles list, and
45   * copies .regioninfos into the snapshot working directory.  At the master side, there is an atomic
46   * rename of the working dir into the proper snapshot directory.
47   */
48  @InterfaceAudience.Private
49  @InterfaceStability.Unstable
50  public class FlushSnapshotSubprocedure extends Subprocedure {
51    private static final Log LOG = LogFactory.getLog(FlushSnapshotSubprocedure.class);
52  
53    private final List<Region> regions;
54    private final SnapshotDescription snapshot;
55    private final SnapshotSubprocedurePool taskManager;
56    private boolean snapshotSkipFlush = false;
57  
58    // the maximum number of attempts we flush
59    final static int MAX_RETRIES = 3;
60  
61    public FlushSnapshotSubprocedure(ProcedureMember member,
62        ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
63        List<Region> regions, SnapshotDescription snapshot,
64        SnapshotSubprocedurePool taskManager) {
65      super(member, snapshot.getName(), errorListener, wakeFrequency, timeout);
66      this.snapshot = snapshot;
67  
68      if (this.snapshot.getType() == SnapshotDescription.Type.SKIPFLUSH) {
69        snapshotSkipFlush = true;
70      }
71      this.regions = regions;
72      this.taskManager = taskManager;
73    }
74  
75    /**
76     * Callable for adding files to snapshot manifest working dir.  Ready for multithreading.
77     */
78    private class RegionSnapshotTask implements Callable<Void> {
79      Region region;
80      RegionSnapshotTask(Region region) {
81        this.region = region;
82      }
83  
84      @Override
85      public Void call() throws Exception {
86        // Taking the region read lock prevents the individual region from being closed while a
87        // snapshot is in progress.  This is helpful but not sufficient for preventing races with
88        // snapshots that involve multiple regions and regionservers.  It is still possible to have
89        // an interleaving such that globally regions are missing, so we still need the verification
90        // step.
91        LOG.debug("Starting region operation on " + region);
92        region.startRegionOperation();
93        try {
94          if (snapshotSkipFlush) {
95          /*
96           * This is to take an online-snapshot without force a coordinated flush to prevent pause
97           * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure
98           * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be
99           * turned on/off based on the flush type.
100          * To minimized the code change, class name is not changed.
101          */
102           LOG.debug("take snapshot without flush memstore first");
103         } else {
104           LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
105           boolean succeeded = false;
106           long readPt = region.getReadpoint(IsolationLevel.READ_COMMITTED);
107           for (int i = 0; i < MAX_RETRIES; i++) {
108             FlushResult res = region.flush(true);
109             if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) {
110               if (region instanceof HRegion) {
111                 HRegion hreg = (HRegion) region;
112                 // CANNOT_FLUSH may mean that a flush is already on-going
113                 // we need to wait for that flush to complete
114                 hreg.waitForFlushes();
115               }
116               if (region.getMaxFlushedSeqId() >= readPt) {
117                 // writes at the start of the snapshot have been persisted
118                 succeeded = true;
119                 break;
120               }
121             } else {
122               succeeded = true;
123               break;
124             }
125           }
126           if (!succeeded) {
127             throw new IOException("Unable to complete flush after " + MAX_RETRIES + " attempts");
128           }
129         }
130         ((HRegion)region).addRegionToSnapshot(snapshot, monitor);
131         if (snapshotSkipFlush) {
132           LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed.");
133         } else {
134           LOG.debug("... Flush Snapshotting region " + region.toString() + " completed.");
135         }
136       } finally {
137         LOG.debug("Closing region operation on " + region);
138         region.closeRegionOperation();
139       }
140       return null;
141     }
142   }
143 
144   private void flushSnapshot() throws ForeignException {
145     if (regions.isEmpty()) {
146       // No regions on this RS, we are basically done.
147       return;
148     }
149 
150     monitor.rethrowException();
151 
152     // assert that the taskManager is empty.
153     if (taskManager.hasTasks()) {
154       throw new IllegalStateException("Attempting to take snapshot "
155           + ClientSnapshotDescriptionUtils.toString(snapshot)
156           + " but we currently have outstanding tasks");
157     }
158 
159     // Add all hfiles already existing in region.
160     for (Region region : regions) {
161       // submit one task per region for parallelize by region.
162       taskManager.submitTask(new RegionSnapshotTask(region));
163       monitor.rethrowException();
164     }
165 
166     // wait for everything to complete.
167     LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions");
168     try {
169       taskManager.waitForOutstandingTasks();
170     } catch (InterruptedException e) {
171       LOG.error("got interrupted exception for " + getMemberName());
172       throw new ForeignException(getMemberName(), e);
173     }
174   }
175 
176   /**
177    * do nothing, core of snapshot is executed in {@link #insideBarrier} step.
178    */
179   @Override
180   public void acquireBarrier() throws ForeignException {
181     // NO OP
182   }
183 
184   /**
185    * do a flush snapshot of every region on this rs from the target table.
186    */
187   @Override
188   public byte[] insideBarrier() throws ForeignException {
189     flushSnapshot();
190     return new byte[0];
191   }
192 
193   /**
194    * Cancel threads if they haven't finished.
195    */
196   @Override
197   public void cleanup(Exception e) {
198     LOG.info("Aborting all online FLUSH snapshot subprocedure task threads for '"
199         + snapshot.getName() + "' due to error", e);
200     try {
201       taskManager.cancelTasks();
202     } catch (InterruptedException e1) {
203       Thread.currentThread().interrupt();
204     }
205   }
206 
207   /**
208    * Hooray!
209    */
210   public void releaseBarrier() {
211     // NO OP
212   }
213 
214 }