pvDatabaseCPP  4.7.1-dev
dataDistributorPlugin.cpp
Go to the documentation of this file.
1 // Copyright information and license terms for this software can be
2 // found in the file LICENSE that is included with the distribution
3 
4 #include <stdlib.h>
5 
6 #include <cctype>
7 #include <string>
8 #include <algorithm>
9 #include <pv/lock.h>
10 #include <pv/pvData.h>
11 #include <pv/bitSet.h>
12 
13 #define epicsExportSharedSymbols
15 
16 using std::string;
17 using std::size_t;
18 using std::endl;
19 using std::tr1::static_pointer_cast;
20 using std::vector;
21 using namespace epics::pvData;
22 namespace epvd = epics::pvData;
23 
24 namespace epics { namespace pvCopy {
25 
26 // Utilities for manipulating strings
27 static std::string leftTrim(const std::string& s)
28 {
29  int i;
30  int n = s.length();
31  for (i = 0; i < n; i++) {
32  if (!isspace(s[i])) {
33  break;
34  }
35  }
36  return s.substr(i,n-i);
37 }
38 
39 static std::string rightTrim(const std::string& s)
40 {
41  int i;
42  int n = s.length();
43  for (i = n; i > 0; i--) {
44  if (!isspace(s[i-1])) {
45  break;
46  }
47  }
48  return s.substr(0,i);
49 }
50 
51 static std::string trim(const std::string& s)
52 {
53  return rightTrim(leftTrim(s));
54 }
55 
56 static std::vector<std::string>& split(const std::string& s, char delimiter, std::vector<std::string>& elements)
57 {
58  std::stringstream ss(s);
59  std::string item;
60  while (std::getline(ss, item, delimiter)) {
61  elements.push_back(trim(item));
62  }
63  return elements;
64 }
65 
66 static std::vector<std::string> split(const std::string& s, char delimiter)
67 {
68  std::vector<std::string> elements;
69  split(s, delimiter, elements);
70  return elements;
71 }
72 
73 static std::string toLowerCase(const std::string& input)
74 {
75  std::stringstream ss;
76  for (unsigned int i = 0; i < input.size(); i++) {
77  char c = std::tolower(input.at(i));
78  ss << c;
79  }
80  return ss.str();
81 }
82 
83 // Data distributor class
84 
85 static std::string name("distributor");
86 bool DataDistributorPlugin::initialized(DataDistributorPlugin::initialize());
87 
88 std::map<std::string, DataDistributorPtr> DataDistributor::dataDistributorMap;
89 epics::pvData::Mutex DataDistributor::dataDistributorMapMutex;
90 
91 DataDistributorPtr DataDistributor::getInstance(const std::string& groupId)
92 {
93  epvd::Lock lock(dataDistributorMapMutex);
94  std::map<std::string,DataDistributorPtr>::iterator ddit = dataDistributorMap.find(groupId);
95  if (ddit != dataDistributorMap.end()) {
96  DataDistributorPtr ddPtr = ddit->second;
97  return ddPtr;
98  }
99  else {
100  DataDistributorPtr ddPtr(new DataDistributor(groupId));
101  dataDistributorMap[groupId] = ddPtr;
102  return ddPtr;
103  }
104 }
105 
106 void DataDistributor::removeUnusedInstance(DataDistributorPtr dataDistributorPtr)
107 {
108  epvd::Lock lock(dataDistributorMapMutex);
109  std::string groupId = dataDistributorPtr->getGroupId();
110  std::map<std::string,DataDistributorPtr>::iterator ddit = dataDistributorMap.find(groupId);
111  if (ddit != dataDistributorMap.end()) {
112  DataDistributorPtr ddPtr = ddit->second;
113  int nSets = ddPtr->clientSetMap.size();
114  if (nSets == 0) {
115  dataDistributorMap.erase(ddit);
116  }
117  }
118 }
119 
120 DataDistributor::DataDistributor(const std::string& groupId_)
121  : groupId(groupId_)
122  , mutex()
123  , clientSetMap()
124  , clientSetIdList()
125  , currentSetIdIter(clientSetIdList.end())
126  , lastUpdateValue()
127 {
128 }
129 
130 DataDistributor::~DataDistributor()
131 {
132  epvd::Lock lock(mutex);
133  clientSetMap.clear();
134  clientSetIdList.clear();
135 }
136 
137 std::string DataDistributor::addClient(int clientId, const std::string& setId, const std::string& triggerField, int nUpdatesPerClient, int updateMode)
138 {
139  epvd::Lock lock(mutex);
140  std::map<std::string,ClientSetPtr>::iterator git = clientSetMap.find(setId);
141  if (git != clientSetMap.end()) {
142  ClientSetPtr setPtr = git->second;
143  setPtr->clientIdList.push_back(clientId);
144  return setPtr->triggerField;
145  }
146  else {
147  ClientSetPtr setPtr(new ClientSet(setId, triggerField, nUpdatesPerClient, updateMode));
148  setPtr->clientIdList.push_back(clientId);
149  clientSetMap[setId] = setPtr;
150  clientSetIdList.push_back(setId);
151  return triggerField;
152  }
153 }
154 
155 void DataDistributor::removeClient(int clientId, const std::string& setId)
156 {
157  epvd::Lock lock(mutex);
158  std::map<std::string,ClientSetPtr>::iterator git = clientSetMap.find(setId);
159  if (git != clientSetMap.end()) {
160  ClientSetPtr setPtr = git->second;
161  std::list<int>::iterator cit = std::find(setPtr->clientIdList.begin(), setPtr->clientIdList.end(), clientId);
162  if (cit != setPtr->clientIdList.end()) {
163  // If we are removing current client id, advance iterator
164  if (cit == setPtr->currentClientIdIter) {
165  setPtr->currentClientIdIter++;
166  }
167 
168  // Find current client id
169  int currentClientId = -1;
170  if (setPtr->currentClientIdIter != setPtr->clientIdList.end()) {
171  currentClientId = *(setPtr->currentClientIdIter);
172  }
173 
174  // Remove client id from the list
175  setPtr->clientIdList.erase(cit);
176 
177  // Reset current client id iterator
178  setPtr->currentClientIdIter = setPtr->clientIdList.end();
179  if (currentClientId >= 0) {
180  std::list<int>::iterator cit2 = std::find(setPtr->clientIdList.begin(), setPtr->clientIdList.end(), currentClientId);
181  if (cit2 != setPtr->clientIdList.end()) {
182  setPtr->currentClientIdIter = cit2;
183  }
184  }
185  }
186 
187  if (setPtr->clientIdList.size() == 0) {
188  clientSetMap.erase(git);
189  std::list<std::string>::iterator git2 = std::find(clientSetIdList.begin(), clientSetIdList.end(), setId);
190  if (git2 == currentSetIdIter) {
191  currentSetIdIter++;
192  }
193  if (git2 != clientSetIdList.end()) {
194  clientSetIdList.erase(git2);
195  }
196  }
197  }
198 }
199 
200 bool DataDistributor::updateClient(int clientId, const std::string& setId, const std::string& triggerFieldValue)
201 {
202  epvd::Lock lock(mutex);
203  bool proceedWithUpdate = false;
204  if (currentSetIdIter == clientSetIdList.end()) {
205  currentSetIdIter = clientSetIdList.begin();
206  }
207  std::string currentSetId = *currentSetIdIter;
208  if (setId != currentSetId) {
209  // We are not distributing data to this set at the moment
210  return proceedWithUpdate;
211  }
212  ClientSetPtr setPtr = clientSetMap[currentSetId];
213  if (setPtr->currentClientIdIter == setPtr->clientIdList.end()) {
214  // Move current client iterator to the beginning of the list
215  setPtr->currentClientIdIter = setPtr->clientIdList.begin();
216  }
217  if (lastUpdateValue == triggerFieldValue) {
218  // This update was already distributed.
219  return proceedWithUpdate;
220  }
221  switch (setPtr->updateMode) {
222  case(DD_UPDATE_ONE_PER_GROUP): {
223  if (clientId != *(setPtr->currentClientIdIter)) {
224  // Not this client's turn.
225  return proceedWithUpdate;
226  }
227  proceedWithUpdate = true;
228  lastUpdateValue = triggerFieldValue;
229  setPtr->lastUpdateValue = triggerFieldValue;
230  setPtr->updateCounter++;
231  if (setPtr->updateCounter >= setPtr->nUpdatesPerClient) {
232  // This client and set are done.
233  setPtr->currentClientIdIter++;
234  setPtr->updateCounter = 0;
235  currentSetIdIter++;
236  }
237  break;
238  }
239  case(DD_UPDATE_ALL_IN_GROUP): {
240  proceedWithUpdate = true;
241  static unsigned int nClientsUpdated = 0;
242  if (setPtr->lastUpdateValue != triggerFieldValue) {
243  setPtr->lastUpdateValue = triggerFieldValue;
244  setPtr->updateCounter++;
245  nClientsUpdated = 0;
246  }
247  nClientsUpdated++;
248  if (nClientsUpdated == setPtr->clientIdList.size() && setPtr->updateCounter >= setPtr->nUpdatesPerClient) {
249  // This set is done.
250  lastUpdateValue = triggerFieldValue;
251  setPtr->updateCounter = 0;
252  currentSetIdIter++;
253  }
254  break;
255  }
256  default: {
257  proceedWithUpdate = true;
258  }
259  }
260  return proceedWithUpdate;
261 }
262 
263 DataDistributorPlugin::DataDistributorPlugin()
264 {
265 }
266 
267 DataDistributorPlugin::~DataDistributorPlugin()
268 {
269 }
270 
271 void DataDistributorPlugin::create()
272 {
273  initialize();
274 }
275 
276 bool DataDistributorPlugin::initialize()
277 {
279  PVPluginRegistry::registerPlugin(name,pvPlugin);
280  return true;
281 }
282 
283 PVFilterPtr DataDistributorPlugin::create(
284  const std::string& requestValue,
285  const PVCopyPtr& pvCopy,
286  const PVFieldPtr& master)
287 {
288  return DataDistributorFilter::create(requestValue,pvCopy,master);
289 }
290 
291 DataDistributorFilter::~DataDistributorFilter()
292 {
293  dataDistributorPtr->removeClient(clientId, setId);
294  DataDistributor::removeUnusedInstance(dataDistributorPtr);
295 }
296 
297 DataDistributorFilterPtr DataDistributorFilter::create(
298  const std::string& requestValue,
299  const PVCopyPtr& pvCopy,
300  const PVFieldPtr& master)
301 {
302  static int clientId = 0;
303  clientId++;
304 
305  std::vector<std::string> configItems = split(requestValue, ';');
306  // Use lowercase keys if possible.
307  std::string requestValue2 = toLowerCase(requestValue);
308  std::vector<std::string> configItems2 = split(requestValue2, ';');
309  int nUpdatesPerClient = 1;
310  int updateMode = DataDistributor::DD_UPDATE_ONE_PER_GROUP;
311  std::string groupId = "default";
312  std::string setId = "default";
313  std::string triggerField = "timeStamp";
314  bool hasUpdateMode = false;
315  bool hasSetId = false;
316  for(unsigned int i = 0; i < configItems2.size(); i++) {
317  std::string configItem2 = configItems2[i];
318  size_t ind = configItem2.find(':');
319  if (ind == string::npos) {
320  continue;
321  }
322  if(configItem2.find("updates") == 0) {
323  std::string svalue = configItem2.substr(ind+1);
324  nUpdatesPerClient = atoi(svalue.c_str());
325  }
326  else if(configItem2.find("group") == 0) {
327  std::string configItem = configItems[i];
328  groupId = configItem.substr(ind+1);
329  }
330  else if(configItem2.find("set") == 0) {
331  std::string configItem = configItems[i];
332  setId = configItem.substr(ind+1);
333  hasSetId = true;
334  }
335  else if(configItem2.find("mode") == 0) {
336  std::string svalue = toLowerCase(configItem2.substr(ind+1));
337  if (svalue == "one") {
338  updateMode = DataDistributor::DD_UPDATE_ONE_PER_GROUP;
339  hasUpdateMode = true;
340  }
341  else if (svalue == "all") {
342  updateMode = DataDistributor::DD_UPDATE_ALL_IN_GROUP;
343  hasUpdateMode = true;
344  }
345  }
346  else if(configItem2.find("trigger") == 0) {
347  std::string configItem = configItems[i];
348  triggerField = configItem.substr(ind+1);
349  }
350  }
351  // If request does not have update mode specified, but has set id
352  // then use a different update mode
353  if(!hasUpdateMode && hasSetId) {
354  updateMode = DataDistributor::DD_UPDATE_ALL_IN_GROUP;
355  }
356 
357  // Make sure request is valid
358  if(nUpdatesPerClient <= 0) {
359  return DataDistributorFilterPtr();
360  }
361  DataDistributorFilterPtr filter =
362  DataDistributorFilterPtr(new DataDistributorFilter(groupId, clientId, setId, triggerField, nUpdatesPerClient, updateMode, pvCopy, master));
363  return filter;
364 }
365 
366 DataDistributorFilter::DataDistributorFilter(const std::string& groupId_, int clientId_, const std::string& setId_, const std::string& triggerField_, int nUpdatesPerClient, int updateMode, const PVCopyPtr& copyPtr_, const epvd::PVFieldPtr& masterFieldPtr_)
367  : dataDistributorPtr(DataDistributor::getInstance(groupId_))
368  , clientId(clientId_)
369  , setId(setId_)
370  , triggerField(triggerField_)
371  , masterFieldPtr(masterFieldPtr_)
372  , triggerFieldPtr()
373  , firstUpdate(true)
374 {
375  triggerField = dataDistributorPtr->addClient(clientId, setId, triggerField, nUpdatesPerClient, updateMode);
376  if(masterFieldPtr->getField()->getType() == epvd::structure) {
377  epvd::PVStructurePtr pvStructurePtr = static_pointer_cast<epvd::PVStructure>(masterFieldPtr);
378  if(pvStructurePtr) {
379  triggerFieldPtr = pvStructurePtr->getSubField(triggerField);
380  }
381  }
382  if(!triggerFieldPtr) {
383  triggerFieldPtr = masterFieldPtr;
384  }
385 }
386 
387 
388 bool DataDistributorFilter::filter(const PVFieldPtr& pvCopy, const BitSetPtr& bitSet, bool toCopy)
389 {
390  if(!toCopy) {
391  return false;
392  }
393 
394  bool proceedWithUpdate = false;
395  if(firstUpdate) {
396  // Always send first update
397  firstUpdate = false;
398  proceedWithUpdate = true;
399  }
400  else {
401  std::stringstream ss;
402  ss << triggerFieldPtr;
403  std::string triggerFieldValue = ss.str();
404  proceedWithUpdate = dataDistributorPtr->updateClient(clientId, setId, triggerFieldValue);
405  }
406 
407  if(proceedWithUpdate) {
408  pvCopy->copyUnchecked(*masterFieldPtr);
409  bitSet->set(pvCopy->getFieldOffset());
410  }
411  else {
412  // Clear all bits
413  //bitSet->clear(pvCopy->getFieldOffset());
414  bitSet->clear();
415  }
416 
417  return true;
418 }
419 
420 string DataDistributorFilter::getName()
421 {
422  return name;
423 }
424 
425 }}
std::tr1::shared_ptr< PVFilter > PVFilterPtr
Definition: pvPlugin.h:29
std::tr1::shared_ptr< PVCopy > PVCopyPtr
Definition: pvPlugin.h:25
std::tr1::shared_ptr< ClientSet > ClientSetPtr
A Plugin for a filter that gets a sub array from a PVScalarDeadband.
std::tr1::shared_ptr< DataDistributorFilter > DataDistributorFilterPtr
std::tr1::shared_ptr< DataDistributor > DataDistributorPtr
std::tr1::shared_ptr< DataDistributorPlugin > DataDistributorPluginPtr