pvDatabaseCPP  4.5.3-dev
monitorFactory.cpp
Go to the documentation of this file.
1 /* monitorFactory.cpp */
12 #include <sstream>
13 
14 #include <epicsGuard.h>
15 #include <pv/thread.h>
16 #include <pv/bitSetUtil.h>
17 #include <pv/pvData.h>
18 #include <pv/pvAccess.h>
19 #include <pv/pvTimeStamp.h>
20 #include <pv/rpcService.h>
21 #include <pv/serverContext.h>
22 #include <pv/timeStamp.h>
23 
24 #define epicsExportSharedSymbols
25 #include "pv/pvStructureCopy.h"
26 #include "pv/pvDatabase.h"
28 
29 using namespace epics::pvData;
30 using namespace epics::pvAccess;
31 using namespace epics::pvCopy;
32 using std::tr1::static_pointer_cast;
33 using std::cout;
34 using std::endl;
35 using std::string;
36 
37 namespace epics { namespace pvDatabase {
38 
39 class MonitorLocal;
40 typedef std::tr1::shared_ptr<MonitorLocal> MonitorLocalPtr;
41 
42 static MonitorPtr nullMonitor;
43 static MonitorElementPtr NULLMonitorElement;
44 static Status failedToCreateMonitorStatus(
45  Status::STATUSTYPE_ERROR,"failed to create monitor");
46 static Status alreadyStartedStatus(Status::STATUSTYPE_ERROR,"already started");
47 static Status notStartedStatus(Status::STATUSTYPE_ERROR,"not started");
48 static Status deletedStatus(Status::STATUSTYPE_ERROR,"record is deleted");
49 
50 class MonitorElementQueue;
51 typedef std::tr1::shared_ptr<MonitorElementQueue> MonitorElementQueuePtr;
52 
53 class MonitorElementQueue
54 {
55 private:
56  MonitorElementPtrArray elements;
57  // TODO use size_t instead
58  int size;
59  int numberFree;
60  int numberUsed;
61  int nextGetFree;
62  int nextSetUsed;
63  int nextGetUsed;
64  int nextReleaseUsed;
65 public:
66  POINTER_DEFINITIONS(MonitorElementQueue);
67 
68  MonitorElementQueue(std::vector<MonitorElementPtr> monitorElementArray)
69  : elements(monitorElementArray),
70  size(monitorElementArray.size()),
71  numberFree(size),
72  numberUsed(0),
73  nextGetFree(0),
74  nextSetUsed(0),
75  nextGetUsed(0),
76  nextReleaseUsed(0)
77  {
78  }
79 
80  virtual ~MonitorElementQueue() {}
81 
82  void clear()
83  {
84  numberFree = size;
85  numberUsed = 0;
86  nextGetFree = 0;
87  nextSetUsed = 0;
88  nextGetUsed = 0;
89  nextReleaseUsed = 0;
90  }
91 
92  MonitorElementPtr getFree()
93  {
94  if(numberFree==0) return MonitorElementPtr();
95  numberFree--;
96  int ind = nextGetFree;
97  MonitorElementPtr queueElement = elements[nextGetFree++];
98  if(nextGetFree>=size) nextGetFree = 0;
99  return elements[ind];
100  }
101 
102  void setUsed(MonitorElementPtr const &element)
103  {
104  if(element!=elements[nextSetUsed++]) {
105  throw std::logic_error("not correct queueElement");
106  }
107  numberUsed++;
108  if(nextSetUsed>=size) nextSetUsed = 0;
109  }
110 
111  MonitorElementPtr getUsed()
112  {
113  if(numberUsed==0) return MonitorElementPtr();
114  int ind = nextGetUsed;
115  MonitorElementPtr queueElement = elements[nextGetUsed++];
116  if(nextGetUsed>=size) nextGetUsed = 0;
117  return elements[ind];
118  }
119  void releaseUsed(MonitorElementPtr const &element)
120  {
121  if(element!=elements[nextReleaseUsed++]) {
122  throw std::logic_error(
123  "not queueElement returned by last call to getUsed");
124  }
125  if(nextReleaseUsed>=size) nextReleaseUsed = 0;
126  numberUsed--;
127  numberFree++;
128  }
129 };
130 
131 
132 typedef std::tr1::shared_ptr<MonitorRequester> MonitorRequesterPtr;
133 
134 
135 class MonitorLocal :
136  public Monitor,
137  public PVListener,
138  public std::tr1::enable_shared_from_this<MonitorLocal>
139 {
140  enum MonitorState {idle,active,deleted};
141 public:
142  POINTER_DEFINITIONS(MonitorLocal);
143  virtual ~MonitorLocal();
144  virtual void destroy() {} // DEPRECATED
145  virtual Status start();
146  virtual Status stop();
147  virtual MonitorElementPtr poll();
148  virtual void detach(PVRecordPtr const & pvRecord){}
149  virtual void release(MonitorElementPtr const & monitorElement);
150  virtual void dataPut(PVRecordFieldPtr const & pvRecordField);
151  virtual void dataPut(
152  PVRecordStructurePtr const & requested,
153  PVRecordFieldPtr const & pvRecordField);
154  virtual void beginGroupPut(PVRecordPtr const & pvRecord);
155  virtual void endGroupPut(PVRecordPtr const & pvRecord);
156  virtual void unlisten(PVRecordPtr const & pvRecord);
157  MonitorElementPtr getActiveElement();
158  void releaseActiveElement();
159  bool init(PVStructurePtr const & pvRequest);
160  MonitorLocal(
161  MonitorRequester::shared_pointer const & channelMonitorRequester,
162  PVRecordPtr const &pvRecord);
163  PVCopyPtr getPVCopy() { return pvCopy;}
164 private:
165  MonitorLocalPtr getPtrSelf()
166  {
167  return shared_from_this();
168  }
169  MonitorRequester::weak_pointer monitorRequester;
170  PVRecordPtr pvRecord;
171  MonitorState state;
172  PVCopyPtr pvCopy;
173  MonitorElementQueuePtr queue;
174  MonitorElementPtr activeElement;
175  bool isGroupPut;
176  bool dataChanged;
177  Mutex mutex;
178  Mutex queueMutex;
179 };
180 
181 MonitorLocal::MonitorLocal(
182  MonitorRequester::shared_pointer const & channelMonitorRequester,
183  PVRecordPtr const &pvRecord)
184 : monitorRequester(channelMonitorRequester),
185  pvRecord(pvRecord),
186  state(idle),
187  isGroupPut(false),
188  dataChanged(false)
189 {
190 }
191 
192 MonitorLocal::~MonitorLocal()
193 {
194 //cout << "MonitorLocal::~MonitorLocal()" << endl;
195 }
196 
197 
198 Status MonitorLocal::start()
199 {
200  if(pvRecord->getTraceLevel()>0)
201  {
202  cout << "MonitorLocal::start state " << state << endl;
203  }
204  {
205  Lock xx(mutex);
206  if(state==active) return alreadyStartedStatus;
207  if(state==deleted) return deletedStatus;
208  }
209  pvRecord->addListener(getPtrSelf(),pvCopy);
210  epicsGuard <PVRecord> guard(*pvRecord);
211  Lock xx(mutex);
212  state = active;
213  queue->clear();
214  isGroupPut = false;
215  activeElement = queue->getFree();
216  activeElement->changedBitSet->clear();
217  activeElement->overrunBitSet->clear();
218  activeElement->changedBitSet->set(0);
219  releaseActiveElement();
220  return Status::Ok;
221 }
222 
223 Status MonitorLocal::stop()
224 {
225  if(pvRecord->getTraceLevel()>0){
226  cout << "MonitorLocal::stop state " << state << endl;
227  }
228  {
229  Lock xx(mutex);
230  if(state==idle) return notStartedStatus;
231  if(state==deleted) return deletedStatus;
232  state = idle;
233  }
234  pvRecord->removeListener(getPtrSelf(),pvCopy);
235  return Status::Ok;
236 }
237 
238 MonitorElementPtr MonitorLocal::poll()
239 {
240  if(pvRecord->getTraceLevel()>1)
241  {
242  cout << "MonitorLocal::poll state " << state << endl;
243  }
244  {
245  Lock xx(queueMutex);
246  if(state!=active) return NULLMonitorElement;
247  return queue->getUsed();
248  }
249 }
250 
251 void MonitorLocal::release(MonitorElementPtr const & monitorElement)
252 {
253  if(pvRecord->getTraceLevel()>1)
254  {
255  cout << "MonitorLocal::release state " << state << endl;
256  }
257  {
258  Lock xx(queueMutex);
259  if(state!=active) return;
260  queue->releaseUsed(monitorElement);
261  }
262 }
263 
264 void MonitorLocal::releaseActiveElement()
265 {
266  if(pvRecord->getTraceLevel()>1)
267  {
268  cout << "MonitorLocal::releaseActiveElement state " << state << endl;
269  }
270  {
271  Lock xx(queueMutex);
272  if(state!=active) return;
273  bool result = pvCopy->updateCopyFromBitSet(activeElement->pvStructurePtr,activeElement->changedBitSet);
274  if(!result) return;
275  MonitorElementPtr newActive = queue->getFree();
276  if(!newActive) return;
277  BitSetUtil::compress(activeElement->changedBitSet,activeElement->pvStructurePtr);
278  BitSetUtil::compress(activeElement->overrunBitSet,activeElement->pvStructurePtr);
279  queue->setUsed(activeElement);
280  activeElement = newActive;
281  activeElement->changedBitSet->clear();
282  activeElement->overrunBitSet->clear();
283  }
284  MonitorRequesterPtr requester = monitorRequester.lock();
285  if(!requester) return;
286  requester->monitorEvent(getPtrSelf());
287  return;
288 }
289 
290 void MonitorLocal::dataPut(PVRecordFieldPtr const & pvRecordField)
291 {
292  if(pvRecord->getTraceLevel()>1)
293  {
294  cout << "MonitorLocal::dataPut(pvRecordField)" << endl;
295  }
296  if(state!=active) return;
297  {
298  Lock xx(mutex);
299  size_t offset = pvCopy->getCopyOffset(pvRecordField->getPVField());
300  BitSetPtr const &changedBitSet = activeElement->changedBitSet;
301  BitSetPtr const &overrunBitSet = activeElement->overrunBitSet;
302  bool isSet = changedBitSet->get(offset);
303  changedBitSet->set(offset);
304  if(isSet) overrunBitSet->set(offset);
305  dataChanged = true;
306  }
307  if(!isGroupPut) {
308  releaseActiveElement();
309  dataChanged = false;
310  }
311 }
312 
313 void MonitorLocal::dataPut(
314  PVRecordStructurePtr const & requested,
315  PVRecordFieldPtr const & pvRecordField)
316 {
317  if(pvRecord->getTraceLevel()>1)
318  {
319  cout << "MonitorLocal::dataPut(requested,pvRecordField)" << endl;
320  }
321  if(state!=active) return;
322  {
323  Lock xx(mutex);
324  BitSetPtr const &changedBitSet = activeElement->changedBitSet;
325  BitSetPtr const &overrunBitSet = activeElement->overrunBitSet;
326  size_t offsetCopyRequested = pvCopy->getCopyOffset(
327  requested->getPVField());
328  size_t offset = offsetCopyRequested
329  + (pvRecordField->getPVField()->getFieldOffset()
330  - requested->getPVField()->getFieldOffset());
331  bool isSet = changedBitSet->get(offset);
332  changedBitSet->set(offset);
333  if(isSet) overrunBitSet->set(offset);
334  dataChanged = true;
335  }
336  if(!isGroupPut) {
337  releaseActiveElement();
338  dataChanged = false;
339  }
340 }
341 
342 void MonitorLocal::beginGroupPut(PVRecordPtr const & pvRecord)
343 {
344  if(pvRecord->getTraceLevel()>1)
345  {
346  cout << "MonitorLocal::beginGroupPut()" << endl;
347  }
348  if(state!=active) return;
349  {
350  Lock xx(mutex);
351  isGroupPut = true;
352  dataChanged = false;
353  }
354 }
355 
356 void MonitorLocal::endGroupPut(PVRecordPtr const & pvRecord)
357 {
358  if(pvRecord->getTraceLevel()>1)
359  {
360  cout << "MonitorLocal::endGroupPut dataChanged " << dataChanged << endl;
361  }
362  if(state!=active) return;
363  {
364  Lock xx(mutex);
365  isGroupPut = false;
366  }
367  if(dataChanged) {
368  dataChanged = false;
369  releaseActiveElement();
370  }
371 }
372 
373 void MonitorLocal::unlisten(PVRecordPtr const & pvRecord)
374 {
375  if(pvRecord->getTraceLevel()>1)
376  {
377  cout << "MonitorLocal::unlisten\n";
378  }
379  {
380  Lock xx(mutex);
381  state = deleted;
382  }
383  MonitorRequesterPtr requester = monitorRequester.lock();
384  if(requester) {
385  if(pvRecord->getTraceLevel()>1)
386  {
387  cout << "MonitorLocal::unlisten calling requester->unlisten\n";
388  }
389  requester->unlisten(getPtrSelf());
390  }
391 }
392 
393 
394 bool MonitorLocal::init(PVStructurePtr const & pvRequest)
395 {
396  PVFieldPtr pvField;
397  size_t queueSize = 2;
398  PVStructurePtr pvOptions = pvRequest->getSubField<PVStructure>("record._options");
399  MonitorRequesterPtr requester = monitorRequester.lock();
400  if(!requester) return false;
401  if(pvOptions) {
402  PVStringPtr pvString = pvOptions->getSubField<PVString>("queueSize");
403  if(pvString) {
404  try {
405  int32 size;
406  std::stringstream ss;
407  ss << pvString->get();
408  ss >> size;
409  queueSize = size;
410  } catch (...) {
411  requester->message("queueSize " +pvString->get() + " illegal",errorMessage);
412  return false;
413  }
414  }
415  }
416  pvField = pvRequest->getSubField("field");
417  if(!pvField) {
418  pvCopy = PVCopy::create(
419  pvRecord->getPVRecordStructure()->getPVStructure(),
420  pvRequest,"");
421  if(!pvCopy) {
422  requester->message("illegal pvRequest",errorMessage);
423  return false;
424  }
425  } else {
426  if(pvField->getField()->getType()!=structure) {
427  requester->message("illegal pvRequest",errorMessage);
428  return false;
429  }
430  pvCopy = PVCopy::create(
431  pvRecord->getPVRecordStructure()->getPVStructure(),
432  pvRequest,"field");
433  if(!pvCopy) {
434  requester->message("illegal pvRequest",errorMessage);
435  return false;
436  }
437  }
438  if(queueSize<2) queueSize = 2;
439  std::vector<MonitorElementPtr> monitorElementArray;
440  monitorElementArray.reserve(queueSize);
441  for(size_t i=0; i<queueSize; i++) {
442  PVStructurePtr pvStructure = pvCopy->createPVStructure();
443  MonitorElementPtr monitorElement(
444  new MonitorElement(pvStructure));
445  monitorElementArray.push_back(monitorElement);
446  }
447  queue = MonitorElementQueuePtr(new MonitorElementQueue(monitorElementArray));
448  requester->monitorConnect(
449  Status::Ok,
450  getPtrSelf(),
451  pvCopy->getStructure());
452  return true;
453 }
454 
456  PVRecordPtr const & pvRecord,
457  MonitorRequester::shared_pointer const & monitorRequester,
458  PVStructurePtr const & pvRequest)
459 {
460  MonitorLocalPtr monitor(new MonitorLocal(
461  monitorRequester,pvRecord));
462  bool result = monitor->init(pvRequest);
463  if(!result) {
464  MonitorPtr monitor;
465  StructureConstPtr structure;
466  monitorRequester->monitorConnect(
467  failedToCreateMonitorStatus,monitor,structure);
468  return nullMonitor;
469  }
470  if(pvRecord->getTraceLevel()>0)
471  {
472  cout << "MonitorFactory::createMonitor"
473  << " recordName " << pvRecord->getRecordName() << endl;
474  }
475  return monitor;
476 }
477 
478 }}
std::tr1::shared_ptr< PVCopy > PVCopyPtr
Definition: pvPlugin.h:25
Listener for PVRecord::message.
Definition: pvDatabase.h:428
std::tr1::shared_ptr< MonitorLocal > MonitorLocalPtr
std::tr1::shared_ptr< MonitorRequester > MonitorRequesterPtr
std::tr1::shared_ptr< PVRecord > PVRecordPtr
Definition: pvDatabase.h:21
std::tr1::shared_ptr< PVRecordStructure > PVRecordStructurePtr
Definition: pvDatabase.h:31
std::tr1::shared_ptr< PVRecordField > PVRecordFieldPtr
Definition: pvDatabase.h:26
std::tr1::shared_ptr< MonitorElementQueue > MonitorElementQueuePtr
MonitorPtr createMonitorLocal(PVRecordPtr const &pvRecord, MonitorRequester::shared_pointer const &monitorRequester, PVStructurePtr const &pvRequest)