Skip to content

Commit

Permalink
Tolerate multiaddrs we can't parse in handling kademlia responses.
Browse files Browse the repository at this point in the history
  • Loading branch information
ianopolous committed Sep 30, 2024
1 parent 1652693 commit eb35b5a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 16 deletions.
9 changes: 8 additions & 1 deletion src/main/java/org/peergos/PeerAddresses.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
import java.util.stream.Stream;

public class PeerAddresses {
public final Multihash peerId;
Expand Down Expand Up @@ -57,7 +58,13 @@ public static PeerAddresses fromProtobuf(Dht.Message.Peer peer) {
Multihash peerId = Multihash.deserialize(peer.getId().toByteArray());
List<Multiaddr> addrs = peer.getAddrsList()
.stream()
.map(b -> Multiaddr.deserialize(b.toByteArray()))
.flatMap(b -> {
try {
return Stream.of(Multiaddr.deserialize(b.toByteArray()));
} catch (Exception e) {
return Stream.empty();
}
})
.collect(Collectors.toList());
return new PeerAddresses(peerId, addrs);
}
Expand Down
34 changes: 19 additions & 15 deletions src/main/java/org/peergos/protocol/dht/Kademlia.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,24 @@ public void bootstrap(Host us) {
List<Connection> allConns = us.getNetwork().getConnections();
Set<Connection> activeConns = us.getStreams().stream().map(s -> s.getConnection()).collect(Collectors.toSet());
List<Connection> toClose = allConns.stream().filter(c -> !activeConns.contains(c)).collect(Collectors.toList());
LOG.info("Closing " + toClose.size() + " / " + allConns.size() + " connections...");
for (Connection conn : toClose) {
conn.close();
}
}

private static <V> CompletableFuture<V> closeAfter(CompletableFuture<io.libp2p.core.Stream> sf, Supplier<CompletableFuture<V>> query) {
CompletableFuture<V> res = new CompletableFuture<>();
query.get().thenAccept(v -> {
sf.thenAccept(s -> s.close());
res.complete(v);
}).exceptionally(t -> {
sf.thenAccept(s -> s.close());
res.completeExceptionally(t);
return null;
});
return res;
}

static class RoutingEntry {
public final Id key;
public final PeerAddresses addresses;
Expand Down Expand Up @@ -287,11 +299,10 @@ public CompletableFuture<List<PeerAddresses>> findProviders(Multihash block, Hos
}

private CompletableFuture<List<PeerAddresses>> getCloserPeers(byte[] key, PeerAddresses target, Host us) {
StreamPromise<? extends KademliaController> conn = null;
try {
conn = dialPeer(target, us);
StreamPromise<? extends KademliaController> conn = dialPeer(target, us);
KademliaController contr = conn.getController().orTimeout(2, TimeUnit.SECONDS).join();
return contr.closerPeers(key);
return closeAfter(conn.getStream(), () -> contr.closerPeers(key));
} catch (Exception e) {
// we can't dial quic only nodes until it's implemented
if (target.addresses.stream().allMatch(a -> a.toString().contains("quic")))
Expand All @@ -303,9 +314,6 @@ private CompletableFuture<List<PeerAddresses>> getCloserPeers(byte[] key, PeerAd
else if (e.getCause() instanceof ConnectionClosedException) {}
else
e.printStackTrace();
} finally {
if (conn != null)
conn.getStream().thenApply(s -> s.close());
}
return CompletableFuture.completedFuture(Collections.emptyList());
}
Expand Down Expand Up @@ -365,15 +373,11 @@ private CompletableFuture<Boolean> putValue(Multihash publisher,
byte[] signedRecord,
PeerAddresses peer,
Host us) {
StreamPromise<? extends KademliaController> conn = null;
try {
conn = dialPeer(peer, us);
return conn.getController().join()
.putValue(publisher, signedRecord);
} catch (Exception e) {} finally {
if (conn != null)
conn.getStream().thenApply(s -> s.close());
}
StreamPromise<? extends KademliaController> conn = dialPeer(peer, us);
return closeAfter(conn.getStream(), () -> conn.getController().join()
.putValue(publisher, signedRecord));
} catch (Exception e) {}
return CompletableFuture.completedFuture(false);
}

Expand Down

0 comments on commit eb35b5a

Please sign in to comment.