Commit dab4c618 authored by BenZr's avatar BenZr
Browse files

ddboat driver v2 initial commit

parents
*~
__pycache__
*.pyc
DDBOAT python3 drivers version 2
IMU (MAG, ACCEL, GYRO) : imu9_driver_v2.py
GPS (serial line, GPGLL message) : gps_driver_v2.py
Encoders on propeller rotation (serial line) : encoders_driver_v2.py
Arduino motors command (serial line) : arduino_driver_v2.py
a small test program to test all the drivers
$ python3 tst_ddboat_v2.py
\ No newline at end of file
import serial
import time
import signal
import sys
class ArduinoIO():
def __init__(self):
self.arduino = None
self.cmdl = 0
self.cmdr = 0
self.dirl = ' '
self.dirr = ' '
try:
self.arduino = serial.Serial('/dev/ttyACM0',115200,timeout=10.0)
#arduino = serial.Serial('/dev/ttyACM1',115200,timeout=1.0)
data = self.arduino.readline()
print ("init status",len(data),data)
except:
print ("Cannot initialize Arduino driver")
signal.signal(signal.SIGINT, self.signal_handler)
def signal_handler(self,sig, frame):
print('You pressed Ctrl+C! set motors to 0!')
#serial_arduino = serial.Serial('/dev/ttyACM0',115200,timeout=1.0)
data = self.send_arduino_cmd_motor(0,0)
sys.exit(0)
def send_arduino_cmd_motor(self,cmdl0,cmdr0):
self.cmdl = cmdl0
self.cmdr = cmdr0
self.dirl = ' '
self.dirr = ' '
# no backward motion
#if cmdl < 0:
# self.dirl = '-'
#if cmdr < 0:
# self.dirr = '-'
self.cmdl = abs(self.cmdl)
self.cmdr = abs(self.cmdr)
strcmd = "M%c%3.3d%c%3.3d;"%(self.dirl,self.cmdl,self.dirr,self.cmdr)
#print (strcmd)
self.arduino.write(strcmd.encode())
def get_arduino_cmd_motor(self,timeout=1.0): # set by RC command ?
strcmd = "C;"
self.arduino.write(strcmd.encode())
t0 = time.time()
data="\n"
while True:
data = self.arduino.readline()
if data:
#print (data)
break
if (time.time()-t0) > timeout:
print ("get_arduino_cmd_motor timeout",timeout)
break
return data[0:-1]
# last motor command sent
def get_cmd_motor(self):
return self.cmdl,self.cmdr
def get_arduino_rc_chan(self,timeout=1.0):
strcmd = "R;"
#print (strcmd.encode())
self.arduino.write(strcmd.encode())
t0 = time.time()
data = "\n"
while True:
data = self.arduino.readline()
if data:
#print (data)
break
if (time.time()-t0) > timeout:
print ("get_arduino_rc_chan timeout",timeout)
break
return data
def get_arduino_status(self,timeout=1.0):
strcmd = "S;"
#print (strcmd.encode())
self.arduino.write(strcmd.encode())
t0 = time.time()
while True:
data = self.arduino.readline()
if data:
#print (data.decode())
break
if (time.time()-t0) > timeout:
print ("get_arduino_status timeout",timeout)
break
return data
if __name__ == "__main__":
ard = ArduinoIO()
ard.send_arduino_cmd_motor (20,20)
print ("Arduino status is",ard.get_arduino_status())
print ("cmd motors l,r are",ard.get_cmd_motor())
print ("RC channel is",ard.get_arduino_rc_chan())
print ("Hit Ctrl+C to get out!")
while True:
print ("arduino motors rc",ard.get_arduino_cmd_motor())
print ("RC channel is",ard.get_arduino_rc_chan())
time.sleep(1)
import serial
import os
import time
import struct
# encoder frame
# 0 : sync 0xFF
# 1 : sync 0x0D
# 2-5 : timer MSByte first
# 6 : direction 1 (left)
# 7 : direction 2 (right)
# 8-9 : encoder 1 (left)
# 10-11 : encoder 2 (right)
# 12-13 : Hall voltage 1 (left)
# 14-15 : Hall voltage 2 (right)
# 16 : sync 0xAA
# data = [timer,dirLeft,dirRight,encLeft,encRight,voltLeft,voltRight]
class EncoderIO():
def __init__(self):
self.baud_rate = 115200
self.init_line()
def init_line(self,timeout=1.0):
self.ser = serial.Serial('/dev/ttyUSB0',self.baud_rate,timeout=timeout)
# in principle , no need to use this function , baudrate is normally 115200 with
# the current software version
def set_baudrate(self,baudrate=115200):
self.baud_rate = baudrate
st = os.system ("stty -F /dev/ttyUSB0 %d"%(self.baud_rate))
print (st)
st = os.system ("stty -F /dev/ttyUSB0")
print (st)
def close_line(self):
self.ser.close()
# find the sync chars 0xFF and 0x0d at the beginning of the data
def get_sync(self):
while True:
c1 = self.ser.read(1)
if ord(c1) == 0xff:
c2 = self.ser.read(1)
if ord(c2) == 0x0d:
v = self.ser.read(15)
break
# read next packet, assume sync has been done before
# detect if sync is lost (sync false at return)
def read_packet(self,debug=False):
sync = True # assume sync has been done before
data = []
v=self.ser.read(17)
#print (type(v))
#st=""
#for i in range(len(v)):
# st += "%2.2x"%(ord(v[i]))
#print st
c1 = v[0]
c2 = v[1]
if (c1 != 0xff) or (c2 != 0x0d):
if debug:
print ("sync lost, exit")
sync = False
else:
timer = (v[2] << 32)
timer += (v[3] << 16)
timer += (v[4] << 8)
timer += v[5]
sensLeft = v[7]
sensRight= v[6]
posLeft = v[10] << 8
posLeft += v[11]
posRight = v[8] << 8
posRight += v[9]
voltLeft = v[14] << 8
voltLeft += v[15]
voltRight = v[12] << 8
voltRight += v[13]
c3 = v[16]
stc3 = "%2.2X"%(c3)
data.append(timer)
data.append(sensLeft)
data.append(sensRight)
data.append(posLeft)
data.append(posRight)
data.append(voltLeft)
data.append(voltRight)
if debug:
print (timer,sensLeft,sensRight,posLeft,posRight,voltLeft,voltRight,stc3)
return sync,data
# do everything (open, sync, read, close) once (mainly for debug purpose)
def read_single_packet(self,debug=True):
self.init_line()
self.get_sync(ser)
sync,data = self.read_packet(ser,debug=debug)
self.close_line(ser)
timeAcq = data[0]
sensLeft = data[1]
sensRight = data[2]
posLeft = data[3]
posRight = data[4]
return sync, timeAcq, sensLeft, sensRight, posLeft, posRight
if __name__ == "__main__":
encoddrv = EncoderIO()
while True:
encoddrv.get_sync()
while True:
sync,data_encoders = encoddrv.read_packet(debug=True)
if not sync:
break
import serial
import os
import time
# the GPS sensor gives informations using the NMEA standard
# https://www.nmea.org/content/STANDARDS/NMEA_0183_Standard
# https://en.wikipedia.org/wiki/NMEA_0183
class GpsIO:
def __init__(self):
# open serial line connected to the GPS sensor
self.init_line()
#time.sleep(1.0)
#print(ser)
def init_line(self,timeout=1.0):
self.ser = serial.Serial('/dev/ttyS0',timeout=timeout)
def close(self):
self.ser.close()
def read_next_message(self):
v=self.ser.readline().decode("utf-8")
#print (v)
return v
# read the position in the GPGLL message
# by default one GPGLL message is expected every 20 messages
def read_gll(self,n_try_max=20):
val=[0.,'N',0.,'W',0.]
for i in range(n_try_max):
v=self.ser.readline().decode("utf-8")
if str(v[0:6]) == "$GPGLL":
vv = v.split(",")
if len(vv[1]) > 0:
val[0] = float(vv[1])
if len(vv[2]) > 0:
val[1] = vv[2]
if len(vv[3]) > 0:
val[2] = float(vv[3])
if len(vv[4]) > 0:
val[3] = vv[4]
if len(vv[5]) > 0:
val[4] = float(vv[5])
break # GPGLL found ! exit !
return val
if __name__ == "__main__":
gps = GpsIO()
# display the 20 first messages
#for i in range(20):
# print (gps.read_next_message())
# display the 20 positions (GPGLL) messages
for i in range(20):
print (gps.read_gll())
gps.close()
import smbus
import time
class i2c():
def __init__(self,addr,bus_nb=2):
self.__bus_nb = bus_nb
self.__addr = addr
self.__bus = smbus.SMBus(self.__bus_nb)
print ("i2c device addr = 0x%x on bus %d"%(addr,bus_nb))
def read(self,offs,nbytes):
v = self.__bus.read_i2c_block_data(self.__addr,offs,nbytes)
time.sleep(0.001)
return v
def read_byte(self,offs):
v = self.__bus.read_byte_data(self.__addr,offs)
time.sleep(0.001)
return v
def write(self,cmd,vData):
self.__bus.write_i2c_block_data(self.__addr,cmd,vData)
time.sleep(0.001)
import struct
import time
import sys
import math
import i2creal as i2c # currently only real I2C on ddboats (no simulated I2C)
# LIS3DML 0x1e (mag sensor)
# LSM6 0x6b (accelero - gyro)
class Imu9IO():
def __init__(self):
self.__bus_nb = 1 # 1 on DDBoat, 2 on DartV2
self.__addr_mg = 0x1e # mag sensor
self.__addr_ag = 0x6b # accelero - gyro
self.__dev_i2c_mg=i2c.i2c(self.__addr_mg,self.__bus_nb)
self.__dev_i2c_ag=i2c.i2c(self.__addr_ag,self.__bus_nb)
# place your imu functions here
self.magx_min = 0.0
self.magx_max = 0.0
self.magy_min = 0.0
self.magy_max = 0.0
self.magx_offs = 0.0
self.magy_offs = 0.0
self.magx_scale = 0.0
self.magy_scale = 0.0
self.__mag_raw = [0.0,0.0,0.0]
self.__accel_raw = [0.0,0.0,0.0]
self.__gyro_raw = [0.0,0.0,0.0]
#
# configure mag sensor
# CTRL_REG1 (0x20) = 0b01110000
# OM = 11 (ultra-high-performance mode for X and Y);
# DO = 100 (10 Hz ODR)
self.__dev_i2c_mg.write(0x20,[0x70])
# CTRL_REG2 (0x21) = 0b00000000
# FS = 00 (+/- 4 gauss full scale)
self.__dev_i2c_mg.write(0x21,[0x00])
# CTRL_REG3 (0x22) = 0b00000000
# MD = 00 (continuous-conversion mode)
self.__dev_i2c_mg.write(0x22,[0x00])
# CTRL_REG4 (0x23) = 0b00001100
# OMZ = 11 (ultra-high-performance mode for Z)
self.__dev_i2c_mg.write(0x23,[0x0C])
#
# configure accelero + gyro
# LSM6DS33 gyro
# CTRL2_G (0x11) = 0b10001100
# ODR = 1000 (1.66 kHz (high performance))
# FS_G = 11 (2000 dps)
self.__dev_i2c_ag.write(0x11,[0x8C])
#
# CTRL7_G (0x16) = 0b00000000
# defaults
self.__dev_i2c_ag.write(0x16,[0x00])
#
# LSM6DS33 accelerometer
# CTRL1_XL (0x10) = 0b10001100
# ODR = 1000 (1.66 kHz (high performance))
# FS_XL = 11 (8 g full scale)
# BW_XL = 00 (400 Hz filter bandwidth)
#self.__dev_i2c_ag.write(0x10,[0x8C])
# more filtering BW_XL = 11 (50 Hz filter bandwidth)
self.__dev_i2c_ag.write(0x13,[0x00])
self.__dev_i2c_ag.write(0x10,[0x8C])
#
# common
# CTRL3_C (0x12) 0b00000100
# IF_INC = 1 (automatically increment address register)
self.__dev_i2c_ag.write(0x12,[0x04])
def setup_accel_filter (self,mode):
if mode == 0:
self.__dev_i2c_ag.write(0x17,[0x00])
self.__dev_i2c_ag.write(0x13,[0x00])
self.__dev_i2c_ag.write(0x10,[0x8C])
elif mode == 1:
self.__dev_i2c_ag.write(0x17,[0x00])
self.__dev_i2c_ag.write(0x13,[0x80])
self.__dev_i2c_ag.write(0x10,[0x8F])
elif mode == 2:
self.__dev_i2c_ag.write(0x17,[0x80])
self.__dev_i2c_ag.write(0x13,[0x80])
self.__dev_i2c_ag.write(0x10,[0x8F])
def read_mag_raw(self):
v = self.__dev_i2c_mg.read(0x28,6)
ix = self.cmpl2(v[0],v[1])
iy = self.cmpl2(v[2],v[3])
iz = self.cmpl2(v[4],v[5])
self.__mag_raw = [ix,iy,iz]
return self.__mag_raw
def read_gyro_raw(self):
# OUTX_L_G (0x22)
v = self.__dev_i2c_ag.read(0x22,6)
ix = self.cmpl2(v[0],v[1])
iy = self.cmpl2(v[2],v[3])
iz = self.cmpl2(v[4],v[5])
self.__gyro_raw = [ix,iy,iz]
return self.__gyro_raw
def read_accel_raw(self):
# OUTX_L_XL (0x28)
v = self.__dev_i2c_ag.read(0x28,6)
ix = self.cmpl2(v[0],v[1])
iy = self.cmpl2(v[2],v[3])
iz = self.cmpl2(v[4],v[5])
self.__accel_raw = [ix,iy,iz]
return self.__accel_raw
def cmpl2(self,lsByte,msByte):
i = lsByte + (msByte << 8)
if i >= (1<<15):
i = i - (1<<16)
return i
def heading_raw(self,magx,magy):
heading = math.atan2(magy,magx)
return heading
def heading_raw_deg(self,magx,magy):
heading = self.heading_raw(magx,magy)*180.0/math.pi
if heading < 0.0:
heading += 360.0
return heading
def fast_heading_calibration (self, magx_min, magx_max, magy_min, magy_max):
self.magx_min = magx_min
self.magx_max = magx_max
self.magy_min = magy_min
self.magy_max = magy_max
self.magx_offs = (magx_min+magx_max)/2.0
self.magy_offs = (magy_min+magy_max)/2.0
self.magx_scale = 2./(magx_max-magx_min)
self.magy_scale = 2./(magy_max-magy_min)
def heading(self,magx,magy):
magx_cal = (magx - self.magx_offs) * self.magx_scale
magy_cal = (magy - self.magy_offs) * self.magy_scale
#print (self.magx_min,magx,self.magx_max,self.magy_min,magy,self.magy_max,magx_cal,magy_cal)
magx_cal = 1.0 if magx_cal>1.0 else magx_cal
magx_cal = -1.0 if magx_cal<-1.0 else magx_cal
magy_cal = 1.0 if magy_cal>1.0 else magy_cal
magy_cal = -1.0 if magy_cal<-1.0 else magy_cal
heading = math.atan2(magy_cal,magx_cal)
return heading
def heading_deg(self,magx,magy):
heading = self.heading(magx,magy)*180.0/math.pi
if heading < 0.0:
heading += 360.0
return heading
if __name__ == "__main__":
imu = Imu9IO()
for i in range(200):
print (imu.read_mag_raw(),imu.read_accel_raw(),imu.read_gyro_raw())
time.sleep(0.01)
import time
import sys
import imu9_driver_v2 as imudrv
import gps_driver_v2 as gpsdrv
import arduino_driver_v2 as arddrv
import encoders_driver_v2 as encoddrv
imu = imudrv.Imu9IO()
gps = gpsdrv.GpsIO()
ard = arddrv. ArduinoIO()
encod = encoddrv.EncoderIO()
# init encoder stream (get synchro characters)
encod.get_sync()
# change motor cmd with time
vtcmd = [ 5, 10, 15, 20, 60] # time
vvcmd = [ 40, 80, 120, 0, 0] # cmd
icmd = 0
ncmd = len(vtcmd)
tmax = 25.0
t0 = time.time()
while True:
print ("---------------------------------------------------")
t = time.time()
if (t-t0) > tmax:
# end, tmax reached
break
#test Arduino (motors)
if icmd < ncmd:
if (t-t0) > vtcmd[icmd]:
cmdl = cmdr = vvcmd[icmd]
ard.send_arduino_cmd_motor(cmdl,cmdr)
icmd += 1
# test GPS
gps_data_string = gps.read_gll()
print ("GPS:",gps_data_string)
# test IMU9
magx,magy,magz = imu.read_mag_raw()
aclx,acly,aclz = imu.read_accel_raw()
gyrx,gyry,gyrz = imu.read_gyro_raw()
print ("Mag %5d %5d %5d"%(magx,magy,magz)," || Accel %5d %5d %5d"%(aclx,acly,aclz)," || Gyro %5d %5d %5d"%(gyrx,gyry,gyrz))
# test encoders
sync,data_encoders = encod.read_packet(debug=False)
if not sync: # if sync lost try to get it again ...
encod.get_sync()
print ("Encoder:",data_encoders)
time.sleep (0.2)
gps.close()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment