pvDatabaseCPP  4.7.0
monitorFactory.cpp
Go to the documentation of this file.
1 /* monitorFactory.cpp */
12 #include <sstream>
13 
14 #include <epicsGuard.h>
15 #include <pv/thread.h>
16 #include <pv/bitSetUtil.h>
17 #include <pv/pvData.h>
18 #include <pv/pvAccess.h>
19 #include <pv/pvTimeStamp.h>
20 #include <pv/rpcService.h>
21 #include <pv/serverContext.h>
22 #include <pv/timeStamp.h>
23 
24 #define epicsExportSharedSymbols
25 #include "pv/pvStructureCopy.h"
26 #include "pv/pvDatabase.h"
28 
29 using namespace epics::pvData;
30 using namespace epics::pvAccess;
31 using namespace epics::pvCopy;
32 using std::tr1::static_pointer_cast;
33 using std::cout;
34 using std::endl;
35 using std::string;
36 
37 namespace epics { namespace pvDatabase {
38 
39 class MonitorLocal;
40 typedef std::tr1::shared_ptr<MonitorLocal> MonitorLocalPtr;
41 
42 static MonitorPtr nullMonitor;
43 static MonitorElementPtr NULLMonitorElement;
44 static Status failedToCreateMonitorStatus(
45  Status::STATUSTYPE_ERROR,"failed to create monitor");
46 static Status alreadyStartedStatus(Status::STATUSTYPE_ERROR,"already started");
47 static Status notStartedStatus(Status::STATUSTYPE_ERROR,"not started");
48 static Status deletedStatus(Status::STATUSTYPE_ERROR,"record is deleted");
49 
50 class MonitorElementQueue;
51 typedef std::tr1::shared_ptr<MonitorElementQueue> MonitorElementQueuePtr;
52 
53 class MonitorElementQueue
54 {
55 private:
56  MonitorElementPtrArray elements;
57  // TODO use size_t instead
58  int size;
59  int numberFree;
60  int numberUsed;
61  int nextGetFree;
62  int nextSetUsed;
63  int nextGetUsed;
64  int nextReleaseUsed;
65 public:
66  POINTER_DEFINITIONS(MonitorElementQueue);
67 
68  MonitorElementQueue(std::vector<MonitorElementPtr> monitorElementArray)
69  : elements(monitorElementArray),
70  size(monitorElementArray.size()),
71  numberFree(size),
72  numberUsed(0),
73  nextGetFree(0),
74  nextSetUsed(0),
75  nextGetUsed(0),
76  nextReleaseUsed(0)
77  {
78  }
79 
80  virtual ~MonitorElementQueue() {}
81 
82  void clear()
83  {
84  numberFree = size;
85  numberUsed = 0;
86  nextGetFree = 0;
87  nextSetUsed = 0;
88  nextGetUsed = 0;
89  nextReleaseUsed = 0;
90  }
91 
92  MonitorElementPtr getFree()
93  {
94  if(numberFree==0) return MonitorElementPtr();
95  numberFree--;
96  int ind = nextGetFree;
97  MonitorElementPtr queueElement = elements[nextGetFree++];
98  if(nextGetFree>=size) nextGetFree = 0;
99  return elements[ind];
100  }
101 
102  void setUsed(MonitorElementPtr const &element)
103  {
104  if(element!=elements[nextSetUsed++]) {
105  throw std::logic_error("not correct queueElement");
106  }
107  numberUsed++;
108  if(nextSetUsed>=size) nextSetUsed = 0;
109  }
110 
111  MonitorElementPtr getUsed()
112  {
113  if(numberUsed==0) return MonitorElementPtr();
114  int ind = nextGetUsed;
115  MonitorElementPtr queueElement = elements[nextGetUsed++];
116  if(nextGetUsed>=size) nextGetUsed = 0;
117  return elements[ind];
118  }
119  void releaseUsed(MonitorElementPtr const &element)
120  {
121  if(element!=elements[nextReleaseUsed++]) {
122  throw std::logic_error(
123  "not queueElement returned by last call to getUsed");
124  }
125  if(nextReleaseUsed>=size) nextReleaseUsed = 0;
126  numberUsed--;
127  numberFree++;
128  }
129 };
130 
131 
132 typedef std::tr1::shared_ptr<MonitorRequester> MonitorRequesterPtr;
133 
134 
135 class MonitorLocal :
136  public Monitor,
137  public PVListener,
138  public std::tr1::enable_shared_from_this<MonitorLocal>
139 {
140  enum MonitorState {idle,active,deleted};
141 public:
142  POINTER_DEFINITIONS(MonitorLocal);
143  virtual ~MonitorLocal();
144  virtual Status start();
145  virtual Status stop();
146  virtual MonitorElementPtr poll();
147  virtual void detach(PVRecordPtr const & pvRecord){}
148  virtual void release(MonitorElementPtr const & monitorElement);
149  virtual void dataPut(PVRecordFieldPtr const & pvRecordField);
150  virtual void dataPut(
151  PVRecordStructurePtr const & requested,
152  PVRecordFieldPtr const & pvRecordField);
153  virtual void beginGroupPut(PVRecordPtr const & pvRecord);
154  virtual void endGroupPut(PVRecordPtr const & pvRecord);
155  virtual void unlisten(PVRecordPtr const & pvRecord);
156  MonitorElementPtr getActiveElement();
157  void releaseActiveElement();
158  bool init(PVStructurePtr const & pvRequest);
159  MonitorLocal(
160  MonitorRequester::shared_pointer const & channelMonitorRequester,
161  PVRecordPtr const &pvRecord);
162  PVCopyPtr getPVCopy() { return pvCopy;}
163 private:
164  MonitorLocalPtr getPtrSelf()
165  {
166  return shared_from_this();
167  }
168  MonitorRequester::weak_pointer monitorRequester;
169  PVRecordPtr pvRecord;
170  MonitorState state;
171  PVCopyPtr pvCopy;
172  MonitorElementQueuePtr queue;
173  MonitorElementPtr activeElement;
174  bool isGroupPut;
175  bool dataChanged;
176  Mutex mutex;
177  Mutex queueMutex;
178 };
179 
180 MonitorLocal::MonitorLocal(
181  MonitorRequester::shared_pointer const & channelMonitorRequester,
182  PVRecordPtr const &pvRecord)
183 : monitorRequester(channelMonitorRequester),
184  pvRecord(pvRecord),
185  state(idle),
186  isGroupPut(false),
187  dataChanged(false)
188 {
189 }
190 
191 MonitorLocal::~MonitorLocal()
192 {
193 //cout << "MonitorLocal::~MonitorLocal()" << endl;
194 }
195 
196 
197 Status MonitorLocal::start()
198 {
199  if(pvRecord->getTraceLevel()>0)
200  {
201  cout << "MonitorLocal::start state " << state << endl;
202  }
203  {
204  Lock xx(mutex);
205  if(state==active) return alreadyStartedStatus;
206  if(state==deleted) return deletedStatus;
207  }
208  pvRecord->addListener(getPtrSelf(),pvCopy);
209  epicsGuard <PVRecord> guard(*pvRecord);
210  Lock xx(mutex);
211  state = active;
212  queue->clear();
213  isGroupPut = false;
214  activeElement = queue->getFree();
215  activeElement->changedBitSet->clear();
216  activeElement->overrunBitSet->clear();
217  activeElement->changedBitSet->set(0);
218  releaseActiveElement();
219  return Status::Ok;
220 }
221 
222 Status MonitorLocal::stop()
223 {
224  if(pvRecord->getTraceLevel()>0){
225  cout << "MonitorLocal::stop state " << state << endl;
226  }
227  {
228  Lock xx(mutex);
229  if(state==idle) return notStartedStatus;
230  if(state==deleted) return deletedStatus;
231  state = idle;
232  }
233  pvRecord->removeListener(getPtrSelf(),pvCopy);
234  return Status::Ok;
235 }
236 
237 MonitorElementPtr MonitorLocal::poll()
238 {
239  if(pvRecord->getTraceLevel()>1)
240  {
241  cout << "MonitorLocal::poll state " << state << endl;
242  }
243  {
244  Lock xx(queueMutex);
245  if(state!=active) return NULLMonitorElement;
246  return queue->getUsed();
247  }
248 }
249 
250 void MonitorLocal::release(MonitorElementPtr const & monitorElement)
251 {
252  if(pvRecord->getTraceLevel()>1)
253  {
254  cout << "MonitorLocal::release state " << state << endl;
255  }
256  {
257  Lock xx(queueMutex);
258  if(state!=active) return;
259  queue->releaseUsed(monitorElement);
260  }
261 }
262 
263 void MonitorLocal::releaseActiveElement()
264 {
265  if(pvRecord->getTraceLevel()>1)
266  {
267  cout << "MonitorLocal::releaseActiveElement state " << state << endl;
268  }
269  {
270  Lock xx(queueMutex);
271  if(state!=active) return;
272  bool result = pvCopy->updateCopyFromBitSet(activeElement->pvStructurePtr,activeElement->changedBitSet);
273  if(!result) return;
274  MonitorElementPtr newActive = queue->getFree();
275  if(!newActive) return;
276  BitSetUtil::compress(activeElement->changedBitSet,activeElement->pvStructurePtr);
277  BitSetUtil::compress(activeElement->overrunBitSet,activeElement->pvStructurePtr);
278  queue->setUsed(activeElement);
279  activeElement = newActive;
280  activeElement->changedBitSet->clear();
281  activeElement->overrunBitSet->clear();
282  }
283  MonitorRequesterPtr requester = monitorRequester.lock();
284  if(!requester) return;
285  requester->monitorEvent(getPtrSelf());
286  return;
287 }
288 
289 void MonitorLocal::dataPut(PVRecordFieldPtr const & pvRecordField)
290 {
291  if(pvRecord->getTraceLevel()>1)
292  {
293  cout << "MonitorLocal::dataPut(pvRecordField)" << endl;
294  }
295  if(state!=active) return;
296  {
297  Lock xx(mutex);
298  size_t offset = pvCopy->getCopyOffset(pvRecordField->getPVField());
299  BitSetPtr const &changedBitSet = activeElement->changedBitSet;
300  BitSetPtr const &overrunBitSet = activeElement->overrunBitSet;
301  bool isSet = changedBitSet->get(offset);
302  changedBitSet->set(offset);
303  if(isSet) overrunBitSet->set(offset);
304  dataChanged = true;
305  }
306  if(!isGroupPut) {
307  releaseActiveElement();
308  dataChanged = false;
309  }
310 }
311 
312 void MonitorLocal::dataPut(
313  PVRecordStructurePtr const & requested,
314  PVRecordFieldPtr const & pvRecordField)
315 {
316  if(pvRecord->getTraceLevel()>1)
317  {
318  cout << "MonitorLocal::dataPut(requested,pvRecordField)" << endl;
319  }
320  if(state!=active) return;
321  {
322  Lock xx(mutex);
323  BitSetPtr const &changedBitSet = activeElement->changedBitSet;
324  BitSetPtr const &overrunBitSet = activeElement->overrunBitSet;
325  size_t offsetCopyRequested = pvCopy->getCopyOffset(
326  requested->getPVField());
327  size_t offset = offsetCopyRequested
328  + (pvRecordField->getPVField()->getFieldOffset()
329  - requested->getPVField()->getFieldOffset());
330  bool isSet = changedBitSet->get(offset);
331  changedBitSet->set(offset);
332  if(isSet) overrunBitSet->set(offset);
333  dataChanged = true;
334  }
335  if(!isGroupPut) {
336  releaseActiveElement();
337  dataChanged = false;
338  }
339 }
340 
341 void MonitorLocal::beginGroupPut(PVRecordPtr const & pvRecord)
342 {
343  if(pvRecord->getTraceLevel()>1)
344  {
345  cout << "MonitorLocal::beginGroupPut()" << endl;
346  }
347  if(state!=active) return;
348  {
349  Lock xx(mutex);
350  isGroupPut = true;
351  dataChanged = false;
352  }
353 }
354 
355 void MonitorLocal::endGroupPut(PVRecordPtr const & pvRecord)
356 {
357  if(pvRecord->getTraceLevel()>1)
358  {
359  cout << "MonitorLocal::endGroupPut dataChanged " << dataChanged << endl;
360  }
361  if(state!=active) return;
362  {
363  Lock xx(mutex);
364  isGroupPut = false;
365  }
366  if(dataChanged) {
367  dataChanged = false;
368  releaseActiveElement();
369  }
370 }
371 
372 void MonitorLocal::unlisten(PVRecordPtr const & pvRecord)
373 {
374  if(pvRecord->getTraceLevel()>1)
375  {
376  cout << "MonitorLocal::unlisten\n";
377  }
378  {
379  Lock xx(mutex);
380  state = deleted;
381  }
382  MonitorRequesterPtr requester = monitorRequester.lock();
383  if(requester) {
384  if(pvRecord->getTraceLevel()>1)
385  {
386  cout << "MonitorLocal::unlisten calling requester->unlisten\n";
387  }
388  requester->unlisten(getPtrSelf());
389  }
390 }
391 
392 
393 bool MonitorLocal::init(PVStructurePtr const & pvRequest)
394 {
395  PVFieldPtr pvField;
396  size_t queueSize = 2;
397  PVStructurePtr pvOptions = pvRequest->getSubField<PVStructure>("record._options");
398  MonitorRequesterPtr requester = monitorRequester.lock();
399  if(!requester) return false;
400  if(pvOptions) {
401  PVStringPtr pvString = pvOptions->getSubField<PVString>("queueSize");
402  if(pvString) {
403  try {
404  int32 size;
405  std::stringstream ss;
406  ss << pvString->get();
407  ss >> size;
408  queueSize = size;
409  } catch (...) {
410  requester->message("queueSize " +pvString->get() + " illegal",errorMessage);
411  return false;
412  }
413  }
414  }
415  pvField = pvRequest->getSubField("field");
416  if(!pvField) {
417  pvCopy = PVCopy::create(
418  pvRecord->getPVRecordStructure()->getPVStructure(),
419  pvRequest,"");
420  if(!pvCopy) {
421  requester->message("illegal pvRequest",errorMessage);
422  return false;
423  }
424  } else {
425  if(pvField->getField()->getType()!=structure) {
426  requester->message("illegal pvRequest",errorMessage);
427  return false;
428  }
429  pvCopy = PVCopy::create(
430  pvRecord->getPVRecordStructure()->getPVStructure(),
431  pvRequest,"field");
432  if(!pvCopy) {
433  requester->message("illegal pvRequest",errorMessage);
434  return false;
435  }
436  }
437  if(queueSize<2) queueSize = 2;
438  std::vector<MonitorElementPtr> monitorElementArray;
439  monitorElementArray.reserve(queueSize);
440  for(size_t i=0; i<queueSize; i++) {
441  PVStructurePtr pvStructure = pvCopy->createPVStructure();
442  MonitorElementPtr monitorElement(
443  new MonitorElement(pvStructure));
444  monitorElementArray.push_back(monitorElement);
445  }
446  queue = MonitorElementQueuePtr(new MonitorElementQueue(monitorElementArray));
447  requester->monitorConnect(
448  Status::Ok,
449  getPtrSelf(),
450  pvCopy->getStructure());
451  return true;
452 }
453 
455  PVRecordPtr const & pvRecord,
456  MonitorRequester::shared_pointer const & monitorRequester,
457  PVStructurePtr const & pvRequest)
458 {
459  MonitorLocalPtr monitor(new MonitorLocal(
460  monitorRequester,pvRecord));
461  bool result = monitor->init(pvRequest);
462  if(!result) {
463  MonitorPtr monitor;
464  StructureConstPtr structure;
465  monitorRequester->monitorConnect(
466  failedToCreateMonitorStatus,monitor,structure);
467  return nullMonitor;
468  }
469  if(pvRecord->getTraceLevel()>0)
470  {
471  cout << "MonitorFactory::createMonitor"
472  << " recordName " << pvRecord->getRecordName() << endl;
473  }
474  return monitor;
475 }
476 
477 }}
std::tr1::shared_ptr< PVCopy > PVCopyPtr
Definition: pvPlugin.h:25
Listener for PVRecord::message.
Definition: pvDatabase.h:457
std::tr1::shared_ptr< MonitorLocal > MonitorLocalPtr
std::tr1::shared_ptr< MonitorRequester > MonitorRequesterPtr
std::tr1::shared_ptr< PVRecord > PVRecordPtr
Definition: pvDatabase.h:21
std::tr1::shared_ptr< PVRecordStructure > PVRecordStructurePtr
Definition: pvDatabase.h:31
std::tr1::shared_ptr< PVRecordField > PVRecordFieldPtr
Definition: pvDatabase.h:26
std::tr1::shared_ptr< MonitorElementQueue > MonitorElementQueuePtr
MonitorPtr createMonitorLocal(PVRecordPtr const &pvRecord, MonitorRequester::shared_pointer const &monitorRequester, PVStructurePtr const &pvRequest)