pvAccessCPP  7.1.7-dev
monitor.h
1 /* monitor.h */
2 /*
3  * Copyright information and license terms for this software can be
4  * found in the file LICENSE that is included with the distribution
5  */
6 /**
7  * @author mrk
8  */
9 #ifndef MONITOR_H
10 #define MONITOR_H
11 
12 #include <list>
13 #include <ostream>
14 
15 #ifdef epicsExportSharedSymbols
16 # define monitorEpicsExportSharedSymbols
17 # undef epicsExportSharedSymbols
18 #endif
19 
20 #include <epicsMutex.h>
21 #include <pv/status.h>
22 #include <pv/pvData.h>
23 #include <pv/sharedPtr.h>
24 #include <pv/bitSet.h>
25 #include <pv/createRequest.h>
26 
27 #ifdef monitorEpicsExportSharedSymbols
28 # define epicsExportSharedSymbols
29 # undef monitorEpicsExportSharedSymbols
30 #endif
31 
32 #include <pv/requester.h>
33 #include <pv/destroyable.h>
34 
35 #include <shareLib.h>
36 
37 namespace epics { namespace pvAccess {
38 
39 class MonitorRequester;
40 class MonitorElement;
41 typedef std::tr1::shared_ptr<MonitorElement> MonitorElementPtr;
42 typedef std::vector<MonitorElementPtr> MonitorElementPtrArray;
43 
44 class Monitor;
45 typedef std::tr1::shared_ptr<Monitor> MonitorPtr;
46 
47 
48 /**
49  * @brief An element for a monitorQueue.
50  *
51  * Class instance representing monitor element.
52  * @author mrk
53  */
54 class epicsShareClass MonitorElement {
55 public:
56  POINTER_DEFINITIONS(MonitorElement);
57  MonitorElement(epics::pvData::PVStructurePtr const & pvStructurePtr);
58  const epics::pvData::PVStructurePtr pvStructurePtr;
59  const epics::pvData::BitSet::shared_pointer changedBitSet;
60  const epics::pvData::BitSet::shared_pointer overrunBitSet;
61 
62  class Ref;
63 };
64 
65 /** Access to Monitor subscription and queue
66  *
67  * Downstream interface to access a monitor queue (via poll() and release() )
68  */
69 class epicsShareClass Monitor : public virtual Destroyable{
70  public:
71  POINTER_DEFINITIONS(Monitor);
72  typedef MonitorRequester requester_type;
73 
74  virtual ~Monitor(){}
75  /**
76  * Start monitoring.
77  * @return completion status.
78  */
79  virtual epics::pvData::Status start() = 0;
80  /**
81  * Stop Monitoring.
82  * @return completion status.
83  */
84  virtual epics::pvData::Status stop() = 0;
85  /**
86  * If monitor has occurred return data.
87  * @return monitorElement for modified data.
88  * Must call get to determine if data is available.
89  *
90  * May recursively call MonitorRequester::unlisten()
91  */
92  virtual MonitorElementPtr poll() = 0;
93  /**
94  * Release a MonitorElement that was returned by poll.
95  * A poll() must be called after the release() to check the presence of any modified data.
96  * @param monitorElement
97  */
98  virtual void release(MonitorElementPtr const & monitorElement) = 0;
99 
100  struct Stats {
101  size_t nfilled; //!< # of elements ready to be poll()d
102  size_t noutstanding; //!< # of elements poll()d but not released()d
103  size_t nempty; //!< # of elements available for new remote data
104  };
105 
106  virtual void getStats(Stats& s) const {
107  s.nfilled = s.noutstanding = s.nempty = 0;
108  }
109 
110  /**
111  * Report remote queue status.
112  * @param freeElements number of free elements.
113  */
114  virtual void reportRemoteQueueStatus(epics::pvData::int32 freeElements) {}
115 };
116 
117 /** A (single ownership) smart pointer to extract a MonitorElement from a Monitor queue
118  *
119  * To fetch a single element
120  @code
121  epics::pvAccess::Monitor::shared_pointer mon(....);
122  epics::pvAccess::MonitorElement::Ref elem(mon);
123  if(elem) {
124  // do something with element
125  assert(elem->pvStructurePtr->getSubField("foo"));
126  } else {
127  // queue was empty
128  }
129  @endcode
130  * To fetch all available elements (c++11)
131  @code
132  epics::pvAccess::Monitor::shared_pointer mon(....);
133  for(auto& elem : *mon) {
134  assert(elem.pvStructurePtr->getSubField("foo"));
135  }
136  @endcode
137  * To fetch all available elements (c++98)
138  @code
139  epics::pvAccess::Monitor::shared_pointer mon(....);
140  for(epics::pvAccess::MonitorElement::Ref it(mon); it; ++it) {
141  MonitorElement& elem(*it);
142  assert(elem.pvStructurePtr->getSubField("foo"));
143  }
144  @endcode
145  */
146 class MonitorElement::Ref
147 {
148  Monitor* mon;
149  MonitorElementPtr elem;
150 public:
151  Ref() :mon(0), elem() {}
152  Ref(Monitor& M) :mon(&M), elem(mon->poll()) {}
153  Ref(const Monitor::shared_pointer& M) :mon(M.get()), elem(mon->poll()) {}
154  ~Ref() { reset(); }
155 #if __cplusplus>=201103L
156  Ref(Ref&& o) :mon(o.mon), elem(o.elem) {
157  o.mon = 0;
158  o.elem.reset();
159  }
160 #endif
161  void swap(Ref& o) {
162  std::swap(mon , o.mon);
163  std::swap(elem, o.elem);
164  }
165  //! analogous to auto_ptr<>::release() but given a different name
166  //! to avoid being confused with Monitor::release()
167  MonitorElementPtr letGo() {
168  MonitorElementPtr ret;
169  elem.swap(ret);
170  return ret;
171  }
172  void attach(Monitor& M) {
173  reset();
174  mon = &M;
175  }
176  void attach(const Monitor::shared_pointer& M) {
177  reset();
178  mon = M.get();
179  }
180  bool next() {
181  if(elem) mon->release(elem);
182  elem = mon->poll();
183  return !!elem;
184  }
185  void reset() {
186  if(elem && mon) mon->release(elem);
187  elem.reset();
188  }
189  Ref& operator++() {// prefix increment. aka "++(*this)"
190  next();
191  return *this;
192  }
193 #if __cplusplus>=201103L
194  inline explicit operator bool() const { return elem.get(); }
195 #else
196 private:
197  typedef const Monitor* const * hidden_bool_type;
198 public:
199  operator hidden_bool_type() const { return elem.get() ? &mon : 0; }
200 #endif
201  inline MonitorElement* operator->() { return elem.get(); }
202  inline MonitorElement& operator*() { return *elem; }
203  inline MonitorElement* get() { return elem.get(); }
204 
205  inline bool operator==(const Ref& o) const { return elem==o.elem; }
206  inline bool operator!=(const Ref& o) const { return !(*this==o); }
207 
208  EPICS_NOT_COPYABLE(Ref)
209 };
210 
211 #if __cplusplus>=201103L
212 // used by c++11 for-range
213 inline MonitorElement::Ref begin(Monitor& mon) { return MonitorElement::Ref(mon); }
214 inline MonitorElement::Ref end(Monitor& mon) { return MonitorElement::Ref(); }
215 #endif // __cplusplus<201103L
216 
217 /** Utility implementation of Monitor.
218  *
219  * The Monitor interface defines the downstream (consumer facing) side
220  * of a FIFO. This class is a concrete implementation of this FIFO,
221  * including the upstream (producer facing) side.
222  *
223  * In addition to MonitorRequester, which provides callbacks to the downstream side,
224  * The MonitorFIFO::Source class provides callbacks to the upstream side.
225  *
226  * The simplest usage is to create (as shown below), then put update into the FIFO
227  * using post() and tryPost(). These methods behave the same when the queue is
228  * not full, but differ when it is. Additionally, tryPost() has an argument 'force'.
229  * Together there are three actions
230  *
231  * # post(value, changed) - combines the new update with the last (most recent) in the FIFO.
232  * # tryPost(value, changed, ..., false) - Makes no change to the FIFO and returns false.
233  * # tryPost(value, changed, ..., true) - Over-fills the FIFO with the new element, then returns false.
234  *
235  * @note Calls to post() or tryPost() __must__ be followed with a call to notify().
236  * Callers of notify() __must__ not hold any locks, or a deadlock is possible.
237  *
238  * The intent of tryPost() with force=true is to aid code which is transferring values from
239  * some upstream buffer and this FIFO. Such code can be complicated if an item is removed
240  * from the upstream buffer, but can't be put into this downstream FIFO. Rather than
241  * being forced to effectivly maintain a third FIFO, code can use force=true.
242  *
243  * In either case, tryPost()==false indicates the the FIFO is full.
244  *
245  * eg. simple usage in a sub-class for Channel named MyChannel.
246  @code
247  pva::Monitor::shared_pointer
248  MyChannel::createMonitor(const pva::MonitorRequester::shared_pointer &requester,
249  const pvd::PVStructure::shared_pointer &pvRequest)
250  {
251  std::tr1::shared_ptr<pva::MonitorFIFO> ret(new pva::MonitorFIFO(requester, pvRequest));
252  ret->open(spamtype);
253  ret->notify();
254  // ret->post(...); // maybe initial update
255  }
256  @endcode
257  */
258 class epicsShareClass MonitorFIFO : public Monitor,
259  public std::tr1::enable_shared_from_this<MonitorFIFO>
260 {
261 public:
262  POINTER_DEFINITIONS(MonitorFIFO);
263  //! Source methods may be called with downstream mutex locked.
264  //! Do not call notify(). This is done automatically after return in a way
265  //! which avoids locking and recursion problems.
266  struct epicsShareClass Source {
267  POINTER_DEFINITIONS(Source);
268  virtual ~Source();
269  //! Called when MonitorFIFO::freeCount() rises above the level computed
270  //! from MonitorFIFO::setFreeHighMark().
271  //! @param numEmpty The number of empty slots in the FIFO.
272  virtual void freeHighMark(MonitorFIFO *mon, size_t numEmpty) {}
273  };
274  struct epicsShareClass Config {
275  size_t maxCount, //!< upper limit on requested FIFO size
276  defCount, //!< FIFO size when client makes no request
277  actualCount; //!< filled in with actual FIFO size
278  bool dropEmptyUpdates; //!< default true. Drop updates which don't include an field values.
279  epics::pvData::PVRequestMapper::mode_t mapperMode; //!< default Mask. @see epics::pvData::PVRequestMapper::mode_t
280  Config();
281  };
282 
283  /**
284  * @param requester Downstream/consumer callbacks
285  * @param pvRequest Downstream provided options
286  * @param source Upstream/producer callbacks
287  * @param conf Upstream provided options. Updated with actual values used. May be NULL to use defaults.
288  */
289  MonitorFIFO(const std::tr1::shared_ptr<MonitorRequester> &requester,
290  const pvData::PVStructure::const_shared_pointer &pvRequest,
291  const Source::shared_pointer& source = Source::shared_pointer(),
292  Config *conf=0);
293  virtual ~MonitorFIFO();
294 
295  //! Access to MonitorRequester passed to ctor, or NULL if it has already been destroyed.
296  //! @since >6.1.0
297  inline const std::tr1::shared_ptr<MonitorRequester> getRequester() const { return requester.lock(); }
298 
299  void show(std::ostream& strm) const;
300 
301  virtual void destroy() OVERRIDE FINAL;
302 
303  // configuration
304 
305  //! Level, as a percentage of empty buffer slots, at which to call Source::freeHighMark().
306  //! Trigger condition is when number of free buffer slots goes above this level.
307  //! In range [0.0, 1.0)
308  void setFreeHighMark(double level);
309 
310  // up-stream interface (putting data into FIFO)
311  //! Mark subscription as "open" with the associated structure type.
312  void open(const epics::pvData::StructureConstPtr& type);
313  //! Abnormal closure (eg. due to upstream dis-connection)
314  void close();
315  //! Successful closure (eg. RDB query done)
316  void finish();
317  //! Consume a free slot if available. otherwise ...
318  //! if !force take no action and return false.
319  //! if force then attempt to allocate and fill a new slot, then return false.
320  //! The extra slot will be free'd after it is consumed.
321  bool tryPost(const pvData::PVStructure& value,
322  const epics::pvData::BitSet& changed,
323  const epics::pvData::BitSet& overrun = epics::pvData::BitSet(),
324  bool force =false);
325  //! Consume a free slot if available, otherwise squash with most recent
326  void post(const pvData::PVStructure& value,
327  const epics::pvData::BitSet& changed,
328  const epics::pvData::BitSet& overrun = epics::pvData::BitSet());
329  //! Call after calling any other upstream interface methods (open()/close()/finish()/post()/...)
330  //! when no upstream mutexes are locked.
331  //! Do not call from Source::freeHighMark(). This is done automatically.
332  //! Call any MonitorRequester methods.
333  void notify();
334 
335  // down-stream interface (taking data from FIFO)
336  virtual epics::pvData::Status start() OVERRIDE FINAL;
337  virtual epics::pvData::Status stop() OVERRIDE FINAL;
338  virtual MonitorElementPtr poll() OVERRIDE FINAL;
339  virtual void release(MonitorElementPtr const & monitorElement) OVERRIDE FINAL; // may call Source::freeHighMark()
340  virtual void getStats(Stats& s) const OVERRIDE FINAL;
341  virtual void reportRemoteQueueStatus(epics::pvData::int32 freeElements) OVERRIDE FINAL;
342 
343  //! Number of unused FIFO slots at this moment, which may changed in the next.
344  size_t freeCount() const;
345 private:
346  size_t _freeCount() const;
347 
348  friend void providerRegInit(void*);
349  static size_t num_instances;
350 
351  // const after ctor
352  Config conf;
353 
354  // locking here is complicated...
355  // our entry points which make callbacks are:
356  // notify() -> MonitorRequester::monitorConnect()
357  // -> MonitorRequester::monitorEvent()
358  // -> MonitorRequester::unlisten()
359  // -> ChannelBaseRequester::channelDisconnect()
360  // start() -> MonitorRequester::monitorEvent()
361  // release() -> Source::freeHighMark()
362  // -> notify() -> ...
363  // reportRemoteQueueStatus() -> Source::freeHighMark()
364  // -> notify() -> ...
365  mutable epicsMutex mutex;
366 
367  // ownership is archored at the downstream (consumer) end.
368  // strong refs are:
369  // downstream -> MonitorFIFO -> Source
370  // weak refs are:
371  // MonitorRequester <- MonitorFIFO <- upstream
372 
373  // so we expect that downstream will hold a strong ref to us,
374  // and we keep a weak ref to downstream's MonitorRequester
375  const std::tr1::weak_ptr<MonitorRequester> requester;
376 
377  const epics::pvData::PVStructure::const_shared_pointer pvRequest;
378 
379  // then we expect to keep a strong ref to upstream (Source)
380  // and expect that upstream will have only a weak ref to us.
381  const Source::shared_pointer upstream;
382 
383  enum state_t {
384  Closed, // not open()'d
385  Opened, // successful open()
386  Error, // unsuccessful open()
387  } state;
388  bool pipeline; // const after ctor
389  bool running; // start() vs. stop()
390  bool finished; // finish() called
391  epics::pvData::BitSet scratch, oscratch; // using during post to avoid re-alloc
392 
393  bool needConnected;
394  bool needEvent;
395  bool needUnlisten;
396  bool needClosed;
397 
398  epics::pvData::Status error; // Set when entering Error state
399 
400  size_t freeHighLevel;
401  epicsInt32 flowCount;
402 
403  epics::pvData::PVRequestMapper mapper;
404 
405  typedef std::list<MonitorElementPtr> buffer_t;
406  // we allocate one extra buffer element to hold data when post()
407  // while all elements poll()'d. So there will always be one
408  // element on either the empty or inuse lists
409  buffer_t inuse, empty, returned;
410  /* our elements are in one of 4 states
411  * Empty - on empty list
412  * In Use - on inuse list
413  * Polled - Returnedd from poll(). Not tracked
414  * Returned - only if pipeline==true, release()'d but not ack'd
415  */
416 
417  EPICS_NOT_COPYABLE(MonitorFIFO)
418 };
419 
420 static inline
421 std::ostream& operator<<(std::ostream& strm, const MonitorFIFO& fifo) {
422  fifo.show(strm);
423  return strm;
424 }
425 
426 }}
427 
428 namespace epics { namespace pvData {
429 
430 using epics::pvAccess::MonitorElement;
431 using epics::pvAccess::MonitorElementPtr;
432 using epics::pvAccess::MonitorElementPtrArray;
433 using epics::pvAccess::Monitor;
434 using epics::pvAccess::MonitorPtr;
435 }}
436 
437 #endif /* MONITOR_H */