001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.List;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
031import org.apache.hadoop.hbase.exceptions.DeserializationException;
032import org.apache.hadoop.hbase.util.CommonFSUtils;
033import org.apache.hadoop.hbase.util.Pair;
034import org.apache.hadoop.hbase.util.RotateFile;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * A filesystem based replication peer storage. The implementation does not require atomic rename so
041 * you can use it on cloud OSS.
042 * <p/>
043 * FileSystem layout:
044 *
045 * <pre>
046 * hbase
047 *   |
048 *   --peers
049 *       |
050 *       --&lt;peer_id&gt;
051 *           |
052 *           --peer_config
053 *           |
054 *           --disabled
055 *           |
056 *           --sync-rep-state
057 * </pre>
058 *
059 * Notice that, if the peer is enabled, we will not have a disabled file.
060 * <p/>
061 * And for other files, to avoid depending on atomic rename, we will use two files for storing the
062 * content. When loading, we will try to read both the files and load the newer one. And when
063 * writing, we will write to the older file.
064 */
065@InterfaceAudience.Private
066public class FSReplicationPeerStorage implements ReplicationPeerStorage {
067
068  private static final Logger LOG = LoggerFactory.getLogger(FSReplicationPeerStorage.class);
069
070  public static final String PEERS_DIR = "hbase.replication.peers.directory";
071
072  public static final String PEERS_DIR_DEFAULT = "peers";
073
074  static final String PEER_CONFIG_FILE = "peer_config";
075
076  static final String DISABLED_FILE = "disabled";
077
078  static final String SYNC_REPLICATION_STATE_FILE = "sync-rep-state";
079
080  static final byte[] NONE_STATE_BYTES =
081    SyncReplicationState.toByteArray(SyncReplicationState.NONE);
082
083  private final FileSystem fs;
084
085  private final Path dir;
086
087  public FSReplicationPeerStorage(FileSystem fs, Configuration conf) throws IOException {
088    this.fs = fs;
089    this.dir = new Path(CommonFSUtils.getRootDir(conf), conf.get(PEERS_DIR, PEERS_DIR_DEFAULT));
090  }
091
092  @RestrictedApi(explanation = "Should only be called in tests", link = "",
093      allowedOnPath = ".*/FSReplicationPeerStorage.java|.*/src/test/.*")
094  Path getPeerDir(String peerId) {
095    return new Path(dir, peerId);
096  }
097
098  @Override
099  public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled,
100    SyncReplicationState syncReplicationState) throws ReplicationException {
101    Path peerDir = getPeerDir(peerId);
102    try {
103      if (fs.exists(peerDir)) {
104        // check whether this is a valid peer, if so we should fail the add peer operation
105        if (read(fs, peerDir, PEER_CONFIG_FILE) != null) {
106          throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfig=>"
107            + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED")
108            + ", syncReplicationState=" + syncReplicationState + ", peer already exists");
109        }
110      }
111      if (!enabled) {
112        fs.createNewFile(new Path(peerDir, DISABLED_FILE));
113      }
114      write(fs, peerDir, SYNC_REPLICATION_STATE_FILE,
115        SyncReplicationState.toByteArray(syncReplicationState, SyncReplicationState.NONE));
116      // write the peer config data at last, so when loading, if we can not load the peer_config, we
117      // know that this is not a valid peer
118      write(fs, peerDir, PEER_CONFIG_FILE, ReplicationPeerConfigUtil.toByteArray(peerConfig));
119    } catch (IOException e) {
120      throw new ReplicationException(
121        "Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig + ", state="
122          + (enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + syncReplicationState,
123        e);
124    }
125  }
126
127  @Override
128  public void removePeer(String peerId) throws ReplicationException {
129    // delete the peer config first, and then delete the directory
130    // we will consider this is not a valid peer by reading the peer config file
131    Path peerDir = getPeerDir(peerId);
132    try {
133      delete(fs, peerDir, PEER_CONFIG_FILE);
134      if (!fs.delete(peerDir, true)) {
135        throw new IOException("Can not delete " + peerDir);
136      }
137    } catch (IOException e) {
138      throw new ReplicationException("Could not remove peer with id=" + peerId, e);
139    }
140  }
141
142  @Override
143  public void setPeerState(String peerId, boolean enabled) throws ReplicationException {
144    Path disabledFile = new Path(getPeerDir(peerId), DISABLED_FILE);
145    try {
146      if (enabled) {
147        if (fs.exists(disabledFile) && !fs.delete(disabledFile, false)) {
148          throw new IOException("Can not delete " + disabledFile);
149        }
150      } else {
151        if (!fs.exists(disabledFile) && !fs.createNewFile(disabledFile)) {
152          throw new IOException("Can not touch " + disabledFile);
153        }
154      }
155    } catch (IOException e) {
156      throw new ReplicationException(
157        "Unable to change state of the peer with id=" + peerId + " to " + enabled, e);
158    }
159  }
160
161  @Override
162  public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
163    throws ReplicationException {
164    Path peerDir = getPeerDir(peerId);
165    try {
166      write(fs, peerDir, PEER_CONFIG_FILE, ReplicationPeerConfigUtil.toByteArray(peerConfig));
167    } catch (IOException e) {
168      throw new ReplicationException(
169        "There was a problem trying to save changes to the " + "replication peer " + peerId, e);
170    }
171  }
172
173  @Override
174  public List<String> listPeerIds() throws ReplicationException {
175    try {
176      FileStatus[] statuses = fs.listStatus(dir);
177      if (statuses == null || statuses.length == 0) {
178        return Collections.emptyList();
179      }
180      List<String> peerIds = new ArrayList<>();
181      for (FileStatus status : statuses) {
182        String peerId = status.getPath().getName();
183        Path peerDir = getPeerDir(peerId);
184        // confirm that this is a valid peer
185        byte[] peerConfigData = read(fs, peerDir, PEER_CONFIG_FILE);
186        if (peerConfigData != null) {
187          peerIds.add(peerId);
188        }
189      }
190      return Collections.unmodifiableList(peerIds);
191    } catch (FileNotFoundException e) {
192      LOG.debug("Peer directory does not exist yet", e);
193      return Collections.emptyList();
194    } catch (IOException e) {
195      throw new ReplicationException("Cannot get the list of peers", e);
196    }
197  }
198
199  @Override
200  public boolean isPeerEnabled(String peerId) throws ReplicationException {
201    Path disabledFile = new Path(getPeerDir(peerId), DISABLED_FILE);
202    try {
203      return !fs.exists(disabledFile);
204    } catch (IOException e) {
205      throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e);
206    }
207  }
208
209  @Override
210  public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException {
211    Path peerDir = getPeerDir(peerId);
212    byte[] data;
213    try {
214      data = read(fs, peerDir, PEER_CONFIG_FILE);
215    } catch (IOException e) {
216      throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e);
217    }
218    if (data == null || data.length == 0) {
219      throw new ReplicationException(
220        "Replication peer config data shouldn't be empty, peerId=" + peerId);
221    }
222    try {
223      return ReplicationPeerConfigUtil.parsePeerFrom(data);
224    } catch (DeserializationException e) {
225      throw new ReplicationException(
226        "Failed to parse replication peer config for peer with id=" + peerId, e);
227    }
228  }
229
230  private Pair<SyncReplicationState, SyncReplicationState> getStateAndNewState(String peerId)
231    throws IOException {
232    Path peerDir = getPeerDir(peerId);
233    if (!fs.exists(peerDir)) {
234      throw new IOException("peer does not exists");
235    }
236    byte[] data = read(fs, peerDir, SYNC_REPLICATION_STATE_FILE);
237    if (data == null) {
238      // should be a peer from previous version, set the sync replication state for it.
239      write(fs, peerDir, SYNC_REPLICATION_STATE_FILE,
240        SyncReplicationState.toByteArray(SyncReplicationState.NONE, SyncReplicationState.NONE));
241      return Pair.newPair(SyncReplicationState.NONE, SyncReplicationState.NONE);
242    } else {
243      return SyncReplicationState.parseStateAndNewStateFrom(data);
244    }
245  }
246
247  @Override
248  public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState newState)
249    throws ReplicationException {
250    Path peerDir = getPeerDir(peerId);
251    try {
252      Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
253        getStateAndNewState(peerId);
254      write(fs, peerDir, SYNC_REPLICATION_STATE_FILE,
255        SyncReplicationState.toByteArray(stateAndNewState.getFirst(), newState));
256    } catch (IOException e) {
257      throw new ReplicationException(
258        "Unable to set the new sync replication state for peer with id=" + peerId + ", newState="
259          + newState,
260        e);
261    }
262  }
263
264  @Override
265  public void transitPeerSyncReplicationState(String peerId) throws ReplicationException {
266    Path peerDir = getPeerDir(peerId);
267    try {
268      Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
269        getStateAndNewState(peerId);
270      write(fs, peerDir, SYNC_REPLICATION_STATE_FILE,
271        SyncReplicationState.toByteArray(stateAndNewState.getSecond(), SyncReplicationState.NONE));
272    } catch (IOException e) {
273      throw new ReplicationException(
274        "Error transiting sync replication state for peer with id=" + peerId, e);
275    }
276  }
277
278  @Override
279  public SyncReplicationState getPeerSyncReplicationState(String peerId)
280    throws ReplicationException {
281    try {
282      return getStateAndNewState(peerId).getFirst();
283    } catch (IOException e) {
284      throw new ReplicationException(
285        "Error getting sync replication state for peer with id=" + peerId, e);
286    }
287  }
288
289  @Override
290  public SyncReplicationState getPeerNewSyncReplicationState(String peerId)
291    throws ReplicationException {
292    try {
293      return getStateAndNewState(peerId).getSecond();
294    } catch (IOException e) {
295      throw new ReplicationException(
296        "Error getting new sync replication state for peer with id=" + peerId, e);
297    }
298  }
299
300  // 16 MB is big enough for our usage here
301  private static final long MAX_FILE_SIZE = 16 * 1024 * 1024;
302
303  private static byte[] read(FileSystem fs, Path dir, String name) throws IOException {
304    RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE);
305    return file.read();
306  }
307
308  private static void write(FileSystem fs, Path dir, String name, byte[] data) throws IOException {
309    RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE);
310    // to initialize the nextFile index
311    file.read();
312    file.write(data);
313  }
314
315  private static void delete(FileSystem fs, Path dir, String name) throws IOException {
316    RotateFile file = new RotateFile(fs, dir, name, MAX_FILE_SIZE);
317    // to initialize the nextFile index
318    file.read();
319    file.delete();
320  }
321}