forked from cesar.alejandro/oscillation_ctrl
Upload files to 'src'
This commit is contained in:
parent
4975235080
commit
823bea7a52
|
@ -0,0 +1,425 @@
|
|||
#!/usr/bin/env python2.7
|
||||
|
||||
### Cesar Rodriguez July 2021
|
||||
### Based off of Klausen 2017 - Smooth trajectory generation based on desired waypoints
|
||||
|
||||
import rospy, tf
|
||||
import numpy as np
|
||||
import time
|
||||
import math
|
||||
|
||||
from scipy import signal
|
||||
from scipy.integrate import odeint
|
||||
|
||||
from offboard_ex.msg import tethered_status, RefSignal
|
||||
|
||||
from controller_msgs.msg import FlatTarget
|
||||
|
||||
from geometry_msgs.msg import Pose, Vector3, PoseStamped
|
||||
|
||||
from geometry_msgs.msg import TwistStamped
|
||||
from sensor_msgs.msg import Imu
|
||||
|
||||
from mavros_msgs.msg import State
|
||||
|
||||
from pymavlink import mavutil
|
||||
|
||||
class Main:
|
||||
|
||||
def __init__(self):
|
||||
|
||||
# rate(s)
|
||||
rate = 30 # rate for the publisher method, specified in Hz -- 10 Hz
|
||||
|
||||
# initialize variables
|
||||
self.tstart = rospy.get_time() # Keep track of the start time
|
||||
while self.tstart == 0.0: # Need to make sure get_rostime works
|
||||
self.tstart = rospy.get_time()
|
||||
|
||||
self.dt = 1.0/rate
|
||||
# self.dt = 0.5
|
||||
# self.tmax = 100
|
||||
self.tmax = self.dt
|
||||
self.n = self.tmax/self.dt + 1
|
||||
self.t = np.linspace(0, self.tmax, self.n) # Time array
|
||||
|
||||
# Message generation/ collection
|
||||
self.state = State()
|
||||
self.mode = ''
|
||||
self.vel_data = TwistStamped() # This is needed to get drone vel from gps
|
||||
self.imu_data = Imu() # Needed for to get drone acc from IMU
|
||||
self.ref_sig = FlatTarget() # Smooth Signal
|
||||
self.ref_sig.position.z = 5.0 # This does not need to be determined
|
||||
|
||||
self.phi = 0.0 # Payload angle of deflection from x-axis
|
||||
self.phidot = 0.0
|
||||
self.theta = 0.0 # Payload angle of deflection from y-axis
|
||||
self.thetadot = 0.0
|
||||
self.has_run = 0 # Bool to keep track of first run instance
|
||||
self.drone_id = ''
|
||||
self.pload_id = ''
|
||||
self.pl_pos = Pose()
|
||||
self.dr_pos = Pose()
|
||||
self.dr_vel = self.vel_data.twist.linear
|
||||
self.dr_acc = self.imu_data.linear_acceleration
|
||||
self.dr_acc_p = self.imu_data.linear_acceleration
|
||||
|
||||
# Get tether length
|
||||
self.param_exists = False
|
||||
while self.param_exists == False:
|
||||
if rospy.has_param('sim/tether_length'):
|
||||
self.tetherL = rospy.get_param('sim/tether_length') # Tether length
|
||||
self.param_exists = True
|
||||
|
||||
# Smooth path variables
|
||||
self.EPS_F = np.zeros(9) # Final Epsilon/ signal
|
||||
self.EPS_I = np.zeros(9) # Epsilon shapefilter
|
||||
|
||||
# Constants for smooth path generation
|
||||
self.w_tune = 3.0 # 3.13 works well? #########################################################################
|
||||
self.epsilon = 0.7 # Damping ratio
|
||||
self.wn = math.sqrt(9.81/self.tetherL)
|
||||
self.wd = self.wn*math.sqrt(1 - self.epsilon**2)
|
||||
self.k4 = 4*self.epsilon*self.w_tune
|
||||
self.k3 = ((2 + 4*self.epsilon**2)*self.w_tune**2)/self.k4
|
||||
self.k2 = (4*self.epsilon*self.w_tune**3)/(self.k4*self.k3)
|
||||
self.k1 = (self.w_tune**4)/(self.k2*self.k3*self.k4)
|
||||
|
||||
# For saturation:
|
||||
self.jmax = [3, 3, 1]
|
||||
self.amax = [1.5, 1.5, 1]
|
||||
self.vmax = [3, 3, 1]
|
||||
self.max = [0, 3, 1.5, 3] #[0, 3, 1.5, 3]
|
||||
# create the array: [vmax; amax; jmax]
|
||||
self.max_array = np.array([[self.vmax],[self.amax],[self.jmax]]).T
|
||||
|
||||
# Desired position array
|
||||
self.xd = np.array([[0],[0],[5.0]])
|
||||
self.des_waypoints = True # True = changing waypoints
|
||||
|
||||
# Initial conditions: [pos, vel, acc, jerk]
|
||||
self.x0 = [0, 0, 0, 0]
|
||||
self.y0 = [0, 0, 0, 0]
|
||||
self.z0 = [0, 0, 0, 0]
|
||||
|
||||
# --------------------------------------------------------------------------------#
|
||||
# FEEDBACK AND INPUT SHAPING
|
||||
# --------------------------------------------------------------------------------#
|
||||
|
||||
# Constants for feedback
|
||||
self.Gd = 0.325
|
||||
self.td = 2*self.Gd*math.pi/self.wd
|
||||
|
||||
# Constants for shape filter/ Input shaping
|
||||
self.t1 = 0
|
||||
self.t2 = math.pi/self.wd
|
||||
self.K = math.exp(-self.epsilon*math.pi/math.sqrt(1 - self.epsilon**2))
|
||||
self.A1 = 1/(1 + self.K)
|
||||
self.A2 = self.A1*self.K
|
||||
|
||||
# Need to determine how large of any array needs to be stored to use delayed functions
|
||||
self.SF_delay_x = np.zeros([4,int(round(self.t2/self.dt))]) # Shapefilter delay; 4 - p,v,a,j
|
||||
self.SF_delay_y = np.zeros([4,int(round(self.t2/self.dt))])
|
||||
|
||||
self.phi_fb = np.zeros(int(round(self.td/self.dt))) # Feedback delay
|
||||
self.phi_vel_fb = np.zeros(int(round(self.td/self.dt)))
|
||||
self.phi_acc_fb = np.zeros(int(round(self.td/self.dt)))
|
||||
|
||||
self.theta_fb = np.zeros(int(round(self.td/self.dt)))
|
||||
self.theta_vel_fb = np.zeros(int(round(self.td/self.dt)))
|
||||
self.theta_acc_fb = np.zeros(int(round(self.td/self.dt)))
|
||||
|
||||
# Need index to keep track of path array of ^^^^ length
|
||||
self.SF_idx = 0
|
||||
self.FB_idx = 0
|
||||
|
||||
# Constants for sigmoid
|
||||
self.at = 3 # acceleration theshold
|
||||
self.pc = 0.7 # From Klausen 2017
|
||||
self.sk = len(self.SF_delay_x[0]) # from Klausen 2017
|
||||
self.ak = np.zeros(len(self.SF_delay_x[0]))
|
||||
self.s_gain = 0.0 # Gain found from sigmoid
|
||||
|
||||
# --------------------------------------------------------------------------------#
|
||||
# SUBSCRIBERS #
|
||||
# --------------------------------------------------------------------------------#
|
||||
# Topic, msg type, and class callback method
|
||||
rospy.Subscriber('/status/twoBody_status', tethered_status, self.linkState_cb)
|
||||
rospy.Subscriber('/mavros/local_position/velocity_body', TwistStamped, self.droneVel_cb)
|
||||
rospy.Subscriber('/mavros/imu/data', Imu, self.droneAcc_cb)
|
||||
rospy.Subscriber('/mavros/state', State, self.state_cb)
|
||||
rospy.Subscriber('/reference/waypoints',PoseStamped, self.waypoints_cb)
|
||||
|
||||
# --------------------------------------------------------------------------------#
|
||||
# PUBLISHERS
|
||||
# --------------------------------------------------------------------------------#
|
||||
# Publish desired path to compute attitudes
|
||||
self.pub_path = rospy.Publisher('/reference/path',FlatTarget,queue_size = 10)
|
||||
# Needed for geometric controller to compute thrust
|
||||
self.pub_ref = rospy.Publisher('/reference/flatsetpoint',FlatTarget,queue_size = 10)
|
||||
|
||||
# timer(s), used to control method loop freq(s) as defined by the rate(s)
|
||||
self.pub_tim = rospy.Timer(rospy.Duration(1.0/rate), self.publisher)
|
||||
|
||||
# ------------------------------------ _init_ ends ------------------------------ #
|
||||
|
||||
# --------------------------------------------------------------------------------#
|
||||
# CALLBACK FUNCTIONS
|
||||
# --------------------------------------------------------------------------------#
|
||||
def state_cb(self, data):
|
||||
if self.state.armed != data.armed:
|
||||
rospy.loginfo("armed state changed from {0} to {1}".format(
|
||||
self.state.armed, data.armed))
|
||||
|
||||
if self.state.connected != data.connected:
|
||||
rospy.loginfo("connected changed from {0} to {1}".format(
|
||||
self.state.connected, data.connected))
|
||||
|
||||
if self.state.mode != data.mode:
|
||||
rospy.loginfo("mode changed from {0} to {1}".format(
|
||||
self.state.mode, data.mode))
|
||||
|
||||
if self.state.system_status != data.system_status:
|
||||
rospy.loginfo("system_status changed from {0} to {1}".format(
|
||||
mavutil.mavlink.enums['MAV_STATE'][
|
||||
self.state.system_status].name, mavutil.mavlink.enums[
|
||||
'MAV_STATE'][data.system_status].name))
|
||||
|
||||
self.state = data
|
||||
# mavros publishes a disconnected state message on init
|
||||
|
||||
# Callback to get link names, states, and pload deflection angles
|
||||
def linkState_cb(self,msg):
|
||||
try:
|
||||
self.drone_id = msg.drone_id
|
||||
self.pload_id = msg.pload_id
|
||||
self.dr_pos = msg.drone_pos
|
||||
self.pl_pos = msg.pload_pos
|
||||
self.phi = -msg.phi
|
||||
self.phidot = -msg.phidot
|
||||
self.theta = msg.theta
|
||||
self.thetadot = msg.thetadot
|
||||
# self.tetherL = msg.length
|
||||
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Callback for drone velocity
|
||||
def droneVel_cb(self,msg):
|
||||
try:
|
||||
self.dr_vel = msg.twist.linear
|
||||
|
||||
except ValueError or TypeError:
|
||||
pass
|
||||
|
||||
# Callback for drone accel from IMU data
|
||||
def droneAcc_cb(self,msg):
|
||||
try:
|
||||
self.dr_acc = msg.linear_acceleration
|
||||
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
def waypoints_cb(self,msg):
|
||||
try:
|
||||
if self.des_waypoints == True:
|
||||
self.xd[0] = msg.pose.position.x
|
||||
self.xd[1] = msg.pose.position.y
|
||||
#self.xd[2] = msg.pose.position.z
|
||||
|
||||
except ValueError:
|
||||
pass
|
||||
# --------------------------------------------------------------------------------#
|
||||
# ALGORITHM
|
||||
# --------------------------------------------------------------------------------#
|
||||
# --------------------------------------------------------------------------------#
|
||||
# TRAJECTORY GENERATION BASED ON WAYPONTS xd
|
||||
# --------------------------------------------------------------------------------#
|
||||
def statespace(self,y,t,xd):
|
||||
# Update initial conditions #
|
||||
|
||||
# Need the statespace array:
|
||||
pos,vel,acc,jer = y
|
||||
|
||||
# Derivative of statesape array:
|
||||
dydt = [vel, acc, jer,
|
||||
self.k4*(self.k3*(self.k2*(self.k1*(xd - pos) - vel) - acc) - jer)]
|
||||
return dydt
|
||||
|
||||
# Sigmoid
|
||||
def sigmoid(self):
|
||||
for i in range(self.sk):
|
||||
if math.sqrt(self.SF_delay_x[2,i]**2 + self.SF_delay_y[2,i]**2) < self.at:
|
||||
self.ak[i] = 1
|
||||
else:
|
||||
self.ak[i] = 0
|
||||
|
||||
pk = sum(self.ak)/self.sk
|
||||
self.s_gain = 1/(1 + math.exp(-self.sk*(pk-self.pc)))
|
||||
|
||||
def delay(self,j,cmd):
|
||||
'''
|
||||
Function to keep track of values in past. Needed for
|
||||
Shape filtering as well as feedback
|
||||
|
||||
j -> tells if we are doing pos, vel, or acc
|
||||
cmd -> determines SF or FB
|
||||
'''
|
||||
if cmd == 1: # Shape filter
|
||||
if self.SF_idx < len(self.SF_delay_x[0]):
|
||||
# First, fill up the delay array
|
||||
self.SF_delay_x[j,self.SF_idx] = self.x[1,j]
|
||||
self.SF_delay_y[j,self.SF_idx] = self.y[1,j]
|
||||
|
||||
else:
|
||||
# once array is filled, we start using the first value every time
|
||||
# x
|
||||
self.SF_delay_x[j,:] = np.roll(self.SF_delay_x[j,:],-1) # makes the first value last
|
||||
self.SF_delay_x[j,len(self.SF_delay_x[0])-1] = self.x[1,j] # updates last value to current x
|
||||
|
||||
# y
|
||||
self.SF_delay_y[j,:] = np.roll(self.SF_delay_y[j,:],-1)
|
||||
self.SF_delay_y[j,len(self.SF_delay_y[0])-1] = self.y[1,j]
|
||||
|
||||
elif cmd == 2: # Feedback
|
||||
if self.FB_idx < len(self.theta_fb):
|
||||
# First, fill up the delay array
|
||||
self.theta_fb[self.FB_idx] = self.theta
|
||||
self.theta_vel_fb[self.FB_idx] = self.thetadot
|
||||
self.theta_acc_fb[self.FB_idx] = self.thetadot - self.theta_vel_fb[self.FB_idx-1]
|
||||
|
||||
self.phi_fb[self.FB_idx] = self.phi
|
||||
self.phi_vel_fb[self.FB_idx] = self.phidot
|
||||
self.phi_acc_fb[self.FB_idx] = self.phidot - self.phi_vel_fb[self.FB_idx-1]
|
||||
|
||||
else:
|
||||
# once array is filled, need to shift values w/ latest value at end
|
||||
self.theta_fb[:] = np.roll(self.theta_fb[:],-1)
|
||||
self.theta_vel_fb[:] = np.roll(self.theta_vel_fb[:],-1)
|
||||
self.theta_acc_fb[:] = np.roll(self.theta_acc_fb[:],-1)
|
||||
|
||||
self.phi_fb[:] = np.roll(self.phi_fb[:],-1)
|
||||
self.phi_vel_fb[:] = np.roll(self.phi_vel_fb[:],-1)
|
||||
self.phi_acc_fb[:] = np.roll(self.phi_acc_fb[:],-1)
|
||||
|
||||
self.theta_fb[len(self.theta_fb)-1] = self.theta # change final value
|
||||
self.theta_vel_fb[len(self.theta_fb)-1] = self.thetadot
|
||||
self.theta_acc_fb[len(self.theta_fb)-1] = self.thetadot - self.theta_vel_fb[len(self.theta_fb)-1]
|
||||
|
||||
self.phi_fb[len(self.phi_fb)-1] = self.phi
|
||||
self.phi_vel_fb[len(self.theta_fb)-1] = self.phidot
|
||||
self.phi_acc_fb[len(self.theta_fb)-1] = self.phidot - self.phi_vel_fb[len(self.theta_fb)-1]
|
||||
|
||||
else:
|
||||
print('No delay')
|
||||
|
||||
# Convolution and input shape filter, and feedback
|
||||
def convo(self):
|
||||
|
||||
# needed for delay function
|
||||
# 1 = determine shapefilter array
|
||||
# 2 = determine theta/phi_fb
|
||||
shapeFil = 1
|
||||
feedBack = 2
|
||||
|
||||
# SOLVE ODE (get ref pos, vel, accel)
|
||||
self.x = odeint(self.statespace,self.x0,self.t,args=(self.xd[0],))
|
||||
self.y = odeint(self.statespace,self.y0,self.t,args=(self.xd[1],))
|
||||
self.z = odeint(self.statespace,self.z0,self.t,args=(self.xd[2],))
|
||||
|
||||
for i in range(1,len(self.y0)):
|
||||
self.x[:,i] = np.clip(self.x[:,i], a_min = -self.max[i], a_max = self.max[i])
|
||||
self.y[:,i] = np.clip(self.y[:,i], a_min = -self.max[i], a_max = self.max[i])
|
||||
self.z[:,i] = np.clip(self.z[:,i], a_min = -self.max[i], a_max = self.max[i])
|
||||
|
||||
for j in range(3): # 3 is due to pos, vel, acc. NOT due x, y, z
|
||||
|
||||
self.delay(j,shapeFil) # Determine the delay array
|
||||
|
||||
if self.SF_idx < len(self.SF_delay_x[0]):
|
||||
self.EPS_I[3*j] = self.x[1,j]
|
||||
self.EPS_I[3*j+1] = self.y[1,j]
|
||||
self.EPS_I[3*j+2] = self.z[1,j]
|
||||
else:
|
||||
self.EPS_I[3*j] = self.A1*self.x[1,j] + self.A2*self.SF_delay_x[j,0] # Determine convolution (x)
|
||||
self.EPS_I[3*j+1] = self.A1*self.y[1,j] + self.A2*self.SF_delay_y[j,0] # Determine convolution (y)
|
||||
self.EPS_I[3*j+2] = self.z[1,j] # No need to convolute z-dim
|
||||
|
||||
self.delay(1,feedBack) # Detemine feedback array
|
||||
|
||||
self.sigmoid() # Determine sigmoid gain
|
||||
EPS_D = self.fback() # Feedback Epsilon
|
||||
|
||||
for i in range(9):
|
||||
# Populate EPS_F buffer with desired change based on feedback
|
||||
self.EPS_F[i] = self.EPS_I[i] + EPS_D[i] #+ EPS_D[i]
|
||||
|
||||
# self.x0 = [self.dr_pos.position.x, self.dr_vel.x, self.dr_acc.x, self.x[1,3]]
|
||||
# self.y0 = [self.dr_pos.position.y, self.dr_vel.y, self.dr_acc.y, self.y[1,3]]
|
||||
# self.z0 = [self.dr_pos.position.z, self.dr_vel.z, self.dr_acc.z, self.z[1,3]]
|
||||
|
||||
self.x0 = [self.dr_pos.position.x, self.x[1,1], self.x[1,2], self.x[1,3]]
|
||||
self.y0 = [self.dr_pos.position.y, self.y[1,1], self.y[1,2], self.y[1,3]]
|
||||
self.z0 = [self.dr_pos.position.z, self.z[1,1], self.z[1,2], self.z[1,3]]
|
||||
|
||||
self.SF_idx += 1
|
||||
self.FB_idx += 1
|
||||
|
||||
|
||||
def fback(self):
|
||||
|
||||
xr = self.Gd*self.tetherL*math.sin(self.theta_fb[0])
|
||||
xdotr = self.Gd*self.tetherL*math.cos(self.theta_fb[0])*self.theta_vel_fb[0]
|
||||
xddotr = -self.Gd*self.tetherL*math.sin(self.theta_fb[0])*(self.theta_vel_fb[0]**2) + math.cos(self.theta_fb[0])*self.theta_acc_fb[0]
|
||||
|
||||
yr = -self.Gd*self.tetherL*math.sin(self.phi_fb[0])
|
||||
ydotr = -self.Gd*self.tetherL*math.cos(self.phi_fb[0])*self.phi_vel_fb[0]
|
||||
yddotr = self.Gd*self.tetherL*math.sin(self.phi_fb[0])*self.phi_vel_fb[0]**2 + math.cos(self.phi_fb[0])*self.phi_acc_fb[0]
|
||||
|
||||
EPS_D = np.array([xr,yr,0,xdotr,ydotr,0,xddotr,yddotr,0])
|
||||
|
||||
return EPS_D
|
||||
|
||||
def publisher(self,pub_tim):
|
||||
|
||||
# Determine final ref signal
|
||||
self.convo()
|
||||
|
||||
# Populate msg with epsilon_final
|
||||
self.ref_sig.position.x = self.EPS_F[0]
|
||||
self.ref_sig.position.y = self.EPS_F[1]
|
||||
|
||||
self.ref_sig.velocity.x = self.EPS_F[3]
|
||||
self.ref_sig.velocity.y = self.EPS_F[4]
|
||||
self.ref_sig.velocity.z = self.EPS_F[5]
|
||||
|
||||
self.ref_sig.acceleration.x = self.EPS_F[6]
|
||||
self.ref_sig.acceleration.y = self.EPS_F[7]
|
||||
self.ref_sig.acceleration.z = self.EPS_F[8]
|
||||
|
||||
# Need typemask = 2 to use correct attitude controller - Jaeyoung Lin
|
||||
self.ref_sig.type_mask = 2
|
||||
|
||||
# Publish command
|
||||
self.ref_sig.header.stamp = rospy.Time.now()
|
||||
self.pub_path.publish(self.ref_sig)
|
||||
self.pub_ref.publish(self.ref_sig)
|
||||
|
||||
# Feedback to user
|
||||
rospy.loginfo(' Var | x | y | z ')
|
||||
rospy.loginfo('Pos: %.2f %.2f %.2f',self.EPS_F[0],self.EPS_F[1],self.EPS_F[2])
|
||||
rospy.loginfo('Vel: %.2f %.2f %.2f',self.EPS_F[3],self.EPS_F[4],self.EPS_F[5])
|
||||
rospy.loginfo('Acc: %.2f %.2f %.2f',self.EPS_F[6],self.EPS_F[7],self.EPS_F[8])
|
||||
rospy.loginfo('_______________________')
|
||||
|
||||
if __name__=="__main__":
|
||||
|
||||
# Initiate ROS node
|
||||
rospy.init_node('desPath_node',anonymous=True)
|
||||
try:
|
||||
Main() # create class object
|
||||
rospy.spin() # loop until shutdown signal
|
||||
|
||||
except rospy.ROSInterruptException:
|
||||
pass
|
||||
|
|
@ -0,0 +1,89 @@
|
|||
#!/usr/bin/env python2.7
|
||||
|
||||
|
||||
### Cesar Rodriguez June 2021
|
||||
### Script to generate set points which form a square with 2m side lengths
|
||||
import rospy, tf
|
||||
import time
|
||||
from geometry_msgs.msg import PoseStamped
|
||||
from std_msgs.msg import Bool
|
||||
|
||||
class Main:
|
||||
# class method used to initialize variables once when the program starts
|
||||
def __init__(self):
|
||||
|
||||
# variable(s)
|
||||
self.Point = PoseStamped()
|
||||
# Init x, y, & z coordinates
|
||||
self.Point.pose.position.x = 0
|
||||
self.Point.pose.position.y = 0
|
||||
self.Point.pose.position.z = 3.5
|
||||
|
||||
self.xarray = [1,2,2,2,1,0,0]
|
||||
self.yarray = [0,0,1,2,2,2,1]
|
||||
self.i = 0
|
||||
self.j = 0
|
||||
self.buffer = 10
|
||||
self.bool = False
|
||||
|
||||
# subscriber(s), with specified topic, message type, and class callback method
|
||||
rospy.Subscriber('/status/path_follow',Bool, self.wait_cb)
|
||||
|
||||
# publisher(s), with specified topic, message type, and queue_size
|
||||
self.pub_square = rospy.Publisher('/reference/waypoints', PoseStamped, queue_size = 5)
|
||||
|
||||
# rate(s)
|
||||
pub_rate = 1 # rate for the publisher method, specified in Hz
|
||||
|
||||
# timer(s), used to control method loop frequencies as defined by the rate(s)
|
||||
self.pub_timer = rospy.Timer(rospy.Duration(1.0/pub_rate), self.pub)
|
||||
|
||||
def wait_cb(self,data):
|
||||
self.bool = data
|
||||
|
||||
|
||||
# Publish messages
|
||||
def pub(self,pub_timer):
|
||||
if self.bool == False:
|
||||
rospy.loginfo('Waiting...')
|
||||
else:
|
||||
# Check if i is too large. loop back to first point
|
||||
if self.i >= len(self.xarray):
|
||||
self.Point.pose.position.x = 0
|
||||
self.Point.pose.position.y = 0
|
||||
|
||||
else:
|
||||
self.Point.pose.position.x = self.xarray[self.i]
|
||||
self.Point.pose.position.y = self.yarray[self.i]
|
||||
|
||||
rospy.loginfo("Sending [Point x] %d [Point y] %d",
|
||||
self.Point.pose.position.x, self.Point.pose.position.y)
|
||||
|
||||
# Published desired msgs
|
||||
self.Point.header.stamp = rospy.Time.now()
|
||||
self.pub_square.publish(self.Point)
|
||||
self.j += 1
|
||||
self.i = self.j // self.buffer
|
||||
|
||||
# self.Point.header.stamp = rospy.Time.now()
|
||||
# self.Point.pose.position.x = self.xarray[self.i]
|
||||
# self.Point.pose.position.y = self.yarray[self.i]
|
||||
|
||||
# rospy.loginfo("Sending [Point x] %d [Point y] %d",
|
||||
# self.Point.pose.position.x, self.Point.pose.position.y)
|
||||
|
||||
# Published desired msgs
|
||||
# self.pub_square.publish(self.Point)
|
||||
# self.i += 1
|
||||
|
||||
if __name__=="__main__":
|
||||
|
||||
# Initiate ROS node
|
||||
rospy.init_node('square',anonymous=True)
|
||||
try:
|
||||
Main() # create class object
|
||||
rospy.spin() # loop until shutdown signal
|
||||
|
||||
except rospy.ROSInterruptException:
|
||||
pass
|
||||
|
|
@ -0,0 +1,62 @@
|
|||
#!/usr/bin/env python2.7
|
||||
|
||||
### Cesar Rodriguez Dec 2021
|
||||
### Generate step input in x or y direction
|
||||
import rospy, tf
|
||||
import time
|
||||
from geometry_msgs.msg import PoseStamped
|
||||
from std_msgs.msg import Bool
|
||||
|
||||
class Main:
|
||||
# class method used to initialize variables once when the program starts
|
||||
def __init__(self):
|
||||
|
||||
# variable(s)
|
||||
self.Point = PoseStamped()
|
||||
# Init x, y, & z coordinates
|
||||
self.Point.pose.position.x = 1
|
||||
self.Point.pose.position.y = 0
|
||||
self.Point.pose.position.z = 4.0
|
||||
|
||||
self.bool = False
|
||||
|
||||
# subscriber(s)
|
||||
rospy.Subscriber('/status/path_follow',Bool, self.wait_cb)
|
||||
|
||||
# publisher(s), with specified topic, message type, and queue_size
|
||||
self.pub_step = rospy.Publisher('/reference/waypoints', PoseStamped, queue_size = 5)
|
||||
|
||||
# rate(s)
|
||||
pub_rate = 1 # rate for the publisher method, specified in Hz
|
||||
|
||||
# timer(s), used to control method loop frequencies as defined by the rate(s)
|
||||
self.pub_timer = rospy.Timer(rospy.Duration(1.0/pub_rate), self.pub)
|
||||
|
||||
# Callbacks
|
||||
def wait_cb(self,data):
|
||||
self.bool = data
|
||||
|
||||
# Publish messages
|
||||
def pub(self,pub_timer):
|
||||
if self.bool == False:
|
||||
rospy.loginfo('Waiting...')
|
||||
else:
|
||||
self.Point.header.stamp = rospy.Time.now()
|
||||
|
||||
# Published desired msgs
|
||||
self.pub_step.publish(self.Point)
|
||||
|
||||
rospy.loginfo("Sending [Point x] %d [Point y] %d",
|
||||
self.Point.pose.position.x, self.Point.pose.position.y)
|
||||
|
||||
if __name__=="__main__":
|
||||
|
||||
# Initiate ROS node
|
||||
rospy.init_node('step',anonymous=True)
|
||||
try:
|
||||
Main() # create class object
|
||||
rospy.spin() # loop until shutdown signal
|
||||
|
||||
except rospy.ROSInterruptException:
|
||||
pass
|
||||
|
Loading…
Reference in New Issue