pvAccessCPP  7.1.1
fairQueue.h
1 /**
2  * Copyright - See the COPYRIGHT that is included with this distribution.
3  * pvAccessCPP is distributed subject to a Software License Agreement found
4  * in file LICENSE that is included with this distribution.
5  */
6 
7 #ifndef FAIRQUEUE_H
8 #define FAIRQUEUE_H
9 
10 #include <vector>
11 
12 #ifdef epicsExportSharedSymbols
13 # define fairQueueExportSharedSymbols
14 # undef epicsExportSharedSymbols
15 #endif
16 
17 #include <epicsEvent.h>
18 #include <epicsMutex.h>
19 #include <epicsGuard.h>
20 #include <ellLib.h>
21 #include <dbDefs.h>
22 
23 #include <pv/sharedPtr.h>
24 
25 #ifdef fairQueueExportSharedSymbols
26 # define epicsExportSharedSymbols
27 # undef fairQueueExportSharedSymbols
28 #endif
29 
30 #include <shareLib.h>
31 
32 namespace epics {
33 namespace pvAccess {
34 
35 
36 /** @brief An intrusive, loss-less, unbounded, round-robin queue
37  *
38  * The parameterized type 'T' must be a sub-class of @class fair_queue<T>::entry
39  *
40  * @li Intrusive. Entries in the queue must derive from @class entry
41  *
42  * @li Loss-less. An entry will be returned by pop_front() corresponding to
43  * each call to push_back().
44  *
45  * @li Un-bounded. There is no upper limit to the number of times an entry
46  * may be queued other than machine constraints.
47  *
48  * @li Round robin. The order that entries are returned may not match
49  * the order they were added in. "Fairness" is achived by returning
50  * entries in a rotating fashion based on the order in which they were
51  * first added. Re-adding the same entry before it is popped does not change
52  * this order.
53  * Adding [A, A, B, A, C, C] would give out [A, B, C, A, C, A].
54  *
55  * @warning Only one thread should call pop_front()
56  * as push_back() does not broadcast (only wakes up one waiter)
57  */
58 template<typename T>
59 class fair_queue
60 {
61  typedef epicsGuard<epicsMutex> guard_t;
62 public:
63  typedef std::tr1::shared_ptr<T> value_type;
64 
65  class entry {
66  /* In c++, use of ellLib (which implies offsetof()) should be restricted
67  * to POD structs. So enode_t exists as a POD struct for which offsetof()
68  * is safe and well defined. enode_t::self is used in place of
69  * casting via CONTAINER(penode, entry, enode)
70  */
71  struct enode_t {
72  ELLNODE node;
73  entry *self;
74  } enode;
75  unsigned Qcnt;
76  value_type holder;
77  fair_queue *owner;
78 
79  friend class fair_queue;
80 
81  entry(const entry&);
82  entry& operator=(const entry&);
83  public:
84  entry() :Qcnt(0), holder()
85  , owner(NULL)
86  {
87  enode.node.next = enode.node.previous = NULL;
88  enode.self = this;
89  }
90  ~entry() {
91  // nodes should be removed from the list before deletion
92  assert(!enode.node.next && !enode.node.previous);
93  assert(Qcnt==0 && !holder);
94  assert(!owner);
95  }
96  };
97 
98  fair_queue()
99  {
100  ellInit(&list);
101  }
102  ~fair_queue()
103  {
104  clear();
105  assert(ellCount(&list)==0);
106  }
107 
108  //! Remove all items.
109  //! @post empty()==true
110  void clear()
111  {
112  // destroy after unlock
113  std::vector<value_type> garbage;
114  {
115  guard_t G(mutex);
116 
117  garbage.resize(unsigned(ellCount(&list)));
118  size_t i=0;
119 
120  while(ELLNODE *cur = ellGet(&list)) {
121  typedef typename entry::enode_t enode_t;
122  enode_t *PN = CONTAINER(cur, enode_t, node);
123  entry *P = PN->self;
124  assert(P->owner==this);
125  assert(P->Qcnt>0);
126 
127  PN->node.previous = PN->node.next = NULL;
128  P->owner = NULL;
129  P->Qcnt = 0u;
130  garbage[i++].swap(P->holder);
131  }
132  }
133  }
134 
135  bool empty() const {
136  guard_t G(mutex);
137  return ellFirst(&list)==NULL;
138  }
139 
140  void push_back(const value_type& ent)
141  {
142  bool wake;
143  entry *P = ent.get();
144  {
145  guard_t G(mutex);
146  wake = ellFirst(&list)==NULL; // empty queue
147 
148  if(P->Qcnt++==0) {
149  // not in list
150  assert(P->owner==NULL);
151  P->owner = this;
152  P->holder = ent; // the list will hold a reference
153  ellAdd(&list, &P->enode.node); // push_back
154  } else
155  assert(P->owner==this);
156  }
157  if(wake) wakeup.signal();
158  }
159 
160  bool pop_front_try(value_type& ret)
161  {
162  ret.reset();
163  guard_t G(mutex);
164  ELLNODE *cur = ellGet(&list); // pop_front
165 
166  if(cur) {
167  typedef typename entry::enode_t enode_t;
168  enode_t *PN = CONTAINER(cur, enode_t, node);
169  entry *P = PN->self;
170  assert(P->owner==this);
171  assert(P->Qcnt>0);
172  if(--P->Qcnt==0) {
173  PN->node.previous = PN->node.next = NULL;
174  P->owner = NULL;
175 
176  ret.swap(P->holder);
177  } else {
178  ellAdd(&list, &P->enode.node); // push_back
179 
180  ret = P->holder;
181  }
182  return true;
183  } else {
184  return false;
185  }
186  }
187 
188  void pop_front(value_type& ret)
189  {
190  while(1) {
191  pop_front_try(ret);
192  if(ret)
193  break;
194  wakeup.wait();
195  }
196  }
197 
198  bool pop_front(value_type& ret, double timeout)
199  {
200  while(1) {
201  pop_front_try(ret);
202  if(ret)
203  return true;
204  if(!wakeup.wait(timeout))
205  return false;
206  }
207  }
208 
209 private:
210  ELLLIST list;
211  mutable epicsMutex mutex;
212  mutable epicsEvent wakeup;
213 };
214 
215 }
216 } // namespace
217 
218 #endif // FAIRQUEUE_H
Copyright - See the COPYRIGHT that is included with this distribution.