View Javadoc

1   /**
2     * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.zookeeper;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.io.UnsupportedEncodingException;
23  import java.net.URLDecoder;
24  import java.net.URLEncoder;
25  import java.util.List;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.fs.FileSystem;
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.exceptions.DeserializationException;
34  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
35  import org.apache.zookeeper.KeeperException;
36  
37  /**
38   * Common methods and attributes used by {@link org.apache.hadoop.hbase.master.SplitLogManager} 
39   * and {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}
40   * running distributed splitting of WAL logs.
41   */
42  @InterfaceAudience.Private
43  public class ZKSplitLog {
44    private static final Log LOG = LogFactory.getLog(ZKSplitLog.class);
45  
46    /**
47     * Gets the full path node name for the log file being split.
48     * This method will url encode the filename.
49     * @param zkw zk reference
50     * @param filename log file name (only the basename)
51     */
52    public static String getEncodedNodeName(ZooKeeperWatcher zkw, String filename) {
53      return ZKUtil.joinZNode(zkw.splitLogZNode, encode(filename));
54    }
55  
56    public static String getFileName(String node) {
57      String basename = node.substring(node.lastIndexOf('/') + 1);
58      return decode(basename);
59    }
60  
61    static String encode(String s) {
62      try {
63        return URLEncoder.encode(s, "UTF-8");
64      } catch (UnsupportedEncodingException e) {
65        throw new RuntimeException("URLENCODER doesn't support UTF-8");
66      }
67    }
68  
69    static String decode(String s) {
70      try {
71        return URLDecoder.decode(s, "UTF-8");
72      } catch (UnsupportedEncodingException e) {
73        throw new RuntimeException("URLDecoder doesn't support UTF-8");
74      }
75    }
76  
77    public static String getRescanNode(ZooKeeperWatcher zkw) {
78      return ZKUtil.joinZNode(zkw.splitLogZNode, "RESCAN");
79    }
80  
81    public static boolean isRescanNode(ZooKeeperWatcher zkw, String path) {
82      String prefix = getRescanNode(zkw);
83      if (path.length() <= prefix.length()) {
84        return false;
85      }
86      for (int i = 0; i < prefix.length(); i++) {
87        if (prefix.charAt(i) != path.charAt(i)) {
88          return false;
89        }
90      }
91      return true;
92    }
93  
94    public static boolean isTaskPath(ZooKeeperWatcher zkw, String path) {
95      String dirname = path.substring(0, path.lastIndexOf('/'));
96      return dirname.equals(zkw.splitLogZNode);
97    }
98  
99    public static Path getSplitLogDir(Path rootdir, String tmpname) {
100     return new Path(new Path(rootdir, HConstants.SPLIT_LOGDIR_NAME), tmpname);
101   }
102 
103 
104   public static String getSplitLogDirTmpComponent(final String worker, String file) {
105     return worker + "_" + ZKSplitLog.encode(file);
106   }
107 
108   public static void markCorrupted(Path rootdir, String logFileName,
109       FileSystem fs) {
110     Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
111     try {
112       fs.createNewFile(file);
113     } catch (IOException e) {
114       LOG.warn("Could not flag a log file as corrupted. Failed to create " +
115           file, e);
116     }
117   }
118 
119   public static boolean isCorrupted(Path rootdir, String logFileName,
120       FileSystem fs) throws IOException {
121     Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
122     boolean isCorrupt;
123     isCorrupt = fs.exists(file);
124     return isCorrupt;
125   }
126 
127   /*
128    * Following methods come from SplitLogManager
129    */
130 
131   /**
132    * check if /hbase/recovering-regions/<current region encoded name> exists. Returns true if exists
133    * and set watcher as well.
134    * @param zkw
135    * @param regionEncodedName region encode name
136    * @return true when /hbase/recovering-regions/<current region encoded name> exists
137    * @throws KeeperException
138    */
139   public static boolean
140       isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName)
141           throws KeeperException {
142     boolean result = false;
143     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName);
144 
145     byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath);
146     if (node != null) {
147       result = true;
148     }
149     return result;
150   }
151 
152   /**
153    * @param bytes - Content of a failed region server or recovering region znode.
154    * @return long - The last flushed sequence Id for the region server
155    */
156   public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
157     long lastRecordedFlushedSequenceId = -1l;
158     try {
159       lastRecordedFlushedSequenceId = ZKUtil.parseWALPositionFrom(bytes);
160     } catch (DeserializationException e) {
161       lastRecordedFlushedSequenceId = -1l;
162       LOG.warn("Can't parse last flushed sequence Id", e);
163     }
164     return lastRecordedFlushedSequenceId;
165   }
166 
167   public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) {
168     try {
169       if (regions == null) {
170         // remove all children under /home/recovering-regions
171         LOG.debug("Garbage collecting all recovering region znodes");
172         ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
173       } else {
174         for (String curRegion : regions) {
175           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion);
176           ZKUtil.deleteNodeRecursively(watcher, nodePath);
177         }
178       }
179     } catch (KeeperException e) {
180       LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
181     }
182   }
183 
184   /**
185    * This function is used in distributedLogReplay to fetch last flushed sequence id from ZK
186    * @param zkw
187    * @param serverName
188    * @param encodedRegionName
189    * @return the last flushed sequence ids recorded in ZK of the region for <code>serverName<code>
190    * @throws IOException
191    */
192 
193   public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw,
194       String serverName, String encodedRegionName) throws IOException {
195     // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits,
196     // last flushed sequence Id changes when newly assigned RS flushes writes to the region.
197     // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed
198     // sequence Id name space (sequence Id only valid for a particular RS instance), changes
199     // when different newly assigned RS flushes the region.
200     // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of
201     // last flushed sequence Id for each failed RS instance.
202     RegionStoreSequenceIds result = null;
203     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
204     nodePath = ZKUtil.joinZNode(nodePath, serverName);
205     try {
206       byte[] data;
207       try {
208         data = ZKUtil.getData(zkw, nodePath);
209       } catch (InterruptedException e) {
210         throw new InterruptedIOException();
211       }
212       if (data != null) {
213         result = ZKUtil.parseRegionStoreSequenceIds(data);
214       }
215     } catch (KeeperException e) {
216       throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server="
217           + serverName + "; region=" + encodedRegionName, e);
218     } catch (DeserializationException e) {
219       LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e);
220     }
221     return result;
222   }
223 }