Introduction
If you looked at my previous posts, you noticed I explored some of the nice things Boost Asio gives you, and how I implemented my own version.
I covered what I called callstack markers, which has more uses than it looks on the surface. Also covered strands, and why they are such a nice concept.
This time around, I’m going to focus on the equivalent of boost::timer::deadline_timer. The implementation shown here is custom made, portable C++11 and self contained.
If I had to roughly describe what deadline_timer is for while at the same time hinting at the implementation, I would say “deadline_timer allows you group together a bunch of handlers to be called when a timer expires“.
I think the “deadline” in deadline_timer is a bit misleading. With the definition of “deadline” being “the latest time or date by which something should be completed“, the first thing that crosses my mind is that it is used to cancel some operation that did not complete by the specified time/date. Such as to cancel a connect attempt. Certainly it can, but can be used for a lot more.
For simplicity, I’ll be calling it “timer(s)”, and not “deadline timer(s)”
A bit of brainstorming
So, how do we go about implementing timers?
At its core, you need a queue for the timers, and a worker thread that dequeues and executes timers according to the specified expiry times/dates. Lets call it “Timer Queue”. Win32 API uses the same name for such a thing: Timer Queues
What I explore in this post is a Timer Queue implementation. It allows one-shot timers. Higher level things such as grouping handlers in one timer and timers that can be reused (like boost Asio’s own implementation), can be built on top.
The trickiest thing to solve is how to correctly notify the worker thread that something happened that might change the current wait time.
For example:
- Worker thread is waiting for timers to process
- Main thread adds a timer A with expiry time of 10 seconds.
- Worker thread detects there is a timer in the queue, and starts a wait based on that timer expiry time.
- Main thread adds a timer B with expiry time 1 second
- Worker thread needs to rethink the waiting time, since timer B should be executed before A.
Now, if you throw in a couple more threads adding or cancelling timers, things might get confusing. Personally, I think it’s a bad sign if to verify correctness of your multithreaded code you have to think too hard. Sure, performance is good, but first get it right. If performance is a problem, profile it, then think as hard as you need to get it right and fast.
My first pass at this was convoluted to say the least. There was a queue for the timers, then communication with the worker thread was done with another queue where I would pass commands such as “recalculate wait time”, or “execute this timer right now”, or “shutdown”.
To add insult to injury, those two queues were locked independently.
Sure enough, as I started to write this post to share the code, I’ve spotted another bug. Ever had that feeling of “Surely there must be a cleaner way to do this” ?
To be honest, in hindsight that first pass was quite bad. Maybe it just got that bad in small increments as I was focusing on the wrong problems.
What was sabotaging my efforts was that I was in the wrong mindset. I kept thinking off the thread-to-thread communication as “work to be done“, and thus the obvious choice there was to have a command queue.
After a moment of clarity, I changed my mindset from “work to be done” to “something has changed“, and a cleaner solution formed:
The thread wakes up when something has changed (e,g: Timer added/cancelled, or expired), and checks for work (e.g: execute handlers for expired timers). Extraneous notifications to wake up the thread have no side effects, since the thread wakes up, checks for work, recalculates the new wait time and goes back to waiting.
Implementation
The only helper class needed is a semaphore, which can be implemented in portable C++11 with a mutex, a condition_variable and a counter.
#pragma once #include <mutex> #include <condition_variable> class Semaphore { public: Semaphore(unsigned int count = 0) : m_count(count) {} void notify() { std::unique_lock<std::mutex> lock(m_mtx); m_count++; m_cv.notify_one(); } void wait() { std::unique_lock<std::mutex> lock(m_mtx); m_cv.wait(lock, [this]() { return m_count > 0; }); m_count--; } template <class Clock, class Duration> bool waitUntil(const std::chrono::time_point<Clock, Duration>& point) { std::unique_lock<std::mutex> lock(m_mtx); if (!m_cv.wait_until(lock, point, [this]() { return m_count > 0; })) return false; m_count--; return true; } private: std::mutex m_mtx; std::condition_variable m_cv; unsigned int m_count; };
A few more useful methods could be added to Semaphore, but were omitted to keep it short.
- bool waitFor(duration) : To wait for specified duration before timeout
- bool tryWait() : To check and decrement the semaphore without waiting.
And the TimerQueue implementation, heavily documented to further explain the implementation…
#pragma once #include "Semaphore.h" #include <thread> #include <queue> #include <chrono> #include <assert.h> // Timer Queue // // Allows execution of handlers at a specified time in the future // Guarantees: // - All handlers are executed ONCE, even if canceled (aborted parameter will //be set to true) // - If TimerQueue is destroyed, it will cancel all handlers. // - Handlers are ALWAYS executed in the Timer Queue worker thread. // - Handlers execution order is NOT guaranteed // class TimerQueue { public: TimerQueue() { m_th = std::thread([this] { run(); }); } ~TimerQueue() { cancelAll(); // Abusing the timer queue to trigger the shutdown. add(0, [this](bool) { m_finish = true; }); m_th.join(); } //! Adds a new timer // return // Returns the ID of the new timer. You can use this ID to cancel the // timer uint64_t add(int64_t milliseconds, std::function<void(bool)> handler) { WorkItem item; item.end = Clock::now() + std::chrono::milliseconds(milliseconds); item.handler = std::move(handler); std::unique_lock<std::mutex> lk(m_mtx); uint64_t id = ++m_idcounter; item.id = id; m_items.push(std::move(item)); lk.unlock(); // Something changed, so wake up timer thread m_checkWork.notify(); return id; } //! Cancels the specified timer // return // 1 if the timer was cancelled. // 0 if you were too late to cancel (or the timer ID was never valid to // start with) size_t cancel(uint64_t id) { // Instead of removing the item from the container (thus breaking the // heap integrity), we set the item as having no handler, and put // that handler on a new item at the top for immediate execution // The timer thread will then ignore the original item, since it has no // handler. std::unique_lock<std::mutex> lk(m_mtx); for (auto&& item : m_items.getContainer()) { if (item.id == id && item.handler) { WorkItem newItem; // Zero time, so it stays at the top for immediate execution newItem.end = Clock::time_point(); newItem.id = 0; // Means it is a canceled item // Move the handler from item to newitem. // Also, we need to manually set the handler to nullptr, since // the standard does not guarantee moving an std::function will // empty it. Some STL implementation will empty it, others will // not. newItem.handler = std::move(item.handler); item.handler = nullptr; m_items.push(std::move(newItem)); lk.unlock(); // Something changed, so wake up timer thread m_checkWork.notify(); return 1; } } return 0; } //! Cancels all timers // return // The number of timers cancelled size_t cancelAll() { // Setting all "end" to 0 (for immediate execution) is ok, // since it maintains the heap integrity std::unique_lock<std::mutex> lk(m_mtx); for (auto&& item : m_items.getContainer()) { if (item.id) { item.end = Clock::time_point(); item.id = 0; } } auto ret = m_items.size(); lk.unlock(); m_checkWork.notify(); return ret; } private: using Clock = std::chrono::steady_clock; TimerQueue(const TimerQueue&) = delete; TimerQueue& operator=(const TimerQueue&) = delete; void run() { while (!m_finish) { auto end = calcWaitTime(); if (end.first) { // Timers found, so wait until it expires (or something else // changes) m_checkWork.waitUntil(end.second); } else { // No timers exist, so wait forever until something changes m_checkWork.wait(); } // Check and execute as much work as possible, such as, all expired // timers checkWork(); } // If we are shutting down, we should not have any items left, // since the shutdown cancels all items assert(m_items.size() == 0); } std::pair<bool, Clock::time_point> calcWaitTime() { std::lock_guard<std::mutex> lk(m_mtx); while (m_items.size()) { if (m_items.top().handler) { // Item present, so return the new wait time return std::make_pair(true, m_items.top().end); } else { // Discard empty handlers (they were cancelled) m_items.pop(); } } // No items found, so return no wait time (causes the thread to wait // indefinitely) return std::make_pair(false, Clock::time_point()); } void checkWork() { std::unique_lock<std::mutex> lk(m_mtx); while (m_items.size() && m_items.top().end <= Clock::now()) { WorkItem item(std::move(m_items.top())); m_items.pop(); lk.unlock(); if (item.handler) item.handler(item.id == 0); lk.lock(); } } Semaphore m_checkWork; std::thread m_th; bool m_finish = false; uint64_t m_idcounter = 0; struct WorkItem { Clock::time_point end; uint64_t id; // id==0 means it was cancelled std::function<void(bool)> handler; bool operator>(const WorkItem& other) const { return end > other.end; } }; std::mutex m_mtx; // Inheriting from priority_queue, so we can access the internal container class Queue : public std::priority_queue<WorkItem, std::vector<WorkItem>, std::greater<WorkItem>> { public: std::vector<WorkItem>& getContainer() { return this->c; } } m_items; };
And a small example just adding and cancelling timers…
#include "TimerQueue.h" #include <future> namespace Timing { using Clock = std::chrono::high_resolution_clock; static thread_local Clock::time_point ms_previous; double now() { static auto start = Clock::now(); return std::chrono::duration<double, std::milli>(Clock::now() - start) .count(); } void sleep(unsigned ms) { std::this_thread::sleep_for(std::chrono::milliseconds(ms)); } } // namespace Timing int main() { TimerQueue q; // Create timer with ID 1 q.add(10000, [start = Timing::now()](bool aborted) mutable { printf("ID 1: aborted=%s, Elapsed %4.2fmsn", aborted ? "true " : "false", Timing::now() - start); }); // Create Timer with ID 2 q.add(10001, [start = Timing::now()](bool aborted) mutable { printf("ID 2: aborted=%s, Elapsed %4.2fmsn", aborted ? "true " : "false", Timing::now() - start); }); // Should cancel timers with ID 1 and 2 q.cancelAll(); // Create timer with ID 3 q.add(1000, [start = Timing::now()](bool aborted) mutable { printf("ID 3: aborted=%s, Elapsed %4.2fmsn", aborted ? "true " : "false", Timing::now() - start); }); // Create timer with ID 4 auto id = q.add(2000, [start = Timing::now()](bool aborted) mutable { printf("ID 4: aborted=%s, Elapsed %4.2fmsn", aborted ? "true " : "false", Timing::now() - start); }); // Cancel timer with ID 4 auto ret = q.cancel(id); assert(ret == 1); // Give just enough time to execute timer with ID 3 before destroying the // TimerQueue Timing::sleep(1500); // At this point, when destroying TimerQueue, the timer with ID 4 is still // pending and will be cancelled implicitly by the destructor return 0; }
The output will be this (with varying *Elapsed” values)…
ID 1: aborted=true , Elapsed 0.01ms ID 2: aborted=true , Elapsed 0.03ms ID 4: aborted=true , Elapsed 0.03ms ID 3: aborted=false, Elapsed 1000.92ms
Things to improve
Main objective with this implementation was to have a small, self-contained and portable implementation. Therefore a few things can be improved if performance is a problem:
- Use some kind of lockless queue
- Cancelling a timer requires iterating through all the timers (the worst case scenario)
- Add checks in TimerQueue::add to detect if we are shutting down
- Try to get rid of unnecessary wake ups.
- For example, for simplicity, adding or cancelling a timer makes no effort to detect if it’s really necessary to wake up the timer thread.
- Using a uint64_t to identify timers works fine, but it’s error prone.
- A fresh pair of eyes looking at the code, since this was a quick implementation with some basic tests.
Conclusion
Although TimerQueue is useful as-is, if you are familiar with Boost’s Asio deadline_timer, you will spot a couple of differences:
- TimerQueue is not in any way associated with something akin to Boost Asio’s io_service
- Handlers are executed in the TimerQueue’s own thread.
- Timers are one-shot
- Boost Asio deadline_timer is reusable.
A close match to Boost Asio deadline_timer can be implemented on top of TimerQueue by having a Timer class that aggregates handlers, and puts a one-off timer on TimerQueue whenever we set an expiry time. That one-off timer will run any handlers the Timer instance owns. Even more, instead of executing the handlers in the TimerQueue thread, Timer can forward execution of the handlers to another object, thus giving the application control when/where to execute the handlers.
While writing this, I took a quick look at how Boost’s Asio’s implements deadline timers (on Windows), to see if it was significantly different. Particularly, I was curious how deadline_timer is tied to io_service. If there is another thread involved for triggering timer related things, or if somehow everything was controlled by Windows Completion Ports.
From my short study of the beast’s guts, it seems it uses a somewhat similar method:
- There is a thread for timer related things (struct win_iocp_io_service::timer_thread_function )
- This thread just loops until shutdown, waiting on a Win32 waitable timer object for something to happen.
- When this waitable timer expires, it uses PostQueuedCompletionStatus on the associated Completion Port, to trigger any handler execution in the appropriate threads (where the user is calling io_service::run)
- Changes to deadline_timer instances set that win32 waitable timer
I said somewhat similar, since my implementation executes handlers in the timer queue thread itself when a timer expires. In Boost Asio, when a timer expires, the timer thread sends a signal to the respective io_service, and the handlers are executed there, as explained above.
Feel free to comment with corrections, suggestions, or ask me to cover other areas.
License
The source code in this article is licensed under the CC0 license, so feel free to copy, modify, share, do whatever you want with it.
No attribution is required, but I’ll be happy if you do.
Updates
- 2016-08-25 : Fixed a bug in cancel (Thanks to Daniel’s comment)
- According to the standard, moving an std::function does not guarantee it will empty it. Visual Studio’s empties it, and so I missed that bug.
- 2016-09-01 : Fixed a bug in cancelAll (Thanks to Patrick’s comment)
- Because calls to cancel don’t remove the original item, but set the handler to nullptr so that the worker thread then ignores those, cancelAll was then ignoring those items and not changing the expiry time (for immediate execution). This meant destroying a TimerQueue would assert since the container would not be empty by the time the worker thread finished.
- The behaviour was still correct since the items left in the container at the time of destruction were already cancelled items (no handler to execute).
I tried out your timer queue implementation on OS-X and think I found a bug:
the cancelled timers will fire again later, because it seems the original handler is not set to nullptr automatically with
newItem.handler = std::move(item.handler);
If I set item.handler to nullptr, everything works fine for me. Btw, what is the exact license of your code above ?
Hi Daniel,
I’ll take a look at that bug later today and update it if necessary. Many thanks for the heads up. 🙂
The license, yes, good point. I should start putting a note at the end of articles with considerable code. I’ll have to check the best license, but I’ll go for something quite permissive. I’d say most of the snippets in these articles will be some version of Common Creatives, maybe https://creativecommons.org/publicdomain/zero/1.0/
Although no attribution required with that one license (CC0) I believe, I don’t mind if you do.
Update: Appropriate license note added to articles with considerable source code.
Thanks for updating the license info.
One thing that I find important is to have the possibility that a cancel operation blocks until the timer has really been cancelled. The same for cancelAll. With this approach it can be guaranteed that no async processing for the specified id is taking place afterwards. At the moment cancelled timers fire totally async to the caller.
I would like to implement such behaviour by adding cancelSync() / cancelAllSync() to your timer code. I figure something like adding an extra semaphore and notify this in case a specific timer is cancelled or the timer queue is empty. Any thoughts ?
I believe having synchronous cancels is problematic. You will need to be locking something while calling unknown code (the user handlers), and that’s always bad. It opens the possibility to cause all kind of problems (e.g: deadlocks) and edge cases.
When in doubt, I prefer to have the API give a small set of strong guarantees (preferably without “except when…” cases) and pass responsibility of the rest to the user code.
If you look at the *checkWork* function, you’ll notice I’m temporarily unlocking to call the handler, so that I don’t call unknown code while locking.
Nevertheless, I’m not saying is not possible. I would have to think careful how to go about it.
I’ll gladly study any changes you make to try and find problems.
Regarding the “std::move” bug you first mentioned, it works fine with Visual Studio. But it seems the standard does not guarantee that *moving* an *std::function* will empty it, so yep, it’s a bug.
See http://stackoverflow.com/questions/13680587/move-semantic-with-stdfunction .
Thanks again.
Update: Updated the *cancel* function to fix the bug.
Thanks for confirming the bug.
Concerning the synchronous design: I see your point with potential race conditions if one uses another set of semaphores.
But if one would introduce cancellation methods that cause the timers not to fire anymore, there would not be any race condition.
This can be easily done by setting the appropriate id and handler to 0 and let the timer thread ignore all those timers.
Is there a particular reason why you want to guarantee to fire a timer even after cancellation ? I think the user could benefit in having the choice to either cancel a timer in the original form or the “unfired” way.
Hi Daniel,
Sorry for the long delay. Busy weekend with the family.
When you mentioned “a cancel operation blocks until the timer has really been cancelled” in the previous comment, it seemed to me you meant having the entire thing locked AND also call the timers as cancelled.
Didn’t realize you meant skipping firing the timer as cancelled.
Might be ok. From the top of my head, I can’t think of anything obviously wrong with that.
I’m using this code in one of our internal products at Cloudgine, and if memory serves me right, I pondered the same thing as you back then.
I ended up leaving it as it is for the following reasons:
* I needed cleanup code for when a timer is cancelled. Having the timer being called even if cancelled (with ‘aborted’ set to true), allows you to have the cleanup in the same place as the proper code. You have access to all the data you passed to the lambda there. And you don’t end up splitting “success” and “cancel” all over the application, especially if you need to cancel the timer from several places.
Splitting the success and cancel code means having the data you passed to the lambda available externally (if you need to do any cleanup with that data) , which just adds more complexity.
* Another thing I remember keeping my brain busy for a while was the small window when we try to cancel a timer and the cancel fails. Failing to cancel a timer doesn’t necessarily mean the timer fired already. It might be at the point it was removed from the queue and it’s about to fire.
There is no way around this, unless we are constantly locking the queue even when firing timers (bad thing as I mentioned previously).
What this means is that when attempting to cancel a timer, you need to keep in mind a timer handler MIGHT be called after the cancel (if the cancel failed), and you need to design accordingly.
You have this problem regardless if timers are fired when aborted (the current design), or completely skipped (as we are discussing).
In the end, having cancelled timers completely skipped didn’t bring any benefit, and that skipping can be done as easily as putting this in the timer lambda:
“if (aborted) return;”
I might be missing something, so feel free to point it out. But with the current design of not holding locks when calling a timer handler, I don’t see any way to guarantee a synchronous cancel (successful or not). You can be sure it is synchronous if the cancel was successful, but if it wasn’t, it means the handler is already executing or will execute at some point after your cancel attempt.
Hope it makes sense. It’s been a while since I brainstormed about this design.
I think by solving this issue in the cancel, another bug was introduced. Since the handler of that item is a nullptr, it’s end won’t be reset by cancelAll, and the queue won’t be empty (leading to the assert at the end of run being false)
Welcome to the blog, Patrick.
I see. Indeed it is a bug. 🙂
But not new. It’s been there from the beginning. Behaviour was still correct for the entire lifetime of the TimerQueue
What the *cancelAll* does is set all timers for immediate execution. That bug leaves timers that where previously cancelled with *cancel* untouched (not for immediate execution).
Since *handler* is empty for those as a result of *cancel*, they will be ignored as they are pulled out by the worker thread, so behaviour is still correct.
The bug will only show itself if destroying the TimerQueue before the worker thread has time to pull them all out.
The small test case I was using had a sleep at the end, so all Timers were expiring before destroying the TimerQueue instance.
Well spotted. Thanks Patrick.
Going to update the code with a fix and a better test case.
Rui,
I was looking for a modern implementation of something closest to a timer wheel it seems to do the job for me but i also wanted to see periodic option so i created. Here is the patch.
— TimerQueue.h.org 2017-01-10 21:12:38.359332810 -0800
+++ TimerQueue.h 2017-01-10 21:21:42.066201334 -0800
@@ -24,7 +24,7 @@
~TimerQueue() {
cancelAll();
// Abusing the timer queue to trigger the shutdown.
– add(0, [this](bool) { m_finish = true; });
+ add(0, false,[this](bool) { m_finish = true; });
m_th.join();
}
@@ -32,7 +32,7 @@
// return
// Returns the ID of the new timer. You can use this ID to cancel the
// timer
– uint64_t add(int64_t milliseconds, std::function handler) {
+ uint64_t add(int64_t milliseconds, bool periodic, std::function handler) {
WorkItem item;
item.end = Clock::now() + std::chrono::milliseconds(milliseconds);
item.handler = std::move(handler);
@@ -40,7 +40,16 @@
std::unique_lock lk(m_mtx);
uint64_t id = ++m_idcounter;
item.id = id;
+
+ if (periodic == true) {
+ item.periodic = std::chrono::milliseconds(milliseconds);
+ } else {
+ item.periodic = std::chrono::milliseconds(0);
+ }
+
m_items.push(std::move(item));
+
+
lk.unlock();
// Something changed, so wake up timer thread
@@ -71,10 +80,11 @@
// the standard does not guarantee moving an std::function will
// empty it. Some STL implementation will empty it, others will
// not.
+ newItem.periodic = std::chrono::milliseconds(0);
newItem.handler = std::move(item.handler);
item.handler = nullptr;
m_items.push(std::move(newItem));
–
+
lk.unlock();
// Something changed, so wake up timer thread
m_checkWork.notify();
@@ -95,6 +105,7 @@
if (item.id) {
item.end = Clock::time_point();
item.id = 0;
+ item.periodic = std::chrono::milliseconds(0);
}
}
auto ret = m_items.size();
@@ -158,6 +169,12 @@
if (item.handler)
item.handler(item.id == 0);
lk.lock();
+
+
+ if(item.periodic.count() > 0) {
+ item.end = Clock::now() + std::chrono::milliseconds(item.periodic);
+ m_items.push(std::move(item));
+ }
}
}
@@ -173,6 +190,7 @@
bool operator>(const WorkItem& other) const {
return end > other.end;
}
+ std::chrono::milliseconds periodic;
};
std::mutex m_mtx;
Hi Pawell,
Thanks for the patch. I’ll take a look by the end of the week, and I’ll get back to you with feedback.
A small edit to the article messed up the code formatting.
I’ll have to fix it whenever I have a chance.
Sorry for that.