Commit d85f5525 authored by Pierre NARVOR's avatar Pierre NARVOR
Browse files

[driver] Synchronous wait for command result

parent 886e83a8
#ifndef _DEF_SEATRAC_DRIVER_SEATRAC_DRIVER_H_
#define _DEF_SEATRAC_DRIVER_SEATRAC_DRIVER_H_
#include <memory>
#include <functional>
#include <list>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <seatrac_driver/SeatracEnums.h>
#include <seatrac_driver/SeatracTypes.h>
#include <seatrac_driver/SeatracSerial.h>
#include <seatrac_driver/messages/Messages.h>
namespace narval { namespace seatrac {
class SeatracDriver : public SeatracSerial
{
public:
......@@ -15,17 +25,51 @@ class SeatracDriver : public SeatracSerial
using IoServicePtr = std::shared_ptr<boost::asio::io_service>;
using SerialPort = boost::asio::serial_port;
using ReadBuffer = boost::asio::streambuf;
class Waiter
{
public:
//using Ptr = std::unique_ptr<Waiter>;
using Ptr = std::shared_ptr<Waiter>;
protected:
CID_E msgId_;
std::vector<uint8_t>* dataDestination_;
std::mutex mutex_;
std::condition_variable cv_;
bool wasCalled_;
public:
Waiter(CID_E msgId, std::vector<uint8_t>* dataDestination);
CID_E msg_id() const;
bool wait_for_data(int64_t timeout = 5000);
void set_data(const std::vector<uint8_t>& data);
};
using Waiters = std::list<Waiter::Ptr>;
protected:
std::vector<uint8_t> data_;
Waiters waiters_;
std::mutex waitersMutex_;
virtual void on_receive(const std::vector<uint8_t>& data);
public:
SeatracDriver(const IoServicePtr& ioService,
const std::string& port = "/dev/narval_usbl");
bool send_request(unsigned int cmdSize, const uint8_t* cmdData,
std::vector<uint8_t>& respData,
int64_t timeout = 5000);
bool wait_for_message(CID_E msgId, std::vector<uint8_t>& data,
int64_t timeout = 5000);
};
}; //namespace seatrac
......
......@@ -104,14 +104,14 @@ class SeatracNode : public SeatracSerial
SeatracNode(const IoServicePtr& ioService,
const std::string& port = "/dev/narval_usbl") :
SeatracSerial(ioService, port),
node_("seatrac_usbl"),
node_("seatrac_usbl_station"),
publisher_(node_.advertise<geometry_msgs::PoseWithCovarianceStamped>("pose", 100))
{}
};
int main(int argc, char** argv)
{
ros::init(argc, argv, "seatrac_usbl");
ros::init(argc, argv, "seatrac_usbl_station");
AsyncService service;
SeatracNode serial(service.io_service(), "/dev/ttyUSB0");
......
......@@ -3,55 +3,151 @@
namespace narval { namespace seatrac {
SeatracDriver::SeatracDriver(const IoServicePtr& ioService,
const std::string& port) :
SeatracSerial(ioService, port)
SeatracDriver::Waiter::Waiter(CID_E msgId, std::vector<uint8_t>* dataDestination) :
msgId_(msgId),
dataDestination_(dataDestination),
wasCalled_(false)
{}
void SeatracDriver::on_receive(const std::vector<uint8_t>& data)
CID_E SeatracDriver::Waiter::msg_id() const
{
//std::cout << "Received : " << data.size() << " bytes." << std::endl;
print_message(std::cout, data);
return msgId_;
}
//return;
messages::Status header = *reinterpret_cast<const messages::Status*>(data.data());
if(header.cmdId != CID_STATUS) {
return;
}
if(header.expected_size() != data.size()) {
std::cerr << "Got status but wrong number of bytes (expected "
<< header.expected_size() << ", got " << data.size() << ")\n";
return;
}
bool SeatracDriver::Waiter::wait_for_data(int64_t timeout)
{
std::unique_lock<std::mutex> lock(mutex_);
const uint8_t* p = data.data() + sizeof(messages::Status);
if(header.contentType & ENVIRONMENT) {
std::cout << *reinterpret_cast<const messages::StatusEnvironment*>(p) << std::endl;
p += sizeof(messages::StatusEnvironment);
if(wasCalled_)
return true;
if(timeout < 0) {
cv_.wait(lock, [&]{ return wasCalled_; });
}
if(header.contentType & ATTITUDE) {
std::cout << *reinterpret_cast<const messages::StatusAttitude*>(p) << std::endl;
p += sizeof(messages::StatusAttitude);
else {
if(!cv_.wait_for(lock, std::chrono::milliseconds(timeout),
[&]{ return wasCalled_; })) {
// Timeout reached, data was not set
return false;
}
}
if(header.contentType & MAG_CAL) {
std::cout << *reinterpret_cast<const messages::StatusMagCalibration*>(p) << std::endl;
p += sizeof(messages::StatusMagCalibration);
return true;
}
void SeatracDriver::Waiter::set_data(const std::vector<uint8_t>& data)
{
{
std::unique_lock<std::mutex> lock(mutex_);
*dataDestination_ = data;
wasCalled_ = true;
}
if(header.contentType & ACC_CAL) {
std::cout << *reinterpret_cast<const messages::StatusAccCalibration*>(p) << std::endl;
p += sizeof(messages::StatusAccCalibration);
cv_.notify_all();
}
SeatracDriver::SeatracDriver(const IoServicePtr& ioService,
const std::string& port) :
SeatracSerial(ioService, port)
{}
/**
* Sends a command and waits for an answer (synchronous call)
*/
bool SeatracDriver::send_request(unsigned int cmdSize, const uint8_t* cmdData,
std::vector<uint8_t>& respData,
int64_t timeout)
{
CID_E msgId = (CID_E)cmdData[0]; // TODO check validity
auto waiter = std::make_shared<Waiter>(msgId, &respData);
{
std::unique_lock<std::mutex> lock(waitersMutex_);
waiters_.push_back(waiter);
}
if(header.contentType & AHRS_RAW_DATA) {
std::cout << *reinterpret_cast<const messages::StatusRawAHRS*>(p) << std::endl;
p += sizeof(messages::StatusRawAHRS);
// sending data when waiter is set, not before (to avoid missing the
// response)
this->send(cmdSize, cmdData);
return waiter->wait_for_data(timeout);
}
/**
* Waits for a specific message identified by its CID_E (with a timeout).
*/
bool SeatracDriver::wait_for_message(CID_E msgId, std::vector<uint8_t>& data,
int64_t timeout)
{
auto waiter = std::make_shared<Waiter>(msgId, &data);
{
std::unique_lock<std::mutex> lock(waitersMutex_);
waiters_.push_back(waiter);
}
if(header.contentType & AHRS_COMP_DATA) {
std::cout << *reinterpret_cast<const messages::StatusCompensatedAHRS*>(p) << std::endl;
p += sizeof(messages::StatusCompensatedAHRS);
// TODO delete waiter if timeout reached
return waiter->wait_for_data(timeout);
}
void SeatracDriver::on_receive(const std::vector<uint8_t>& data)
{
// main dispatch function
CID_E msgId = (CID_E)data[0]; // TODO check validity
{
// Iterating on waiters and setting data when msgId match.
// If there is a match, waiter is to be deleted.
std::unique_lock<std::mutex> lock(waitersMutex_);
auto it = waiters_.begin();
while(it != waiters_.end()) {
if((*it)->msg_id() == msgId) {
(*it)->set_data(data);
waiters_.erase(it++);
}
else {
it++;
}
}
}
std::cout << "expectedSize : " << header.expected_size()
<< ", got : " << data.size() << std::endl;
std::cout << "Received : " << data.size() << " bytes." << std::endl;
// print_message(std::cout, data);
// return;
//
// messages::Status header = *reinterpret_cast<const messages::Status*>(data.data());
// if(header.cmdId != CID_STATUS) {
// return;
// }
// if(header.expected_size() != data.size()) {
// std::cerr << "Got status but wrong number of bytes (expected "
// << header.expected_size() << ", got " << data.size() << ")\n";
// return;
// }
// const uint8_t* p = data.data() + sizeof(messages::Status);
// if(header.contentType & ENVIRONMENT) {
// std::cout << *reinterpret_cast<const messages::StatusEnvironment*>(p) << std::endl;
// p += sizeof(messages::StatusEnvironment);
// }
// if(header.contentType & ATTITUDE) {
// std::cout << *reinterpret_cast<const messages::StatusAttitude*>(p) << std::endl;
// p += sizeof(messages::StatusAttitude);
// }
// if(header.contentType & MAG_CAL) {
// std::cout << *reinterpret_cast<const messages::StatusMagCalibration*>(p) << std::endl;
// p += sizeof(messages::StatusMagCalibration);
// }
// if(header.contentType & ACC_CAL) {
// std::cout << *reinterpret_cast<const messages::StatusAccCalibration*>(p) << std::endl;
// p += sizeof(messages::StatusAccCalibration);
// }
// if(header.contentType & AHRS_RAW_DATA) {
// std::cout << *reinterpret_cast<const messages::StatusRawAHRS*>(p) << std::endl;
// p += sizeof(messages::StatusRawAHRS);
// }
// if(header.contentType & AHRS_COMP_DATA) {
// std::cout << *reinterpret_cast<const messages::StatusCompensatedAHRS*>(p) << std::endl;
// p += sizeof(messages::StatusCompensatedAHRS);
// }
// std::cout << "expectedSize : " << header.expected_size()
// << ", got : " << data.size() << std::endl;
}
}; //namespace seatrac
......
#include <iostream>
using namespace std;
#include <seatrac_driver/AsyncService.h>
#include <seatrac_driver/SeatracDriver.h>
using namespace narval::seatrac;
void request_sys_info(boost::asio::steady_timer* timer,
SeatracSerial* serial,
SeatracDriver* seatrac,
const boost::system::error_code& err)
{
if(err) {
......@@ -15,22 +16,43 @@ void request_sys_info(boost::asio::steady_timer* timer,
}
uint8_t cid = 0x02;
serial->send(1, &cid);
std::vector<uint8_t> respData;
if(!seatrac->send_request(1, &cid, respData, 5000)) {
cout << "Timeout reached !" << endl;
}
else {
cout << "Response ok" << endl;
}
//seatrac->send(1, &cid);
timer->expires_from_now(boost::asio::chrono::seconds(2));
timer->async_wait(boost::bind(&request_sys_info, timer, serial, _1));
timer->expires_from_now(boost::asio::chrono::seconds(1));
timer->async_wait(boost::bind(&request_sys_info, timer, seatrac, _1));
}
int main()
{
auto service = std::make_shared<SeatracSerial::IoService>();
SeatracDriver serial(service);
serial.enable_io_dump();
AsyncService service;
SeatracDriver seatrac(service.io_service(), "/dev/ttyUSB0");
seatrac.enable_io_dump();
// boost::asio::steady_timer timer(*service.io_service().get(), boost::asio::chrono::milliseconds(200));
// timer.async_wait(boost::bind(&request_sys_info, &timer, &seatrac, _1));
boost::asio::steady_timer timer(*serial.io_service().get(), boost::asio::chrono::seconds(2));
timer.async_wait(boost::bind(&request_sys_info, &timer, &serial, _1));
service.start();
for(int i = 0; i < 10; i++) {
uint8_t cid = 0x02;
std::vector<uint8_t> respData;
if(!seatrac.send_request(1, &cid, respData, 1000)) {
cout << "Timeout reached !" << endl;
}
else {
cout << "Response ok" << endl;
}
getchar();
}
service->run();
service.stop();
return 0;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment