/* * Software License Agreement (BSD License) * * Copyright (c) 2011, Willow Garage, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials provided * with the distribution. * * Neither the name of the copyright holder(s) nor the names of its * contributors may be used to endorse or promote products derived * from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * Author: Nico Blodow (blodow@in.tum.de), Suat Gedikli (gedikli@willowgarage.com) */ #pragma once #include #include #include #include #include namespace pcl { /** /brief This template class synchronizes two data streams of different types. * The data can be added using add0 and add1 methods which expects also a timestamp of type unsigned long. * If two matching data objects are found, registered callback functions are invoked with the objects and the time stamps. * The only assumption of the timestamp is, that they are in the same unit, linear and strictly monotonic increasing. * If filtering is desired, e.g. thresholding of time differences, the user can do that in the callback method. * This class is thread safe. * /ingroup common */ template class Synchronizer { using T1Stamped = std::pair; using T2Stamped = std::pair; std::mutex mutex1_; std::mutex mutex2_; std::mutex publish_mutex_; std::deque queueT1; std::deque queueT2; using CallbackFunction = std::function; std::map cb_; int callback_counter = 0; public: int addCallback (const CallbackFunction& callback) { std::unique_lock publish_lock (publish_mutex_); cb_[callback_counter] = callback; return callback_counter++; } void removeCallback (int i) { std::unique_lock publish_lock (publish_mutex_); cb_.erase (i); } void add0 (const T1& t, unsigned long time) { mutex1_.lock (); queueT1.push_back (T1Stamped (time, t)); mutex1_.unlock (); publish (); } void add1 (const T2& t, unsigned long time) { mutex2_.lock (); queueT2.push_back (T2Stamped (time, t)); mutex2_.unlock (); publish (); } private: void publishData () { std::unique_lock lock1 (mutex1_); std::unique_lock lock2 (mutex2_); for (const auto& cb: cb_) { if (cb.second) { cb.second.operator()(queueT1.front ().second, queueT2.front ().second, queueT1.front ().first, queueT2.front ().first); } } queueT1.pop_front (); queueT2.pop_front (); } void publish () { // only one publish call at once allowed std::unique_lock publish_lock (publish_mutex_); std::unique_lock lock1 (mutex1_); if (queueT1.empty ()) return; T1Stamped t1 = queueT1.front (); lock1.unlock (); std::unique_lock lock2 (mutex2_); if (queueT2.empty ()) return; T2Stamped t2 = queueT2.front (); lock2.unlock (); bool do_publish = false; if (t1.first <= t2.first) { // iterate over queue1 lock1.lock (); while (queueT1.size () > 1 && queueT1[1].first <= t2.first) queueT1.pop_front (); if (queueT1.size () > 1) { // we have at least 2 measurements; first in past and second in future -> find out closer one! if ( (t2.first << 1) > (queueT1[0].first + queueT1[1].first) ) queueT1.pop_front (); do_publish = true; } lock1.unlock (); } else { // iterate over queue2 lock2.lock (); while (queueT2.size () > 1 && (queueT2[1].first <= t1.first) ) queueT2.pop_front (); if (queueT2.size () > 1) { // we have at least 2 measurements; first in past and second in future -> find out closer one! if ( (t1.first << 1) > queueT2[0].first + queueT2[1].first ) queueT2.pop_front (); do_publish = true; } lock2.unlock (); } if (do_publish) publishData (); } } ; } // namespace