iop_simplenet.cpp
Go to the documentation of this file.
1 /** @file iop_simplenet.cpp Simple networked events via tcp/ip */
2 
3 /*
4  FAU Discrete Event Systems Library (libfaudes)
5 
6  Copyright (C) 2008, 2024 Thomas Moor
7  Exclusive copyright is granted to Klaus Schmidt
8 
9 */
10 
11 
12 
13 #include "iop_simplenet.h"
14 
15 
16 namespace faudes {
17 
18 
19 /*
20  **********************************************
21  **********************************************
22  **********************************************
23 
24  implementation: SimplenetAddress
25 
26  **********************************************
27  **********************************************
28  **********************************************
29  */
30 
31 // std faudes
32 FAUDES_TYPE_IMPLEMENTATION(SimplenetDevice,nDevice,vDevice)
33 
34 // construct
36  mIp="";
37  mPort=-1;
38 }
39 
40 // copy construct
42  mIp=rOther.mIp;
43  mPort=rOther.mPort;
44 }
45 
46 // construct from string
47 SimplenetAddress::SimplenetAddress(const std::string& rString) {
48  IpColonPort(rString);
49 }
50 
51 // validity
52 bool SimplenetAddress::Valid(void) const {
53  if(mPort<=0) return false;
54  if(mIp=="") return false;
55  if(mIp.find(':',0)!=std::string::npos) return false;
56  return true;
57 }
58 
59 // get colon seperated string
60 std::string SimplenetAddress::IpColonPort(void) const {
61  std::string res;
62  if(!Valid()) return res;
63  res=mIp+":"+ToStringInteger(mPort);
64  return res;
65 }
66 
67 // Set from colon seperated string
68 void SimplenetAddress::IpColonPort(std::string ipcolonport) {
69  FD_DHV("SimplenetAddress::IpColonPort(): " << ipcolonport << " --> ?");
70  // invalid
71  mIp="";
72  mPort=-1;
73  // find colon
74  std::size_t cpos = ipcolonport.find(':',0);
75  if(cpos==std::string::npos) return;
76  if(cpos==0) return;
77  if(cpos+1>= ipcolonport.length()) return;
78  // extract ip
79  mIp=ipcolonport.substr(0,cpos);
80  // extract port
81  mPort=ToIdx(ipcolonport.substr(cpos+1));
82  // report
83  FD_DHV("SimplenetAddress::IpColonPort(): " << ipcolonport << " --> " << IpColonPort());
84  // test for errors (might be too strict)
85  if(IpColonPort()!=ipcolonport) {
86  mIp="";
87  mPort=-1;
88  }
89 }
90 
91 // sorting
93  if(this->mIp < rOther.mIp) return true;
94  if(this->mIp > rOther.mIp) return false;
95  if(this->mPort < rOther.mPort) return true;
96  return false;
97 }
98 
99 
100 // only compile for simplenet support
101 #ifdef FAUDES_IODEVICE_SIMPLENET
102 
103 /*
104  **********************************************
105  **********************************************
106  **********************************************
107 
108  implementation: AttributeSimplenetOutput
109 
110  **********************************************
111  **********************************************
112  **********************************************
113  */
114 
115 // std faudes type
117 
118 
119 //DoWrite(rTw);
120 void AttributeSimplenetOutput::DoWrite(TokenWriter& rTw, const std::string& rLabel, const Type* pContext) const {
121  (void) rLabel; (void) pContext;
122 }
123 
124 
125 //DoRead(rTr)
126 void AttributeSimplenetOutput::DoRead(TokenReader& rTr, const std::string& rLabel, const Type* pContext) {
127  (void) rLabel; (void) pContext;
128  // report
129  FD_DHV("AttributeSimplenetOutput(" << this << ")::DoRead(tr)");
130 
131  // sense and digest pre 2.16 format
132  Token token;
133  rTr.Peek(token);
134  if(token.Type()==Token::Begin)
135  if(token.StringValue()=="Output") {
136  rTr.ReadBegin("Output");
137  rTr.ReadEnd("Output");
138  }
139 }
140 
141 
142 /*
143  **********************************************
144  **********************************************
145  **********************************************
146 
147  implementation: AttributeSimplenetInput
148 
149  **********************************************
150  **********************************************
151  **********************************************
152  */
153 
154 
155 // std faudes type
157 
158 
159 //DoWrite(rTw);
160 void AttributeSimplenetInput::DoWrite(TokenWriter& rTw, const std::string& rLabel, const Type* pContext) const {
161  (void) rLabel; (void) pContext;
162 }
163 
164 
165 //DoRead(rTr)
166 void AttributeSimplenetInput::DoRead(TokenReader& rTr, const std::string& rLabel, const Type* pContext) {
167  (void) rLabel; (void) pContext;
168  // report
169  FD_DHV("AttributeSimplenetInput(" << this << ")::DoRead(tr)");
170 
171  // sense and digest pre 2.16 format
172  Token token;
173  rTr.Peek(token);
174  if(token.Type()==Token::Begin)
175  if(token.StringValue()=="Input") {
176  rTr.ReadBegin("Input");
177  rTr.ReadEnd("Input");
178  }
179 }
180 
181 
182 
183 
184 /*
185  **********************************************
186  **********************************************
187  **********************************************
188 
189  implementation: AttributeSimplenetEvent
190 
191  **********************************************
192  **********************************************
193  **********************************************
194  */
195 
196 
197 
198 // std faudes type
200 
201 // Default constructor, set my prototypes
203  FD_DHV("AttributeSimplenetEvent::AttributeSimplenetEvent(" << this << ")");;
204  pOutputPrototype=OutputPrototypep();
205  pInputPrototype=InputPrototypep();
206 }
207 
208 // Copy constructor
211 {
212  FD_DHV("AttributeSimplenetEvent(" << this << "): form other attr " << &rOtherAttr);
215  DoAssign(rOtherAttr);
216 }
217 
218 
219 // pseudo statics
222  return attrp;
223 }
224 
225 // pseudo statics
228  return attrp;
229 }
230 
231 
232 /*
233  **********************************************
234  **********************************************
235  **********************************************
236 
237  implementation: nDevice
238 
239  **********************************************
240  **********************************************
241  **********************************************
242  */
243 
244 // autoregister
246 
247 // helper: send entire buffer
248 int syncSend(int dest, const char* data, int len, int flag) {
249  int from=0;
250  int left=len;
251  while(left>0) {
252  int rc=send(dest, data+from, left, 0);
253  if(rc<0) {
254  std::stringstream errstr;
255  errstr << "Simplenet fatal network error (cannot send message)";
256  throw Exception("nDevice::syncSend", errstr.str(), 553, true); // mute console out
257  }
258  left-=rc;
259  }
260  return len;
261 }
262 
263 
264 // constructor
266  FD_DHV("nDevice(" << this << ")::nDevice()");
269  // default token section
270  mDefaultLabel="SimplenetDevice";
271  // default network data
272  mName="SimplenetNode";
273  mNetwork="Simplenet";
274  mListenAddress.IpColonPort("localhost:40000");
275  mBroadcastAddress.IpColonPort("255.255.255.255:40000");
276  // clear/prepare state
277  faudes_mutex_init(&mMutex);
278  mListenSocket=-1;
279  mBroadcastSocket=-1;
280 }
281 
282 // destructor
284  FD_DHV("nDevice(" << this << ")::~nDevice()");
285  // stop
286  Stop();
287  // clean up my data
288  faudes_mutex_destroy(&mMutex);
289 }
290 
291 // clear all configuration
292 void nDevice::Clear(void) {
293  FD_DHV("nDevice(" << this << ")::Clear()");
294  // call base, incl stop
295  vDevice::Clear();
296  // clear compiled data
297  mInputSubscriptions.clear();
298 }
299 
300 
301 // programmatic config: server address
302 void nDevice::ServerAddress(const std::string& rAddr) {
303  if(mState!=Down) return;
305 }
306 
307 // programmatic config: broadcast address
308 void nDevice::BroadcastAddress(const std::string& rAddr) {
309  if(mState!=Down) return;
311 }
312 
313 
314 // programmatic config: network name
315 void nDevice::NetworkName(const std::string& rNetwork) {
316  if(mState!=Down) return;
317  mNetwork=rNetwork;
318 }
319 
320 // programmatic config: insert node name
321 void nDevice::InsNode(const std::string& rNodeName) {
322  if(mState!=Down) return;
323  mNetworkNodes[rNodeName]="unknown:0";
324 }
325 
326 // programmatic config: insert node address
327 void nDevice::InsNodeAddress(const std::string& rNodeName, const std::string& rNodeAddress) {
328  if(mState!=Down) return;
329  mNetworkNodes[rNodeName]=rNodeAddress;
330 }
331 
332 // programmatic config: clear known nodes
334  if(mState!=Down) return;
335  mNetworkNodes.clear();
336 }
337 
338 // programmatic config: insert input event
339 void nDevice::InsInputEvent(const std::string& event) {
340  if(mState!=Down) return;
342  inp.DefaultInput();
343  Idx ev=pConfiguration->Insert(event);
344  pConfiguration->Attribute(ev, inp);
345 }
346 
347 // programmatic config: insert output event
348 void nDevice::InsOutputEvent(const std::string& event) {
349  if(mState!=Down) return;
351  outp.DefaultOutput();
352  Idx ev=pConfiguration->Insert(event);
353  pConfiguration->Attribute(ev, outp);
354 }
355 
356 
357 //Compile(void)
358 void nDevice::Compile(void){
359  //setup up internal data structure
360  FD_DHV("nDevice(" << this << ")::Compile()");
361  // call base
363 }
364 
365 
366 //DoWritePreface(rTr,rLabel)
367 void nDevice::DoWritePreface(TokenWriter& rTw, const std::string& rLabel, const Type* pContext) const {
368  FD_DHV("nDevice::DoWrite()");
369  //call base
370  vDevice::DoWritePreface(rTw,rLabel,pContext);
371  // write my data: my server role ip address
372  Token vtag;
373  vtag.SetEmpty("ServerAddress");
375  rTw<<vtag;
376  // write my data: my server broadcast address
377  Token btag;
378  btag.SetEmpty("BroadcastAddress");
380  rTw<<btag;
381  // write my data network topology
382  Token ntag;
383  ntag.SetBegin("Network");
384  ntag.InsAttributeString("name",mNetwork);
385  rTw<<ntag;
386  std::map<std::string,std::string>::const_iterator nit;
387  for(nit=mNetworkNodes.begin();nit!=mNetworkNodes.end();nit++) {
388  vtag.SetEmpty("Node");
389  vtag.InsAttributeString("name",nit->first);
390  SimplenetAddress defaddress(nit->second);
391  if(defaddress.Valid())
392  vtag.InsAttributeString("address",nit->second);
393  rTw<<vtag;
394  }
395  rTw.WriteEnd("Network");
396 }
397 
398 //DoRead(rTr,rLabel)
399 void nDevice::DoReadPreface(TokenReader& rTr, const std::string& rLabel, const Type* pContext) {
400  FD_DHV("nDevice::DoReadPreface()");
401  // call base (reads name and timescale)
402  vDevice::DoReadPreface(rTr,rLabel,pContext);
403 
404  // sense and digest pre 2.16 format
405  Token token;
406  rTr.Peek(token);
407  if(token.IsString()) {
409  if(!mListenAddress.Valid()) {
410  std::stringstream errstr;
411  errstr << "Simplenet address expected at " << rTr.FileLine();
412  throw Exception("nDevice::DoRead", errstr.str(), 50);
413  }
414  mNetwork=rTr.ReadString();
415  mNetworkNodes.clear();
416  rTr.ReadBegin("Network");
417  while(!rTr.Eos("Network")) {
418  mNetworkNodes[rTr.ReadString()]="unknown:0";
419  }
420  rTr.ReadEnd("Network");
421  return;
422  }
423 
424  // read my data: server address
425  Token atag;
426  rTr.ReadBegin("ServerAddress",atag);
428  if(!mListenAddress.Valid()) {
429  std::stringstream errstr;
430  errstr << "Simplenet address expected at " << rTr.FileLine();
431  throw Exception("nDevice::DoRead", errstr.str(), 50);
432  }
433  rTr.ReadEnd("ServerAddress");
434  // read my data: broadcast address (optional)
435  mBroadcastAddress.IpColonPort("255.255.255.255:40000");
436  rTr.Peek(token);
437  if(token.IsBegin("BroadcastAddress")) {
438  rTr.ReadBegin("BroadcastAddress",atag);
440  if(!mBroadcastAddress.Valid()) {
441  std::stringstream errstr;
442  errstr << "Simplenet address expected at " << rTr.FileLine();
443  throw Exception("nDevice::DoRead", errstr.str(), 50);
444  }
445  rTr.ReadEnd("BroadcastAddress");
446  }
447  // read my data: network
448  Token ntag;
449  rTr.ReadBegin("Network",ntag);
450  mNetwork=ntag.AttributeStringValue("name");
451  // loop network nodes
452  while(!rTr.Eos("Network")) {
453  rTr.ReadBegin("Node",ntag);
454  if(!ntag.ExistsAttributeString("name")) {
455  std::stringstream errstr;
456  errstr << "Simplenet node name expected at " << rTr.FileLine();
457  throw Exception("nDevice::DoRead", errstr.str(), 50);
458  }
459  std::string node=ntag.AttributeStringValue("name");
460  InsNode(node);
461  // undocumented feature: explicit server addresses in dev file; tmoor 20121113
462  if(ntag.ExistsAttributeString("address")) {
463  SimplenetAddress defaddress;
464  defaddress.IpColonPort(ntag.AttributeStringValue("address"));
465  if(!defaddress.Valid()) {
466  std::stringstream errstr;
467  errstr << "Simplenet node address expected at " << rTr.FileLine();
468  throw Exception("nDevice::DoRead", errstr.str(), 50);
469  }
470  mNetworkNodes[node]=defaddress.IpColonPort();
471  }
472  rTr.ReadEnd("Node");
473  }
474  rTr.ReadEnd("Network");
475 }
476 
477 
478 // lock - unlock shortcuts;
479 #define LOCK_E {int rc = faudes_mutex_lock(&mMutex); \
480  if(rc) {FD_ERR("nDevice::LOCK_E: lock mutex error\n"); exit(1); }}
481 #define UNLOCK_E {int rc = faudes_mutex_unlock(&mMutex); \
482  if(rc) {FD_ERR("nDevice::LOCK_E: unlock mutex error\n"); exit(1); }}
483 #define TLOCK_E {int rc = faudes_mutex_lock(&ndevice->mMutex); \
484  if(rc) {FD_ERR("nDevice::TLOCK_E: lock mutex error\n"); exit(1); }}
485 #define TUNLOCK_E {int rc = faudes_mutex_unlock(&ndevice->mMutex); \
486  if(rc) {FD_ERR("nDevice::TLOCK_E: unlock mutex error\n"); exit(1); }}
487 
488 
489 // Write Output
490 void nDevice::WriteOutput(Idx output) {
491 
492  FD_DHV("nDevice::WriteOutput(" << mOutputs.SymbolicName(output) << ")");
493 
494  // bail out (do notify clients even when servers incomplete)
495  if(mState!=Up && mState!=StartUp) return;
496 
497  // test event
498  if(!mOutputs.Exists(output)) {
499  std::stringstream errstr;
500  errstr << "Unknown output event " << output;
501  throw Exception("nDevice::WriteOutput", errstr.str(), 65);
502  }
503 
504  // find properties
505  const AttributeSimplenetOutput* aattr = pConfiguration->Attribute(output).Outputp();
506  if(!aattr) {
507  std::stringstream errstr;
508  errstr << "Invalid output attribute " << output;
509  throw Exception("nDevice::WriteOutput", errstr.str(), 65);
510  }
511 
512  // report
513  std::string message= "<Notify> " + mOutputs.SymbolicName(output) + " </Notify>\n";
514  FD_DHV("nDevice::WriteOutput(): message: " << message.substr(0,message.length()-1));
515 
516  // send event to those clients that did subscribe
517  LOCK_E;
518  int clientsock=-1;
519  try {
520  std::map<int,ClientState>::iterator sit=mOutputClientStates.begin();
521  for(;sit!=mOutputClientStates.end();sit++) {
522  if(!sit->second.mEvents.Empty())
523  if(!sit->second.mEvents.Exists(output))
524  continue;
525  clientsock=sit->second.mClientSocket;
526  if(clientsock>0) {
527  FD_DHV("nDevice::WriteOutput(): to socket " << clientsock);
528  syncSend(clientsock, message.c_str(), message.length(), 0);
529  }
530  }
531  } catch (faudes::Exception&) {
532  FD_DH("nDevice::WriteOutput(): failed to notify client on socket " << clientsock);
533  }
534  UNLOCK_E;
535  FD_DHV("nDevice::WriteOutput(): done");
536 }
537 
538 
539 
540 // Start(void)
541 void nDevice::Start(void) {
542  if(mState!=Down) return;
543  FD_DH("nDevice(" << mName <<")::Start()");
544  // call base
545  vDevice::Start();
546  mState=StartUp;
547  // clear event server states
548  mInputServerStates.clear();
549  std::map<std::string,std::string>::iterator nit;
550  for(nit=mNetworkNodes.begin(); nit!=mNetworkNodes.end();nit++) {
551  if(nit->first == mName) continue;
552  mInputServerStates[nit->first].mAddress= SimplenetAddress(nit->second);
553  mInputServerStates[nit->first].mEvents= EventSet();
554  mInputServerStates[nit->first].mServerSocket=-1;
555  mInputServerStates[nit->first].mLineBuffer="";
556  }
557  // clear client states
558  mOutputClientStates.clear();
559  // set my effective address
560  char hostname[1024];
561  int hostname_len =1023;
562  if(gethostname(hostname,hostname_len)!=0) {
563  std::stringstream errstr;
564  errstr << "Simplenet fatal network error (cannot get hostname)";
565  throw Exception("nDevice::Start", errstr.str(), 553);
566  }
567  hostname[hostname_len]=0;
569  mEffectiveListenAddress.Ip(hostname);
570  FD_DH("nDevice::Start(): server adress " << mEffectiveListenAddress.IpColonPort());
571  // open a tcp port to listen: create socket
572  mListenSocket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
573  if(mListenSocket<=0) {
574  std::stringstream errstr;
575  errstr << "Simplenet fatal network error (cannot open server socket)";
576  throw Exception("nDevice::Start", errstr.str(), 553);
577  }
578  int reuse=1;
579  faudes_setsockopt(mListenSocket,SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
580  // open a tcp port to listen: set up address
581  struct sockaddr_in serveraddr;
582  memset(&serveraddr, 0, sizeof(serveraddr));
583  serveraddr.sin_family = AF_INET;
584  serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
585  serveraddr.sin_port = htons(mListenAddress.Port());
586  // open a tcp port to listen: bind socket to address
587  if(bind(mListenSocket, (struct sockaddr *) &serveraddr,sizeof(serveraddr)) <0) {
588  std::stringstream errstr;
589  errstr << "Simplenet fatal network error (cannot bind socket)";
590  throw Exception("nDevice::Start", errstr.str(), 553);
591  }
592  // open a tcp port to listen: start to listen
593  if(listen(mListenSocket, 77) < 0) { // todo: max pending connections
594  std::stringstream errstr;
595  errstr << "Simplenet fatal network error (cannot listen from socket)";
596  throw Exception("nDevice::Start", errstr.str(), 553);
597  }
598  // open a udp port to listen: create socket
599  mBroadcastSocket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
600  if(mBroadcastSocket<=0) {
601  std::stringstream errstr;
602  errstr << "Simplenet fatal network error (cannot open broadcast socket)";
603  throw Exception("nDevice::Start", errstr.str(), 553);
604  }
605  //int reuse=1;
606  faudes_setsockopt(mBroadcastSocket,SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
607  faudes_setsockopt(mBroadcastSocket,SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse));
608  // open a udp port: enable broadcast
609  // int reuse
610  if(faudes_setsockopt(mBroadcastSocket, SOL_SOCKET, SO_BROADCAST, &reuse, sizeof(reuse)) ) {
611  std::stringstream errstr;
612  errstr << "Simplenet fatal network error (cannot setopt broadcast socket)";
613  throw Exception("nDevice::Start", errstr.str(), 553);
614  }
615  // open a udp port to listen: set up address
616  struct sockaddr_in broadcastaddr;
617  memset(&broadcastaddr, 0, sizeof(broadcastaddr));
618  broadcastaddr.sin_family = AF_INET;
619  broadcastaddr.sin_addr.s_addr = htonl(INADDR_ANY);
620  broadcastaddr.sin_port = htons(mBroadcastAddress.Port());
621  // open a udp port to listen: bind socket to address
622  if(bind(mBroadcastSocket, (struct sockaddr *) &broadcastaddr,sizeof(broadcastaddr)) <0) {
623  std::stringstream errstr;
624  errstr << "Simplenet fatal network error (cannot bind broadcast socket)";
625  throw Exception("nDevice::Start", errstr.str(), 553);
626  }
627  // start background thread to listen: create & run thread
628  mStopListen=false;
629  int rc = faudes_thread_create(&mThreadListen, NDeviceListen, this);
630  // thread error
631  if(rc) {
632  std::stringstream errstr;
633  errstr << "Simplenet fatal thread error (cannot create thread)";
634  throw Exception("nDevice::Start", errstr.str(), 554);
635  }
636 }
637 
638 // Stop(void)
639 void nDevice::Stop(void) {
640  // bail out
641  if(mState!=Up && mState!=StartUp) return;
642  FD_DH("nDevice::Stop()");
643  LOCK_E;
644  // stop background threads
645  mStopListen=true;
646  UNLOCK_E;
647  // signal update to my listen thread: via udp message ... release select
648  std::string message= "<Stop> " + mNetwork + " " + mName + " </Stop>\n";
649  struct sockaddr_in broadcastaddr;
650  memset(&broadcastaddr, '\0', sizeof(broadcastaddr));
651  broadcastaddr.sin_family=AF_INET;
652  broadcastaddr.sin_port=htons(mBroadcastAddress.Port());
653  broadcastaddr.sin_addr.s_addr=inet_addr(mBroadcastAddress.Ip().c_str());
654  LOCK_E;
655  sendto(mBroadcastSocket,message.c_str(),message.length(),0,
656  (struct sockaddr *) & broadcastaddr, sizeof(broadcastaddr));
657  UNLOCK_E;
658  // wait until listen thread finished
659  FD_DH("nDevice::Stop(): waiting for listen thread");
660  faudes_thread_join(mThreadListen, NULL);
661  FD_DH("nDevice::Stop(): listen thread finished");
662  // close broadcast socket
663  shutdown(mBroadcastSocket,2);
664  faudes_closesocket(mBroadcastSocket);
665  mBroadcastSocket=-1;
666  // close server socket
667  shutdown(mListenSocket,2);
668  faudes_closesocket(mListenSocket);
669  mListenSocket=-1;
670  // call base (implies reset)
671  vDevice::Stop();
672 }
673 
674 
675 
676 // background thread,
677 // - receiving requests on broadcast port
678 // - sending requests broadcats
679 // - accept connections on listen port for output clients
680 // - notify connected output clients about events
681 // - connecting to input servers to receiving notifications
682 void* NDeviceListen(void* arg){
683  bool term;
684  std::map<std::string,nDevice::ServerState>::iterator sit;
685  std::map<int,nDevice::ClientState>::iterator cit;
686  // cast this object
687  nDevice* ndevice= static_cast<nDevice*>(arg);
688  // say hello
689  FD_DH("nDevice::Listen(" << ndevice << ")");
690  // clear broadcast time stamp
691  faudes_systime_t lastbroadcast;
692  lastbroadcast.tv_sec=0;
693  lastbroadcast.tv_nsec=0;
694 #ifdef FAUDES_DEBUG_IODEVICE
695  // clear debugging time stamp
696  int debuglisten=0;
697 #endif
698 
699  // infinite loop
700  while(true){
701 
702  // detect missing servers
703  int servermis=0;
704  int serverunknown=0;
705  for(sit=ndevice->mInputServerStates.begin(); sit!=ndevice->mInputServerStates.end(); sit++) {
706  // have no address?
707  if(!sit->second.mAddress.Valid()) {
708  FD_DH("nDevice::Listen(): missing server address for node: " << sit->first);
709  serverunknown++;
710  servermis++;
711  continue;
712  }
713  // have no connection
714  if(sit->second.mServerSocket<=0) {
715  FD_DH("nDevice::Listen(): missing server connection for node: " << sit->first);
716  servermis++;
717  continue;
718  }
719  }
720 
721  // detect missing clients (extension 2.22i, trust by number, should ask nodename on subscription)
722  int clientmis= ndevice->mNetworkNodes.size()-1;
723  for(cit=ndevice->mOutputClientStates.begin(); cit!=ndevice->mOutputClientStates.end(); cit++) {
724  if(cit->second.mClientSocket<0) continue;
725  if(cit->second.mConnected) clientmis--;
726  }
727 #ifdef FAUDES_DEBUG_IODEVICE
728  if(clientmis!=servermis)
729  FD_DH("nDevice::Listen(): missing clients to subscribe: #"<< clientmis);
730 #endif
731 
732  // update state
733  if((servermis>0 || clientmis>0) && ndevice->mState==vDevice::Up) {
734  TLOCK_E;
735  ndevice->mState=vDevice::StartUp;
736  TUNLOCK_E;
737  }
738  if(servermis==0 && clientmis==0 && ndevice->mState==vDevice::StartUp) {
739  TLOCK_E;
740  ndevice->mState=vDevice::Up;
741  TUNLOCK_E;
742  }
743 
744 
745  // try to find input servers
746  if(serverunknown>0 && ndevice->mState==vDevice::StartUp) {
747  // is a broadcast due? (period 5sec)
748  faudes_systime_t now;
749  faudes_gettimeofday(&now);
750  faudes_mstime_t diffms;
751  faudes_diffsystime(now,lastbroadcast,&diffms);
752  if(diffms>5000) {
753  // udp message
754  std::string message= "<Request> "
755  + ndevice->mNetwork + " " + ndevice->mName + " </Request>\n";
756  // udp broadcast
757  struct sockaddr_in broadcastaddr;
758  memset(&broadcastaddr, '\0', sizeof(broadcastaddr));
759  broadcastaddr.sin_family=AF_INET;
760  broadcastaddr.sin_port=htons(ndevice->mBroadcastAddress.Port());
761  broadcastaddr.sin_addr.s_addr=inet_addr(ndevice->mBroadcastAddress.Ip().c_str());
762  TLOCK_E;
763  int rc=sendto(ndevice->mBroadcastSocket,message.c_str(),message.length(),
764  0,(struct sockaddr *) & broadcastaddr, sizeof(broadcastaddr));
765  (void) rc;
766  FD_DH("nDevice::Listen(): broadcast request: " << message.substr(0,message.length()-1) << " #" << rc);
767  TUNLOCK_E;
768  faudes_gettimeofday(&lastbroadcast);
769  }
770  }
771 
772  // subscribe to missing servers
773  for(sit=ndevice->mInputServerStates.begin(); sit!=ndevice->mInputServerStates.end(); sit++) {
774  // have active connection?
775  if(sit->second.mServerSocket>0) continue;
776  // have no address?
777  if(!sit->second.mAddress.Valid()) continue;
778  // try to connect
779  FD_DH("nDevice::Listen(): subscribing to " << sit->first <<
780  " at " << sit->second.mAddress.IpColonPort());
781  // open a tcp port: create socket
782  int serversock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
783  if(serversock<=0) {
784  FD_DH("nDevice::Listen(): subscription failed: no socket");
785  continue;
786  }
787  // open a tcp port: set up internet address
788  unsigned long int serverinaddr = INADDR_NONE;
789  if(serverinaddr==INADDR_NONE) {
790  FD_DH("nDevice::Listen(): using address as advertised");
791  serverinaddr = inet_addr(sit->second.mAddress.Ip().c_str());
792  }
793  if(serverinaddr==INADDR_NONE) {
794  struct hostent *host;
795  host = gethostbyname(sit->second.mAddress.Ip().c_str());
796  if(host!=0) {
797  FD_DH("nDevice::Listen(): using address by name lookup");
798  serverinaddr = *(unsigned long int*) host->h_addr;
799  }
800  }
801  if(serverinaddr==INADDR_NONE) {
802  FD_DH("nDevice::Listen(): subscription failed: invalid address " << sit->second.mAddress.Ip());
803  faudes_closesocket(serversock);
804  continue;
805  }
806  // open a tcp port: set up socket address
807  struct sockaddr_in serveraddress;
808  memset(&serveraddress, 0, sizeof(serveraddress));
809  serveraddress.sin_family = AF_INET;
810  serveraddress.sin_addr.s_addr=serverinaddr;
811  serveraddress.sin_port = htons(sit->second.mAddress.Port());
812  // open a tcp port: connect
813  if(connect(serversock, (struct sockaddr*) &serveraddress, sizeof(serveraddress))<0) {
814  FD_DH("nDevice::Listen(): subscription failed: connect");
815  faudes_closesocket(serversock);
816  continue;
817  }
818  // say hello to remote input server
819  try {
820  std::string hello;
821  hello="% Simplenet universal event subscription: "+ndevice->mName+" subscribing from "+sit->first+"\n";
822  syncSend(serversock, hello.c_str(), hello.length(), 0);
823  hello="% Expecting notifications in format '<Notify> event_name </Notify>'\n";
824  syncSend(serversock, hello.c_str(), hello.length(), 0);
825  hello="% Trying to subscribe to all required events\n";
826  syncSend(serversock, hello.c_str(), hello.length(), 0);
827  } catch (faudes::Exception&) {
828  faudes_closesocket(serversock);
829  serversock=-1;
830  }
831  if(serversock<0) {
832  FD_DH("nDevice::Listen(): subscription failed: cannot write");
833  faudes_closesocket(serversock);
834  continue;
835  }
836  // record success
837  FD_DH("nDevice::Listen(): subscribing to " << sit->first << " via socket " << serversock);
838  sit->second.mServerSocket=serversock;
839  // subscribe to all input events
840  EventSet sevents=ndevice->Inputs();
841  sevents.Name("Subscribe");
842  std::string message=sevents.ToString() + "\n";
843  syncSend(serversock,message.c_str(), message.length(),0);
844  // used to get info in pre 2.22h
845  /*
846  hello="% Going to Sending info command, explicit subscription may follow\n";
847  hello="<Cmd> Info </Cmd>\n";
848  syncSend(serversock, hello.c_str(), hello.length(), 0);
849  */
850  FD_DH("nDevice::Listen(): subscribing to " << sit->first << " via socket " << serversock << ": ok");
851  }
852 
853 
854  // prepare relevant wait on sources ...
855  fd_set mysocks;
856  int mysocks_max=0;
857  FD_ZERO(&mysocks);
858  // ... my server listen socket, expecting other nodes to connect and subscribe
859  if(mysocks_max<ndevice->mListenSocket) mysocks_max=ndevice->mListenSocket;
860  if(mysocks_max>= FD_SETSIZE) FD_ERR("NDeviceListen: fail to select socket " << mysocks_max);
861  FD_SET(ndevice->mListenSocket, &mysocks);
862  // ... udp port, expecting requests and adverts
863  if(mysocks_max< ndevice->mBroadcastSocket) mysocks_max=ndevice->mBroadcastSocket;
864  if(mysocks_max>= FD_SETSIZE) FD_ERR("NDeviceListen: fail to select socket " << mysocks_max);
865  FD_SET(ndevice->mBroadcastSocket, &mysocks);
866  // ... input server connections, expecting notifications
867  for(sit=ndevice->mInputServerStates.begin(); sit!=ndevice->mInputServerStates.end(); sit++) {
868  int serversock=sit->second.mServerSocket;
869  if(serversock<0) continue;
870  if(mysocks_max< serversock) mysocks_max=serversock;
871  if(mysocks_max>= FD_SETSIZE) FD_ERR("NDeviceListen: fail to select socket " << mysocks_max);
872  FD_SET(serversock, &mysocks);
873  }
874  // ... output client connections, expecting commands
875  for(cit=ndevice->mOutputClientStates.begin(); cit!=ndevice->mOutputClientStates.end(); cit++) {
876  int clientsock=cit->second.mClientSocket;
877  if(clientsock<0) continue;
878  if(mysocks_max< clientsock) mysocks_max=clientsock;
879  if(mysocks_max>= FD_SETSIZE) FD_ERR("NDeviceListen: fail to select socket " << mysocks_max);
880  FD_SET(clientsock, &mysocks);
881  }
882 
883  // wait for traffic with moderate timeout
884  struct timeval tv;
885  tv.tv_sec = 1;
886  tv.tv_usec = 0;
887  int avail=select(mysocks_max+1, &mysocks, NULL, NULL, &tv);
888 
889  // terminate thread on request (before accepting incomming connections)
890  TLOCK_E;
891  term= ndevice->mStopListen;
892  TUNLOCK_E;
893  if(term) break;
894 
895  // reduce debugging output
896  #ifdef FAUDES_DEBUG_IODEVICE
897  debuglisten++;
898  if((debuglisten>10) || (avail>0)) {
899  FD_DH("nDevice::Listen(): listen as node \"" << ndevice->mName << "\" on network \"" << ndevice->mNetwork << "\"" << " #" << avail);
900  debuglisten=0;
901  }
902  #endif
903 
904  // handle incomming connection requests
905  if(avail>0)
906  if(FD_ISSET(ndevice->mListenSocket,&mysocks)) {
907  avail--;
908  int clientsock=-1;
909  struct sockaddr_in clientaddr;
910  socklen_t clientaddr_len = sizeof(clientaddr);
911  clientsock=accept(ndevice->mListenSocket, (struct sockaddr *) &clientaddr, &clientaddr_len );
912  if(clientsock<0) {
913  FD_DH("nDevice::Listen(): failed to accept incomming connection");
914  break;
915  }
916  FD_DH("nDevice::Listen(): accepted connection from client " << inet_ntoa(clientaddr.sin_addr) <<
917  " on socket " << clientsock);
918  // say hello
919  try {
920  std::string hello;
921  hello="% Simplenet Event Server: "+ndevice->mName+" providing events\n";
922  syncSend(clientsock, hello.c_str(), hello.length(), 0);
923  hello="% Notifications will have format '<Notify> event_name </Notify>'\n";
924  syncSend(clientsock, hello.c_str(), hello.length(), 0);
925  hello="% Commands are accepted in format '<Cmd> cmd_name </Cmd>'\n";
926  syncSend(clientsock, hello.c_str(), hello.length(), 0);
927  hello="% Supported commands are Subscribe, Info, Status, and ResetRequest\n";
928  syncSend(clientsock, hello.c_str(), hello.length(), 0);
929  } catch (faudes::Exception&) {
930  faudes_closesocket(clientsock);
931  clientsock=-1;
932  }
933  if(clientsock<0) {
934  FD_DH("nDevice::Listen(): connection test failed: cannot write");
935  faudes_closesocket(clientsock);
936  break;
937  }
938  // record client by socket
939  TLOCK_E;
940  nDevice::ClientState* cstate = &ndevice->mOutputClientStates[clientsock];
941  cstate->mClientSocket=clientsock;
942  cstate->mEvents.Clear();
943  cstate->mConnected=false;
944  cstate->mLineBuffer="";
945  TUNLOCK_E;
946  }
947 
948  // handle incomming broadcast
949  if(avail>0)
950  if(FD_ISSET(ndevice->mBroadcastSocket,&mysocks)) {
951  avail--;
952  // get message
953  char data[1024];
954  int data_len=1023;
955  struct sockaddr_in fromaddr;
956  socklen_t fromaddr_len = sizeof(fromaddr);
957  data_len=recvfrom(ndevice->mBroadcastSocket,data,data_len,0, (struct sockaddr*) &fromaddr,&fromaddr_len);
958  if(data_len<0) data_len=0; // todo: eof
959  data[data_len]=0;
960  if(data_len>=1) if(data[data_len-1]=='\n') data[data_len-1]=0;
961  FD_DH("nDevice::Listen(): received udp datagram " << data <<
962  " from " << inet_ntoa(fromaddr.sin_addr));
963  // interpret message
964  TokenReader tr(TokenReader::String,std::string(data));
965  try {
966  Token token;
967  tr.Peek(token);
968  // interpret udp request ...
969  if(token.IsBegin("Request")) {
970  tr.ReadBegin("Request");
971  if(tr.ReadString()==ndevice->mNetwork) {
972  // extension 2.22i: identify sender (optional for compatibility)
973  std::string snode;
974  Token stoken;
975  tr.Peek(stoken);
976  if(stoken.IsString()) snode=stoken.StringValue();
977  // extension 2.22i: if this is a missing server, reset my request timer
978  sit=ndevice->mInputServerStates.find(snode);
979  if(sit!=ndevice->mInputServerStates.end()) {
980  if(sit->second.mServerSocket==-1) {
981  lastbroadcast.tv_sec=0;
982  lastbroadcast.tv_nsec=0;
983  }
984  }
985  // extension 2.22i: ignore my own requests
986  if(snode!=ndevice->mName) {
987  // set up advert
988  std::string message= "<Advert> "
989  + ndevice->mNetwork + " "
990  + ndevice->mName + " " +
991  ndevice->mEffectiveListenAddress.IpColonPort()+ " </Advert>\n";
992  // udp reply
993  struct sockaddr_in replyaddr;
994  memset(&replyaddr, '\0', sizeof(replyaddr));
995  replyaddr.sin_family=AF_INET;
996  replyaddr.sin_port=htons(ndevice->mBroadcastAddress.Port());
997  //replyaddr.sin_addr.s_addr=fromaddr.sin_addr.s_addr;
998  replyaddr.sin_addr.s_addr=inet_addr(ndevice->mBroadcastAddress.Ip().c_str());
999  //replyaddr.sin_addr.s_addr=htonl(INADDR_BROADCAST);
1000  TLOCK_E;
1001  int rc = sendto(ndevice->mBroadcastSocket,message.c_str(),message.length(),0,(struct sockaddr *) & replyaddr, sizeof(replyaddr));
1002  TUNLOCK_E
1003  FD_DH("nDevice::Listen(): reply advert: " << message.substr(0,message.length()-1) << " #" << rc);
1004  } else {
1005  FD_DH("nDevice::Listen(): ingoring request from myself");
1006  }
1007  } else {
1008  FD_DH("nDevice::Listen(): ingoring request from other network");
1009  }
1010  }
1011  // interpret udp advert ...
1012  if(token.IsBegin("Advert")) {
1013  tr.ReadBegin("Advert");
1014  if(tr.ReadString()==ndevice->mNetwork) {
1015  std::string node = tr.ReadString();
1016  std::string saddr = tr.ReadString();
1017  SimplenetAddress addr(saddr);
1018  addr.Ip(inet_ntoa(fromaddr.sin_addr)); // overwrite with actual ip
1019  FD_DHV("nDevice::Listen(): figure actual ip address " << addr.Ip());
1020  if(!addr.Valid()) {
1021  addr=saddr;
1022  FD_DH("nDevice::Listen(): fallback to explicit ip address " << addr.Ip());
1023  }
1024  std::map<std::string,nDevice::ServerState>::iterator sit;
1025  sit=ndevice->mInputServerStates.find(node);
1026  if(sit==ndevice->mInputServerStates.end()) {
1027  FD_DH("nDevice::Listen(): ignoring irrelevant advert from " << node);
1028  } else if(sit->second.mAddress.Valid()) {
1029  FD_DH("nDevice::Listen(): ignoring address overwrite (hardwired?) " << node);
1030  }
1031  if(sit!=ndevice->mInputServerStates.end())
1032  if(!sit->second.mAddress.Valid()) {
1033  FD_DH("nDevice::Listen(): accept advert " << node);
1034  sit->second.mAddress=addr;
1035  if(sit->second.mServerSocket>=0) faudes_closesocket(sit->second.mServerSocket);
1036  sit->second.mServerSocket=-1;
1037  }
1038  } else {
1039  FD_DH("nDevice::Listen(): ingoring advert from other network");
1040  }
1041  }
1042  } catch (faudes::Exception&) {
1043  FD_DH("nDevice::Listen(): ignore invalid udp message");
1044  }
1045  }
1046 
1047 
1048  // handle input servers: receive event notification
1049  int revcount=0;
1050  if(avail>0)
1051  for(sit=ndevice->mInputServerStates.begin(); sit!=ndevice->mInputServerStates.end(); sit++) {
1052  int serversock=sit->second.mServerSocket;
1053  if(serversock<0) continue;
1054  if(FD_ISSET(serversock, &mysocks)) {
1055  avail--;
1056  FD_DH("nDevice::Listen(): reading sock " << serversock);
1057  // buffer data in line buffer
1058  char buffer[1025];
1059  int count = recv(serversock, buffer, 1024, 0);
1060  if(count<=0) { // todo: test eof
1061  FD_DH("nDevice::Listen(): reading server sock " << serversock << " : eof");
1062  faudes_closesocket(serversock);
1063  sit->second.mServerSocket=-1;
1064  continue;
1065  }
1066  FD_DH("nDevice::Listen(): reading server sock " << serversock << ": #" << count);
1067  buffer[count]=0;
1068  sit->second.mLineBuffer +=std::string(buffer);
1069  // interpret line(s)
1070  if(count>0)
1071  if(buffer[count-1]=='\n')
1072  if(sit->second.mLineBuffer.length()>0)
1073  {
1074  const std::string& linebuffer = sit->second.mLineBuffer;
1075 #ifdef FAUDES_DEBUG_IODEVICE
1076  if(linebuffer.length()>0)
1077  if(linebuffer[0]!='%')
1078  FD_DH("nDevice::Listen(): reading server sock " << serversock << ": line: " << linebuffer);
1079 #endif
1080  // tokenise notification
1081  TokenReader tr(TokenReader::String,linebuffer);
1082  try {
1083  Token token;
1084  while(tr.Peek(token)) {
1085  // its an event notify
1086  if(token.Type()==Token::Begin && token.StringValue()=="Notify") {
1087  tr.ReadBegin("Notify");
1088  std::string event = tr.ReadString();
1089  tr.ReadEnd("Notify");
1090  faudes_mutex_lock(ndevice->pBufferMutex);
1091  FD_DH("nDevice::Listen(): found event " << event);
1092  Idx sev=ndevice->mInputs.Index(event);
1093  if(ndevice->mInputs.Exists(sev)) ndevice->pInputBuffer->push_back(sev);
1094  faudes_mutex_unlock(ndevice->pBufferMutex);
1095  revcount++;
1096  continue;
1097  }
1098  // its an info reply (ignored as of 2.22i)
1099  if(token.Type()==Token::Begin && token.StringValue()=="SimplenetDevice") {
1100  FD_DH("nDevice::Listen(): found device info");
1101  nDevice remote;
1102  remote.Read(tr);
1103  FD_DH("nDevice::Listen(): found device with outputs " << remote.Outputs().ToString());
1104  // used to subscribe on relevant events in pre 2.22h
1105  /*
1106  EventSet sevents=ndevice->Inputs();
1107  sevents.SetIntersection(remote.Outputs());
1108  sevents.Name("Subscribe");
1109  std::string message=sevents.ToString();
1110  FD_DH("nDevice::Listen(): subscribing events " << message);
1111  message += "\n";
1112  syncSend(serversock,message.c_str(), message.length(),0);
1113  */
1114  continue;
1115  }
1116  // skip other sections
1117  if(token.Type()==Token::Begin) {
1118  FD_DH("nDevice::Listen(): ignore section " << token.StringValue());
1119  std::string section=token.StringValue();
1120  tr.ReadBegin(section);
1121  while(!tr.Eos(section)) tr.Get(token);
1122  tr.ReadEnd(section);
1123  continue;
1124  }
1125  // ignore token
1126  FD_DH("nDevice::Listen(): error: ignore token");
1127  tr.Get(token);
1128  }
1129  } catch (faudes::Exception&) {
1130  FD_DH("nDevice::Listen(): " << serversock << ": invalid notification");
1131  }
1132  sit->second.mLineBuffer.clear();
1133  }
1134  }
1135  }
1136 
1137  // handle output clients: reply to commands
1138  if(avail>0)
1139  for(cit=ndevice->mOutputClientStates.begin(); cit!=ndevice->mOutputClientStates.end(); cit++) {
1140  int clientsock=cit->second.mClientSocket;
1141  if(clientsock<0) continue;
1142  if(FD_ISSET(clientsock, &mysocks)) {
1143  avail--;
1144  FD_DH("nDevice::Listen(): reading client sock " << clientsock);
1145  // buffer data in line buffer
1146  char buffer[1025];
1147  int count = recv(clientsock, buffer, 1024, 0);
1148  if(count<=0) { // todo: test eof
1149  FD_DH("nDevice::Listen(): reading client sock " << clientsock << " : eof");
1150  TLOCK_E;
1151  faudes_closesocket(clientsock);
1152  cit->second.mClientSocket=-1;
1153  cit->second.mConnected=false;
1154  TUNLOCK_E;
1155  continue;
1156  }
1157  FD_DH("nDevice::Listen(): reading client sock " << clientsock << ": #" << count);
1158  buffer[count]=0;
1159  cit->second.mLineBuffer +=std::string(buffer);
1160  // interpret line(s)
1161  if(count>0)
1162  if(buffer[count-1]=='\n')
1163  if(cit->second.mLineBuffer.length()>0)
1164  {
1165  const std::string& linebuffer = cit->second.mLineBuffer;
1166 #ifdef FAUDES_DEBUG_IODEVICE
1167  if(linebuffer.length()>0)
1168  if(linebuffer[0]!='%')
1169  FD_DH("nDevice::Listen(): reading client sock " << clientsock << ": line: " << linebuffer);
1170 #endif
1171  // tokenise command
1172  TokenReader tr(TokenReader::String,linebuffer);
1173  try {
1174  Token token;
1175  while(tr.Peek(token)) {
1176  // its a command
1177  if(token.IsBegin("Cmd")) {
1178  tr.ReadBegin("Cmd");
1179  std::string cmd = tr.ReadString();
1180  tr.ReadEnd("Cmd");
1181  std::string response="<NAck> </NAck>\n";
1182  FD_DH("nDevice::Reply(" << clientsock << "): received cmd " << cmd);
1183  // command: info
1184  if(cmd=="Info") {
1185  TLOCK_E;
1186  response=ndevice->ToString() + "\n";
1187  TUNLOCK_E;
1188  }
1189  // command: status
1190  if(cmd=="Status") {
1191  TLOCK_E;
1192  if(ndevice->mState==vDevice::Up) response="<Ack> Up </Ack>\n";
1193  if(ndevice->mState==vDevice::StartUp) response="<Ack> StartUp </Ack>\n";
1194  if(ndevice->mState==vDevice::ShutDown) response="<Ack> ShutDown </Ack>\n";
1195  TUNLOCK_E;
1196  }
1197  // its a reset request
1198  if(cmd=="ResetRequest") {
1199  FD_DH("nDevice::Reply(" << clientsock << "): reset request");
1200  faudes_mutex_lock(ndevice->pBufferMutex);
1201  if(!ndevice->mResetRequest) revcount++;
1202  ndevice->mResetRequest=true;
1203  faudes_mutex_unlock(ndevice->pBufferMutex);
1204  response="";
1205  }
1206  // send reply
1207  syncSend(clientsock, response.c_str(), response.length(), 0);
1208  }
1209  // its a event subscription
1210  if(token.IsBegin("Subscribe")) {
1211  EventSet sevents;
1212  sevents.Read(tr,"Subscribe");
1213  sevents.RestrictSet(ndevice->Outputs());
1214  sevents.Name("Subscribed");
1215  FD_DH("nDevice::Reply(" << clientsock << "): providing events " << sevents.ToString());
1216  TLOCK_E;
1217  cit->second.mEvents.Clear();
1218  cit->second.mEvents.InsertSet(sevents);
1219  cit->second.mConnected=true;
1220  std::string response=sevents.ToString()+"\n";
1221  TUNLOCK_E;
1222  // send reply
1223  syncSend(clientsock, response.c_str(), response.length(), 0);
1224  }
1225  }
1226  } catch (faudes::Exception&) {
1227  FD_DH("nDevice::Reply(" << clientsock << "): invalid cmd");
1228  }
1229  cit->second.mLineBuffer.clear();
1230  }
1231  }
1232  }
1233 
1234  // signal condition for received events / reset requests
1235  if(revcount>0) {
1236  FD_DH("nDevice::Listen(): broadcast condition");
1237  faudes_mutex_lock(ndevice->pWaitMutex);
1238  faudes_cond_broadcast(ndevice->pWaitCondition);
1239  faudes_mutex_unlock(ndevice->pWaitMutex);
1240  revcount=0;
1241  }
1242 
1243  // should remove unconnected clients ?
1244 
1245 
1246  // some error
1247  if(avail<0) {
1248  FD_DH("nDevice::Listen(): select error");
1249  }
1250 
1251  }
1252 
1253  // close clientsockets
1254  FD_DH("nDevice::Listen(): close client sockets");
1255  TLOCK_E;
1256  for(cit=ndevice->mOutputClientStates.begin(); cit!=ndevice->mOutputClientStates.end(); cit++) {
1257  int clientsock= cit->second.mClientSocket;
1258  if(clientsock>0) faudes_closesocket(clientsock);
1259  cit->second.mClientSocket=-1;
1260  cit->second.mConnected=false;
1261  }
1262  ndevice->mOutputClientStates.clear();
1263  TUNLOCK_E;
1264  // close serversockets
1265  FD_DH("nDevice::Listen(): close server sockets");
1266  for(sit=ndevice->mInputServerStates.begin(); sit!=ndevice->mInputServerStates.end(); sit++) {
1267  int serversock=sit->second.mServerSocket;
1268  if(serversock>0) faudes_closesocket(serversock);
1269  sit->second.mServerSocket=-1;
1270  }
1271  FD_DH("nDevice::Listen(): terminating listen thread");
1272  faudes_thread_exit(NULL);
1273  return NULL;
1274 }
1275 
1276 
1277 
1278 
1279 // reset dynamic faudes state (buffered events, current time)
1280 void nDevice::Reset(void) {
1281  // call base for time and input buffer
1282  vDevice::Reset();
1283  // bail out (do notify clients even when servers incomplete)
1284  if(mState!=Up && mState!=StartUp) return;
1285  // have message
1286  std::string message= "<Cmd> ResetRequest </Cmd>\n";
1287  // send cmd to all my servers
1288  LOCK_E;
1289  std::map<std::string,ServerState>::iterator sit=mInputServerStates.begin();
1290  for(; sit!=mInputServerStates.end(); sit++) {
1291  int serversock=sit->second.mServerSocket;
1292  if(serversock<0) continue;
1293  FD_DH("nDevice::Reset(): sending reset request to socket " << serversock);
1294  syncSend(serversock, message.c_str(), message.length(), 0);
1295  }
1296  UNLOCK_E;
1297 }
1298 
1299 
1300 #endif // configured simplenet
1301 
1302 
1303 } // name space
1304 
1305 
1306 
#define FD_ERR(message)
Debug: report more errors with file/line info.
void faudes_diffsystime(const faudes_systime_t &end, const faudes_systime_t &begin, faudes_systime_t *res)
#define FAUDES_TYPE_IMPLEMENTATION(ftype, ctype, cbase)
faudes type implementation macros, overall
Definition: cfl_types.h:946
Attribute for the configuration of a input or output mapping.
Definition: iop_vdevice.h:68
const AttributeVoid * pOutputPrototype
Output Prototype (set to nontrivial attribute in derived classes)
Definition: iop_vdevice.h:148
void DefaultOutput(void)
Set to default output attribute.
Definition: iop_vdevice.h:97
const AttributeVoid * pInputPrototype
Input Prototype (set to nontrivial attribute in derived classes)
Definition: iop_vdevice.h:151
void DefaultInput(void)
Set to default input attribute.
Definition: iop_vdevice.h:103
Configuration of a networked input or output.
void DoAssign(const AttributeSimplenetEvent &rSrc)
DoAssign.
AttributeSimplenetEvent(void)
Default constructor (no mapping at all)
static const AttributeSimplenetInput * InputPrototypep(void)
Prototype, input (construct on first use static)
static const AttributeSimplenetOutput * OutputPrototypep(void)
Prototype, output (construct on first use static)
Configuration of a network input mapping.
virtual void DoRead(TokenReader &rTr, const std::string &rLabel="", const Type *pContext=0)
Reads the attribute from TokenReader, see AttributeVoid for public wrappers.
Configuration of a network output mapping.
Definition: iop_simplenet.h:96
virtual void DoRead(TokenReader &rTr, const std::string &rLabel="", const Type *pContext=0)
Reads the attribute from TokenReader, see AttributeVoid for public wrappers.
Minimal Attribute.
Auto register faudes-type with specified type name.
Definition: cfl_registry.h:460
Faudes exception class.
Set of indices with symbolic names.
Definition: cfl_nameset.h:69
bool Exists(const Idx &rIndex) const
Test existence of index.
void SymbolicName(Idx index, const std::string &rName)
Set new name for existing index.
Idx Index(const std::string &rName) const
Index lookup.
void RestrictSet(const NameSet &rOtherSet)
Restrict to elements specified by rOtherSet.
Simplenet node address.
Definition: iop_simplenet.h:31
int Port(void) const
Get TCP port.
Definition: iop_simplenet.h:51
bool operator<(const SimplenetAddress &rOther) const
Order for sorting containers of addresses.
std::string IpColonPort(void) const
Get as colon seperated string.
bool Valid(void) const
Return true if valid.
SimplenetAddress(void)
Default constructor.
std::string mIp
Ip address.
Definition: iop_simplenet.h:71
std::string Ip(void) const
Get IP address.
Definition: iop_simplenet.h:48
Set of indices with symbolic names and attributes.
Definition: cfl_nameset.h:566
A TokenReader reads sequential tokens from a file or string.
std::string FileLine(void) const
Return "filename:line".
bool Eos(const std::string &rLabel)
Peek a token and check whether it ends the specified section.
void ReadEnd(const std::string &rLabel)
Close the current section by matching the previous ReadBegin().
std::string ReadString(void)
Read string token.
void ReadBegin(const std::string &rLabel)
Open a section by specified label.
bool Get(Token &token)
Get next token.
bool Peek(Token &token)
Peek next token.
A TokenWriter writes sequential tokens to a file, a string or stdout.
void WriteEnd(const std::string &rLabel)
Write end label.
Tokens model atomic data for stream IO.
Definition: cfl_token.h:54
const std::string & StringValue(void) const
Get string value of a name token.
Definition: cfl_token.cpp:178
@ Begin
<label> (begin of section)
Definition: cfl_token.h:84
bool IsString(void) const
Test token Type.
Definition: cfl_token.cpp:244
bool ExistsAttributeString(const std::string &name)
Test attibute existence.
Definition: cfl_token.cpp:356
bool IsBegin(void) const
Test token Type.
Definition: cfl_token.cpp:259
void SetEmpty(const std::string &rName)
Initialize as empty-tag token.
Definition: cfl_token.cpp:106
void SetBegin(const std::string &rName)
Initialize as Begin token.
Definition: cfl_token.cpp:92
void InsAttributeString(const std::string &name, const std::string &value)
Insert named attribute with string value.
Definition: cfl_token.cpp:310
const std::string & AttributeStringValue(const std::string &name)
Access attribute value.
Definition: cfl_token.cpp:386
TokenType Type(void) const
Get token Type.
Definition: cfl_token.cpp:199
Base class of all libFAUDES objects that participate in the run-time interface.
Definition: cfl_types.h:239
void Read(const std::string &rFileName, const std::string &rLabel="", const Type *pContext=0)
Read configuration data from file with label specified.
Definition: cfl_types.cpp:261
std::string ToString(const std::string &rLabel="", const Type *pContext=0) const
Write configuration data to a string.
Definition: cfl_types.cpp:169
An nDevice implements networked IO via a simple TCP/IP protocol.
void BroadcastAddress(const std::string &rAddr)
Set broadcast address for address resolution Note: you can only set the broadcast address while the d...
void InsNode(const std::string &rNodeName)
Add a node to the network configuration.
void ServerAddress(const std::string &rAddr)
Set server address of this node.
virtual void Compile(void)
Set up internal data structures.
virtual void DoReadPreface(TokenReader &rTr, const std::string &rLabel="", const Type *pContext=0)
Actual method to read device configuration from tokenreader.
virtual void Clear(void)
Clear all configuration.
void InsNodeAddress(const std::string &rNode, const std::string &rAddress)
Add entry to node name resolution.
SimplenetAddress mListenAddress
Simplenet: address of my server incl port (localhost:40000)
TaNameSet< AttributeSimplenetEvent > * pConfiguration
Overall configuration (with actual type)
std::string mNetwork
Simplenet: network id.
std::map< std::string, EventSet > mInputSubscriptions
Compiled data: map subscriptions.
void NetworkName(const std::string &rNetwork)
Set network name to participate.
faudes_thread_t mThreadListen
Background: thread handle (global)
virtual ~nDevice(void)
Explicit destructor.
std::map< std::string, ServerState > mInputServerStates
Background: connection states to event servers (by node name)
void InsOutputEvent(const std::string &event)
Insert event as output event.
int mBroadcastSocket
Background: udp broadcast socket (background only)
void ClearNodes(void)
Add a node to the network configuration.
virtual void DoWritePreface(TokenWriter &rTw, const std::string &rLabel="", const Type *pContext=0) const
Actual method to write the device configuration to a TokenWriter.
std::map< int, ClientState > mOutputClientStates
Background: map sockets to connection states (shared)
faudes_mutex_t mMutex
Background: mutex for below shared variables.
friend void * NDeviceListen(void *)
nDevice(void)
Default constructor.
SimplenetAddress mEffectiveListenAddress
Simplenet: effective address of my server port.
virtual void Stop(void)
Deactivate the device.
virtual void WriteOutput(Idx output)
Run output command.
SimplenetAddress mBroadcastAddress
Simplenet: address for udp broadcast (255.255.255.255:40000.
bool mStopListen
Background: request to join via flag (mutexed)
virtual void Reset(void)
Reset device.
std::map< std::string, std::string > mNetworkNodes
Simplenet: list of nodes in this network incl default addresses.
int mListenSocket
Background: server socket to listen (background only)
virtual void Start(void)
Activate the device.
void InsInputEvent(const std::string &event)
Insert event as input event.
Virtual base class to define the interface for event io.
Definition: iop_vdevice.h:261
bool mResetRequest
Reset request marker (mutexed)
Definition: iop_vdevice.h:806
faudes_mutex_t * pBufferMutex
Actual mutex for input buffer (mutexted)
Definition: iop_vdevice.h:803
virtual void Clear(void)
Clear all configuration.
virtual const EventSet & Inputs(void) const
Get inputs as plain set.
virtual void Compile(void)
Compile inner data-structures.
std::string mDefaultLabel
Default label for token io.
Definition: iop_vdevice.h:721
faudes_cond_t * pWaitCondition
Actual Wait Condition.
Definition: iop_vdevice.h:782
virtual void DoReadPreface(TokenReader &rTr, const std::string &rLabel="", const Type *pContext=0)
Reads non-event-configuration data from TokenReader.
virtual void Stop(void)
Deactivate the device.
virtual void Start(void)
Activate the device.
EventSet * mpConfiguration
Overall event configuration (uses cast for type)
Definition: iop_vdevice.h:761
EventSet mInputs
All inputs.
Definition: iop_vdevice.h:764
EventSet mOutputs
All outputs.
Definition: iop_vdevice.h:767
virtual void Reset(void)
Reset device.
virtual void DoWritePreface(TokenWriter &rTw, const std::string &rLabel="", const Type *pContext=0) const
Writes non-event-configuration data from TokenWriter.
faudes_mutex_t * pWaitMutex
Actual Wait Condition Mutex.
Definition: iop_vdevice.h:779
std::string mName
Name.
Definition: iop_vdevice.h:758
std::deque< Idx > * pInputBuffer
Actual Fifo buffer for input readings.
Definition: iop_vdevice.h:797
DeviceState mState
Status: running, starting etc.
Definition: iop_vdevice.h:770
virtual const EventSet & Outputs(void) const
Get outputs as plain set.
virtual void Clear(void)
Clear all set.
Definition: cfl_baseset.h:1911
NameSet EventSet
Convenience typedef for plain event sets.
Definition: cfl_nameset.h:533
const std::string & Name(void) const
Return name of TBaseSet.
Definition: cfl_baseset.h:1764
#define TUNLOCK_E
#define UNLOCK_E
#define TLOCK_E
#define LOCK_E
Simple networked events via TCP/IP.
#define FD_DHV(message)
Definition: iop_vdevice.h:37
#define FD_DH(message)
Definition: iop_vdevice.h:27
libFAUDES resides within the namespace faudes.
uint32_t Idx
Type definition for index type (allways 32bit)
void * NDeviceListen(void *arg)
Idx ToIdx(const std::string &rString)
Convert a string to Idx.
Definition: cfl_utils.cpp:100
std::string ToStringInteger(Int number)
integer to string
Definition: cfl_utils.cpp:43
int syncSend(int dest, const char *data, int len, int flag)
AutoRegisterType< nDevice > gRtiRegisterSimplenetDevice("SimplenetDevice")
Background: state of a connection to a client (shared)

libFAUDES 2.32f --- 2024.12.22 --- c++ api documentaion by doxygen