diff --git a/src/main/java/org/peergos/protocol/dht/Kademlia.java b/src/main/java/org/peergos/protocol/dht/Kademlia.java index e4bc969..5dac139 100644 --- a/src/main/java/org/peergos/protocol/dht/Kademlia.java +++ b/src/main/java/org/peergos/protocol/dht/Kademlia.java @@ -95,7 +95,13 @@ public void stopBootstrapThread() { private boolean connectTo(Host us, PeerAddresses peer) { try { - new Identify().dial(us, PeerId.fromBase58(peer.peerId.toBase58()), getPublic(peer)).getController().join().id().join(); + PeerId ourPeerId = PeerId.fromBase58(peer.peerId.toBase58()); + StreamPromise conn = new Identify().dial(us, ourPeerId, getPublic(peer)); + try { + conn.getController().join().id().join(); + } finally { + conn.getStream().thenApply(s -> s.getConnection().close()); + } return true; } catch (Exception e) { if (e.getCause() instanceof NothingToCompleteException || e.getCause() instanceof NonCompleteException) @@ -242,7 +248,7 @@ public CompletableFuture> findProviders(Multihash block, Hos return null; } finally { if (conn != null) - conn.getStream().thenApply(s -> s.close()); + conn.getStream().thenApply(s -> s.getConnection().close()); } }).filter(prov -> prov != null) .collect(Collectors.toList()); @@ -291,7 +297,7 @@ else if (e.getCause() instanceof ConnectionClosedException) {} e.printStackTrace(); } finally { if (conn != null) - conn.getStream().thenApply(s -> s.close()); + conn.getStream().thenApply(s -> s.getConnection().close()); } return CompletableFuture.completedFuture(Collections.emptyList()); } @@ -319,14 +325,14 @@ public CompletableFuture provideBlock(Multihash block, Host us, PeerAddres return conn.getController() .thenCompose(contr -> contr.provide(block, ourAddrs)) .thenApply(res -> { - conn.getStream().thenApply(s -> s.close()); + conn.getStream().thenApply(s -> s.getConnection().close()); return res; }) .exceptionally(t -> { if (t.getCause() instanceof NonCompleteException) return true; LOG.log(Level.FINE, t, t::getMessage); - conn.getStream().thenApply(s -> s.close()); + conn.getStream().thenApply(s -> s.getConnection().close()); return true; }); }) @@ -358,7 +364,7 @@ private CompletableFuture putValue(Multihash publisher, .putValue(publisher, signedRecord); } catch (Exception e) {} finally { if (conn != null) - conn.getStream().thenApply(s -> s.close()); + conn.getStream().thenApply(s -> s.getConnection().close()); } return CompletableFuture.completedFuture(false); } @@ -483,7 +489,7 @@ private CompletableFuture> getValueFromPeer(PeerAddresses pe return CompletableFuture.completedFuture(Optional.empty()); } finally { if (conn != null) - conn.getStream().thenApply(s -> s.close()); + conn.getStream().thenApply(s -> s.getConnection().close()); } } public List resolveValue(Multihash publisher, int minResults, Host us) {