Skip to content

Commit b7c6a7b

Browse files
committed
Improve handling of DHT messages
1 parent 7bc2a87 commit b7c6a7b

6 files changed

Lines changed: 443 additions & 127 deletions

File tree

Lines changed: 87 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
package com.lbry.globe.thread;
22

3-
import com.dampcake.bencode.Bencode;
4-
import com.dampcake.bencode.Type;
53
import com.lbry.globe.api.API;
64
import com.lbry.globe.object.Node;
75
import com.lbry.globe.object.Service;
6+
import com.lbry.globe.util.DHT;
87
import com.lbry.globe.util.GeoIP;
8+
import com.lbry.globe.util.UDP;
99

1010
import java.io.IOException;
1111
import java.net.*;
@@ -17,8 +17,6 @@
1717

1818
public class DHTNodeFinderThread implements Runnable{
1919

20-
private static final Bencode BENCODE = new Bencode();
21-
2220
public static final String[] BOOTSTRAP = {
2321
"dht.lbry.grin.io:4444", // Grin
2422
"dht.lbry.madiator.com:4444", // Madiator
@@ -43,7 +41,7 @@ public class DHTNodeFinderThread implements Runnable{
4341

4442
private final Map<InetSocketAddress,Boolean> pingableDHTs = new ConcurrentHashMap<>();
4543

46-
private final Queue<DatagramPacket> incoming = new ConcurrentLinkedQueue<>();
44+
private final Queue<UDP.Packet> incoming = new ConcurrentLinkedQueue<>();
4745

4846
@Override
4947
public void run(){
@@ -52,15 +50,22 @@ public void run(){
5250
this.pingableDHTs.put(new InetSocketAddress(uri.getHost(),uri.getPort()),true);
5351
}
5452

55-
// Ping Sender
53+
this.startSender();
54+
this.startReceiver();
55+
this.handleIncomingMessages();
56+
}
57+
58+
private void startSender(){
5659
new Thread(() -> {
5760
while(true){
61+
System.out.println("[BULK PING]");
5862
for(InetSocketAddress socketAddress : DHTNodeFinderThread.this.pingableDHTs.keySet()){
5963
String hostname = socketAddress.getHostName();
6064
int port = socketAddress.getPort();
6165
try{
6266
for(InetAddress ip : InetAddress.getAllByName(hostname)){
63-
DHTNodeFinderThread.ping(ip,port);
67+
InetSocketAddress destination = new InetSocketAddress(ip,port);
68+
this.doPing(destination);
6469
}
6570
}catch(Exception e){
6671
e.printStackTrace();
@@ -71,147 +76,102 @@ public void run(){
7176
} catch (InterruptedException e) {
7277
throw new RuntimeException(e);
7378
}
79+
API.saveNodes();
7480
}
7581
}).start();
82+
}
7683

77-
// Receiver
78-
new Thread(() -> {
79-
while(true) {
80-
try {
81-
byte[] buffer = new byte[1024];
82-
DatagramPacket receiverPacket = new DatagramPacket(buffer, buffer.length);
83-
DHTNodeFinderThread.SOCKET.receive(receiverPacket);
84-
DHTNodeFinderThread.this.incoming.add(receiverPacket);
85-
} catch (IOException e) {
86-
e.printStackTrace();
87-
}
88-
}
89-
}).start();
90-
91-
while(true){
92-
93-
//TODO: MARKS AS DELETED
94-
95-
while(this.incoming.peek()!=null){
96-
DatagramPacket receiverPacket = this.incoming.poll();
97-
byte[] receivingBytes = receiverPacket.getData();
98-
Map<String, Object> receivingDictionary = DHTNodeFinderThread.decodePacket(receivingBytes);
99-
if(receivingDictionary.get("0").equals(1L)){
100-
if(receivingDictionary.get("3").equals("pong")){
101-
try{
102-
DHTNodeFinderThread.findNode(receiverPacket.getAddress(),receiverPacket.getPort());
103-
}catch(Exception e){
104-
e.printStackTrace();
105-
}
84+
private void doPing(InetSocketAddress destination) throws IOException{
85+
DHT.ping(DHTNodeFinderThread.SOCKET,destination).thenAccept((UDP.Packet packet) -> {
86+
byte[] receivingBytes = packet.getData();
87+
DHT.Message<?> message = DHT.Message.fromBencode(receivingBytes);
88+
System.out.println(" - [Ping Response] "+message);
10689

107-
//TODO Improve updating pinged nodes.
108-
System.out.println("PONG: "+receiverPacket.getSocketAddress());
90+
try{
91+
this.doFindNode(packet.getAddress());
92+
}catch(Exception e){
93+
e.printStackTrace();
94+
}
10995

110-
Node existingNode = API.NODES.get(receiverPacket.getAddress());
111-
if(existingNode==null){
112-
JSONObject geoData = GeoIP.getCachedGeoIPInformation(receiverPacket.getAddress());
113-
Double[] coords = GeoIP.getCoordinateFromLocation((geoData!=null && geoData.has("loc"))?geoData.getString("loc"):null);
114-
existingNode = new Node(receiverPacket.getAddress(),coords[0],coords[1]);
115-
API.NODES.put(receiverPacket.getAddress(),existingNode);
116-
}
117-
Service dhtService = null;
118-
for(Service s : existingNode.getServices()){
119-
if(s.getPort()==receiverPacket.getPort() && "dht".equals(s.getType())){
120-
dhtService = s;
121-
break;
122-
}
123-
}
96+
//TODO Improve updating pinged nodes.
12497

125-
if(dhtService==null){
126-
existingNode.getServices().add(new Service(UUID.randomUUID(),receiverPacket.getPort(),"dht"));
127-
}else{
128-
dhtService.updateLastSeen();
129-
}
130-
}else{
131-
//TODO Save connections too
132-
List<List<Object>> nodes = (List<List<Object>>) receivingDictionary.get("3");
133-
for(List<Object> n : nodes){
134-
String hostname = (String) n.get(1);
135-
int port = (int) ((long) n.get(2));
136-
InetSocketAddress existingSocketAddr = null;
137-
for(InetSocketAddress addr : this.pingableDHTs.keySet()){
138-
if(addr.getHostName().equals(hostname) && addr.getPort()==port){
139-
existingSocketAddr = addr;
140-
}
141-
}
142-
if(existingSocketAddr==null){
143-
this.pingableDHTs.put(new InetSocketAddress(hostname,port),false);
144-
}
145-
}
146-
}
98+
Node existingNode = API.NODES.get(packet.getAddress().getAddress());
99+
if(existingNode==null){
100+
JSONObject geoData = GeoIP.getCachedGeoIPInformation(packet.getDatagramPacket().getAddress());
101+
Double[] coords = GeoIP.getCoordinateFromLocation((geoData!=null && geoData.has("loc"))?geoData.getString("loc"):null);
102+
existingNode = new Node(packet.getDatagramPacket().getAddress(),coords[0],coords[1]);
103+
API.NODES.put(packet.getDatagramPacket().getAddress(),existingNode);
104+
}
105+
Service dhtService = null;
106+
for(Service s : existingNode.getServices()){
107+
if(s.getPort()==packet.getDatagramPacket().getPort() && "dht".equals(s.getType())){
108+
dhtService = s;
109+
break;
147110
}
148111
}
149112

150-
API.saveNodes();
151-
//TODO: REMOVE MARKED AS DELETED
152-
153-
System.out.println("----");
154-
try {
155-
Thread.sleep(1_000);
156-
} catch (InterruptedException e) {
157-
throw new RuntimeException(e);
113+
if(dhtService==null){
114+
existingNode.getServices().add(new Service(UUID.randomUUID(),packet.getDatagramPacket().getPort(),"dht"));
115+
}else{
116+
dhtService.updateLastSeen();
158117
}
159-
}
118+
}).exceptionally((Throwable e) -> null);
160119
}
161120

162-
private static void ping(InetAddress ip,int port) throws IOException{
163-
byte[] rpcID = new byte[20];
164-
new Random().nextBytes(rpcID);
165-
166-
Map<String,Object> ping = new HashMap<>();
167-
ping.put("0",0);
168-
ping.put("1",rpcID);
169-
ping.put("2",new byte[48]);
170-
ping.put("3","ping");
171-
ping.put("4",Collections.singletonList(Collections.singletonMap("protocolVersion",1)));
172-
byte[] pingBytes = DHTNodeFinderThread.encodePacket(ping);
173-
174-
DatagramPacket sendingDiagram = new DatagramPacket(pingBytes,pingBytes.length,ip,port);
175-
DHTNodeFinderThread.SOCKET.send(sendingDiagram);
121+
private void doFindNode(InetSocketAddress destination) throws IOException{
122+
DHT.findNode(DHTNodeFinderThread.SOCKET,destination).thenAccept((UDP.Packet packet) -> {
123+
byte[] receivingBytes = packet.getData();
124+
DHT.Message<?> message = DHT.Message.fromBencode(receivingBytes);
125+
System.out.println(" - [FindNode Response] "+message);
126+
127+
List<List<Object>> nodes = (List<List<Object>>) message.getPayload();
128+
for(List<Object> n : nodes){
129+
String hostname = (String) n.get(1);
130+
int port = (int) ((long) n.get(2));
131+
InetSocketAddress existingSocketAddr = null;
132+
for(InetSocketAddress addr : this.pingableDHTs.keySet()){
133+
if(addr.getHostName().equals(hostname) && addr.getPort()==port){
134+
existingSocketAddr = addr;
135+
}
136+
}
137+
if(existingSocketAddr==null){
138+
this.pingableDHTs.put(new InetSocketAddress(hostname,port),false);
139+
}
140+
}
141+
}).exceptionally((Throwable e) -> null);
176142
}
177143

178-
private static void findNode(InetAddress ip,int port) throws IOException{
179-
byte[] rpcID = new byte[20];
180-
new Random().nextBytes(rpcID);
144+
private void startReceiver(){
145+
new Thread(() -> {
146+
while(true) {
147+
try {
148+
UDP.Packet receiverPacket = UDP.receive(DHTNodeFinderThread.SOCKET);
149+
DHTNodeFinderThread.this.incoming.add(receiverPacket);
181150

182-
Map<String,Object> findNode = new HashMap<>();
183-
findNode.put("0",0);
184-
findNode.put("1",rpcID);
185-
findNode.put("2",new byte[48]);
186-
findNode.put("3","findNode");
187-
findNode.put("4",Arrays.asList(new byte[48],Collections.singletonMap("protocolVersion",1)));
188-
byte[] findNodeBytes = DHTNodeFinderThread.encodePacket(findNode);
151+
byte[] receivingBytes = receiverPacket.getData();
189152

190-
DatagramPacket sendingDiagram = new DatagramPacket(findNodeBytes,findNodeBytes.length,ip,port);
191-
DHTNodeFinderThread.SOCKET.send(sendingDiagram);
153+
DHT.Message<?> message = DHT.Message.fromBencode(receivingBytes);
154+
DHT.RPCID rpcid = new DHT.RPCID(message);
155+
DHT.getFutureManager().finishFuture(rpcid,receiverPacket);
156+
} catch (IOException e) {
157+
e.printStackTrace();
158+
}
159+
}
160+
}).start();
192161
}
193162

194-
private static byte[] encodePacket(Map<String,Object> map){
195-
return DHTNodeFinderThread.BENCODE.encode(map);
196-
}
163+
private void handleIncomingMessages(){
164+
while(DHTNodeFinderThread.SOCKET.isBound()){
165+
while(this.incoming.peek()!=null){
166+
UDP.Packet receiverPacket = this.incoming.poll();
167+
byte[] receivingBytes = receiverPacket.getData();
197168

198-
private static Map<String,Object> decodePacket(byte[] bytes){
199-
// Fix invalid B-encoding
200-
if(bytes[0]=='d'){
201-
bytes[0] = 'l';
202-
}
203-
List<Object> list = DHTNodeFinderThread.BENCODE.decode(bytes,Type.LIST);
204-
for(int i=0;i<list.size();i++){
205-
if(i%2==0){
206-
list.set(i,String.valueOf(list.get(i)));
169+
DHT.Message<?> message = DHT.Message.fromBencode(receivingBytes);
170+
if(message.getType()==DHT.Message.TYPE_REQUEST){
171+
System.out.println("Incoming request");
172+
}
207173
}
208174
}
209-
bytes = DHTNodeFinderThread.BENCODE.encode(list);
210-
if(bytes[0]=='l'){
211-
bytes[0] = 'd';
212-
}
213-
// Normal B-decoding
214-
return DHTNodeFinderThread.BENCODE.decode(bytes,Type.DICTIONARY);
215175
}
216176

217177
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package com.lbry.globe.util;
2+
3+
import com.dampcake.bencode.Bencode;
4+
import com.dampcake.bencode.Type;
5+
6+
import java.nio.ByteBuffer;
7+
import java.util.List;
8+
import java.util.Map;
9+
10+
public class BencodeConverter{
11+
12+
private static final Bencode BENCODE = new Bencode(true);
13+
14+
public static byte[] encode(Map<String,?> map){
15+
return BencodeConverter.BENCODE.encode(map);
16+
}
17+
18+
public static Map<String,?> decode(byte[] bytes){
19+
// Fix invalid B-encoding
20+
if(bytes[0]=='d'){
21+
bytes[0] = 'l';
22+
}
23+
List<Object> list = BencodeConverter.BENCODE.decode(bytes,Type.LIST);
24+
for(int i=0;i<list.size();i++){
25+
if(i%2==0){
26+
list.set(i,String.valueOf(list.get(i)));
27+
}
28+
}
29+
bytes = BencodeConverter.BENCODE.encode(list);
30+
if(bytes[0]=='l'){
31+
bytes[0] = 'd';
32+
}
33+
34+
// Normal B-decoding
35+
return BencodeConverter.BENCODE.decode(bytes,Type.DICTIONARY);
36+
}
37+
38+
public static <V> V walkAndConvertByteBufferToByteArrayOrString(Object value){
39+
if(value instanceof ByteBuffer){
40+
ByteBuffer bb = (ByteBuffer) value;
41+
byte[] ba = bb.array();
42+
boolean hasControlOrNonASCII = false;
43+
for(byte b : ba){
44+
int bv = b & 0xFF;
45+
if(bv<0x20 || bv>=0x7F){
46+
hasControlOrNonASCII = true;
47+
break;
48+
}
49+
}
50+
if(hasControlOrNonASCII){
51+
return (V) ba;
52+
}
53+
return (V) new String(ba);
54+
}
55+
if(value instanceof List){
56+
List<Object> l = (List<Object>) value;
57+
l.replaceAll(BencodeConverter::walkAndConvertByteBufferToByteArrayOrString);
58+
}
59+
if(value instanceof Map){
60+
Map<Object,Object> m = (Map<Object,Object>) value;
61+
m.replaceAll((k,v) -> BencodeConverter.walkAndConvertByteBufferToByteArrayOrString(v));
62+
}
63+
return (V) value;
64+
}
65+
66+
}

0 commit comments

Comments
 (0)