#!/usr/bin/env python3
example rover for JSON backend using pybullet
based on racecar example from pybullet
import os, inspect, sys
import socket
import struct
import json
import math
from pyrobolearn.simulators import Bullet
import pybullet_data
# use pymavlink for ArduPilot convention transformations
from pymavlink.rotmat import Vector3, Matrix3
from pymavlink.quaternion import Quaternion
from pyrobolearn.utils.transformation import get_rpy_from_quaternion
import pyrobolearn as prl
import time
import argparse
from math import degrees, radians
parser = argparse.ArgumentParser(description="pybullet robot")
parser.add_argument("--vehicle", required=True, choices=['quad', 'racecar', 'iris', 'opendog', 'all'], default='iris', help="vehicle type")
parser.add_argument("--fps", type=float, default=1000.0, help="physics frame rate")
parser.add_argument("--stadium", default=False, action='store_true', help="use stadium for world")
args = parser.parse_args()
RATE_HZ = args.fps
GRAVITY_MSS = 9.80665
# Create simulator
sim = Bullet()
# create world
from pyrobolearn.worlds import BasicWorld
world = BasicWorld(sim)
if args.stadium:
world.floor_id = world.sim.load_sdf(os.path.join(pybullet_data.getDataPath(), "stadium.sdf"), position=[0,0,0])
# setup keyboard interface
interface =
def control_quad(pwm):
'''control quadcopter'''
motor_dir = [ 1, 1, -1, -1 ]
motor_order = [ 0, 1, 2, 3 ]
motors = pwm[0:4]
motor_speed = [ 0 ] * 4
for m in range(len(motors)):
motor_speed[motor_order[m]] = constrain(motors[m] - 1000.0, 0, 1000) * motor_dir[motor_order[m]]
def control_racecar(pwm):
'''control racecar'''
steer_max = 45.0
throttle_max = 200.0
steering = constrain((pwm[0] - 1500.0)/500.0, -1, 1) * math.radians(steer_max) * -1
throttle = constrain((pwm[2] - 1500.0)/500.0, -1, 1) * throttle_max
last_angles = [0.0] * 12
def control_joints(pwm):
'''control a joint based bot'''
global last_angles
max_angle = radians(90)
joint_speed = radians(30)
pwm = pwm[0:len(robot.joints)]
angles = [ constrain((v-1500.0)/500.0, -1, 1) * max_angle for v in pwm ]
current = last_angles
max_change = joint_speed * TIME_STEP
for i in range(len(angles)):
angles[i] = constrain(angles[i], current[i]-max_change, current[i]+max_change)
robot.set_joint_positions(angles, robot.joints)
last_angles = angles
if args.vehicle == 'iris':
from pyrobolearn.robots import Quadcopter
robot = Quadcopter(sim, urdf="models/iris/iris.urdf")
control_pwm = control_quad
elif args.vehicle == 'racecar':
from pyrobolearn.robots import F10Racecar
robot = F10Racecar(sim)
control_pwm = control_racecar
elif args.vehicle == 'opendog':
from pyrobolearn.robots import OpenDog
robot = OpenDog(sim, urdf="models/opendog/opendog.urdf")
control_pwm = control_joints
elif args.vehicle == 'all':
from pyrobolearn.robots import OpenDog, Aibo, Ant, ANYmal, HyQ, HyQ2Max, Laikago, LittleDog, Minitaur, Pleurobot, Crab, Morphex, Rhex, SEAHexapod
bots = [Crab, Morphex, Rhex, SEAHexapod, Aibo, Ant, ANYmal, HyQ, HyQ2Max, Laikago, LittleDog, Minitaur, Pleurobot ]
for i in range(len(bots)):
r = bots[i](sim)
r.position = [0, i*2, 2]
control_pwm = control_joints
robot = OpenDog(sim, urdf="models/opendog/opendog.urdf")
raise Exception("Bad vehicle")
time_now = 0
last_velocity = None
def quaternion_to_AP(quaternion):
'''convert pybullet quaternion to ArduPilot quaternion'''
return Quaternion([quaternion[3], quaternion[0], -quaternion[1], -quaternion[2]])
def vector_to_AP(vec):
'''convert pybullet vector tuple to ArduPilot Vector3'''
return Vector3(vec[0], -vec[1], -vec[2])
def quaternion_from_AP(q):
'''convert ArduPilot quaternion to pybullet quaternion'''
return [q.q[1], -q.q[2], -q.q[3], q.q[0]]
def to_tuple(vec3):
'''convert a Vector3 to a tuple'''
return (vec3.x, vec3.y, vec3.z)
def init():
global time_now
time_now = 0
robot.position = [0,0,0]
robot.orientation = [0,0,0,1]
def constrain(v,min_v,max_v):
'''constrain a value'''
if v < min_v:
v = min_v
if v > max_v:
v = max_v
return v
#robot.position = [ 0, 0, 2]
#robot.orientation = quaternion_from_AP(Quaternion([math.radians(0), math.radians(0), math.radians(50)]))
def physics_step(pwm_in):
global time_now
time_now += TIME_STEP
# get the position orientation and velocity
quaternion = quaternion_to_AP(robot.orientation)
roll, pitch, yaw = quaternion.euler
velocity = vector_to_AP(robot.linear_velocity)
position = vector_to_AP(robot.position)
# get ArduPilot DCM matrix (rotation matrix)
dcm = quaternion.dcm
# get gyro vector in body frame
gyro = dcm.transposed() * vector_to_AP(robot.angular_velocity)
# calculate acceleration
global last_velocity
if last_velocity is None:
last_velocity = velocity
accel = (velocity - last_velocity) * (1.0 / TIME_STEP)
last_velocity = velocity
# add in gravity in earth frame
accel.z -= GRAVITY_MSS
# convert accel to body frame
accel = dcm.transposed() * accel
# convert to tuples
accel = to_tuple(accel)
gyro = to_tuple(gyro)
position = to_tuple(position)
velocity = to_tuple(velocity)
euler = (roll, pitch, yaw)
return time_now,gyro,accel,position,euler,velocity
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', 9002))
last_SITL_frame = -1
connected = False
frame_count = 0
frame_time = time.time()
print_frame_count = 1000
move_accel = 0.0
last_move = time.time()
def move_view(keys):
'''move camera target based on arrow keys'''
global last_move, move_accel
now = time.time()
if now - last_move < 0.1:
last_move = now
KEY_LEFT = 65295
KEY_RIGHT = 65296
KEY_UP = 65297
KEY_DOWN = 65298
global move_accel
angle = None
if KEY_LEFT in keys:
angle = 90
elif KEY_RIGHT in keys:
angle = -90
elif KEY_UP in keys:
angle = 180
elif KEY_DOWN in keys:
angle = 0
move_accel = 0
caminfo = list(sim.sim.getDebugVisualizerCamera())
target = caminfo[-1]
dist = caminfo[-2]
pitch = caminfo[-3]
yaw = caminfo[-4]
look = 90.0-yaw
step = 0.3 + move_accel
move_accel += 0.1
move_accel = min(move_accel, 5)
target = (target[0] + step*math.cos(radians(look+angle)),
target[1] - step*math.sin(radians(look+angle)),
sim.reset_debug_visualizer(dist, radians(yaw), radians(pitch), target)
while True:
py_time = time.time()
data,address = sock.recvfrom(100)
except Exception as ex:
parse_format = 'HHI16H'
magic = 18458
if len(data) != struct.calcsize(parse_format):
print("got packet of len %u, expected %u" % (len(data), struct.calcsize(parse_format)))
decoded = struct.unpack(parse_format,data)
if magic != decoded[0]:
print("Incorrect protocol magic %u should be %u" % (decoded[0], magic))
frame_rate_hz = decoded[1]
frame_count = decoded[2]
pwm = decoded[3:]
if frame_rate_hz != RATE_HZ:
print("New frame rate %u" % frame_rate_hz)
RATE_HZ = frame_rate_hz
# Check if the fame is in expected order
if frame_count < last_SITL_frame:
# Controller has reset, reset physics also
print('Controller reset')
elif frame_count == last_SITL_frame:
# duplicate frame, skip
print('Duplicate input frame')
elif frame_count != last_SITL_frame + 1 and connected:
print('Missed {0} input frames'.format(frame_count - last_SITL_frame - 1))
last_SITL_frame = frame_count
if not connected:
connected = True
print('Connected to {0}'.format(str(address)))
frame_count += 1
# physics time step
phys_time,gyro,accel,pos,euler,velo = physics_step(pwm)
# build JSON format
IMU_fmt = {
"gyro" : gyro,
"accel_body" : accel
JSON_fmt = {
"timestamp" : phys_time,
"imu" : IMU_fmt,
"position" : pos,
"attitude" : euler,
"velocity" : velo
JSON_string = "\n" + json.dumps(JSON_fmt,separators=(',', ':')) + "\n"
# Send to AP
sock.sendto(bytes(JSON_string,"ascii"), address)
# Track frame rate
if frame_count % print_frame_count == 0:
now = time.time()
total_time = now - frame_time
print("%.2f fps T=%.3f dt=%.3f" % (print_frame_count/total_time, phys_time, total_time))
frame_time = now