autotest: add a test for dataflash log message rates

This commit is contained in:
Peter Barker 2021-12-01 12:40:05 +11:00 committed by Peter Barker
parent d72d0578a3
commit 8e3265d1f6
2 changed files with 99 additions and 2 deletions

View File

@ -7617,6 +7617,45 @@ class AutoTestCopter(AutoTest):
ret.extend(self.tests1e())
return ret
def ATTITUDE_FAST(self):
'''ensure that when ATTITDE_FAST is set we get many messages'''
self.context_push()
ex = None
try:
old = self.get_parameter('LOG_BITMASK')
new = int(old) | (1 << 0) # see defines.h
self.set_parameters({
"LOG_BITMASK": new,
"LOG_DISARMED": 1,
})
path = self.generate_rate_sample_log()
except Exception as e:
self.print_exception_caught(e)
ex = e
self.context_pop()
self.reboot_sitl()
if ex is not None:
raise ex
self.delay_sim_time(10) # NFI why this is required
self.check_dflog_message_rates(path, {
'ATT': 400,
})
def BaseLoggingRates(self):
'''ensure messages come out at specific rates'''
path = self.generate_rate_sample_log()
self.delay_sim_time(10) # NFI why this is required
self.check_dflog_message_rates(path, {
"ATT": 10,
"IMU": 25,
})
def FETtecESC_flight(self):
'''fly with servo outputs from FETtec ESC'''
self.start_subtest("FETtec ESC flight")
@ -8000,6 +8039,14 @@ class AutoTestCopter(AutoTest):
"Fly Vision Position",
self.fly_vision_position), # 24s
("ATTITUDE_FAST",
"Ensure ATTITUTDE_FAST logging works",
self.ATTITUDE_FAST),
("BaseLoggingRates",
"Ensure base logging rates as expected",
self.BaseLoggingRates),
("BodyFrameOdom",
"Fly Body Frame Odometry Code",
self.fly_body_frame_odom), # 24s

View File

@ -3868,6 +3868,53 @@ class AutoTest(ABC):
mavutil.mavlink.MAV_MISSION_TYPE_MISSION,
strict=strict)
def check_dflog_message_rates(self, log_filepath, message_rates):
reader = self.dfreader_for_path(log_filepath)
counts = {}
first = None
while True:
m = reader.recv_match()
if m is None:
break
if (m.fmt.instance_field is not None and
getattr(m, m.fmt.instance_field) != 0):
continue
t = m.get_type()
# print("t=%s" % str(t))
if t not in counts:
counts[t] = 0
counts[t] += 1
if hasattr(m, 'TimeUS'):
if first is None:
first = m
last = m
if first is None:
raise NotAchievedException("Did not get any messages")
delta_time_us = last.TimeUS - first.TimeUS
for (t, want_rate) in message_rates.items():
if t not in counts:
raise NotAchievedException("Wanted %s but got none" % t)
self.progress("Got (%u)" % counts[t])
got_rate = counts[t] / delta_time_us * 1000000
if abs(want_rate - got_rate) > 5:
raise NotAchievedException("Not getting %s data at wanted rate want=%f got=%f" %
(t, want_rate, got_rate))
def generate_rate_sample_log(self):
self.reboot_sitl()
self.wait_ready_to_arm()
self.delay_sim_time(20)
path = self.current_onboard_log_filepath()
self.progress("Rate sample log (%s)" % path)
self.reboot_sitl()
return path
def rc_defaults(self):
return {
1: 1500,
@ -9405,10 +9452,13 @@ switch value'''
latest = logs[-1]
return latest
def dfreader_for_current_onboard_log(self):
return DFReader.DFReader_binary(self.current_onboard_log_filepath(),
def dfreader_for_path(self, path):
return DFReader.DFReader_binary(path,
zero_time_base=True)
def dfreader_for_current_onboard_log(self):
return self.dfreader_for_path(self.current_onboard_log_filepath())
def current_onboard_log_contains_message(self, messagetype):
self.progress("Checking (%s) for (%s)" %
(self.current_onboard_log_filepath(), messagetype))