15 #ifdef FAUDES_IODEVICE_D3RIP_URT
40 void AttributeD3ripURTOutput::DoAssign(
const AttributeD3ripURTOutput& rSrcAttr) {
41 FD_DHV(
"AttributeD3ripURTOutput(" <<
this <<
"):DoAssign(): assignment from " << &rSrcAttr);
42 mChannelToTransmit=rSrcAttr.mChannelToTransmit;
43 mParameterRecords=rSrcAttr.mParameterRecords;
44 mEventId=rSrcAttr.mEventId;
48 void AttributeD3ripURTOutput::DoWrite(TokenWriter& rTw,
const std::string& rLabel,
const Type* pContext)
const {
49 (void) rLabel; (void) pContext;
51 FD_DHV(
"AttributeD3ripURTOutput::DoWrite()" );
53 ptoken.SetEmpty(
"ChannelToTransmit");
54 ptoken.InsAttributeInteger(
"value",mChannelToTransmit);
60 void AttributeD3ripURTOutput::DoRead(TokenReader& rTr,
const std::string& rLabel,
const Type* pContext) {
61 mParameterRecords.reserve(10);
62 (void) rLabel; (void) pContext;
64 FD_DHV(
"AttributeD3ripURTOutput::DoRead()");
68 if(!token.IsBegin())
return;
70 if(token.IsBegin(
"ChannelToTransmit")) {
72 if(token.ExistsAttributeInteger(
"value"))
73 mChannelToTransmit=token.AttributeIntegerValue(
"value");
74 rTr.ReadEnd(
"ChannelToTransmit");
77 if(token.IsBegin(
"EventId")) {
79 if(token.ExistsAttributeInteger(
"value"))
80 mEventId=token.AttributeIntegerValue(
"value");
81 rTr.ReadEnd(
"EventId");
86 if(token.IsBegin(
"ParameterRecord")) {
88 mParameterRecords.resize(mParameterRecords.size()+1);
89 ParameterRecord& record = mParameterRecords.back();
90 rTr.ReadBegin(
"ParameterRecord",token);
92 while(!rTr.Eos(
"ParameterRecord")) {
95 if(token.IsBegin(
"DestinationNode")) {
97 if(token.ExistsAttributeInteger(
"value"))
98 record.destinationNode=token.AttributeIntegerValue(
"value");
99 rTr.ReadEnd(
"DestinationNode");
103 if(token.IsBegin(
"DestinationChannel")) {
105 if(token.ExistsAttributeInteger(
"value"))
106 record.destinationChannel=token.AttributeIntegerValue(
"value");
107 rTr.ReadEnd(
"DestinationChannel");
111 if(token.IsBegin(
"EligibilityTime")) {
113 if(token.ExistsAttributeInteger(
"value"))
114 record.eligibilityTime=token.AttributeIntegerValue(
"value");
115 rTr.ReadEnd(
"EligibilityTime");
119 if(token.IsBegin(
"DeadlineTime")) {
121 if(token.ExistsAttributeInteger(
"value"))
122 record.deadlineTime=token.AttributeIntegerValue(
"value");
123 rTr.ReadEnd(
"DeadlineTime");
127 rTr.ReadEnd(token.StringValue());
129 rTr.ReadEnd(
"ParameterRecord");
158 void AttributeD3ripURTInput::DoAssign(
const AttributeD3ripURTInput& rSrcAttr) {
159 FD_DHV(
"AttributeD3ripURTInput(" <<
this <<
"):DoAssign(): assignment from " << &rSrcAttr);
160 mEventId=rSrcAttr.mEventId;
165 void AttributeD3ripURTInput::DoWrite(TokenWriter& rTw,
const std::string& rLabel,
const Type* pContext)
const {
166 (void) rLabel; (void) pContext;
171 void AttributeD3ripURTInput::DoRead(TokenReader& rTr,
const std::string& rLabel,
const Type* pContext) {
172 (void) rLabel; (void) pContext;
174 FD_DHV(
"AttributeD3ripURTOutput::DoRead()");
178 if(!token.IsBegin())
return;
179 if(token.IsBegin(
"EventId")) {
181 if(token.ExistsAttributeInteger(
"value"))
182 mEventId=token.AttributeIntegerValue(
"value");
183 rTr.ReadEnd(
"EventId");
212 AttributeD3ripURTEvent::AttributeD3ripURTEvent(
void) : AttributeDeviceEvent() {
213 FD_DHV(
"AttributeD3ripURTEvent::AttributeD3ripURTEvent(" <<
this <<
")");
214 pOutputPrototype=OutputPrototypep();
215 pInputPrototype=InputPrototypep();
219 AttributeD3ripURTEvent::AttributeD3ripURTEvent(
const AttributeD3ripURTEvent& rOtherAttr) :
220 AttributeDeviceEvent()
222 FD_DHV(
"AttributeD3ripURTEvent(" <<
this <<
"): form other attr " << &rOtherAttr);
223 pOutputPrototype=OutputPrototypep();
224 pInputPrototype=InputPrototypep();
225 DoAssign(rOtherAttr);
230 AutoRegisterType< TaNameSet<AttributeD3ripURTEvent> >
231 gRti1RegisterD3ripURTDeviceEventSet(
"D3ripURTDeviceEventSet");
232 AutoRegisterXElementTag< TaNameSet<AttributeD3ripURTEvent> >
233 gRti1XElementTagD3ripURTDeviceEventSet(
"D3ripURTDeviceEventSet",
"Event");
236 const AttributeD3ripURTOutput* AttributeD3ripURTEvent::OutputPrototypep(
void){
237 AttributeD3ripURTOutput* attrp=
new AttributeD3ripURTOutput();
242 const AttributeD3ripURTInput* AttributeD3ripURTEvent::InputPrototypep(
void) {
243 AttributeD3ripURTInput* attrp=
new AttributeD3ripURTInput();
261 AutoRegisterType<d3ripURTDevice> gRtiRegisterD3ripURTDevice(
"D3ripURTDevice");
265 void* DoListenModule(
void* threadArg);
271 d3ripURTDevice::d3ripURTDevice(
void) : vDevice() {
272 FD_DHV(
"d3ripURTDevice(" << mName <<
")::d3ripURTDevice()");
274 mpConfiguration =
new TaNameSet<AttributeD3ripURTEvent>;
275 pConfiguration =
dynamic_cast< TaNameSet<AttributeD3ripURTEvent>*
>(mpConfiguration);
277 mDefaultLabel =
"D3ripURTDevice";
279 mName=
"D3ripURTDevice";
284 d3ripURTDevice::~d3ripURTDevice(
void) {
286 FD_DHV(
"d3ripURTDevice(" << mName <<
")::~d3ripURTDevice()");
291 void d3ripURTDevice::Clear(
void) {
293 FD_DHV(
"d3ripURTDevice(" << mName <<
")::Clear()");
302 void d3ripURTDevice::Start(
void) {
306 if( (mState == Up)||(mState == StartUp) )
return;
307 FD_DHV(
"d3ripURTDevice("<<mName<<
")::Start()");
312 if (mlockall(MCL_CURRENT | MCL_FUTURE)) {
313 perror(
"mlockall failed:");
315 struct sched_param scheduling_parameters;
316 scheduling_parameters.sched_priority = sched_get_priority_max(SCHED_FIFO);
317 sched_setscheduler(0, SCHED_FIFO, &scheduling_parameters);
319 NameSet::Iterator eit;
320 for(eit=pConfiguration->Begin(); eit != pConfiguration->End(); eit++) {
322 AttributeD3ripURTEvent attr;
323 attr = pConfiguration->Attribute(*eit);
324 const AttributeD3ripURTOutput* oattr = attr.Outputp();
327 FD_DHV(
"d3ripURTDevice("<<mName<<
") demo: output idx " << *eit);
328 FD_DHV(
"d3ripURTDevice("<<mName<<
") demo: output name " << pConfiguration->SymbolicName(*eit));
329 FD_DHV(
"d3ripURTDevice("<<mName<<
") demo: Channel To Transmit: " << oattr->mChannelToTransmit);
332 pthread_create(&mThreadListenModule, NULL, &DoListenCLModule,
this);
338 void d3ripURTDevice::Stop(
void) {
340 if(mState != Up && mState !=StartUp)
return;
341 FD_DHV(
"d3ripURTDevice("<<mName<<
")::Stop()");
343 mContinueListening=0;
344 mq_close(mMQueueToSend);
345 mq_close(mMQueueToReceive);
346 pthread_kill(mThreadListenModule, SIGTERM);
353 void d3ripURTDevice::Reset(
void){
355 FD_DHV(
"d3ripURTDevice("<<mName<<
")::Reset()");
362 void d3ripURTDevice::Compile(
void){
364 mContinueListening=1;
365 struct mq_attr wanted_attrs;
366 wanted_attrs.mq_flags=0;
367 wanted_attrs.mq_maxmsg=10;
368 wanted_attrs.mq_msgsize=1500;
369 wanted_attrs.mq_curmsgs=0;
371 mMQueueToSend = mq_open(
"/MQUEUE_AP2CL_ID", O_RDWR | O_CREAT, S_IRWXU | S_IRWXG , &wanted_attrs);
373 if(mMQueueToSend==-1 || mMQueueToReceive==-1) {
374 FD_DHV(
"d3ripURTDevice("<<mName<<
"): Cannot Open Message Queue");
379 NameSet::Iterator eit;
380 for(eit=pConfiguration->Begin(); eit != pConfiguration->End(); eit++) {
382 AttributeD3ripURTEvent attr;
383 attr = pConfiguration->Attribute(*eit);
384 const AttributeD3ripURTOutput* oattr = attr.Outputp();
385 const AttributeD3ripURTInput* iattr = attr.Inputp();
389 mEventIdMap[mOutputs.SymbolicName(*eit)] = oattr->mEventId;
391 mEventParameters[i][D3RIP_URT_EVENT_ID]=*eit;
392 mEventParameters[i][D3RIP_URT_CHANNEL_TO_TRANSMIT]=oattr->mChannelToTransmit;
395 for(j=0;j<oattr->mParameterRecords.size();j++) {
396 mEventParameters[i][D3RIP_URT_HEADER_SIZE+j*D3RIP_URT_PARAMETER_SIZE+D3RIP_URT_DESTIONATION_NODE]=oattr->mParameterRecords[j].destinationNode;
397 mEventParameters[i][D3RIP_URT_HEADER_SIZE+j*D3RIP_URT_PARAMETER_SIZE+D3RIP_URT_DESTIONATION_CHANNEL]=oattr->mParameterRecords[j].destinationChannel;
398 mEventParameters[i][D3RIP_URT_HEADER_SIZE+j*D3RIP_URT_PARAMETER_SIZE+D3RIP_URT_ELIGIBILITY_TIME]=oattr->mParameterRecords[j].eligibilityTime;
399 mEventParameters[i][D3RIP_URT_HEADER_SIZE+j*D3RIP_URT_PARAMETER_SIZE+D3RIP_URT_DEADLINE_TIME]=oattr->mParameterRecords[j].deadlineTime;
402 mEventParameters[i][D3RIP_URT_PARAMETER_COUNT]=j;
406 mEventIdMap[mInputs.SymbolicName(*eit)] = iattr->mEventId;
412 FD_DHV(
"d3ripURTDevice(" << mName <<
")::Compile()");
418 void d3ripURTDevice::WriteOutput(
Idx output) {
422 mEventsToSendCount=0;
423 for(i=0;i<mEventCount;i++) {
424 if(mEventParameters[i][D3RIP_URT_EVENT_ID]==output){
426 for(j=0;j<mEventParameters[i][D3RIP_URT_PARAMETER_COUNT];j++){
428 mCommunicationRequests[mCommunicationRequestCount].destinationNode=
429 mEventParameters[i][D3RIP_URT_HEADER_SIZE+j*D3RIP_URT_PARAMETER_SIZE+D3RIP_URT_DESTIONATION_NODE];
431 mCommunicationRequests[mCommunicationRequestCount].destinationChannel=
432 mEventParameters[i][D3RIP_URT_HEADER_SIZE+j*D3RIP_URT_PARAMETER_SIZE+D3RIP_URT_DESTIONATION_CHANNEL];
434 mCommunicationRequests[mCommunicationRequestCount].eligibilityTime=
435 mEventParameters[i][D3RIP_URT_HEADER_SIZE+j*D3RIP_URT_PARAMETER_SIZE+D3RIP_URT_ELIGIBILITY_TIME];
437 mCommunicationRequests[mCommunicationRequestCount].deadlineTime=
438 mEventParameters[i][D3RIP_URT_HEADER_SIZE+j*D3RIP_URT_PARAMETER_SIZE+D3RIP_URT_DEADLINE_TIME];
441 mCommunicationRequestCount++;
444 mEventIdsToSend[mEventsToSendCount]=output;
445 mEventsToSendCount++;
452 void d3ripURTDevice::FlushOutput(
unsigned char channel) {
455 char msgToSend[D3RIP_RT_MESSAGE_MAX_LENGTH];
457 msgToSend[1]=mCommunicationRequestCount;
458 for(i=0;i<mCommunicationRequestCount;i++){
459 msgToSend[2+D3RIP_URT_DESTIONATION_NODE+i*D3RIP_URT_PARAMETER_SIZE]=mCommunicationRequests[i].destinationNode;
460 msgToSend[2+D3RIP_URT_DESTIONATION_CHANNEL+i*D3RIP_URT_PARAMETER_SIZE]=mCommunicationRequests[i].destinationChannel;
461 msgToSend[2+D3RIP_URT_ELIGIBILITY_TIME+i*D3RIP_URT_PARAMETER_SIZE]=mCommunicationRequests[i].eligibilityTime;
462 msgToSend[2+D3RIP_URT_DEADLINE_TIME+i*D3RIP_URT_PARAMETER_SIZE]=mCommunicationRequests[i].deadlineTime;
465 msgToSend[2+mCommunicationRequestCount*D3RIP_URT_PARAMETER_SIZE]=mEventsToSendCount;
466 for(i=0;i<mEventsToSendCount;i++) {
467 msgToSend[i+3+mCommunicationRequestCount*D3RIP_URT_PARAMETER_SIZE]=mEventIdMap[mOutputs.SymbolicName(mEventIdsToSend[i])];
471 msgLength=i+3+mCommunicationRequestCount*D3RIP_URT_PARAMETER_SIZE;
472 if(msgLength>D3RIP_RT_MESSAGE_MAX_LENGTH){
473 FD_DHV(
"RT Message Length exceeds MAX limit!");
476 mq_send(mMQueueToSend,(
char*)&msgToSend[0],msgLength,0);
479 mCommunicationRequestCount=0;
480 mEventsToSendCount=0;
485 void d3ripURTDevice::DoWritePreface(TokenWriter& rTw,
const std::string& rLabel,
const Type* pContext)
const{
486 FD_DHV(
"d3ripURTDevice("<<mName<<
")::DoWritePreface()");
488 vDevice::DoWritePreface(rTw,rLabel,pContext);
492 void d3ripURTDevice::DoReadPreface(TokenReader& rTr,
const std::string& rLabel,
const Type* pContext){
493 FD_DHV(
"d3ripURTDevice("<<mName<<
")::DoReadPreface()");
495 vDevice::DoReadPreface(rTr,rLabel,pContext);
499 void* DoListenCLModule(
void* threadArg){
500 d3ripURTDevice* pD3ripURTDevice = (d3ripURTDevice*)threadArg;
503 int communicationRequestCount;
505 unsigned char receivedEventId;
506 char msg[D3RIP_RT_MESSAGE_MAX_LENGTH];
507 struct timespec timeout;
509 mq_getattr(pD3ripURTDevice->mMQueueToReceive,&attr);
510 std::map<std::string,int>::iterator ii;
511 int evIndexToProcess;
512 struct timespec ts,tr;
520 mqd_t mMQueueToReceive = mq_open(
"/MQUEUE_CL2AP_ID", O_RDWR | O_CREAT, S_IRWXU | S_IRWXG , 0);
524 clock_gettime(CLOCK_REALTIME, &timeout);
526 length = mq_receive(mMQueueToReceive, msg, 1500, 0);
531 communicationRequestCount=msg[0];
532 if(communicationRequestCount>0) {
536 eventIdCount=msg[1+communicationRequestCount*D3RIP_URT_PARAMETER_SIZE];
537 for(i=0;i<eventIdCount;i++){
538 receivedEventId=msg[i+2+communicationRequestCount*D3RIP_URT_PARAMETER_SIZE];
540 for( ii=pD3ripURTDevice->mEventIdMap.begin(); ii!=pD3ripURTDevice->mEventIdMap.end(); ++ii)
542 if((*ii).second==receivedEventId) {
543 evIndexToProcess=pD3ripURTDevice->mInputs.Index((*ii).first.c_str());
544 if(pD3ripURTDevice->mInputs.Exists(evIndexToProcess)) {
545 pthread_mutex_lock(pD3ripURTDevice->pBufferMutex);
546 pD3ripURTDevice->pInputBuffer->push_back(evIndexToProcess);
547 pthread_mutex_unlock(pD3ripURTDevice->pBufferMutex);
549 pthread_mutex_lock(pD3ripURTDevice->pWaitMutex);
550 pthread_cond_broadcast(pD3ripURTDevice->pWaitCondition);
551 pthread_mutex_unlock(pD3ripURTDevice->pWaitMutex);
559 if( !pD3ripURTDevice->mContinueListening ) {
560 FD_DHV(
"Stopping the CL Listener");
#define FAUDES_TYPE_IMPLEMENTATION(ftype, ctype, cbase)
faudes type implementation macros, overall
iodevice for d3ripURT protocol and friends
libFAUDES resides within the namespace faudes.
uint32_t Idx
Type definition for index type (allways 32bit)