View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.snapshot;
19  
20  import java.util.List;
21  import java.util.concurrent.Callable;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.classification.InterfaceStability;
27  import org.apache.hadoop.hbase.errorhandling.ForeignException;
28  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
29  import org.apache.hadoop.hbase.procedure.ProcedureMember;
30  import org.apache.hadoop.hbase.procedure.Subprocedure;
31  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
32  import org.apache.hadoop.hbase.regionserver.HRegion;
33  import org.apache.hadoop.hbase.regionserver.Region;
34  import org.apache.hadoop.hbase.regionserver.Region.Operation;
35  import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool;
36  import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
37  
38  /**
39   * This online snapshot implementation uses the distributed procedure framework to force a
40   * store flush and then records the hfiles.  Its enter stage does nothing.  Its leave stage then
41   * flushes the memstore, builds the region server's snapshot manifest from its hfiles list, and
42   * copies .regioninfos into the snapshot working directory.  At the master side, there is an atomic
43   * rename of the working dir into the proper snapshot directory.
44   */
45  @InterfaceAudience.Private
46  @InterfaceStability.Unstable
47  public class FlushSnapshotSubprocedure extends Subprocedure {
48    private static final Log LOG = LogFactory.getLog(FlushSnapshotSubprocedure.class);
49  
50    private final List<Region> regions;
51    private final SnapshotDescription snapshot;
52    private final SnapshotSubprocedurePool taskManager;
53    private boolean snapshotSkipFlush = false;
54  
55    public FlushSnapshotSubprocedure(ProcedureMember member,
56        ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
57        List<Region> regions, SnapshotDescription snapshot,
58        SnapshotSubprocedurePool taskManager) {
59      super(member, snapshot.getName(), errorListener, wakeFrequency, timeout);
60      this.snapshot = snapshot;
61  
62      if (this.snapshot.getType() == SnapshotDescription.Type.SKIPFLUSH) {
63        snapshotSkipFlush = true;
64      }
65      this.regions = regions;
66      this.taskManager = taskManager;
67    }
68  
69    /**
70     * Callable for adding files to snapshot manifest working dir.  Ready for multithreading.
71     */
72    private class RegionSnapshotTask implements Callable<Void> {
73      Region region;
74      RegionSnapshotTask(Region region) {
75        this.region = region;
76      }
77  
78      @Override
79      public Void call() throws Exception {
80        // Taking the region read lock prevents the individual region from being closed while a
81        // snapshot is in progress.  This is helpful but not sufficient for preventing races with
82        // snapshots that involve multiple regions and regionservers.  It is still possible to have
83        // an interleaving such that globally regions are missing, so we still need the verification
84        // step.
85        LOG.debug("Starting region operation on " + region);
86        region.startRegionOperation();
87        try {
88          if (snapshotSkipFlush) {
89          /*
90           * This is to take an online-snapshot without force a coordinated flush to prevent pause
91           * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure
92           * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be
93           * turned on/off based on the flush type.
94           * To minimized the code change, class name is not changed.
95           */
96            LOG.debug("take snapshot without flush memstore first");
97          } else {
98            LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
99            region.flush(true);
100         }
101         ((HRegion)region).addRegionToSnapshot(snapshot, monitor);
102         if (snapshotSkipFlush) {
103           LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed.");
104         } else {
105           LOG.debug("... Flush Snapshotting region " + region.toString() + " completed.");
106         }
107       } finally {
108         LOG.debug("Closing region operation on " + region);
109         region.closeRegionOperation();
110       }
111       return null;
112     }
113   }
114 
115   private void flushSnapshot() throws ForeignException {
116     if (regions.isEmpty()) {
117       // No regions on this RS, we are basically done.
118       return;
119     }
120 
121     monitor.rethrowException();
122 
123     // assert that the taskManager is empty.
124     if (taskManager.hasTasks()) {
125       throw new IllegalStateException("Attempting to take snapshot "
126           + ClientSnapshotDescriptionUtils.toString(snapshot)
127           + " but we currently have outstanding tasks");
128     }
129 
130     // Add all hfiles already existing in region.
131     for (Region region : regions) {
132       // submit one task per region for parallelize by region.
133       taskManager.submitTask(new RegionSnapshotTask(region));
134       monitor.rethrowException();
135     }
136 
137     // wait for everything to complete.
138     LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions");
139     try {
140       taskManager.waitForOutstandingTasks();
141     } catch (InterruptedException e) {
142       LOG.error("got interrupted exception for " + getMemberName());
143       throw new ForeignException(getMemberName(), e);
144     }
145   }
146 
147   /**
148    * do nothing, core of snapshot is executed in {@link #insideBarrier} step.
149    */
150   @Override
151   public void acquireBarrier() throws ForeignException {
152     // NO OP
153   }
154 
155   /**
156    * do a flush snapshot of every region on this rs from the target table.
157    */
158   @Override
159   public byte[] insideBarrier() throws ForeignException {
160     flushSnapshot();
161     return new byte[0];
162   }
163 
164   /**
165    * Cancel threads if they haven't finished.
166    */
167   @Override
168   public void cleanup(Exception e) {
169     LOG.info("Aborting all online FLUSH snapshot subprocedure task threads for '"
170         + snapshot.getName() + "' due to error", e);
171     try {
172       taskManager.cancelTasks();
173     } catch (InterruptedException e1) {
174       Thread.currentThread().interrupt();
175     }
176   }
177 
178   /**
179    * Hooray!
180    */
181   public void releaseBarrier() {
182     // NO OP
183   }
184 
185 }