001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.NavigableMap;
028import java.util.Set;
029import java.util.TreeMap;
030import java.util.stream.Collectors;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Scan;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
044import org.apache.hadoop.hbase.replication.ReplicationException;
045import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
046import org.apache.hadoop.hbase.replication.ReplicationQueueData;
047import org.apache.hadoop.hbase.replication.ReplicationQueueId;
048import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
049import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
050import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage;
051import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.CommonFSUtils;
054import org.apache.hadoop.hbase.util.FSUtils;
055import org.apache.hadoop.hbase.util.Pair;
056import org.apache.yetus.audience.InterfaceAudience;
057
058import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
059import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
060
061@InterfaceAudience.Private
062public class OfflineTableReplicationQueueStorage implements ReplicationQueueStorage {
063
064  private final Map<ReplicationQueueId, Map<String, ReplicationGroupOffset>> offsets =
065    new HashMap<>();
066
067  private final Map<String, Map<String, Long>> lastSequenceIds = new HashMap<>();
068
069  private final Map<String, Set<String>> hfileRefs = new HashMap<>();
070
071  private void loadRegionInfo(FileSystem fs, Path regionDir,
072    NavigableMap<byte[], RegionInfo> startKey2RegionInfo) throws IOException {
073    RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
074    // TODO: we consider that the there will not be too many regions for hbase:replication table, so
075    // here we just iterate over all the regions to find out the overlapped ones. Can be optimized
076    // later.
077    Iterator<Map.Entry<byte[], RegionInfo>> iter = startKey2RegionInfo.entrySet().iterator();
078    while (iter.hasNext()) {
079      Map.Entry<byte[], RegionInfo> entry = iter.next();
080      if (hri.isOverlap(entry.getValue())) {
081        if (hri.getRegionId() > entry.getValue().getRegionId()) {
082          // we are newer, remove the old hri, we can not break here as if hri is a merged region,
083          // we need to remove all its parent regions.
084          iter.remove();
085        } else {
086          // we are older, just return, skip the below add
087          return;
088        }
089      }
090
091    }
092    startKey2RegionInfo.put(hri.getStartKey(), hri);
093  }
094
095  private void loadOffsets(Result result) {
096    NavigableMap<byte[], byte[]> map =
097      result.getFamilyMap(TableReplicationQueueStorage.QUEUE_FAMILY);
098    if (map == null || map.isEmpty()) {
099      return;
100    }
101    Map<String, ReplicationGroupOffset> offsetMap = new HashMap<>();
102    map.forEach((k, v) -> {
103      String walGroup = Bytes.toString(k);
104      ReplicationGroupOffset offset = ReplicationGroupOffset.parse(Bytes.toString(v));
105      offsetMap.put(walGroup, offset);
106    });
107    ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow()));
108    offsets.put(queueId, offsetMap);
109  }
110
111  private void loadLastSequenceIds(Result result) {
112    NavigableMap<byte[], byte[]> map =
113      result.getFamilyMap(TableReplicationQueueStorage.LAST_SEQUENCE_ID_FAMILY);
114    if (map == null || map.isEmpty()) {
115      return;
116    }
117    Map<String, Long> lastSeqIdMap = new HashMap<>();
118    map.forEach((k, v) -> {
119      String encodedRegionName = Bytes.toString(k);
120      long lastSeqId = Bytes.toLong(v);
121      lastSeqIdMap.put(encodedRegionName, lastSeqId);
122    });
123    String peerId = Bytes.toString(result.getRow());
124    lastSequenceIds.put(peerId, lastSeqIdMap);
125  }
126
127  private void loadHFileRefs(Result result) {
128    NavigableMap<byte[], byte[]> map =
129      result.getFamilyMap(TableReplicationQueueStorage.HFILE_REF_FAMILY);
130    if (map == null || map.isEmpty()) {
131      return;
132    }
133    Set<String> refs = new HashSet<>();
134    map.keySet().forEach(ref -> refs.add(Bytes.toString(ref)));
135    String peerId = Bytes.toString(result.getRow());
136    hfileRefs.put(peerId, refs);
137  }
138
139  private void loadReplicationQueueData(Configuration conf, TableName tableName)
140    throws IOException {
141    Path rootDir = CommonFSUtils.getRootDir(conf);
142    Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
143    FileSystem fs = tableDir.getFileSystem(conf);
144    FileStatus[] regionDirs =
145      CommonFSUtils.listStatus(fs, tableDir, new FSUtils.RegionDirFilter(fs));
146    if (regionDirs == null) {
147      return;
148    }
149    NavigableMap<byte[], RegionInfo> startKey2RegionInfo = new TreeMap<>(Bytes.BYTES_COMPARATOR);
150    for (FileStatus regionDir : regionDirs) {
151      loadRegionInfo(fs, regionDir.getPath(), startKey2RegionInfo);
152    }
153    TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
154    for (RegionInfo hri : startKey2RegionInfo.values()) {
155      try (ClientSideRegionScanner scanner =
156        new ClientSideRegionScanner(conf, fs, rootDir, td, hri, new Scan(), null)) {
157        for (;;) {
158          Result result = scanner.next();
159          if (result == null) {
160            break;
161          }
162          loadOffsets(result);
163          loadLastSequenceIds(result);
164          loadHFileRefs(result);
165        }
166      }
167    }
168  }
169
170  public OfflineTableReplicationQueueStorage(Configuration conf, TableName tableName)
171    throws IOException {
172    loadReplicationQueueData(conf, tableName);
173  }
174
175  @Override
176  public synchronized void setOffset(ReplicationQueueId queueId, String walGroup,
177    ReplicationGroupOffset offset, Map<String, Long> lastSeqIds) throws ReplicationException {
178    Map<String, ReplicationGroupOffset> offsetMap = offsets.get(queueId);
179    if (offsetMap == null) {
180      offsetMap = new HashMap<>();
181      offsets.put(queueId, offsetMap);
182    }
183    offsetMap.put(walGroup, offset);
184    Map<String, Long> lastSeqIdsMap = lastSequenceIds.get(queueId.getPeerId());
185    if (lastSeqIdsMap == null) {
186      lastSeqIdsMap = new HashMap<>();
187      lastSequenceIds.put(queueId.getPeerId(), lastSeqIdsMap);
188    }
189    for (Map.Entry<String, Long> entry : lastSeqIds.entrySet()) {
190      Long oldSeqId = lastSeqIdsMap.get(entry.getKey());
191      if (oldSeqId == null || oldSeqId < entry.getValue()) {
192        lastSeqIdsMap.put(entry.getKey(), entry.getValue());
193      }
194    }
195  }
196
197  @Override
198  public synchronized Map<String, ReplicationGroupOffset> getOffsets(ReplicationQueueId queueId)
199    throws ReplicationException {
200    Map<String, ReplicationGroupOffset> offsetMap = offsets.get(queueId);
201    if (offsetMap == null) {
202      return Collections.emptyMap();
203    }
204    return ImmutableMap.copyOf(offsetMap);
205  }
206
207  @Override
208  public synchronized List<ReplicationQueueId> listAllQueueIds(String peerId)
209    throws ReplicationException {
210    return offsets.keySet().stream().filter(rqi -> rqi.getPeerId().equals(peerId))
211      .collect(Collectors.toList());
212  }
213
214  @Override
215  public synchronized List<ReplicationQueueId> listAllQueueIds(ServerName serverName)
216    throws ReplicationException {
217    return offsets.keySet().stream().filter(rqi -> rqi.getServerName().equals(serverName))
218      .collect(Collectors.toList());
219  }
220
221  @Override
222  public synchronized List<ReplicationQueueId> listAllQueueIds(String peerId, ServerName serverName)
223    throws ReplicationException {
224    return offsets.keySet().stream()
225      .filter(rqi -> rqi.getPeerId().equals(peerId) && rqi.getServerName().equals(serverName))
226      .collect(Collectors.toList());
227  }
228
229  @Override
230  public synchronized List<ReplicationQueueData> listAllQueues() throws ReplicationException {
231    return offsets.entrySet().stream()
232      .map(e -> new ReplicationQueueData(e.getKey(), ImmutableMap.copyOf(e.getValue())))
233      .collect(Collectors.toList());
234  }
235
236  @Override
237  public synchronized List<ServerName> listAllReplicators() throws ReplicationException {
238    return offsets.keySet().stream().map(ReplicationQueueId::getServerName).distinct()
239      .collect(Collectors.toList());
240  }
241
242  @Override
243  public synchronized Map<String, ReplicationGroupOffset> claimQueue(ReplicationQueueId queueId,
244    ServerName targetServerName) throws ReplicationException {
245    Map<String, ReplicationGroupOffset> offsetMap = offsets.remove(queueId);
246    if (offsetMap == null) {
247      return Collections.emptyMap();
248    }
249    offsets.put(queueId.claim(targetServerName), offsetMap);
250    return ImmutableMap.copyOf(offsetMap);
251  }
252
253  @Override
254  public synchronized void removeQueue(ReplicationQueueId queueId) throws ReplicationException {
255    offsets.remove(queueId);
256  }
257
258  @Override
259  public synchronized void removeAllQueues(String peerId) throws ReplicationException {
260    Iterator<ReplicationQueueId> iter = offsets.keySet().iterator();
261    while (iter.hasNext()) {
262      if (iter.next().getPeerId().equals(peerId)) {
263        iter.remove();
264      }
265    }
266  }
267
268  @Override
269  public synchronized long getLastSequenceId(String encodedRegionName, String peerId)
270    throws ReplicationException {
271    Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId);
272    if (lastSeqIdMap == null) {
273      return HConstants.NO_SEQNUM;
274    }
275    Long lastSeqId = lastSeqIdMap.get(encodedRegionName);
276    return lastSeqId != null ? lastSeqId.longValue() : HConstants.NO_SEQNUM;
277  }
278
279  @Override
280  public synchronized void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
281    throws ReplicationException {
282    Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId);
283    if (lastSeqIdMap == null) {
284      lastSeqIdMap = new HashMap<>();
285      lastSequenceIds.put(peerId, lastSeqIdMap);
286    }
287    lastSeqIdMap.putAll(lastSeqIds);
288  }
289
290  @Override
291  public synchronized void removeLastSequenceIds(String peerId) throws ReplicationException {
292    lastSequenceIds.remove(peerId);
293  }
294
295  @Override
296  public synchronized void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
297    throws ReplicationException {
298    Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId);
299    if (lastSeqIdMap == null) {
300      return;
301    }
302    for (String encodedRegionName : encodedRegionNames) {
303      lastSeqIdMap.remove(encodedRegionName);
304    }
305  }
306
307  @Override
308  public synchronized void removePeerFromHFileRefs(String peerId) throws ReplicationException {
309    hfileRefs.remove(peerId);
310  }
311
312  @Override
313  public synchronized void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
314    throws ReplicationException {
315    Set<String> refs = hfileRefs.get(peerId);
316    if (refs == null) {
317      refs = new HashSet<>();
318      hfileRefs.put(peerId, refs);
319    }
320    for (Pair<Path, Path> pair : pairs) {
321      refs.add(pair.getSecond().getName());
322    }
323  }
324
325  @Override
326  public synchronized void removeHFileRefs(String peerId, List<String> files)
327    throws ReplicationException {
328    Set<String> refs = hfileRefs.get(peerId);
329    if (refs == null) {
330      return;
331    }
332    refs.removeAll(files);
333  }
334
335  @Override
336  public synchronized List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
337    return ImmutableList.copyOf(hfileRefs.keySet());
338  }
339
340  @Override
341  public synchronized List<String> getReplicableHFiles(String peerId) throws ReplicationException {
342    Set<String> refs = hfileRefs.get(peerId);
343    if (refs == null) {
344      return Collections.emptyList();
345    }
346    return ImmutableList.copyOf(refs);
347  }
348
349  @Override
350  public synchronized Set<String> getAllHFileRefs() throws ReplicationException {
351    return hfileRefs.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
352  }
353
354  @Override
355  public boolean hasData() throws ReplicationException {
356    return true;
357  }
358
359  @Override
360  public void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
361    throws ReplicationException {
362    throw new UnsupportedOperationException();
363  }
364
365  @Override
366  public void batchUpdateLastSequenceIds(
367    List<ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId> lastPushedSeqIds)
368    throws ReplicationException {
369    throw new UnsupportedOperationException();
370  }
371
372  @Override
373  public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs)
374    throws ReplicationException {
375    throw new UnsupportedOperationException();
376  }
377
378  @Override
379  public void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException {
380    throw new UnsupportedOperationException();
381  }
382}