Gossip Peer Sampling Service

Introduction

A Gossip Peer Sampling Service (GPSS) is a powerful tool for enforcing a random, dynamic overlay network and returning peers within that topology for communication. Such a service has applications in a variety of large-scale distributed tasks, from decentralized data bases to distributed training of machine learning models.

This multi-part series will guide you through the process of building a GPSS using gRPC, C++, and Python. By the end of this series, you’ll not only understand the intricacies of the service but also be able to leverage the accompanying open-source library to simulate results and integrate it into your own projects.

If you discover something that could be improved or have a cool idea to share, contributions are highly encouraged!

If you want to skip this and check out the code, head to Gossip Sampling.

Why Build My Own?

While researching existing GPSS implementations, I found plenty of blog posts and projects. However, in a quick search, I couldn’t find a general-purpose GPSS library that fully met my needs. This project provided me with an opportunity to design a library that not only supports my specific requirements but also allows me to test some additional algorithms I’ve been meaning to explore.

Moreover, building this library gave me hands-on experience with gRPC, a technology I plan to use in a larger upcoming project. It’s a chance to lay the groundwork for future endeavors while also creating something reusable and potentially useful for others in the community.

The Theory

The implementation we’ll work toward is inspired by the paper “Gossip-based Peer Sampling”. I highly recommend reading the paper for a deeper understanding, but don’t worry—this series will break down the key concepts into manageable steps.

In our GPSS there are several core concepts:

  • Node Descriptor – Describes a node in the network.
  • View – A list of Node Descriptors that we know.
  • Peer Selector – Returns a peer in the overlay network to a subscriber.
  • Gossip Protocol: Maintains the overlay network which we are sampling peers from.
  • Peer Selection Service – Aggregates all the above. Handles entering and exiting the overlay, running the Gossip Protocol, providing access to the View, and hands out Peer Selectors to the services subscribers.

In a GPSS we periodically update our View using a Gossip Protocol. This protocol is designed in such a way to be resilient to churn in the network, provide fair load incursion on each node, and provide accurate and current peers in the network to subscribers of the GPSS.

With these goals in mind lets take a look at some sudo code for the Gossip Protocol and the hyper-parameters that dictate trade-offs between our goals.

Client Thread:
def client_thread():
    while(True):
        wait(wait_time) # We block our thread for a preset amount of time
        peer = view.select_peer() # This returns a live node from the view
        if push:
            send(peer, view.send_peers()) # We send peers from our view to the slected peer
        if pull:
            new_peers = receive(peer)
            view.receive_peers(new_peers) # We incorporate the recieved peers into our view
        view.increment_age() # Increment the age of all nodes in our view
  • We must push and then pull to make sure that we do not cross contaminate information from the selected peer.
Server Thread:
def server_thread():
    while(True):
        wait(new_peers = receive(peer)) # Wait until we recieve peers from a peer 
        if pull:
            send(peer, view.send_peers()) # We send peers from our view to the selected peer
        view.receive_peers(new_peers) # We incorporate the recieved peers into our view
        view.increment_age() # Increment the age of all nodes in our view
  • For the same reason as above, once we receive new peers, we first send our views peers before processing the new peers.
view.send_peers():
def send_peers():
    peers = []
    peers.append(NodeDescriptor(Self, 0)) # Add ourselves with age 0
    view.permute() # Shuffle the view
    view.move_oldest_to_back(healing) # move the oldest 'healing' nodes to the back, 
    num_send = (size / 2) - 1
    peers.append(view.head(num_send)) # Add (size / 2) - 1 first nodes in view
    return peers
  • The above method sends a random sample of the peers in our view ignoring the oldest healing elements. If there are not enough peers in the view than some of the oldest will also be sent.
view.receive_peers(peers):
def receive_peers(peers):
    view.append(peers) # Add new peers to back of view
    view.remove_duplicates() # remove duplicates, keeping youngest age
    # Prune view with hyper-parameters, but never below the size hyper-parameter
    view.remove_oldest(min(healing, view.size - size)) # Get rid of ones that may have left network
    view.remove_head(min(swap, view.size - size)) # Get rid of ones we just talked to
    view.remove_random(view.size - size) # Just get down to size
  • This method incorporates new peers into our current view, ensuring that the number of nodes in our view never decreases below size.
  • Because new peers are added to the back of the view, swap is correlated with the number of new nodes that will be kept. The higher swap the more old nodes will (located at the front) will be removed. This intern increases the likelihood that new peers will be kept in the remove_random() step.
Hyper-Parameters and Design Space:

Size:

  • This is the maximum number of Node Descriptors allowed in the view at the end of a recieve_peers() operation.
  • The average in-degree which is representative of average load on a node in the network is always equal to the size.

Healing:

  • All values of healing > size / 2 are equivalent to healing = size / 2
  • A higher healing value leads to better robustness against failures and churn in the network.
  • A high healing and low swap causes the load on a single node to change randomly

Swap:

  • All values of swap > (size / 2) - healing are equivalent to swap = (size / 2) - healing
  • swap is given its name because the receive_peers() is run on both client and server and is correlated with how many new nodes are kept on both. ie. how many nodes they will swap with each other.
  • A higher swap value leads to more equal load incursion across the network.
  • A high swap and low heal can cause the load on a single node to change slowly. This can be bad if a single node randomly incurs a large load.

Push:

  • If we will push our own data unprompted to another peer in the network
  • Push only protocols cause poor load distribution in scenarios where the network grows overtime. Causing very high load that does not dissipate at the entry server.

Pull:

  • If we will pull other peers data
  • Pull only protocol never insert there local address into the network and thus will only gain information through our own active thread.

PushPull:

  • PushPull protocols quickly distribute load throughout the network in scenarios where the network grows overtime.

Peer Selection:

  • Head
    • This does not work as it only talks to peers that we recently talked to, leading to little diversity in updates.
  • Tail
    • This Selects the peer with the highest age.
      • This helps us have more diverse information sharing as the node has not talked to this peer in a long time compared to other nodes in its’ view.
  • Random
    • Uniform Random With vs With Out Replacement:
      • For more diverse range of peers, some method of ensuring that once a peer is selected it is not chosen again can be added. (ie. selection without replacement)
  • It is worth noting that there is design space outside of these options. This become obvious in application specific scenarios.

The Service

Now with an understanding of a Gossip Protocol, the data structures it encapsulates, and the general design space around it, lets put it all together in a usable service.

We will construct our Sampling Service with five basic methods:

  1. enter() – A start up procedure that could be different from the normal Gossip Protocol that initializes the state of our view.
  2. exit() – The opposite of enter(), notifies the network that we are leaving. This is not strictly needed and sometimes actively not wanted, but useful if you want some sort of tracking of who is in the network.
  3. start() – Starts the client and server threads for the Gossip Protocol.
  4. stop() – Stops the client and server threads for the Gossip Protocol.
  5. subscribe() – Returns a Peer Selector object of the chosen type. This peer selector object can use a variety of selection methods talked about above. Additionally, it allows for multiple separate instances to sample peers from the Gossip Network simultaneously without be constrained to the same sampling method or sampling pool.

Additionally, depending on our bootstrap servers (entry servers) many different types of enter() and exit() may be wanted. So lets make a base class that encapsulates this logic called PeerSamplingService.

PeerSamplingService(bool push, bool pull, unsigned int wait_time, unsigned int timeout,
                    std::vector<std::string> entry_points, std::shared_ptr<View> view);

The view initialization is separated because it makes everything easier to code at this point. Some additional real-world parameters are added as well such as a communication timeout, and blocking time for the client thread after each interaction.

Finally, lets make a Manager class so that all lifetimes and memory allocations are kept together making the final interface easier to use.

Python:

class PSSManager:
    def __init__(self, address: str, push: bool, pull: bool,
                 wait_time: int, timeout: int,
                 entry_points: list[str],
                 pss_type: _gossip.PeerSamplingService,
                 view_type: _gossip.View, 
                 selector_type: _gossip.SelectorType, **view_kargs):
        self.view = view_type(address=address, **view_kargs)
        self.view.init_selector(type=selector_type)
        self.pss = pss_type(push, pull, wait_time, timeout, entry_points, self.view)

    def subscribe(self, type: _gossip.SelectorType, log: _gossip.TSLog=None) -> _gossip.PeerSelector:
        return self.view.create_subscriber(type, log)
    
    def enter(self) -> bool:
        return self.pss.enter()
    
    def exit(self) -> bool:
        return self.pss.exit()
    
    def start(self) -> None:
        self.pss.start()

    def stop(self) -> None:
        self.pss.stop()

    def entered(self) -> bool:
        return self.pss.entered()
    
    def push(self) -> bool:
        return self.pss.push()
    
    def pull(self) -> bool:
        return self.pss.pull()
    
    def wait_time(self) -> int:
        return self.pss.wait_time()
    
    def timeout(self) -> int:
        return self.pss.timeout()
    
    def view(self) -> _gossip.View:
        return self.view
    
    def service(self) -> _gossip.PeerSamplingService:
        return self.pss

C++:

template <typename PssType, typename ViewType, typename... ViewArgs>
class PSSManager {
    public:
        PSSManager(std::string address, bool push, bool pull, 
                   unsigned int wait_time, unsigned int timeout,
                   std::vector<std::string> entry_points,
                   SelectorType type, ViewArgs... view_args) 
                   : _view(std::make_shared<ViewType>(address, std::forward<ViewArgs>(view_args)...)),
                   _service(std::make_shared<PssType>(push, pull, wait_time, timeout, entry_points, _view)) {
            _view->init_selector(type);
        }

        std::shared_ptr<View::PeerSelector> subscribe(SelectorType type, std::shared_ptr<TSLog> log=nullptr) { return _view->create_subscriber(type, log); }

        bool enter() { return _service->enter(); }
        bool exit() { return _service->exit(); }

        void start() { _service->start(); }
        void stop() { _service->stop(); }

        friend std::ostream& operator<<(std::ostream& os, const PSSManager<PssType, ViewType, ViewArgs...>& obj) {
            os << *obj._service;
            return os;
        }

        bool entered() const { return _service->entered(); }
        bool push() const { return _service->push(); }
        bool pull() const { return _service->pull(); }
        unsigned int wait_time() const { return _service->wait_time(); }
        unsigned int timeout() const { return _service->timeout(); }
        std::shared_ptr<View> view() { return _view; } // Don't shoot yourself in the foot
        std::shared_ptr<PeerSamplingService> service() { return _service; }


    private:
        std::shared_ptr<View> _view;
        std::shared_ptr<PeerSamplingService> _service;
};

Here is a real code example from both the Python and C++ front-ends that put it all together:

Python:

# Create Peer Sampling Service Manager Object
service = gossip.PSSManager(address=address, push=push, pull=pull, 
                            wait_time=client_thread_sleep_time, timeout=request_timeout,
                            entry_points=entry_points,
                            pss_type=gossip.PeerSamplingService,
                            view_type=gossip.URView,
                            selector_type=gossip.SelectorType.TAIL,
                            size=view_size, healing=view_healing, swap=view_swap)

# Enter the Overlay Network
service.enter()

# Start the Server and Client Threads for the gossip protocol
service.start()

# Create subscriptions to be able to sample peers from the overlay network
subscriber = service.subscribe(gossip.SelectorType.TAIL)

# Do work with the new peer selection subscription
while(True):
    peer = subscriber.select_peer()
    if peer:
        print(f'Client subscriber ({address}): {peer}')
    else:
        print(f'Client subscriber ({address}): View is empty')

    time.sleep(client_thread_sleep_time/2)

C++:

using namespace gossip;

// Create Peer Sampling Service Manager Object
PSSManager<PeerSamplingService, URView, int, int, int> service(address, push, pull,                  client_thread_sleep_time, request_timeout, entry_points, SelectorType::TAIL, view_size, view_healing, view_swap);

// Enter the Overlay Network
service.enter();

// Start the Server and Client Threads for the gossip protocol
service.start();

// Create subscriptions to be able to sample peers from the overlay network
auto subscriber = service.subscribe(SelectorType::UNIFORM_RANDOM_NO_REPLACEMENT);

// Do work with the new peer selection subscription
while (true) {
  auto peer = subscriber->select_peer();
  if (peer) {
      std::cout << "Client subscriber (" << address << "): " << *peer << std::endl;
  }
  else {
      std::cout << "Client subscriber (" << address << "): View is empty" << std::endl;
  }
  std::this_thread::sleep_for(std::chrono::seconds(client_thread_sleep_time/2));
}

In the next posts we will go into the details of this project. Things like implementing an async gRPC service using their callback api, building the view logic in C++, porting and packaging the back-end C++ into a Python library, and adding simulation code in the Python package.

Leave a Reply