Gnutella today is a pretty complex protocol, if you want to write a really full featured program. But if you want a simple servent, which just can connect and search, you can actually write it quite quickly.
Here I’ll document my own implementation to guide you through creating a Gnutella servent yourself.
I use Python 3 for implementing it, because it’s really nice to use, and this makes the servent future proof.
The first part for that is basic design.
A full featured Gnutella servent has to perform a number of different actions. We will only do the ones written in bold.
We want our simple servent to be easy to use for Python programmers, so it’s API should be similar to
contacts = gnutella.Contacts()
tagnet = gnutella.TagNet()
tagnet.connect()
search = tagnet.search("search string")
# search = {results: [], status: []} — or so
But lets start with the basic internal design.
Contacts contains all the code for maintaining the other servents we connect to. It doesn’t do anything with these but checking if we have enough of them and making sure that when we add one too many, the best ones are being kept.
If we have too few contacts (=0), it can try to get new contacts, for example via UDPHostCatchers and GnutellaWebCaches for new contacts.
The most important feature is the sorted list of contacts. To make that efficient, every contact gets a value with which we can let python do the sorting.
contact_list.append((value, address))
…
contact_list.sort()
contact_list = contact_list[:max_len]
The value gets added when the contact gets added to the list. When we need contacts for connecting, we pop them from the list.
Contacts are being saved on quit. If the servent dies, we don’t want the active contacts to die, so we need to save the active contacts from time to time, and add them to the contacts on startup.
With this we have the design of the basic contact management, so we get on towards the Leaf connection to the TagNet.
As Leaf we don’t forward queries. As first step we only connect to a few ultrapeers and keep the connections, so we can send queries and read the results.
We need to be able to connect to an Ultrapeer via TCP and start reading and writing from/to the TCP stream.
Every connection should run in its own process (via multiprocessing).
That process needs to be able to keep the connection, parse the received data, hand out the parsed data and send data it is given. On the long run it will have to be able to check the quality of the connection to handle disconnecting.
Also we need to manage all the connections and aggregate the results from them. When we have enough results for a specific search we need to tell all connections to stop searching for that information. So searches (understanding returned data from the connections) has to be done in a central place.
So we get
Also the methods for tagnet
and
TagNet can also extract query results from connection data (via the search ID) and get new contacts from returned data.
And that’s it. We have the very basic design. Now we’ll start with actually implementing our servent.
The first part in implementing our servent is setting up our test environment. For that we locally start an existing Gnutella servent like Phex or gtk-gnutella. We use these two, because both offer good configurability and don’t shield their users from the goary details of Gnutella.
We have to setup the servent to run in Ultrapeer mode, so we can connect as leaf.
With this setup getting the contacts for connecting to is very simple: We just return one single static source.
contacts.list = [(0, (127.0.0.1, 9846))]
con = contacts.list.pop()
In the first and easiest version, the TagNet just gets one contact and starts a connection with it.
In the first version the TagConnection just connects to a contact and then streams out the compressed datastream until the connection gets closed (because we don’t answer pings).
The first part here is handshaking.
→ http://wiki.limewire.org/index.php?title=Handshaking
We might have to lie a bit for getting a connection, since we don’t yet actually support all features which are required in current Gnutella 0.6. See the mandatory parts in http://wiki.limewire.org/index.php?title=Known_Gnutella_Connection_Headers
Limewire requires the X-Query-Routing, gtk-gnutella requires Accept-Encoding.
The following first part of the handshake works with Phex.
The \r just adds a \<cr> before the linebreak, so we get the \<cr>\<lf> which Gnutella requires.
from socket import socket, AF_INET, SOCK_STREAM
sock = socket(AF_INET, SOCK_STREAM)
sock.connect(("127.0.0.1", 9846))
sock.send(b"""GNUTELLA CONNECT/0.6\r
User-Agent: PyGNet/0.0\r
X-Ultrapeer: False\r
\r
""")
reply = sock.recv(10000)
for line in reply.splitlines():
print(line)
The reply we get contains the headers the other one uses. The following is the reply Phex gives:
b'GNUTELLA/0.6 200 OK'
b'User-Agent: Phex 3.4.2.116'
b'GGEP: 0.5'
b'Vendor-Message: 0.1'
b'X-Requeries: false'
b'Pong-Caching: 0.1'
b'Listen-IP: 109.192.87.31:9846'
b'Remote-IP: 127.0.0.1'
b'Accept-Encoding: deflate'
b'X-Ultrapeer: true'
b'X-Query-Routing: 0.1'
b'X-Ultrapeer-Query-Routing: 0.1'
b'X-Dynamic-Querying: 0.1'
b'X-Degree: 99'
b'X-Max-TTL: 4'
b''
So now we only need to send a confirmation, that the headers by the server are OK with us.
if reply.startswith(b"GNUTELLA/0.6 200 OK"):
sock.send(b"""GNUTELLA/0.6 200 OK\r
\r
""")
With this, we are connected, and the other one will start sending us messages. Since we won’t answer quite yet, we’ll quickly be disconnected. But let’s get the data it sends us first.
print(sock.recv(10000))
It will be binary cruft, since Gnutella is a binary protocol. But it shows us, that we’re connected now. The other one considers us an equal — right up until it realizes, that we don’t know how to answer :)
An example of that cruft is
b'\xd1\xdf\x15\xdeM\xd0/\xe8\xff\x0b\x19\xedf\xbb4\x00\x00\x07\x00\x00\x00\x00\x00
I encapsuled this stuff in the TagConnection class and its handshake method.
class TagConnection():
"""Connect to one single contact in the tag net and manage that connection. Send and receive structured data, and send queries."""
def __init__(self, contact):
from socket import socket, AF_INET, SOCK_STREAM
self.socket = socket(AF_INET, SOCK_STREAM)
self.socket.connect(contact)
self.handshake()
def handshake(self):
"""Connect to the contact (handshake)"""
self.socket.send(b"""GNUTELLA CONNECT/0.6\r\nUser-Agent: PyGNet/0.0\r\nX-Ultrapeer: False\r\n\r\n""")
reply = self.socket.recv(10000)
for line in reply.splitlines():
print(line)
if reply.startswith(b"GNUTELLA/0.6 200 OK"):
self.socket.send(b"""GNUTELLA/0.6 200 OK\r\n\r\n""")
print(self.socket.recv(10000))
from time import sleep
sleep(10)
What we can do now is starting a data exchange with another servent and getting some information.
So the next logical step is parsing that data to actually understand what we get told.
For this the standard message architecture is requred reading: http://wiki.limewire.org/index.php?title=Standard_Message_Architecture
It’s an optimized binary format, and once our parser can speak it, we can start to actually work with Gnutella.
The first step is to just parse the data we get.
Th first 23 bytes of the first data we get have to be a message-header. So we just save the data we got as unprocessed and split the first 23 bytes into the different fields.
Let’s assume we got this data (grabbed from a live session with Phex):
unprocessed = b'\xbe,\xcd\xdeZ\xb0\xe1\xa9\xff&\x13|r,\xee\x00\x00\x07\x00\x00\x00\x00\x00'
Now we only need to split the header:
header = unprocessed[:23] # the first 23 bytes
data = {
"msgid": header[:16],
"payload_type": header[16],
"ttl": header[17],
"hops": header[18],
"payload_length": 8**2 * header[21] + 8*header[20] + header[19], # little endian
#"payload": header[22:]
}
print(data)
what we get is something like this:
{
'hops': 0,
'msgid': b'\xbe,\xcd\xdeZ\xb0\xe1\xa9\xff&\x13|r,\xee\x00',
'payload_length': 0,
'payload_type': 0,
'ttl': 7
}
What this tells us is: payload type 0 (0x00) is a ping, so no payload (except maybe a GGEP block). payload length 0 supports that assumption.
The msgid is a random string with 0xff (=255) in byte 8 and 0x00 in byte 15 (=0).
>>> print (data["msgid"][8])
255
>>> print (data["msgid"][15])
0
>>> new_style_msgid = data["msgid"][8] == 255 and data["msgid"][15] == 0
fits.
So now we are connected, and we can start asking questions.
But firstoff, let’s be nice to the other client and answer its ping with a pong.
Our problem is now, that a Pong requires the address where others can reach us, but we don’t yet have any server part of our servent.
But instead of leaving this part completely unfinished, we’ll just fake it and take it up later.
TODO: leave the “hey we just connect to someone and ask” stage and actually turn our client into a real servent. We need to provide some way for others to reach us, and every TagConnection has to know that way, so it can construct messages.
So we need a message contructor which creates a pong message for us.
The structure of a pong message is described in the standard message architecture: http://wiki.limewire.org/index.php?title=Standard_Message_Architecture#Pong_.280x01.29
Sadly additionally to the address the pong also needs the number of shared files (a leftover of the times when we actually trusted others in p2p networks). So we create a shared data object which is independent of any individual connection and call it shared_data.
For a pong it needs the attributes my_address, shared_files and shared_kilobytes, so we start with these.
Also we generally need to turn numbers into a given number of bytes. For that we just use a simple function:
def number_to_bytes(number, amount_of_bytes):
"""Turn a number into a given amount of bytes."""
ints = []
for byte_index in range(amount_of_bytes):
divisor = 2**(8*byte_index)
modulus = 2**(8*(byte_index+1))
i = (number // divisor)%modulus
ints.append(i)
return bytes(ints)
And now, we go on to constructing the pong.
It has a standard header (by now you’ll likely already have thought that we should add a general header method. We use one but that’s redundant to describe).
A Pong is simply our (still faked) port number and our IP (also still faked: 127.0.0.1, so noone tries to connect). Then the number of shared files and shared data. So creating our pong is as simple as this:
pong_payload = number_to_bytes(address[1], 2)
# then the IP
IP = [int(i) for i in address[0].split(".")]
pong_payload += bytes(IP)
# and the shared files and data
pong_payload += number_to_bytes(shared_files, 4)
pong_payload += number_to_bytes(shared_kilobytes, 4)
# we use no GGEP extension, so we’re finished with building the pong payload.
# now we have the payload length, so we can get it and use it to create the header.
pong_payload_length = len(pong_payload)
pong = self.construct_header(None, "pong", ttl, hops, pong_payload_length)
pong += pong_payload
That’s it. Our pong is created. We just have to send it via
self.socket.send(pong)
Up until now this guide only showed you small parts of the bigger picture and talked about the design. The reasons are, that I still work on the structure of the Python program I implement while writing this guide, and that python code is expressive enough that the goary implementation details aren’t really big enough to warrant writing much about them.
The skelleton of the code is a different beast, though. So here’s the basic layout of the code I wrote for this, stripping out all implementition details (not yet threaded):
payload_byte_to_string = {
…
}
payload_int_to_string = {
0: "ping",
1: "pong",
2: "bye",
64: "push",
128: "query",
129: "query hit"
}
payload_string_to_byte = {
…
}
#: Data needed in many places.
shared_data = {
…
}
def number_to_bytes(number, amount_of_bytes):
"""Turn a number into a given amount of bytes."""
…
def generate_msgid(seed = None):
"""Generate a random message ID according to http://wiki.limewire.org/index.php?title=Standard_Message_Architecture#Message_ID"""
…
class Contacts():
"""Manage the contacts for a Gnutella servent."""
def __init__(self):
self.list = …
…
def add(contact, value):
"""Add a contact to the contact list and sorts it according to the value, best one _last_ (so we can pop it)."""
…
class TagConnection():
"""Connect to one single contact in the tag net and manage that connection. Send and receive structured data, and send queries."""
def __init__(self, contact):
…
def handshake(self):
"""Connect to the contact (handshake)"""
…
def process_new_data(self):
"""Process data to split it into messages"""
…
def process_message_header(self, header):
"""Turn the header bytes into a data-dict."""
…
def construct_header(self, msgid, payload_type, ttl, hops, payload_length):
"""construct a header bytestring from the header data."""
…
def send_pong(self, msgid, ttl, hops, address, shared_files, shared_kilobytes):
"""Send a pong to our contact."""
…
def _test():
from doctest import testmod
testmod()
if __name__ == "__main__":
_test()
contacts = Contacts()
contact = contacts.list.pop()[1]
conn = TagConnection(contact)
Looks quite simple, when you look at it this way, doesn’t it?
(the full code is at gnutella.py )
Sure, we’ll get all that threading and/or multiprocessing into the mix for managing multiple contacts, down-and uploads and all the interaction between these, but at the core, the stuff we did until now gives you the basic steps to implement the pure technical networking layer of the Gnutella TagNetwork1.
Now we get to the actually useful part: Searching the network, or more exactly (as our first step): Sending queries to a single local servent and understanding the replies.
A query is a rather simple construct, because it only contains special flags (for starters we fake these), a search criterium (delimited by a null byte) and then a few extensions (which we will ignore for now — see why its easy? :) ).
Just like the pong contruction we’ll first build the payload, then the header.
The query string should be UTF-8 encoded.
So:
def query(self, query_string, ttl=7):
"""Send a query to the contact.
@return: The ID of the query, so we can see which replies fit the query."""
flags = number_to_bytes(256*256 -1, 2) # all on.
enc_query = query_string.encode("UTF-8")
payload = flags + enc_query + b"\x00"
payload_length = len(payload)
msgid = generate_msgid()
header = self._construct_header(msgid, "query", 7, 0, payload_length)
print("# sending query:", query_string, "data:", header + payload)
self.send(header + payload)
return msgid
Now we can watch the returned messages for query hits. Since Ultrapeers in Gnutella have quite some intelligence, as a leaf (our current status: an endpoint) we only get query hits for queries we sent ourselves) we only get query hits for queries we sent.
A query hit has the same message ID as the query to which it is a reply reply.
To read all new incoming messages, we use a simple while loop (I left out automatic ping handling to make the loop easier to read):
# as long as there are at least 23 unprocessed bytes
# (=at least a header)
# we check if there are complete packets.
while self.unprocessed_data[22:]:
header = self.unprocessed_data[:23]
# parse the headers
data = self._process_message_header(header)
# if at least one packet has been transmitted completely,
# we grab it from the stream.
if self.unprocessed_data[23+data["payload_length"]:]:
payload = self.unprocessed_data[23:23+data["payload_length"]]
self.new_packets.append((data,payload))
# remove the packet from the data.
self.unprocessed_data = self.unprocessed_data[23+data["payload_length"]:]
else:
# we got no complete packet,
# so we jump out of the loop.
break
That’s the whole new packet grabber. We now have a lot of packets in self.new_packets, and some of them are query hits (packet[0]["payload_type"] == "query hit").
So we can now search and read the query hits. And here we leave the TagConnection for a while, since it doesn’t really have to know what exactly it found and how to parse the content of packages. A connection only has to split out the packets and disassemble the metadata, so the TagNetwork can easily work with them.
And before you scratch your head and say “wouldn’t it be more efficient to just save the received messages and their headers as bytes and avoid part of the disassemble-reassemble step we’ll have to do when we forward messages?”, I’ll give you the answer directly: Sure it would be more efficient, but it would also be much less transparent, and the real benefit of that only shows once we implement ultrapeer capabilities which require forwarding messages.
If you do your own efficient servent, make sure you built it much more efficient than this one. Until then, bear with my try to make Gnutella easy to understand. I spent years bedazzled by some of the details I write about here (and never dared to actually tackle them, because they looked so complicated), but they really aren’t as hard as they looked to me. If you don’t try to understand Gnutella by implementations tuned for maximum efficiency (try reading the host comparator in Phex for an example of heavily optimized code).
Now we get to the part which turns our simple singlethreaded single-contact-asker into an efficient information gatherer by getting data from several contacts, understanding what they tell us and coordinating their actions.
Here we now need to do some reading again — and then fix some fake flags we used for the search.
But firstoff we now get to the TagNetwork class. The TagNetwork has to manage Threads (or processes) for the different connections.
And even before that, we turn each connection into a thread:
from threading import Thread
…
class TagConnection(Thread):
…
def __init__(self):
…
super().__init__()
def run(self):
…actions of the thread (loop)…
Now we need an explicit kill option, else we can’t get rid of the threads anymore (they don’t get the keyboard interrupt).
We get it via a watcher process (see Recipe 496735: Workaround for missed SIGINT in multithreaded programs (Python)).
And now we need more contacts.
We get them via the phex.hosts file (just perusing the local host catcher of phex).
The network I call TagNetwork is the basic networking layer which historically formed tha whole networking layer of Gnutella. It is massively optimized for fuzzy string-based searching. What it can’t do well is searching for stuff identified by hashes. For these kinds of searches Gnutella recently added a DHT, so I call the original network “TagNetwork” because that’s where it is best: Searching for files which match certain tags. ↩
-- 2012-12-14 21:36:45 --
Created with pyMarkdown Minisite
using the layout from the pybrary.