14 #include <epicsGuard.h> 15 #include <pv/thread.h> 16 #include <pv/bitSetUtil.h> 17 #include <pv/pvData.h> 18 #include <pv/pvAccess.h> 19 #include <pv/pvTimeStamp.h> 20 #include <pv/rpcService.h> 21 #include <pv/serverContext.h> 22 #include <pv/timeStamp.h> 24 #define epicsExportSharedSymbols 32 using std::tr1::static_pointer_cast;
37 namespace epics {
namespace pvDatabase {
42 static MonitorPtr nullMonitor;
43 static MonitorElementPtr NULLMonitorElement;
44 static Status failedToCreateMonitorStatus(
45 Status::STATUSTYPE_ERROR,
"failed to create monitor");
46 static Status alreadyStartedStatus(Status::STATUSTYPE_ERROR,
"already started");
47 static Status notStartedStatus(Status::STATUSTYPE_ERROR,
"not started");
48 static Status deletedStatus(Status::STATUSTYPE_ERROR,
"record is deleted");
50 class MonitorElementQueue;
53 class MonitorElementQueue
56 MonitorElementPtrArray elements;
66 POINTER_DEFINITIONS(MonitorElementQueue);
68 MonitorElementQueue(std::vector<MonitorElementPtr> monitorElementArray)
69 : elements(monitorElementArray),
70 size(monitorElementArray.size()),
80 virtual ~MonitorElementQueue() {}
92 MonitorElementPtr getFree()
94 if(numberFree==0)
return MonitorElementPtr();
96 int ind = nextGetFree;
97 MonitorElementPtr queueElement = elements[nextGetFree++];
98 if(nextGetFree>=size) nextGetFree = 0;
102 void setUsed(MonitorElementPtr
const &element)
104 if(element!=elements[nextSetUsed++]) {
105 throw std::logic_error(
"not correct queueElement");
108 if(nextSetUsed>=size) nextSetUsed = 0;
111 MonitorElementPtr getUsed()
113 if(numberUsed==0)
return MonitorElementPtr();
114 int ind = nextGetUsed;
115 MonitorElementPtr queueElement = elements[nextGetUsed++];
116 if(nextGetUsed>=size) nextGetUsed = 0;
117 return elements[ind];
119 void releaseUsed(MonitorElementPtr
const &element)
121 if(element!=elements[nextReleaseUsed++]) {
122 throw std::logic_error(
123 "not queueElement returned by last call to getUsed");
125 if(nextReleaseUsed>=size) nextReleaseUsed = 0;
138 public std::tr1::enable_shared_from_this<MonitorLocal>
140 enum MonitorState {idle,active,deleted};
142 POINTER_DEFINITIONS(MonitorLocal);
143 virtual ~MonitorLocal();
144 virtual void destroy() {}
145 virtual Status start();
146 virtual Status stop();
147 virtual MonitorElementPtr poll();
148 virtual void detach(
PVRecordPtr const & pvRecord){}
149 virtual void release(MonitorElementPtr
const & monitorElement);
151 virtual void dataPut(
154 virtual void beginGroupPut(
PVRecordPtr const & pvRecord);
155 virtual void endGroupPut(
PVRecordPtr const & pvRecord);
156 virtual void unlisten(
PVRecordPtr const & pvRecord);
157 MonitorElementPtr getActiveElement();
158 void releaseActiveElement();
159 bool init(PVStructurePtr
const & pvRequest);
161 MonitorRequester::shared_pointer
const & channelMonitorRequester,
165 MonitorLocalPtr getPtrSelf()
167 return shared_from_this();
169 MonitorRequester::weak_pointer monitorRequester;
173 MonitorElementQueuePtr queue;
174 MonitorElementPtr activeElement;
181 MonitorLocal::MonitorLocal(
182 MonitorRequester::shared_pointer
const & channelMonitorRequester,
184 : monitorRequester(channelMonitorRequester),
192 MonitorLocal::~MonitorLocal()
194 if(pvRecord->getTraceLevel()>0)
196 cout <<
"MonitorLocal::~MonitorLocal()" << endl;
201 Status MonitorLocal::start()
203 if(pvRecord->getTraceLevel()>0)
205 cout <<
"MonitorLocal::start state " << state << endl;
209 if(state==active)
return alreadyStartedStatus;
210 if(state==deleted)
return deletedStatus;
212 pvRecord->addListener(getPtrSelf(),pvCopy);
213 epicsGuard <PVRecord> guard(*pvRecord);
218 activeElement = queue->getFree();
219 activeElement->changedBitSet->clear();
220 activeElement->overrunBitSet->clear();
221 activeElement->changedBitSet->set(0);
222 releaseActiveElement();
226 Status MonitorLocal::stop()
228 if(pvRecord->getTraceLevel()>0){
229 cout <<
"MonitorLocal::stop state " << state << endl;
233 if(state==idle)
return notStartedStatus;
234 if(state==deleted)
return deletedStatus;
237 pvRecord->removeListener(getPtrSelf(),pvCopy);
241 MonitorElementPtr MonitorLocal::poll()
243 if(pvRecord->getTraceLevel()>1)
245 cout <<
"MonitorLocal::poll state " << state << endl;
249 if(state!=active)
return NULLMonitorElement;
250 return queue->getUsed();
254 void MonitorLocal::release(MonitorElementPtr
const & monitorElement)
256 if(pvRecord->getTraceLevel()>1)
258 cout <<
"MonitorLocal::release state " << state << endl;
262 if(state!=active)
return;
263 queue->releaseUsed(monitorElement);
267 void MonitorLocal::releaseActiveElement()
269 if(pvRecord->getTraceLevel()>1)
271 cout <<
"MonitorLocal::releaseActiveElement state " << state << endl;
275 if(state!=active)
return;
276 bool result = pvCopy->updateCopyFromBitSet(activeElement->pvStructurePtr,activeElement->changedBitSet);
278 MonitorElementPtr newActive = queue->getFree();
279 if(!newActive)
return;
280 BitSetUtil::compress(activeElement->changedBitSet,activeElement->pvStructurePtr);
281 BitSetUtil::compress(activeElement->overrunBitSet,activeElement->pvStructurePtr);
282 queue->setUsed(activeElement);
283 activeElement = newActive;
284 activeElement->changedBitSet->clear();
285 activeElement->overrunBitSet->clear();
287 MonitorRequesterPtr requester = monitorRequester.lock();
288 if(!requester)
return;
289 requester->monitorEvent(getPtrSelf());
295 if(pvRecord->getTraceLevel()>1)
297 cout <<
"PVCopyMonitor::dataPut(pvRecordField)" << endl;
299 if(state!=active)
return;
302 size_t offset = pvCopy->getCopyOffset(pvRecordField->getPVField());
303 BitSetPtr
const &changedBitSet = activeElement->changedBitSet;
304 BitSetPtr
const &overrunBitSet = activeElement->overrunBitSet;
305 bool isSet = changedBitSet->get(offset);
306 changedBitSet->set(offset);
307 if(isSet) overrunBitSet->set(offset);
311 releaseActiveElement();
316 void MonitorLocal::dataPut(
320 if(pvRecord->getTraceLevel()>1)
322 cout <<
"PVCopyMonitor::dataPut(requested,pvRecordField)" << endl;
324 if(state!=active)
return;
327 BitSetPtr
const &changedBitSet = activeElement->changedBitSet;
328 BitSetPtr
const &overrunBitSet = activeElement->overrunBitSet;
329 size_t offsetCopyRequested = pvCopy->getCopyOffset(
330 requested->getPVField());
331 size_t offset = offsetCopyRequested
332 + (pvRecordField->getPVField()->getFieldOffset()
333 - requested->getPVField()->getFieldOffset());
334 bool isSet = changedBitSet->get(offset);
335 changedBitSet->set(offset);
336 if(isSet) overrunBitSet->set(offset);
340 releaseActiveElement();
345 void MonitorLocal::beginGroupPut(
PVRecordPtr const & pvRecord)
347 if(pvRecord->getTraceLevel()>1)
349 cout <<
"PVCopyMonitor::beginGroupPut()" << endl;
351 if(state!=active)
return;
359 void MonitorLocal::endGroupPut(
PVRecordPtr const & pvRecord)
361 if(pvRecord->getTraceLevel()>1)
363 cout <<
"PVCopyMonitor::endGroupPut dataChanged " << dataChanged << endl;
365 if(state!=active)
return;
372 releaseActiveElement();
376 void MonitorLocal::unlisten(
PVRecordPtr const & pvRecord)
378 if(pvRecord->getTraceLevel()>1)
380 cout <<
"PVCopyMonitor::unlisten\n";
386 MonitorRequesterPtr requester = monitorRequester.lock();
388 if(pvRecord->getTraceLevel()>1)
390 cout <<
"PVCopyMonitor::unlisten calling requester->unlisten\n";
392 requester->unlisten(getPtrSelf());
397 bool MonitorLocal::init(PVStructurePtr
const & pvRequest)
400 size_t queueSize = 2;
401 PVStructurePtr pvOptions = pvRequest->getSubField<PVStructure>(
"record._options");
402 MonitorRequesterPtr requester = monitorRequester.lock();
403 if(!requester)
return false;
405 PVStringPtr pvString = pvOptions->getSubField<PVString>(
"queueSize");
409 std::stringstream ss;
410 ss << pvString->get();
414 requester->message(
"queueSize " +pvString->get() +
" illegal",errorMessage);
419 pvField = pvRequest->getSubField(
"field");
421 pvCopy = PVCopy::create(
422 pvRecord->getPVRecordStructure()->getPVStructure(),
425 requester->message(
"illegal pvRequest",errorMessage);
429 if(pvField->getField()->getType()!=structure) {
430 requester->message(
"illegal pvRequest",errorMessage);
433 pvCopy = PVCopy::create(
434 pvRecord->getPVRecordStructure()->getPVStructure(),
437 requester->message(
"illegal pvRequest",errorMessage);
441 if(queueSize<2) queueSize = 2;
442 std::vector<MonitorElementPtr> monitorElementArray;
443 monitorElementArray.reserve(queueSize);
444 for(
size_t i=0; i<queueSize; i++) {
445 PVStructurePtr pvStructure = pvCopy->createPVStructure();
446 MonitorElementPtr monitorElement(
447 new MonitorElement(pvStructure));
448 monitorElementArray.push_back(monitorElement);
451 requester->monitorConnect(
454 pvCopy->getStructure());
460 MonitorRequester::shared_pointer
const & monitorRequester,
461 PVStructurePtr
const & pvRequest)
463 MonitorLocalPtr monitor(
new MonitorLocal(
464 monitorRequester,pvRecord));
465 bool result = monitor->init(pvRequest);
468 StructureConstPtr structure;
469 monitorRequester->monitorConnect(
470 failedToCreateMonitorStatus,monitor,structure);
473 if(pvRecord->getTraceLevel()>0)
475 cout <<
"MonitorFactory::createMonitor" 476 <<
" recordName " << pvRecord->getRecordName() << endl;
std::tr1::shared_ptr< PVCopy > PVCopyPtr
Listener for PVRecord::message.
std::tr1::shared_ptr< MonitorLocal > MonitorLocalPtr
std::tr1::shared_ptr< MonitorRequester > MonitorRequesterPtr
std::tr1::shared_ptr< PVRecord > PVRecordPtr
std::tr1::shared_ptr< PVRecordStructure > PVRecordStructurePtr
std::tr1::shared_ptr< PVRecordField > PVRecordFieldPtr
std::tr1::shared_ptr< MonitorElementQueue > MonitorElementQueuePtr
MonitorPtr createMonitorLocal(PVRecordPtr const &pvRecord, MonitorRequester::shared_pointer const &monitorRequester, PVStructurePtr const &pvRequest)