Examples

For all the examples here, it is assumed that the server is ran with

sudo python3.7 -m crh_botnet.server -p 80

and that it is addressable by choate-robotics-rpi-01.local. Please adjust as necessary.

Say Hi To Another Robot

This is probably the most straightforward example.

from crh_botnet import *

robot = Robot()
robot.network.set_server_address('choate-robotics-rpi-01.local')


def on_message(message: Message):
    print("Robot", message.sender, "said", message.content)
    
    if message.content == 'Hi':  # avoid infinite looping
        robot.network.send('Hi to you too', message.sender)

def setup():
    robot.network.broadcast('Hi')


robot.run(globals())

When this robot is connected, it will send a message to all other connected robots, who will send a simple “Hi to you too” back. Note that this program does not define a loop() function.

Simple LED Cycler

This is a simple implementation that two robots alternate turning on LEDs, 0.5 seconds each.

from crh_botnet import *
from gpiozero import LED

# This program works if and only if 2 robots are connected

robot = Robot()
robot.network.set_server_address('choate-robotics-rpi-01.local')

led = LED(17)


def setup():
    robot.network.broadcast("next")


async def on_message(message):
    led.on()
    await sleep(0.5)
    await robot.network.coro.broadcast('next')
    led.off()


robot.run(globals())

In order to achieve the delay, on_message() is defined to be async. Also, the coroutine version of broadcast is used, which will give a smoother blink on slow network (although pretty much no difference on fast network).

Complex LED Cycler

This implementation does the same as above, except that it supports multiple robots.

from crh_botnet import *
from gpiozero import LED

robot = Robot()
robot.network.set_server_address('choate-robotics-rpi-01.local')

led = LED(17)


def setup():
    robot.network.broadcast("Hi")


async def on_message(message):
    connected_robots = sorted(robot.network.connected_robots)
    
    if message.content == 'next':
        led.on()
        await sleep(0.5)
        connected_robots = sorted(robot.network.connected_robots)
        # re-fetch because connected robots may change over the 0.5 seconds
        
        loc = connected_robots.index(robot.id)
        if loc == len(connected_robots) - 1:
            recipient = connected_robots[0]
        else:
            recipient = connected_robots[loc + 1]
        await robot.network.coro.send('next', recipient)
        led.off()
    elif message.content == 'Hi':
        if len(connected_robots) == 2:
            robot.network.send('next', connected_robots[0])
            # It doesn't matter which robot gets the message, as long as one
            # of the robot starts the chain, so the first one on the list is used.


robot.run(globals())

The reason for the re-fetch at line 20 is that the robot will keep updating the list of connected robots during a sleep, and it may change during that 0.5 seconds if another robot joined the network.

Simple Remotely Controlled Robot

This example implements a simple control system with two Raspberry Pi’s. One RPi is mounted on a robot, connected to motors with an H-Bridge (the robot), while the other is connected to only a button (the controller).

The code on the controller is pretty simple

from crh_botnet import *
from gpiozero import Button
from functools import partial

robot = Robot()
robot.network.set_server_address("choate-robotics-rpi-01.local")


def setup():
    button = Button(15)
    button.when_pressed = partial(robot.network.send, 'on', 0)
    button.when_released = partial(robot.network.send, 'off', 0)


robot.run(globals())

There is nothing fancy in this code. The usage of partial() is to wrap it into a callable, as required by when_pressed and when_released attributes. It is the equivalent of

def send_on_wrapper():
    robot.network.send('on',0)

def send_off_wrapper():
    robot.network.send('off',0)

button.when_pressed = send_on_wrapper
button.when_released = send_off_wrapper

This code assumes the ID of the robot is 0. In the case that the ID of the robot is unknown, you should consider using broadcast().

The code on the robot side looks slightly more complicated, but the most of it is initializing the H-Bridge. The rest of them should be pretty self-explanatory.

from crh_botnet import *
import gpiozero

robot = Robot()
robot.network.set_server_address('choate-robotics-rpi-01.local')

motor_left = gpiozero.PWMOutputDevice(17)
motor_right = gpiozero.PWMOutputDevice(18)

H_left_1 = gpiozero.DigitalOutputDevice(22)
H_left_2 = gpiozero.DigitalOutputDevice(23)

H_right_1 = gpiozero.DigitalOutputDevice(26)
H_right_2 = gpiozero.DigitalOutputDevice(20)


def setup():
    H_left_1.on()
    H_right_1.on()


def on_message(message):
    if message == 'on':
        motor_left.value = 0.75
        motor_right.value = 0.75
    elif message == 'off':
        motor_left.value = 0
        motor_right.value = 0


robot.run(globals())

HBridge Drive

Added in version 0.2.0

A simple code for using the HBridgeDrive class. Offline is set to True here because this code does not involve the use of network and it’s not necessary to connect to the network. The reverse_left and reverse_right keyword arguments are purely dependent on your wiring only and you don’t have to set any of them.

from crh_botnet import *
from crh_botnet.drive import HBridgeDrive

robot = Robot()
driver = HBridgeDrive(20, 26, 18, 22, 23, 17, reverse_left=True)


def loop():
    driver.drive(1, 1)


robot.run(globals(), offline=True)

An alternative code (which makes it look a lot more elegant and slightly more confusing) is

from crh_botnet import *
from crh_botnet.drive import HBridgeDrive

robot = Robot()
robot.drive = HBridgeDrive(20, 26, 18, 22, 23, 17, reverse_left=True)


def loop():
    robot.drive(1, 1)


robot.run(globals(), offline=True)

It works because the __call__() function is just a convenience wrapper around the drive() function.