14 #include <epicsGuard.h> 15 #include <pv/thread.h> 16 #include <pv/bitSetUtil.h> 17 #include <pv/pvData.h> 18 #include <pv/pvAccess.h> 19 #include <pv/pvTimeStamp.h> 20 #include <pv/rpcService.h> 21 #include <pv/serverContext.h> 22 #include <pv/timeStamp.h> 24 #define epicsExportSharedSymbols 32 using std::tr1::static_pointer_cast;
37 namespace epics {
namespace pvDatabase {
42 static MonitorPtr nullMonitor;
43 static MonitorElementPtr NULLMonitorElement;
44 static Status failedToCreateMonitorStatus(
45 Status::STATUSTYPE_ERROR,
"failed to create monitor");
46 static Status alreadyStartedStatus(Status::STATUSTYPE_ERROR,
"already started");
47 static Status notStartedStatus(Status::STATUSTYPE_ERROR,
"not started");
48 static Status deletedStatus(Status::STATUSTYPE_ERROR,
"record is deleted");
50 class MonitorElementQueue;
53 class MonitorElementQueue
56 MonitorElementPtrArray elements;
66 POINTER_DEFINITIONS(MonitorElementQueue);
68 MonitorElementQueue(std::vector<MonitorElementPtr> monitorElementArray)
69 : elements(monitorElementArray),
70 size(monitorElementArray.size()),
80 virtual ~MonitorElementQueue() {}
92 MonitorElementPtr getFree()
94 if(numberFree==0)
return MonitorElementPtr();
96 int ind = nextGetFree;
97 MonitorElementPtr queueElement = elements[nextGetFree++];
98 if(nextGetFree>=size) nextGetFree = 0;
102 void setUsed(MonitorElementPtr
const &element)
104 if(element!=elements[nextSetUsed++]) {
105 throw std::logic_error(
"not correct queueElement");
108 if(nextSetUsed>=size) nextSetUsed = 0;
111 MonitorElementPtr getUsed()
113 if(numberUsed==0)
return MonitorElementPtr();
114 int ind = nextGetUsed;
115 MonitorElementPtr queueElement = elements[nextGetUsed++];
116 if(nextGetUsed>=size) nextGetUsed = 0;
117 return elements[ind];
119 void releaseUsed(MonitorElementPtr
const &element)
121 if(element!=elements[nextReleaseUsed++]) {
122 throw std::logic_error(
123 "not queueElement returned by last call to getUsed");
125 if(nextReleaseUsed>=size) nextReleaseUsed = 0;
138 public std::tr1::enable_shared_from_this<MonitorLocal>
140 enum MonitorState {idle,active,deleted};
142 POINTER_DEFINITIONS(MonitorLocal);
143 virtual ~MonitorLocal();
144 virtual void destroy() {}
145 virtual Status start();
146 virtual Status stop();
147 virtual MonitorElementPtr poll();
148 virtual void detach(
PVRecordPtr const & pvRecord){}
149 virtual void release(MonitorElementPtr
const & monitorElement);
151 virtual void dataPut(
154 virtual void beginGroupPut(
PVRecordPtr const & pvRecord);
155 virtual void endGroupPut(
PVRecordPtr const & pvRecord);
156 virtual void unlisten(
PVRecordPtr const & pvRecord);
157 MonitorElementPtr getActiveElement();
158 void releaseActiveElement();
159 bool init(PVStructurePtr
const & pvRequest);
161 MonitorRequester::shared_pointer
const & channelMonitorRequester,
165 MonitorLocalPtr getPtrSelf()
167 return shared_from_this();
169 MonitorRequester::weak_pointer monitorRequester;
173 MonitorElementQueuePtr queue;
174 MonitorElementPtr activeElement;
181 MonitorLocal::MonitorLocal(
182 MonitorRequester::shared_pointer
const & channelMonitorRequester,
184 : monitorRequester(channelMonitorRequester),
192 MonitorLocal::~MonitorLocal()
198 Status MonitorLocal::start()
200 if(pvRecord->getTraceLevel()>0)
202 cout <<
"MonitorLocal::start state " << state << endl;
206 if(state==active)
return alreadyStartedStatus;
207 if(state==deleted)
return deletedStatus;
209 pvRecord->addListener(getPtrSelf(),pvCopy);
210 epicsGuard <PVRecord> guard(*pvRecord);
215 activeElement = queue->getFree();
216 activeElement->changedBitSet->clear();
217 activeElement->overrunBitSet->clear();
218 activeElement->changedBitSet->set(0);
219 releaseActiveElement();
223 Status MonitorLocal::stop()
225 if(pvRecord->getTraceLevel()>0){
226 cout <<
"MonitorLocal::stop state " << state << endl;
230 if(state==idle)
return notStartedStatus;
231 if(state==deleted)
return deletedStatus;
234 pvRecord->removeListener(getPtrSelf(),pvCopy);
238 MonitorElementPtr MonitorLocal::poll()
240 if(pvRecord->getTraceLevel()>1)
242 cout <<
"MonitorLocal::poll state " << state << endl;
246 if(state!=active)
return NULLMonitorElement;
247 return queue->getUsed();
251 void MonitorLocal::release(MonitorElementPtr
const & monitorElement)
253 if(pvRecord->getTraceLevel()>1)
255 cout <<
"MonitorLocal::release state " << state << endl;
259 if(state!=active)
return;
260 queue->releaseUsed(monitorElement);
264 void MonitorLocal::releaseActiveElement()
266 if(pvRecord->getTraceLevel()>1)
268 cout <<
"MonitorLocal::releaseActiveElement state " << state << endl;
272 if(state!=active)
return;
273 bool result = pvCopy->updateCopyFromBitSet(activeElement->pvStructurePtr,activeElement->changedBitSet);
275 MonitorElementPtr newActive = queue->getFree();
276 if(!newActive)
return;
277 BitSetUtil::compress(activeElement->changedBitSet,activeElement->pvStructurePtr);
278 BitSetUtil::compress(activeElement->overrunBitSet,activeElement->pvStructurePtr);
279 queue->setUsed(activeElement);
280 activeElement = newActive;
281 activeElement->changedBitSet->clear();
282 activeElement->overrunBitSet->clear();
284 MonitorRequesterPtr requester = monitorRequester.lock();
285 if(!requester)
return;
286 requester->monitorEvent(getPtrSelf());
292 if(pvRecord->getTraceLevel()>1)
294 cout <<
"MonitorLocal::dataPut(pvRecordField)" << endl;
296 if(state!=active)
return;
299 size_t offset = pvCopy->getCopyOffset(pvRecordField->getPVField());
300 BitSetPtr
const &changedBitSet = activeElement->changedBitSet;
301 BitSetPtr
const &overrunBitSet = activeElement->overrunBitSet;
302 bool isSet = changedBitSet->get(offset);
303 changedBitSet->set(offset);
304 if(isSet) overrunBitSet->set(offset);
308 releaseActiveElement();
313 void MonitorLocal::dataPut(
317 if(pvRecord->getTraceLevel()>1)
319 cout <<
"MonitorLocal::dataPut(requested,pvRecordField)" << endl;
321 if(state!=active)
return;
324 BitSetPtr
const &changedBitSet = activeElement->changedBitSet;
325 BitSetPtr
const &overrunBitSet = activeElement->overrunBitSet;
326 size_t offsetCopyRequested = pvCopy->getCopyOffset(
327 requested->getPVField());
328 size_t offset = offsetCopyRequested
329 + (pvRecordField->getPVField()->getFieldOffset()
330 - requested->getPVField()->getFieldOffset());
331 bool isSet = changedBitSet->get(offset);
332 changedBitSet->set(offset);
333 if(isSet) overrunBitSet->set(offset);
337 releaseActiveElement();
342 void MonitorLocal::beginGroupPut(
PVRecordPtr const & pvRecord)
344 if(pvRecord->getTraceLevel()>1)
346 cout <<
"MonitorLocal::beginGroupPut()" << endl;
348 if(state!=active)
return;
356 void MonitorLocal::endGroupPut(
PVRecordPtr const & pvRecord)
358 if(pvRecord->getTraceLevel()>1)
360 cout <<
"MonitorLocal::endGroupPut dataChanged " << dataChanged << endl;
362 if(state!=active)
return;
369 releaseActiveElement();
373 void MonitorLocal::unlisten(
PVRecordPtr const & pvRecord)
375 if(pvRecord->getTraceLevel()>1)
377 cout <<
"MonitorLocal::unlisten\n";
383 MonitorRequesterPtr requester = monitorRequester.lock();
385 if(pvRecord->getTraceLevel()>1)
387 cout <<
"MonitorLocal::unlisten calling requester->unlisten\n";
389 requester->unlisten(getPtrSelf());
394 bool MonitorLocal::init(PVStructurePtr
const & pvRequest)
397 size_t queueSize = 2;
398 PVStructurePtr pvOptions = pvRequest->getSubField<PVStructure>(
"record._options");
399 MonitorRequesterPtr requester = monitorRequester.lock();
400 if(!requester)
return false;
402 PVStringPtr pvString = pvOptions->getSubField<PVString>(
"queueSize");
406 std::stringstream ss;
407 ss << pvString->get();
411 requester->message(
"queueSize " +pvString->get() +
" illegal",errorMessage);
416 pvField = pvRequest->getSubField(
"field");
418 pvCopy = PVCopy::create(
419 pvRecord->getPVRecordStructure()->getPVStructure(),
422 requester->message(
"illegal pvRequest",errorMessage);
426 if(pvField->getField()->getType()!=structure) {
427 requester->message(
"illegal pvRequest",errorMessage);
430 pvCopy = PVCopy::create(
431 pvRecord->getPVRecordStructure()->getPVStructure(),
434 requester->message(
"illegal pvRequest",errorMessage);
438 if(queueSize<2) queueSize = 2;
439 std::vector<MonitorElementPtr> monitorElementArray;
440 monitorElementArray.reserve(queueSize);
441 for(
size_t i=0; i<queueSize; i++) {
442 PVStructurePtr pvStructure = pvCopy->createPVStructure();
443 MonitorElementPtr monitorElement(
444 new MonitorElement(pvStructure));
445 monitorElementArray.push_back(monitorElement);
448 requester->monitorConnect(
451 pvCopy->getStructure());
457 MonitorRequester::shared_pointer
const & monitorRequester,
458 PVStructurePtr
const & pvRequest)
460 MonitorLocalPtr monitor(
new MonitorLocal(
461 monitorRequester,pvRecord));
462 bool result = monitor->init(pvRequest);
465 StructureConstPtr structure;
466 monitorRequester->monitorConnect(
467 failedToCreateMonitorStatus,monitor,structure);
470 if(pvRecord->getTraceLevel()>0)
472 cout <<
"MonitorFactory::createMonitor" 473 <<
" recordName " << pvRecord->getRecordName() << endl;
std::tr1::shared_ptr< PVCopy > PVCopyPtr
Listener for PVRecord::message.
std::tr1::shared_ptr< MonitorLocal > MonitorLocalPtr
std::tr1::shared_ptr< MonitorRequester > MonitorRequesterPtr
std::tr1::shared_ptr< PVRecord > PVRecordPtr
std::tr1::shared_ptr< PVRecordStructure > PVRecordStructurePtr
std::tr1::shared_ptr< PVRecordField > PVRecordFieldPtr
std::tr1::shared_ptr< MonitorElementQueue > MonitorElementQueuePtr
MonitorPtr createMonitorLocal(PVRecordPtr const &pvRecord, MonitorRequester::shared_pointer const &monitorRequester, PVStructurePtr const &pvRequest)