pvDatabaseCPP  4.5.1
monitorFactory.cpp
Go to the documentation of this file.
1 /* monitorFactory.cpp */
12 #include <sstream>
13 
14 #include <epicsGuard.h>
15 #include <pv/thread.h>
16 #include <pv/bitSetUtil.h>
17 #include <pv/pvData.h>
18 #include <pv/pvAccess.h>
19 #include <pv/pvTimeStamp.h>
20 #include <pv/rpcService.h>
21 #include <pv/serverContext.h>
22 #include <pv/timeStamp.h>
23 
24 #define epicsExportSharedSymbols
25 #include "pv/pvStructureCopy.h"
26 #include "pv/pvDatabase.h"
28 
29 using namespace epics::pvData;
30 using namespace epics::pvAccess;
31 using namespace epics::pvCopy;
32 using std::tr1::static_pointer_cast;
33 using std::cout;
34 using std::endl;
35 using std::string;
36 
37 namespace epics { namespace pvDatabase {
38 
39 class MonitorLocal;
40 typedef std::tr1::shared_ptr<MonitorLocal> MonitorLocalPtr;
41 
42 static MonitorPtr nullMonitor;
43 static MonitorElementPtr NULLMonitorElement;
44 static Status failedToCreateMonitorStatus(
45  Status::STATUSTYPE_ERROR,"failed to create monitor");
46 static Status alreadyStartedStatus(Status::STATUSTYPE_ERROR,"already started");
47 static Status notStartedStatus(Status::STATUSTYPE_ERROR,"not started");
48 static Status deletedStatus(Status::STATUSTYPE_ERROR,"record is deleted");
49 
50 class MonitorElementQueue;
51 typedef std::tr1::shared_ptr<MonitorElementQueue> MonitorElementQueuePtr;
52 
53 class MonitorElementQueue
54 {
55 private:
56  MonitorElementPtrArray elements;
57  // TODO use size_t instead
58  int size;
59  int numberFree;
60  int numberUsed;
61  int nextGetFree;
62  int nextSetUsed;
63  int nextGetUsed;
64  int nextReleaseUsed;
65 public:
66  POINTER_DEFINITIONS(MonitorElementQueue);
67 
68  MonitorElementQueue(std::vector<MonitorElementPtr> monitorElementArray)
69  : elements(monitorElementArray),
70  size(monitorElementArray.size()),
71  numberFree(size),
72  numberUsed(0),
73  nextGetFree(0),
74  nextSetUsed(0),
75  nextGetUsed(0),
76  nextReleaseUsed(0)
77  {
78  }
79 
80  virtual ~MonitorElementQueue() {}
81 
82  void clear()
83  {
84  numberFree = size;
85  numberUsed = 0;
86  nextGetFree = 0;
87  nextSetUsed = 0;
88  nextGetUsed = 0;
89  nextReleaseUsed = 0;
90  }
91 
92  MonitorElementPtr getFree()
93  {
94  if(numberFree==0) return MonitorElementPtr();
95  numberFree--;
96  int ind = nextGetFree;
97  MonitorElementPtr queueElement = elements[nextGetFree++];
98  if(nextGetFree>=size) nextGetFree = 0;
99  return elements[ind];
100  }
101 
102  void setUsed(MonitorElementPtr const &element)
103  {
104  if(element!=elements[nextSetUsed++]) {
105  throw std::logic_error("not correct queueElement");
106  }
107  numberUsed++;
108  if(nextSetUsed>=size) nextSetUsed = 0;
109  }
110 
111  MonitorElementPtr getUsed()
112  {
113  if(numberUsed==0) return MonitorElementPtr();
114  int ind = nextGetUsed;
115  MonitorElementPtr queueElement = elements[nextGetUsed++];
116  if(nextGetUsed>=size) nextGetUsed = 0;
117  return elements[ind];
118  }
119  void releaseUsed(MonitorElementPtr const &element)
120  {
121  if(element!=elements[nextReleaseUsed++]) {
122  throw std::logic_error(
123  "not queueElement returned by last call to getUsed");
124  }
125  if(nextReleaseUsed>=size) nextReleaseUsed = 0;
126  numberUsed--;
127  numberFree++;
128  }
129 };
130 
131 
132 typedef std::tr1::shared_ptr<MonitorRequester> MonitorRequesterPtr;
133 
134 
135 class MonitorLocal :
136  public Monitor,
137  public PVListener,
138  public std::tr1::enable_shared_from_this<MonitorLocal>
139 {
140  enum MonitorState {idle,active,deleted};
141 public:
142  POINTER_DEFINITIONS(MonitorLocal);
143  virtual ~MonitorLocal();
144  virtual void destroy() {} // DEPRECATED
145  virtual Status start();
146  virtual Status stop();
147  virtual MonitorElementPtr poll();
148  virtual void detach(PVRecordPtr const & pvRecord){}
149  virtual void release(MonitorElementPtr const & monitorElement);
150  virtual void dataPut(PVRecordFieldPtr const & pvRecordField);
151  virtual void dataPut(
152  PVRecordStructurePtr const & requested,
153  PVRecordFieldPtr const & pvRecordField);
154  virtual void beginGroupPut(PVRecordPtr const & pvRecord);
155  virtual void endGroupPut(PVRecordPtr const & pvRecord);
156  virtual void unlisten(PVRecordPtr const & pvRecord);
157  MonitorElementPtr getActiveElement();
158  void releaseActiveElement();
159  bool init(PVStructurePtr const & pvRequest);
160  MonitorLocal(
161  MonitorRequester::shared_pointer const & channelMonitorRequester,
162  PVRecordPtr const &pvRecord);
163  PVCopyPtr getPVCopy() { return pvCopy;}
164 private:
165  MonitorLocalPtr getPtrSelf()
166  {
167  return shared_from_this();
168  }
169  MonitorRequester::weak_pointer monitorRequester;
170  PVRecordPtr pvRecord;
171  MonitorState state;
172  PVCopyPtr pvCopy;
173  MonitorElementQueuePtr queue;
174  MonitorElementPtr activeElement;
175  bool isGroupPut;
176  bool dataChanged;
177  Mutex mutex;
178  Mutex queueMutex;
179 };
180 
181 MonitorLocal::MonitorLocal(
182  MonitorRequester::shared_pointer const & channelMonitorRequester,
183  PVRecordPtr const &pvRecord)
184 : monitorRequester(channelMonitorRequester),
185  pvRecord(pvRecord),
186  state(idle),
187  isGroupPut(false),
188  dataChanged(false)
189 {
190 }
191 
192 MonitorLocal::~MonitorLocal()
193 {
194  if(pvRecord->getTraceLevel()>0)
195  {
196  cout << "MonitorLocal::~MonitorLocal()" << endl;
197  }
198 }
199 
200 
201 Status MonitorLocal::start()
202 {
203  if(pvRecord->getTraceLevel()>0)
204  {
205  cout << "MonitorLocal::start state " << state << endl;
206  }
207  {
208  Lock xx(mutex);
209  if(state==active) return alreadyStartedStatus;
210  if(state==deleted) return deletedStatus;
211  }
212  pvRecord->addListener(getPtrSelf(),pvCopy);
213  epicsGuard <PVRecord> guard(*pvRecord);
214  Lock xx(mutex);
215  state = active;
216  queue->clear();
217  isGroupPut = false;
218  activeElement = queue->getFree();
219  activeElement->changedBitSet->clear();
220  activeElement->overrunBitSet->clear();
221  activeElement->changedBitSet->set(0);
222  releaseActiveElement();
223  return Status::Ok;
224 }
225 
226 Status MonitorLocal::stop()
227 {
228  if(pvRecord->getTraceLevel()>0){
229  cout << "MonitorLocal::stop state " << state << endl;
230  }
231  {
232  Lock xx(mutex);
233  if(state==idle) return notStartedStatus;
234  if(state==deleted) return deletedStatus;
235  state = idle;
236  }
237  pvRecord->removeListener(getPtrSelf(),pvCopy);
238  return Status::Ok;
239 }
240 
241 MonitorElementPtr MonitorLocal::poll()
242 {
243  if(pvRecord->getTraceLevel()>1)
244  {
245  cout << "MonitorLocal::poll state " << state << endl;
246  }
247  {
248  Lock xx(queueMutex);
249  if(state!=active) return NULLMonitorElement;
250  return queue->getUsed();
251  }
252 }
253 
254 void MonitorLocal::release(MonitorElementPtr const & monitorElement)
255 {
256  if(pvRecord->getTraceLevel()>1)
257  {
258  cout << "MonitorLocal::release state " << state << endl;
259  }
260  {
261  Lock xx(queueMutex);
262  if(state!=active) return;
263  queue->releaseUsed(monitorElement);
264  }
265 }
266 
267 void MonitorLocal::releaseActiveElement()
268 {
269  if(pvRecord->getTraceLevel()>1)
270  {
271  cout << "MonitorLocal::releaseActiveElement state " << state << endl;
272  }
273  {
274  Lock xx(queueMutex);
275  if(state!=active) return;
276  bool result = pvCopy->updateCopyFromBitSet(activeElement->pvStructurePtr,activeElement->changedBitSet);
277  if(!result) return;
278  MonitorElementPtr newActive = queue->getFree();
279  if(!newActive) return;
280  BitSetUtil::compress(activeElement->changedBitSet,activeElement->pvStructurePtr);
281  BitSetUtil::compress(activeElement->overrunBitSet,activeElement->pvStructurePtr);
282  queue->setUsed(activeElement);
283  activeElement = newActive;
284  activeElement->changedBitSet->clear();
285  activeElement->overrunBitSet->clear();
286  }
287  MonitorRequesterPtr requester = monitorRequester.lock();
288  if(!requester) return;
289  requester->monitorEvent(getPtrSelf());
290  return;
291 }
292 
293 void MonitorLocal::dataPut(PVRecordFieldPtr const & pvRecordField)
294 {
295  if(pvRecord->getTraceLevel()>1)
296  {
297  cout << "PVCopyMonitor::dataPut(pvRecordField)" << endl;
298  }
299  if(state!=active) return;
300  {
301  Lock xx(mutex);
302  size_t offset = pvCopy->getCopyOffset(pvRecordField->getPVField());
303  BitSetPtr const &changedBitSet = activeElement->changedBitSet;
304  BitSetPtr const &overrunBitSet = activeElement->overrunBitSet;
305  bool isSet = changedBitSet->get(offset);
306  changedBitSet->set(offset);
307  if(isSet) overrunBitSet->set(offset);
308  dataChanged = true;
309  }
310  if(!isGroupPut) {
311  releaseActiveElement();
312  dataChanged = false;
313  }
314 }
315 
316 void MonitorLocal::dataPut(
317  PVRecordStructurePtr const & requested,
318  PVRecordFieldPtr const & pvRecordField)
319 {
320  if(pvRecord->getTraceLevel()>1)
321  {
322  cout << "PVCopyMonitor::dataPut(requested,pvRecordField)" << endl;
323  }
324  if(state!=active) return;
325  {
326  Lock xx(mutex);
327  BitSetPtr const &changedBitSet = activeElement->changedBitSet;
328  BitSetPtr const &overrunBitSet = activeElement->overrunBitSet;
329  size_t offsetCopyRequested = pvCopy->getCopyOffset(
330  requested->getPVField());
331  size_t offset = offsetCopyRequested
332  + (pvRecordField->getPVField()->getFieldOffset()
333  - requested->getPVField()->getFieldOffset());
334  bool isSet = changedBitSet->get(offset);
335  changedBitSet->set(offset);
336  if(isSet) overrunBitSet->set(offset);
337  dataChanged = true;
338  }
339  if(!isGroupPut) {
340  releaseActiveElement();
341  dataChanged = false;
342  }
343 }
344 
345 void MonitorLocal::beginGroupPut(PVRecordPtr const & pvRecord)
346 {
347  if(pvRecord->getTraceLevel()>1)
348  {
349  cout << "PVCopyMonitor::beginGroupPut()" << endl;
350  }
351  if(state!=active) return;
352  {
353  Lock xx(mutex);
354  isGroupPut = true;
355  dataChanged = false;
356  }
357 }
358 
359 void MonitorLocal::endGroupPut(PVRecordPtr const & pvRecord)
360 {
361  if(pvRecord->getTraceLevel()>1)
362  {
363  cout << "PVCopyMonitor::endGroupPut dataChanged " << dataChanged << endl;
364  }
365  if(state!=active) return;
366  {
367  Lock xx(mutex);
368  isGroupPut = false;
369  }
370  if(dataChanged) {
371  dataChanged = false;
372  releaseActiveElement();
373  }
374 }
375 
376 void MonitorLocal::unlisten(PVRecordPtr const & pvRecord)
377 {
378  if(pvRecord->getTraceLevel()>1)
379  {
380  cout << "PVCopyMonitor::unlisten\n";
381  }
382  {
383  Lock xx(mutex);
384  state = deleted;
385  }
386  MonitorRequesterPtr requester = monitorRequester.lock();
387  if(requester) {
388  if(pvRecord->getTraceLevel()>1)
389  {
390  cout << "PVCopyMonitor::unlisten calling requester->unlisten\n";
391  }
392  requester->unlisten(getPtrSelf());
393  }
394 }
395 
396 
397 bool MonitorLocal::init(PVStructurePtr const & pvRequest)
398 {
399  PVFieldPtr pvField;
400  size_t queueSize = 2;
401  PVStructurePtr pvOptions = pvRequest->getSubField<PVStructure>("record._options");
402  MonitorRequesterPtr requester = monitorRequester.lock();
403  if(!requester) return false;
404  if(pvOptions) {
405  PVStringPtr pvString = pvOptions->getSubField<PVString>("queueSize");
406  if(pvString) {
407  try {
408  int32 size;
409  std::stringstream ss;
410  ss << pvString->get();
411  ss >> size;
412  queueSize = size;
413  } catch (...) {
414  requester->message("queueSize " +pvString->get() + " illegal",errorMessage);
415  return false;
416  }
417  }
418  }
419  pvField = pvRequest->getSubField("field");
420  if(!pvField) {
421  pvCopy = PVCopy::create(
422  pvRecord->getPVRecordStructure()->getPVStructure(),
423  pvRequest,"");
424  if(!pvCopy) {
425  requester->message("illegal pvRequest",errorMessage);
426  return false;
427  }
428  } else {
429  if(pvField->getField()->getType()!=structure) {
430  requester->message("illegal pvRequest",errorMessage);
431  return false;
432  }
433  pvCopy = PVCopy::create(
434  pvRecord->getPVRecordStructure()->getPVStructure(),
435  pvRequest,"field");
436  if(!pvCopy) {
437  requester->message("illegal pvRequest",errorMessage);
438  return false;
439  }
440  }
441  if(queueSize<2) queueSize = 2;
442  std::vector<MonitorElementPtr> monitorElementArray;
443  monitorElementArray.reserve(queueSize);
444  for(size_t i=0; i<queueSize; i++) {
445  PVStructurePtr pvStructure = pvCopy->createPVStructure();
446  MonitorElementPtr monitorElement(
447  new MonitorElement(pvStructure));
448  monitorElementArray.push_back(monitorElement);
449  }
450  queue = MonitorElementQueuePtr(new MonitorElementQueue(monitorElementArray));
451  requester->monitorConnect(
452  Status::Ok,
453  getPtrSelf(),
454  pvCopy->getStructure());
455  return true;
456 }
457 
459  PVRecordPtr const & pvRecord,
460  MonitorRequester::shared_pointer const & monitorRequester,
461  PVStructurePtr const & pvRequest)
462 {
463  MonitorLocalPtr monitor(new MonitorLocal(
464  monitorRequester,pvRecord));
465  bool result = monitor->init(pvRequest);
466  if(!result) {
467  MonitorPtr monitor;
468  StructureConstPtr structure;
469  monitorRequester->monitorConnect(
470  failedToCreateMonitorStatus,monitor,structure);
471  return nullMonitor;
472  }
473  if(pvRecord->getTraceLevel()>0)
474  {
475  cout << "MonitorFactory::createMonitor"
476  << " recordName " << pvRecord->getRecordName() << endl;
477  }
478  return monitor;
479 }
480 
481 }}
std::tr1::shared_ptr< PVCopy > PVCopyPtr
Definition: pvPlugin.h:25
Listener for PVRecord::message.
Definition: pvDatabase.h:426
std::tr1::shared_ptr< MonitorLocal > MonitorLocalPtr
std::tr1::shared_ptr< MonitorRequester > MonitorRequesterPtr
std::tr1::shared_ptr< PVRecord > PVRecordPtr
Definition: pvDatabase.h:21
std::tr1::shared_ptr< PVRecordStructure > PVRecordStructurePtr
Definition: pvDatabase.h:31
std::tr1::shared_ptr< PVRecordField > PVRecordFieldPtr
Definition: pvDatabase.h:26
std::tr1::shared_ptr< MonitorElementQueue > MonitorElementQueuePtr
MonitorPtr createMonitorLocal(PVRecordPtr const &pvRecord, MonitorRequester::shared_pointer const &monitorRequester, PVStructurePtr const &pvRequest)