10 #include <pv/pvData.h> 11 #include <pv/bitSet.h> 13 #define epicsExportSharedSymbols 19 using std::tr1::static_pointer_cast;
24 namespace epics {
namespace pvCopy {
27 static std::string leftTrim(
const std::string& s)
31 for (i = 0; i < n; i++) {
36 return s.substr(i,n-i);
39 static std::string rightTrim(
const std::string& s)
43 for (i = n; i > 0; i--) {
44 if (!isspace(s[i-1])) {
51 static std::string trim(
const std::string& s)
53 return rightTrim(leftTrim(s));
56 static std::vector<std::string>& split(
const std::string& s,
char delimiter, std::vector<std::string>& elements)
58 std::stringstream ss(s);
60 while (std::getline(ss, item, delimiter)) {
61 elements.push_back(trim(item));
66 static std::vector<std::string> split(
const std::string& s,
char delimiter)
68 std::vector<std::string> elements;
69 split(s, delimiter, elements);
73 static std::string toLowerCase(
const std::string& input)
76 for (
unsigned int i = 0; i < input.size(); i++) {
77 char c = std::tolower(input.at(i));
85 static std::string name(
"distributor");
86 bool DataDistributorPlugin::initialized(DataDistributorPlugin::initialize());
88 std::map<std::string, DataDistributorPtr> DataDistributor::dataDistributorMap;
89 epics::pvData::Mutex DataDistributor::dataDistributorMapMutex;
93 epvd::Lock lock(dataDistributorMapMutex);
94 std::map<std::string,DataDistributorPtr>::iterator ddit = dataDistributorMap.find(groupId);
95 if (ddit != dataDistributorMap.end()) {
101 dataDistributorMap[groupId] = ddPtr;
108 epvd::Lock lock(dataDistributorMapMutex);
109 std::string groupId = dataDistributorPtr->getGroupId();
110 std::map<std::string,DataDistributorPtr>::iterator ddit = dataDistributorMap.find(groupId);
111 if (ddit != dataDistributorMap.end()) {
113 int nSets = ddPtr->clientSetMap.size();
115 dataDistributorMap.erase(ddit);
120 DataDistributor::DataDistributor(
const std::string& groupId_)
125 , currentSetIdIter(clientSetIdList.end())
130 DataDistributor::~DataDistributor()
132 epvd::Lock lock(mutex);
133 clientSetMap.clear();
134 clientSetIdList.clear();
137 std::string DataDistributor::addClient(
int clientId,
const std::string& setId,
const std::string& triggerField,
int nUpdatesPerClient,
int updateMode)
139 epvd::Lock lock(mutex);
140 std::map<std::string,ClientSetPtr>::iterator git = clientSetMap.find(setId);
141 if (git != clientSetMap.end()) {
143 setPtr->clientIdList.push_back(clientId);
144 return setPtr->triggerField;
148 setPtr->clientIdList.push_back(clientId);
149 clientSetMap[setId] = setPtr;
150 clientSetIdList.push_back(setId);
155 void DataDistributor::removeClient(
int clientId,
const std::string& setId)
157 epvd::Lock lock(mutex);
158 std::map<std::string,ClientSetPtr>::iterator git = clientSetMap.find(setId);
159 if (git != clientSetMap.end()) {
161 std::list<int>::iterator cit = std::find(setPtr->clientIdList.begin(), setPtr->clientIdList.end(), clientId);
162 if (cit != setPtr->clientIdList.end()) {
164 if (cit == setPtr->currentClientIdIter) {
165 setPtr->currentClientIdIter++;
169 int currentClientId = -1;
170 if (setPtr->currentClientIdIter != setPtr->clientIdList.end()) {
171 currentClientId = *(setPtr->currentClientIdIter);
175 setPtr->clientIdList.erase(cit);
178 setPtr->currentClientIdIter = setPtr->clientIdList.end();
179 if (currentClientId >= 0) {
180 std::list<int>::iterator cit2 = std::find(setPtr->clientIdList.begin(), setPtr->clientIdList.end(), currentClientId);
181 if (cit2 != setPtr->clientIdList.end()) {
182 setPtr->currentClientIdIter = cit2;
187 if (setPtr->clientIdList.size() == 0) {
188 clientSetMap.erase(git);
189 std::list<std::string>::iterator git2 = std::find(clientSetIdList.begin(), clientSetIdList.end(), setId);
190 if (git2 == currentSetIdIter) {
193 if (git2 != clientSetIdList.end()) {
194 clientSetIdList.erase(git2);
200 bool DataDistributor::updateClient(
int clientId,
const std::string& setId,
const std::string& triggerFieldValue)
202 epvd::Lock lock(mutex);
203 bool proceedWithUpdate =
false;
204 if (currentSetIdIter == clientSetIdList.end()) {
205 currentSetIdIter = clientSetIdList.begin();
207 std::string currentSetId = *currentSetIdIter;
208 if (setId != currentSetId) {
210 return proceedWithUpdate;
213 if (setPtr->currentClientIdIter == setPtr->clientIdList.end()) {
215 setPtr->currentClientIdIter = setPtr->clientIdList.begin();
217 if (lastUpdateValue == triggerFieldValue) {
219 return proceedWithUpdate;
221 switch (setPtr->updateMode) {
222 case(DD_UPDATE_ONE_PER_GROUP): {
223 if (clientId != *(setPtr->currentClientIdIter)) {
225 return proceedWithUpdate;
227 proceedWithUpdate =
true;
228 lastUpdateValue = triggerFieldValue;
229 setPtr->lastUpdateValue = triggerFieldValue;
230 setPtr->updateCounter++;
231 if (setPtr->updateCounter >= setPtr->nUpdatesPerClient) {
233 setPtr->currentClientIdIter++;
234 setPtr->updateCounter = 0;
239 case(DD_UPDATE_ALL_IN_GROUP): {
240 proceedWithUpdate =
true;
241 static unsigned int nClientsUpdated = 0;
242 if (setPtr->lastUpdateValue != triggerFieldValue) {
243 setPtr->lastUpdateValue = triggerFieldValue;
244 setPtr->updateCounter++;
248 if (nClientsUpdated == setPtr->clientIdList.size() && setPtr->updateCounter >= setPtr->nUpdatesPerClient) {
250 lastUpdateValue = triggerFieldValue;
251 setPtr->updateCounter = 0;
257 proceedWithUpdate =
true;
260 return proceedWithUpdate;
263 DataDistributorPlugin::DataDistributorPlugin()
267 DataDistributorPlugin::~DataDistributorPlugin()
271 void DataDistributorPlugin::create()
276 bool DataDistributorPlugin::initialize()
279 PVPluginRegistry::registerPlugin(name,pvPlugin);
284 const std::string& requestValue,
286 const PVFieldPtr& master)
288 return DataDistributorFilter::create(requestValue,pvCopy,master);
291 DataDistributorFilter::~DataDistributorFilter()
293 dataDistributorPtr->removeClient(clientId, setId);
294 DataDistributor::removeUnusedInstance(dataDistributorPtr);
298 const std::string& requestValue,
300 const PVFieldPtr& master)
302 static int clientId = 0;
305 std::vector<std::string> configItems = split(requestValue,
';');
307 std::string requestValue2 = toLowerCase(requestValue);
308 std::vector<std::string> configItems2 = split(requestValue2,
';');
309 int nUpdatesPerClient = 1;
310 int updateMode = DataDistributor::DD_UPDATE_ONE_PER_GROUP;
311 std::string groupId =
"default";
312 std::string setId =
"default";
313 std::string triggerField =
"timeStamp";
314 bool hasUpdateMode =
false;
315 bool hasSetId =
false;
316 for(
unsigned int i = 0; i < configItems2.size(); i++) {
317 std::string configItem2 = configItems2[i];
318 size_t ind = configItem2.find(
':');
319 if (ind == string::npos) {
322 if(configItem2.find(
"updates") == 0) {
323 std::string svalue = configItem2.substr(ind+1);
324 nUpdatesPerClient = atoi(svalue.c_str());
326 else if(configItem2.find(
"group") == 0) {
327 std::string configItem = configItems[i];
328 groupId = configItem.substr(ind+1);
330 else if(configItem2.find(
"set") == 0) {
331 std::string configItem = configItems[i];
332 setId = configItem.substr(ind+1);
335 else if(configItem2.find(
"mode") == 0) {
336 std::string svalue = toLowerCase(configItem2.substr(ind+1));
337 if (svalue ==
"one") {
338 updateMode = DataDistributor::DD_UPDATE_ONE_PER_GROUP;
339 hasUpdateMode =
true;
341 else if (svalue ==
"all") {
342 updateMode = DataDistributor::DD_UPDATE_ALL_IN_GROUP;
343 hasUpdateMode =
true;
346 else if(configItem2.find(
"trigger") == 0) {
347 std::string configItem = configItems[i];
348 triggerField = configItem.substr(ind+1);
353 if(!hasUpdateMode && hasSetId) {
354 updateMode = DataDistributor::DD_UPDATE_ALL_IN_GROUP;
358 if(nUpdatesPerClient <= 0) {
366 DataDistributorFilter::DataDistributorFilter(
const std::string& groupId_,
int clientId_,
const std::string& setId_,
const std::string& triggerField_,
int nUpdatesPerClient,
int updateMode,
const PVCopyPtr& copyPtr_,
const epvd::PVFieldPtr& masterFieldPtr_)
367 : dataDistributorPtr(DataDistributor::getInstance(groupId_))
368 , clientId(clientId_)
370 , triggerField(triggerField_)
371 , masterFieldPtr(masterFieldPtr_)
375 triggerField = dataDistributorPtr->addClient(clientId, setId, triggerField, nUpdatesPerClient, updateMode);
376 if(masterFieldPtr->getField()->getType() == epvd::structure) {
377 epvd::PVStructurePtr pvStructurePtr = static_pointer_cast<epvd::PVStructure>(masterFieldPtr);
379 triggerFieldPtr = pvStructurePtr->getSubField(triggerField);
382 if(!triggerFieldPtr) {
383 triggerFieldPtr = masterFieldPtr;
388 bool DataDistributorFilter::filter(
const PVFieldPtr& pvCopy,
const BitSetPtr& bitSet,
bool toCopy)
394 bool proceedWithUpdate =
false;
398 proceedWithUpdate =
true;
401 std::stringstream ss;
402 ss << triggerFieldPtr;
403 std::string triggerFieldValue = ss.str();
404 proceedWithUpdate = dataDistributorPtr->updateClient(clientId, setId, triggerFieldValue);
407 if(proceedWithUpdate) {
408 pvCopy->copyUnchecked(*masterFieldPtr);
409 bitSet->set(pvCopy->getFieldOffset());
420 string DataDistributorFilter::getName()
std::tr1::shared_ptr< PVFilter > PVFilterPtr
std::tr1::shared_ptr< PVCopy > PVCopyPtr
std::tr1::shared_ptr< ClientSet > ClientSetPtr
A Plugin for a filter that gets a sub array from a PVScalarDeadband.
std::tr1::shared_ptr< DataDistributorFilter > DataDistributorFilterPtr
std::tr1::shared_ptr< DataDistributor > DataDistributorPtr
std::tr1::shared_ptr< DataDistributorPlugin > DataDistributorPluginPtr