View Javadoc

1   /**
2     * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.zookeeper;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.io.UnsupportedEncodingException;
23  import java.net.URLDecoder;
24  import java.net.URLEncoder;
25  import java.util.List;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.fs.FileSystem;
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.exceptions.DeserializationException;
34  import org.apache.hadoop.hbase.master.SplitLogManager;
35  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
36  import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
37  import org.apache.zookeeper.KeeperException;
38  
39  /**
40   * Common methods and attributes used by {@link SplitLogManager} and {@link SplitLogWorker}
41   * running distributed splitting of WAL logs.
42   */
43  @InterfaceAudience.Private
44  public class ZKSplitLog {
45    private static final Log LOG = LogFactory.getLog(ZKSplitLog.class);
46  
47    /**
48     * Gets the full path node name for the log file being split.
49     * This method will url encode the filename.
50     * @param zkw zk reference
51     * @param filename log file name (only the basename)
52     */
53    public static String getEncodedNodeName(ZooKeeperWatcher zkw, String filename) {
54      return ZKUtil.joinZNode(zkw.splitLogZNode, encode(filename));
55    }
56  
57    public static String getFileName(String node) {
58      String basename = node.substring(node.lastIndexOf('/') + 1);
59      return decode(basename);
60    }
61  
62    static String encode(String s) {
63      try {
64        return URLEncoder.encode(s, "UTF-8");
65      } catch (UnsupportedEncodingException e) {
66        throw new RuntimeException("URLENCODER doesn't support UTF-8");
67      }
68    }
69  
70    static String decode(String s) {
71      try {
72        return URLDecoder.decode(s, "UTF-8");
73      } catch (UnsupportedEncodingException e) {
74        throw new RuntimeException("URLDecoder doesn't support UTF-8");
75      }
76    }
77  
78    public static String getRescanNode(ZooKeeperWatcher zkw) {
79      return ZKUtil.joinZNode(zkw.splitLogZNode, "RESCAN");
80    }
81  
82    public static boolean isRescanNode(ZooKeeperWatcher zkw, String path) {
83      String prefix = getRescanNode(zkw);
84      if (path.length() <= prefix.length()) {
85        return false;
86      }
87      for (int i = 0; i < prefix.length(); i++) {
88        if (prefix.charAt(i) != path.charAt(i)) {
89          return false;
90        }
91      }
92      return true;
93    }
94  
95    public static boolean isTaskPath(ZooKeeperWatcher zkw, String path) {
96      String dirname = path.substring(0, path.lastIndexOf('/'));
97      return dirname.equals(zkw.splitLogZNode);
98    }
99  
100   public static Path getSplitLogDir(Path rootdir, String tmpname) {
101     return new Path(new Path(rootdir, HConstants.SPLIT_LOGDIR_NAME), tmpname);
102   }
103 
104 
105   public static String getSplitLogDirTmpComponent(final String worker, String file) {
106     return worker + "_" + ZKSplitLog.encode(file);
107   }
108 
109   public static void markCorrupted(Path rootdir, String logFileName,
110       FileSystem fs) {
111     Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
112     try {
113       fs.createNewFile(file);
114     } catch (IOException e) {
115       LOG.warn("Could not flag a log file as corrupted. Failed to create " +
116           file, e);
117     }
118   }
119 
120   public static boolean isCorrupted(Path rootdir, String logFileName,
121       FileSystem fs) throws IOException {
122     Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
123     boolean isCorrupt;
124     isCorrupt = fs.exists(file);
125     return isCorrupt;
126   }
127 
128   /*
129    * Following methods come from SplitLogManager
130    */
131 
132   /**
133    * check if /hbase/recovering-regions/<current region encoded name> exists. Returns true if exists
134    * and set watcher as well.
135    * @param zkw
136    * @param regionEncodedName region encode name
137    * @return true when /hbase/recovering-regions/<current region encoded name> exists
138    * @throws KeeperException
139    */
140   public static boolean
141       isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName)
142           throws KeeperException {
143     boolean result = false;
144     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName);
145 
146     byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath);
147     if (node != null) {
148       result = true;
149     }
150     return result;
151   }
152 
153   /**
154    * @param bytes - Content of a failed region server or recovering region znode.
155    * @return long - The last flushed sequence Id for the region server
156    */
157   public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
158     long lastRecordedFlushedSequenceId = -1l;
159     try {
160       lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes);
161     } catch (DeserializationException e) {
162       lastRecordedFlushedSequenceId = -1l;
163       LOG.warn("Can't parse last flushed sequence Id", e);
164     }
165     return lastRecordedFlushedSequenceId;
166   }
167 
168   public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) {
169     try {
170       if (regions == null) {
171         // remove all children under /home/recovering-regions
172         LOG.debug("Garbage collecting all recovering region znodes");
173         ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
174       } else {
175         for (String curRegion : regions) {
176           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion);
177           ZKUtil.deleteNodeRecursively(watcher, nodePath);
178         }
179       }
180     } catch (KeeperException e) {
181       LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
182     }
183   }
184 
185   /**
186    * This function is used in distributedLogReplay to fetch last flushed sequence id from ZK
187    * @param zkw
188    * @param serverName
189    * @param encodedRegionName
190    * @return the last flushed sequence ids recorded in ZK of the region for <code>serverName<code>
191    * @throws IOException
192    */
193 
194   public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw,
195       String serverName, String encodedRegionName) throws IOException {
196     // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits,
197     // last flushed sequence Id changes when newly assigned RS flushes writes to the region.
198     // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed
199     // sequence Id name space (sequence Id only valid for a particular RS instance), changes
200     // when different newly assigned RS flushes the region.
201     // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of
202     // last flushed sequence Id for each failed RS instance.
203     RegionStoreSequenceIds result = null;
204     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
205     nodePath = ZKUtil.joinZNode(nodePath, serverName);
206     try {
207       byte[] data;
208       try {
209         data = ZKUtil.getData(zkw, nodePath);
210       } catch (InterruptedException e) {
211         throw new InterruptedIOException();
212       }
213       if (data != null) {
214         result = ZKUtil.parseRegionStoreSequenceIds(data);
215       }
216     } catch (KeeperException e) {
217       throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server="
218           + serverName + "; region=" + encodedRegionName, e);
219     } catch (DeserializationException e) {
220       LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e);
221     }
222     return result;
223   }
224 }