001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.NavigableMap;
028import java.util.Set;
029import java.util.TreeMap;
030import java.util.stream.Collectors;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Scan;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
044import org.apache.hadoop.hbase.replication.ReplicationException;
045import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
046import org.apache.hadoop.hbase.replication.ReplicationQueueData;
047import org.apache.hadoop.hbase.replication.ReplicationQueueId;
048import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
049import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
050import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage;
051import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.CommonFSUtils;
054import org.apache.hadoop.hbase.util.FSUtils;
055import org.apache.hadoop.hbase.util.Pair;
056import org.apache.yetus.audience.InterfaceAudience;
057
058import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
059import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
060
061@InterfaceAudience.Private
062public class OfflineTableReplicationQueueStorage implements ReplicationQueueStorage {
063
064  private final Map<ReplicationQueueId, Map<String, ReplicationGroupOffset>> offsets =
065    new HashMap<>();
066
067  private final Map<String, Map<String, Long>> lastSequenceIds = new HashMap<>();
068
069  private final Map<String, Set<String>> hfileRefs = new HashMap<>();
070
071  private void loadRegionInfo(FileSystem fs, Path regionDir,
072    NavigableMap<byte[], RegionInfo> startKey2RegionInfo) throws IOException {
073    RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
074    // TODO: we consider that the there will not be too many regions for hbase:replication table, so
075    // here we just iterate over all the regions to find out the overlapped ones. Can be optimized
076    // later.
077    Iterator<Map.Entry<byte[], RegionInfo>> iter = startKey2RegionInfo.entrySet().iterator();
078    while (iter.hasNext()) {
079      Map.Entry<byte[], RegionInfo> entry = iter.next();
080      if (hri.isOverlap(entry.getValue())) {
081        if (hri.getRegionId() > entry.getValue().getRegionId()) {
082          // we are newer, remove the old hri, we can not break here as if hri is a merged region,
083          // we need to remove all its parent regions.
084          iter.remove();
085        } else {
086          // we are older, just return, skip the below add
087          return;
088        }
089      }
090
091    }
092    startKey2RegionInfo.put(hri.getStartKey(), hri);
093  }
094
095  private void loadOffsets(Result result) {
096    NavigableMap<byte[], byte[]> map =
097      result.getFamilyMap(TableReplicationQueueStorage.QUEUE_FAMILY);
098    if (map == null || map.isEmpty()) {
099      return;
100    }
101    Map<String, ReplicationGroupOffset> offsetMap = new HashMap<>();
102    map.forEach((k, v) -> {
103      String walGroup = Bytes.toString(k);
104      ReplicationGroupOffset offset = ReplicationGroupOffset.parse(Bytes.toString(v));
105      offsetMap.put(walGroup, offset);
106    });
107    ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow()));
108    offsets.put(queueId, offsetMap);
109  }
110
111  private void loadLastSequenceIds(Result result) {
112    NavigableMap<byte[], byte[]> map =
113      result.getFamilyMap(TableReplicationQueueStorage.LAST_SEQUENCE_ID_FAMILY);
114    if (map == null || map.isEmpty()) {
115      return;
116    }
117    Map<String, Long> lastSeqIdMap = new HashMap<>();
118    map.forEach((k, v) -> {
119      String encodedRegionName = Bytes.toString(k);
120      long lastSeqId = Bytes.toLong(v);
121      lastSeqIdMap.put(encodedRegionName, lastSeqId);
122    });
123    String peerId = Bytes.toString(result.getRow());
124    lastSequenceIds.put(peerId, lastSeqIdMap);
125  }
126
127  private void loadHFileRefs(Result result) {
128    NavigableMap<byte[], byte[]> map =
129      result.getFamilyMap(TableReplicationQueueStorage.HFILE_REF_FAMILY);
130    if (map == null || map.isEmpty()) {
131      return;
132    }
133    Set<String> refs = new HashSet<>();
134    map.keySet().forEach(ref -> refs.add(Bytes.toString(ref)));
135    String peerId = Bytes.toString(result.getRow());
136    hfileRefs.put(peerId, refs);
137  }
138
139  private void loadReplicationQueueData(Configuration conf, TableName tableName)
140    throws IOException {
141    Path rootDir = CommonFSUtils.getRootDir(conf);
142    Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
143    FileSystem fs = tableDir.getFileSystem(conf);
144    FileStatus[] regionDirs =
145      CommonFSUtils.listStatus(fs, tableDir, new FSUtils.RegionDirFilter(fs));
146    if (regionDirs == null) {
147      return;
148    }
149    NavigableMap<byte[], RegionInfo> startKey2RegionInfo = new TreeMap<>(Bytes.BYTES_COMPARATOR);
150    for (FileStatus regionDir : regionDirs) {
151      loadRegionInfo(fs, regionDir.getPath(), startKey2RegionInfo);
152    }
153    TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
154    for (RegionInfo hri : startKey2RegionInfo.values()) {
155      try (ClientSideRegionScanner scanner =
156        new ClientSideRegionScanner(conf, fs, rootDir, td, hri, new Scan(), null)) {
157        for (;;) {
158          Result result = scanner.next();
159          if (result == null) {
160            break;
161          }
162          loadOffsets(result);
163          loadLastSequenceIds(result);
164          loadHFileRefs(result);
165        }
166      }
167    }
168  }
169
170  public OfflineTableReplicationQueueStorage(Configuration conf, TableName tableName)
171    throws IOException {
172    loadReplicationQueueData(conf, tableName);
173  }
174
175  @Override
176  public synchronized void setOffset(ReplicationQueueId queueId, String walGroup,
177    ReplicationGroupOffset offset, Map<String, Long> lastSeqIds) throws ReplicationException {
178    Map<String, ReplicationGroupOffset> offsetMap = offsets.get(queueId);
179    if (offsetMap == null) {
180      offsetMap = new HashMap<>();
181      offsets.put(queueId, offsetMap);
182    }
183    offsetMap.put(walGroup, offset);
184    Map<String, Long> lastSeqIdsMap = lastSequenceIds.get(queueId.getPeerId());
185    if (lastSeqIdsMap == null) {
186      lastSeqIdsMap = new HashMap<>();
187      lastSequenceIds.put(queueId.getPeerId(), lastSeqIdsMap);
188    }
189    for (Map.Entry<String, Long> entry : lastSeqIds.entrySet()) {
190      Long oldSeqId = lastSeqIdsMap.get(entry.getKey());
191      if (oldSeqId == null || oldSeqId < entry.getValue()) {
192        lastSeqIdsMap.put(entry.getKey(), entry.getValue());
193      }
194    }
195  }
196
197  @Override
198  public synchronized Map<String, ReplicationGroupOffset> getOffsets(ReplicationQueueId queueId)
199    throws ReplicationException {
200    Map<String, ReplicationGroupOffset> offsetMap = offsets.get(queueId);
201    if (offsetMap == null) {
202      return Collections.emptyMap();
203    }
204    return ImmutableMap.copyOf(offsetMap);
205  }
206
207  @Override
208  public List<String> listAllPeerIds() throws ReplicationException {
209    return offsets.keySet().stream().map(ReplicationQueueId::getPeerId).distinct()
210      .collect(Collectors.toList());
211  }
212
213  @Override
214  public synchronized List<ReplicationQueueId> listAllQueueIds(String peerId)
215    throws ReplicationException {
216    return offsets.keySet().stream().filter(rqi -> rqi.getPeerId().equals(peerId))
217      .collect(Collectors.toList());
218  }
219
220  @Override
221  public synchronized List<ReplicationQueueId> listAllQueueIds(ServerName serverName)
222    throws ReplicationException {
223    return offsets.keySet().stream().filter(rqi -> rqi.getServerName().equals(serverName))
224      .collect(Collectors.toList());
225  }
226
227  @Override
228  public synchronized List<ReplicationQueueId> listAllQueueIds(String peerId, ServerName serverName)
229    throws ReplicationException {
230    return offsets.keySet().stream()
231      .filter(rqi -> rqi.getPeerId().equals(peerId) && rqi.getServerName().equals(serverName))
232      .collect(Collectors.toList());
233  }
234
235  @Override
236  public synchronized List<ReplicationQueueData> listAllQueues() throws ReplicationException {
237    return offsets.entrySet().stream()
238      .map(e -> new ReplicationQueueData(e.getKey(), ImmutableMap.copyOf(e.getValue())))
239      .collect(Collectors.toList());
240  }
241
242  @Override
243  public synchronized List<ServerName> listAllReplicators() throws ReplicationException {
244    return offsets.keySet().stream().map(ReplicationQueueId::getServerName).distinct()
245      .collect(Collectors.toList());
246  }
247
248  @Override
249  public synchronized Map<String, ReplicationGroupOffset> claimQueue(ReplicationQueueId queueId,
250    ServerName targetServerName) throws ReplicationException {
251    Map<String, ReplicationGroupOffset> offsetMap = offsets.remove(queueId);
252    if (offsetMap == null) {
253      return Collections.emptyMap();
254    }
255    offsets.put(queueId.claim(targetServerName), offsetMap);
256    return ImmutableMap.copyOf(offsetMap);
257  }
258
259  @Override
260  public synchronized void removeQueue(ReplicationQueueId queueId) throws ReplicationException {
261    offsets.remove(queueId);
262  }
263
264  @Override
265  public synchronized void removeAllQueues(String peerId) throws ReplicationException {
266    Iterator<ReplicationQueueId> iter = offsets.keySet().iterator();
267    while (iter.hasNext()) {
268      if (iter.next().getPeerId().equals(peerId)) {
269        iter.remove();
270      }
271    }
272  }
273
274  @Override
275  public synchronized long getLastSequenceId(String encodedRegionName, String peerId)
276    throws ReplicationException {
277    Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId);
278    if (lastSeqIdMap == null) {
279      return HConstants.NO_SEQNUM;
280    }
281    Long lastSeqId = lastSeqIdMap.get(encodedRegionName);
282    return lastSeqId != null ? lastSeqId.longValue() : HConstants.NO_SEQNUM;
283  }
284
285  @Override
286  public synchronized void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
287    throws ReplicationException {
288    Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId);
289    if (lastSeqIdMap == null) {
290      lastSeqIdMap = new HashMap<>();
291      lastSequenceIds.put(peerId, lastSeqIdMap);
292    }
293    lastSeqIdMap.putAll(lastSeqIds);
294  }
295
296  @Override
297  public synchronized void removeLastSequenceIds(String peerId) throws ReplicationException {
298    lastSequenceIds.remove(peerId);
299  }
300
301  @Override
302  public synchronized void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
303    throws ReplicationException {
304    Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId);
305    if (lastSeqIdMap == null) {
306      return;
307    }
308    for (String encodedRegionName : encodedRegionNames) {
309      lastSeqIdMap.remove(encodedRegionName);
310    }
311  }
312
313  @Override
314  public synchronized void removePeerFromHFileRefs(String peerId) throws ReplicationException {
315    hfileRefs.remove(peerId);
316  }
317
318  @Override
319  public synchronized void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
320    throws ReplicationException {
321    Set<String> refs = hfileRefs.get(peerId);
322    if (refs == null) {
323      refs = new HashSet<>();
324      hfileRefs.put(peerId, refs);
325    }
326    for (Pair<Path, Path> pair : pairs) {
327      refs.add(pair.getSecond().getName());
328    }
329  }
330
331  @Override
332  public synchronized void removeHFileRefs(String peerId, List<String> files)
333    throws ReplicationException {
334    Set<String> refs = hfileRefs.get(peerId);
335    if (refs == null) {
336      return;
337    }
338    refs.removeAll(files);
339  }
340
341  @Override
342  public synchronized List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
343    return ImmutableList.copyOf(hfileRefs.keySet());
344  }
345
346  @Override
347  public synchronized List<String> getReplicableHFiles(String peerId) throws ReplicationException {
348    Set<String> refs = hfileRefs.get(peerId);
349    if (refs == null) {
350      return Collections.emptyList();
351    }
352    return ImmutableList.copyOf(refs);
353  }
354
355  @Override
356  public synchronized Set<String> getAllHFileRefs() throws ReplicationException {
357    return hfileRefs.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
358  }
359
360  @Override
361  public boolean hasData() throws ReplicationException {
362    return true;
363  }
364
365  @Override
366  public void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
367    throws ReplicationException {
368    throw new UnsupportedOperationException();
369  }
370
371  @Override
372  public void batchUpdateLastSequenceIds(
373    List<ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId> lastPushedSeqIds)
374    throws ReplicationException {
375    throw new UnsupportedOperationException();
376  }
377
378  @Override
379  public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs)
380    throws ReplicationException {
381    throw new UnsupportedOperationException();
382  }
383
384  @Override
385  public void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException {
386    throw new UnsupportedOperationException();
387  }
388}