px4-firmware/Tools/sdlog2/geotagging.py

238 lines
8.2 KiB
Python

#!/usr/bin/env python
#
# __geotagging__
# Tag the images recorded during a flight with geo location extracted from
# a PX4 log file.
#
# This file accepts *.jpg format images and reads position information
# from a *.px4log file
#
# Example Syntax:
# python geotagging.py --logfile=log001.px4log --input=images/
# --output=imagesWithTag/
#
# Author: Andreas Bircher, Wingtra, http://wingtra.com, in 2016
#
import glob
import os
import pyexiv2
from shutil import copyfile
from optparse import OptionParser
import csv
from datetime import datetime, timedelta
class TriggerList(object):
def __init__(self):
self.CAMT_seq = []
self.CAMT_timestamp = []
self.GPOS_Lat = []
self.GPOS_Lon = []
self.GPOS_Alt = []
self.GPS_GPSTime = []
class ImageList(object):
def __init__(self):
self.jpg = []
self.raw = []
def to_degree(value, loc):
if value < 0:
loc_value = loc[0]
elif value > 0:
loc_value = loc[1]
else:
loc_value = ""
absolute_value = abs(value)
deg = int(absolute_value)
t1 = (absolute_value - deg) * 60
min = int(t1)
sec = round((t1 - min) * 60, 5)
return (deg, min, sec, loc_value)
def SetGpsLocation(file_name, gps_datetime, lat, lng, alt):
"""
Adding GPS tag
"""
lat_deg = to_degree(lat, ["S", "N"])
lng_deg = to_degree(lng, ["W", "E"])
exiv_lat = (pyexiv2.Rational(lat_deg[0] * 60 + lat_deg[1], 60), pyexiv2.Rational(
lat_deg[2] * 100, 6000), pyexiv2.Rational(0, 1))
exiv_lng = (pyexiv2.Rational(lng_deg[0] * 60 + lng_deg[1], 60), pyexiv2.Rational(
lng_deg[2] * 100, 6000), pyexiv2.Rational(0, 1))
exiv_image = pyexiv2.ImageMetadata(file_name)
exiv_image.read()
date_tag = exiv_image['Exif.Image.DateTime']
date_max = max(date_tag.value, gps_datetime)
date_min = min(date_tag.value, gps_datetime)
time_diff = date_max - date_min
if (time_diff > timedelta(seconds=5)):
print(
"WARNING, camera trigger and photo time different by {}".format(time_diff))
print(" Photo tag time: {}".format(date_tag.value))
print(" Camera trigger time: {}".format(gps_datetime))
exiv_image["Exif.GPSInfo.GPSLatitude"] = exiv_lat
exiv_image["Exif.GPSInfo.GPSLatitudeRef"] = lat_deg[3]
exiv_image["Exif.GPSInfo.GPSLongitude"] = exiv_lng
exiv_image["Exif.GPSInfo.GPSLongitudeRef"] = lng_deg[3]
exiv_image["Exif.GPSInfo.GPSAltitude"] = pyexiv2.Rational(alt, 1)
exiv_image["Exif.GPSInfo.GPSAltitudeRef"] = '0'
exiv_image["Exif.Image.GPSTag"] = 654
exiv_image["Exif.GPSInfo.GPSMapDatum"] = "WGS-84"
exiv_image["Exif.GPSInfo.GPSVersionID"] = '2 0 0 0'
exiv_image.write(True)
def LoadPX4log(px4_log_file):
"""
load px4 log file and extract trigger locations
"""
os.system('python sdlog2_dump.py ' + px4_log_file +
' -t time -m TIME -m CAMT -m GPOS -m GPS -f log.csv')
f = open('log.csv', 'rb')
reader = csv.reader(f)
headers = reader.next()
line = {}
for h in headers:
line[h] = []
for row in reader:
for h, v in zip(headers, row):
line[h].append(v)
trigger_list = TriggerList()
for seq in range(0, len(line['CAMT_seq']) - 1):
if line['CAMT_seq'][seq] != line['CAMT_seq'][seq + 1]:
trigger_list.CAMT_seq.append(line['CAMT_seq'][seq + 1])
trigger_list.CAMT_timestamp.append(line['CAMT_timestamp'][seq + 1])
trigger_list.GPOS_Lat.append(line['GPOS_Lat'][seq + 1])
trigger_list.GPOS_Lon.append(line['GPOS_Lon'][seq + 1])
trigger_list.GPOS_Alt.append(line['GPOS_Alt'][seq + 1])
trigger_list.GPS_GPSTime.append(line['GPS_GPSTime'][seq + 1])
return trigger_list
def LoadImageList(input_folder):
"""
load the image list
"""
image_list = ImageList()
for jpg_image in glob.glob(input_folder + "/*.jpg"):
image_list.jpg.append(jpg_image)
for jpg_image in glob.glob(input_folder + "/*.JPG"):
image_list.jpg.append(jpg_image)
for raw_image in glob.glob(input_folder + "/*.RC"):
image_list.raw.append(raw_image)
if len(image_list.jpg) != len(image_list.raw) and len(image_list.jpg) * len(image_list.raw) != 0:
print("Unequal number of jpg and raw images")
if len(image_list.jpg) == 0 and len(image_list.raw) == 0:
print("No images found")
image_list.jpg = sorted(image_list.jpg)
image_list.raw = sorted(image_list.raw)
return image_list
def FilterTrigger(trigger_list, image_list):
"""
filter triggers to allow exact matching with recorded images
"""
# filter trigger list to match the number of pics
if len(image_list.jpg) != len(trigger_list.CAMT_seq):
print('WARNING! differ number of jpg images ({}) and camera triggers ({})'.format(
len(image_list.jpg), len(trigger_list.CAMT_seq)))
n_overlap = min(len(image_list.jpg), len(trigger_list.CAMT_seq))
del image_list.jpg[n_overlap:]
if len(image_list.raw) != len(trigger_list.CAMT_seq):
print('WARNING! differ number of raw images ({}) and camera triggers ({})'.format(
len(image_list.raw), len(trigger_list.CAMT_seq)))
n_overlap = min(len(image_list.raw), len(trigger_list.CAMT_seq))
del image_list.raw[n_overlap:]
return trigger_list
def TagImages(trigger_list, image_list, output_folder):
"""
load px4 log file and extract trigger locations
"""
for image in range(len(image_list.jpg)):
print("############################################################")
print('Photo {}: {}'.format(image, image_list.jpg[image]))
cam_time = int(trigger_list.GPS_GPSTime[image]) / 1000000
gps_datetime = datetime.fromtimestamp(cam_time)
base_path, filename = os.path.split(image_list.jpg[image])
copyfile(image_list.jpg[image], output_folder + "/" + filename)
SetGpsLocation(output_folder + "/" + filename, gps_datetime, float(
trigger_list.GPOS_Lat[image]), float(trigger_list.GPOS_Lon[image]), float(trigger_list.GPOS_Alt[image]))
for image in range(len(image_list.raw)):
print("############################################################")
print('Photo {}: {}'.format(image, image_list.raw[image]))
cam_time = int(trigger_list.GPS_GPSTime[image]) / 1000000
gps_datetime = datetime.fromtimestamp(cam_time)
base_path, filename = os.path.split(image_list.raw[image])
copyfile(image_list.raw[image], output_folder + "/" + filename)
SetGpsLocation(output_folder + "/" + filename, gps_datetime, float(
trigger_list.GPOS_Lat[image]), float(trigger_list.GPOS_Lon[image]), float(trigger_list.GPOS_Alt[image]))
def main():
"""
Main method
"""
parser = OptionParser()
parser.add_option("-l", "--logfile", dest="LogFile",
help="PX4 log file containing recorded positions",
metavar="string")
parser.add_option("-i", "--input", dest="InputFolder",
help="Input folder containing untagged images in alphabetical order",
type="string")
parser.add_option("-o", "--output", dest="OutputFolder",
help="Output folder to contain tagged images",
type="string")
(options, args) = parser.parse_args()
if not options.LogFile:
print "please type python " \
"geotagging.py --logfile=[filename] --intput=[folder] [--output=[folder]]"
elif not options.InputFolder:
print "please type python " \
"geotagging.py --logfile=[filename] --intput=[folder] [--output=[folder]]s"
else:
trigger_list = LoadPX4log(options.LogFile)
image_list = LoadImageList(options.InputFolder)
if not options.OutputFolder:
options.OutputFolder = "imagesWithTag"
if not os.path.exists(options.OutputFolder):
os.makedirs(options.OutputFolder)
trigger_list = FilterTrigger(trigger_list, image_list)
TagImages(trigger_list, image_list, options.OutputFolder)
if __name__ == "__main__":
main()