/*----------------------------------------------------------------* |srtp.cpp | |@version: 2.1 | |Last Modification: Oct 24, 2003 , by Fei, based on 06170301 | |Change logs: Bundle header format, mode 1 header format, mode 0 | | header format, and nack header format are now | | following the draft | | NACK mechanism is now following the draft | | Reduannt datastructres, functions, variables were | | removed | | Long message was buffered | | The status of unfinished longmessage will be kept | *----------------------------------------------------------------*/ /*----------------------------------------------------------------* | Copyright 2001-2003 Networking and Simulation Laboratory | | George Mason University, Fairfax, Virginia | | | | Permission to use, copy, modify, and distribute this | | software and its documentation for academic purposes is hereby | | granted without fee, provided that the above copyright notice | | and this permission appear in all copies and in supporting | | documentation, and that the name of George Mason University | | not be used in advertising or publicity pertaining to | | distribution of the software without specific, written prior | | permission. GMU makes no representations about the suitability | | of this software for any purposes. It is provided "AS IS" | | without express or implied warranties. All risk associated | | with use of this software is expressly assumed by the user. | *----------------------------------------------------------------*/ #ifdef UNIX #include #include #include #include #include #else #include #include #include #endif #include #include #include #include #include #include /* the header file for constants and data structures */ #include "srtp_proto.h" /* The UDP port for communication between SRTP daemon */ #define SRTP_UDP_PORT 8000 /* The UDP port that SRTP daemon will listen on for client request */ #define REQ_UDP_PORT 8001 /***************************************************************************************/ /* Global Variables */ /***************************************************************************************/ unsigned long int local_ip_address; /* local ip address */ /* The socket to communicate with remote SRTP daemon */ unsigned int srtp_socket; /* The socket to receive request from and send ack to local clients */ unsigned int client_socket; /* socket to deliver messages to local clients */ unsigned int delivery_socket; /* what has been registered/subscribed */ registered_dataid_t *registered_dataid[MAX_REG_MCA_NUM]; /* registed */ long subscribed_mca[MAX_SUB_MCA_NUM]; /* subscribed */ /* for each subscribed , the list of client's UDP ports */ int client_port[MAX_SUB_MCA_NUM][MAX_CLIENT_PER_MCA]; /* SN of the most recent outgoing m1 messages */ recent_m1_buffer_t *recent_outgoing_m1_sn; /* buffer for the most recent outgoing m1 message for each */ message_buffer_t *buffered_outgoing_m1_message_head; message_buffer_t *buffered_outgoing_m1_message_tail; /* A long mode 1 message maybe divided into multiple segments. * Here, all the messages from local clients is stored in this link list, no mater it * is mode 0 or mode 1. If it is a mode 0 message, or a short mode 1 message, the whole * message is put into one segment. The maximum segment size is defined by * MAX_M1_SEG_SIZE. If a mode 1 message is longer than MAX_M1_SEG_SIZE, the mode 1 * message will be segmented into multiple segments. The maximum mode 1 message length * is defined by MAX_M1_MESSAGE_LEN. * * Writer to this link list: retrieve_thread(), unbundler_thread() ( NACK and resend_m1) * Reader of this link list: bundler_thread() */ outgoing_segment_t *outgoing_segment_head; outgoing_segment_t *outgoing_segment_tail; /* array for storing bundles going to be transmitted through socket * bundler_thread() will retrieve all the segments in outgoing_segment_head, and try to * put multiple segments into one bundle. * transmitter_thread() will read from this array, and send out the bundles through * socket. * * Writer to this array: bundler_thread(), timer_thread() * Reader to this array: transmitter_thread() */ struct outgoing_bundle_t outgoing_bundle_pool[MAX_BUFFERED_OUTGOING_BUNDLE]; /* the link list for the index of outgoing_bundle_pool */ transmit_index_t *transmit_index_head; transmit_index_t *transmit_index_tail; /* link list for bundle timeout timer * each time the bundler_thread() create a new bundle, it will insert a new record in * the link list * timer_thead() will wait for each record to timeout. when timeout, timer_thead() * schedule the corresponding bundle to be transmitted immediately. */ bundle_timeout_timer_t *bundle_timeout_timer_tail; bundle_timeout_timer_t *bundle_timeout_timer_head; /* Packet here means a message received from the socket: can be a bundle or a feedback * Cyclic array for storing packets received from socket at the SRMP receiver side */ char incoming_packet_buffer[MAX_INCOMING_PACKETS][1600]; int incoming_packet_buffer_size[MAX_INCOMING_PACKETS]; long incoming_packet_write_pointer; /* write count */ long incoming_packet_read_pointer; /* read count */ /* Array for storing mode 0, mode 1 and NACK messages received from remote and local * retrieve_thread() will insert messages (mode 0/1) that were sent out from local * clients into this array * unbundler_thread() will extract segments from a bundle that was received from the * socket, and insert each segment into this array * deliver_thread() will retrieve messages from this array and send them to local * clients * * Writer to this array: unbundler_thread(), retrieve_thread() * Reader to this array: deliver_thread() */ char incoming_seg_pool[MAX_INCOM_MSG_NUM][MAX_M1_BUFFER_SIZE]; int incoming_seg_pool_size[MAX_INCOM_MSG_NUM]; /* size of each entry in the seg pool*/ long incoming_pool_write_pointer ; /* write count */ long incoming_pool_read_pointer; /* read count */ int reader1Cleared ; /* wrap around flag */ /* heartbeat signal */ heartbeat_buffer_t *heartbeat_msgqueue_head; heartbeat_buffer_t *heartbeat_msgqueue_tail; /* buffer of all mode 1 messages, that were received from locally or remotely */ recent_m1_buffer_t *m1_message_buffer; /* buffer for unassembled long m1 messages */ struct m1_seg_chain_t m1_seg_chain[MAX_UNASEMBLED_M1_MESSAGES]; char local_buffer[MAX_M1_BUFFER_SIZE]; int srtp_port; int req_port; #ifdef UNIX /* mutex for buffered_outgoing_m1_message_head_head */ pthread_mutex_t mutex_buffered_outgoing_m1_message; /* mutex between retriever_thread() and bundler_thread() */ /* mutex for outgoing_segment_head */ pthread_mutex_t mutex_outgoing_segment_head; /* event for outgoing_segment_head became non-empty */ pthread_cond_t event_outgoing_segment_head; /* mutex between bundler_thread(), timer_thread() */ /* mutext for bundle_timeout_timer_queue */ pthread_mutex_t mutex_bundle_timeout_timer_queue; /* event for bundle_timeout_timer_queue became non-empty */ pthread_cond_t event_bundle_timeout_timer_queue; /* mutex between bundler_thread(), transmitter_thread, and timer_thread() */ /* mutex for transmit_index_head became non-empty */ pthread_mutex_t mutex_transmit_index_head; /* event for transmit_index_head */ pthread_cond_t event_transmit_index_head; /* mutex between bundler_thread(), transmitter_thread, and timer_thread() */ /* mutex for outgoing_bundles_pool[] */ pthread_mutex_t mutex_all_bundles; /* mutex between receiver_thread() and unbundler_thread() */ /* mutex for incoming_packet_buffer */ pthread_mutex_t mutex_incoming_packet_buffer; /* event for incoming_packet_buffer became non-empty */ pthread_cond_t event_incoming_packet_buffer; /* mutex between unbundler_thread() and deliver_thread() */ pthread_mutex_t mutex_incoming_seg_pool; /* event for incoming_seg_pool became non-empty */ pthread_cond_t event_incoming_seg_pool; /* mutex between unbundler_thread(), deliver_thread(), and retrieve_thread() */ /* mutex for m1_message_buffer*/ pthread_mutex_t mutex_m1_message_buffer; /* mutex between retrieve_thread() and transmitter_thread() */ /* mutex for recent_outgoing_m1_sn */ pthread_mutex_t mutex_recent_outgoing_m1_sn; /* main waill wait on this event */ pthread_mutex_t hSemJoin; pthread_mutex_t hSigHeartBeat; #else /* mutex for buffered_outgoing_m1_message_head_head */ HANDLE mutex_buffered_outgoing_m1_message; /* mutex between retriever_thread() and bundler_thread() */ /* mutex for outgoing_segment_head */ HANDLE mutex_outgoing_segment_head; /* event for outgoing_segment_head became non-empty */ HANDLE event_outgoing_segment_head; /* mutex between bundler_thread(), timer_thread() */ /* mutext for bundle_timeout_timer_queue */ HANDLE mutex_bundle_timeout_timer_queue; /* event for bundle_timeout_timer_queue became non-empty */ HANDLE event_bundle_timeout_timer_queue; /* mutex between bundler_thread(), transmitter_thread, and timer_thread() */ /* mutex for transmit_index_head became non-empty */ HANDLE mutex_transmit_index_head; /* event for transmit_index_head */ HANDLE event_transmit_index_head; /* mutex between bundler_thread(), transmitter_thread, and timer_thread() */ /* mutex for ALLL_BUNDLES[] */ HANDLE mutex_all_bundles; /* mutex between receiver_thread() and unbundler_thread() */ /* mutex for incoming_packet_buffer */ HANDLE mutex_incoming_packet_buffer; /* event for incoming_packet_buffer became non-empty */ HANDLE event_incoming_packet_buffer; /* mutex between unbundler_thread() and deliver_thread() */ HANDLE mutex_incoming_seg_pool; /* event for incoming_seg_pool became non-empty */ HANDLE event_incoming_seg_pool; /* mutex between unbundler_thread(), deliver_thread(), and retrieve_thread() */ /* mutex for m1_message_buffer*/ HANDLE mutex_m1_message_buffer; /* mutex between retrieve_thread() and transmitter_thread() */ /* mutex for recent_outgoing_m1_sn */ HANDLE mutex_recent_outgoing_m1_sn; /* main waill wait on this event */ HANDLE hSemJoin; HANDLE hSigHeartBeat; #endif /* used to transfer information to receiver_thread() */ struct socket_info_t socket_info; /***************************************************************************************/ /* Function Name : main */ /* Description : initialize all the data structures */ /* create the sockets */ /* check local ip address */ /* install the signal handler */ /* create all the threads */ /***************************************************************************************/ int main(int argc, char* argv[]) { #ifdef UNIX pthread_t thread_id; #endif printf("SRMP version 2.1\n"); printf("Last modified on Oct 24th, 2003\n"); /* full commandline parameters: srtp [srtp_port] [request_port] * srtp_port is the UDP port that SRTP daemon communicate with remote * SRTP daemon. * request_port is the UDP port that SRTP daemon listening on for * local client request */ if(argc>1) srtp_port=atoi(argv[1]); else srtp_port=SRTP_UDP_PORT; if(argc>2) req_port=atoi(argv[2]); else req_port=REQ_UDP_PORT; /***************************************/ /* create and initiate data structures.*/ /***************************************/ init_server(); /************************************/ /* install signal handler for SIGINT*/ /************************************/ if( signal(SIGINT,sigterm_handler) == SIG_ERR ) { printf("failed to install signal_handler\a\n"); exit(-1); } /******************/ /* create threads */ /******************/ #ifdef UNIX /* the thread for waiting for request from local client*/ pthread_create(&thread_id, NULL, retrieve_thread, NULL); /* the thread for bundling outgoing messages */ pthread_create(&thread_id, NULL, bundler_thread, NULL); /* the thread for transmitting bundles to remote SRTP daemon */ pthread_create(&thread_id, NULL, transmitter_thread, NULL); /* the thread for signalling a bundle has timed out and shoud be transmitted */ pthread_create(&thread_id, NULL, signaller_thread, NULL); /* the thread for parsing messages from a received bundle */ pthread_create(&thread_id, NULL, unbundler_thread, NULL); /* the thread for deliving the received messages to local client */ pthread_create(&thread_id, NULL, deliver_thread, NULL); #else /* the thread for waiting for request from local client*/ _beginthread(retrieve_thread,0,NULL); /* the thread for bundling outgoing messages */ _beginthread(bundler_thread,0,NULL); /* the thread for transmitting bundles to remote SRTP daemon */ _beginthread(transmitter_thread,0,NULL); /* the thread for signalling a bundle has timed out and shoud be transmitted */ _beginthread(signaller_thread,0,NULL); /* the thread for parsing messages from a received bundle */ _beginthread(unbundler_thread,0,NULL); /* the thread for deliving the received messages to local client */ _beginthread(deliver_thread,0,NULL); #endif /* _beginthread(heartbeatter_thread,0,NULL);*/ printf("Initialization finished\n"); printf("SRTP listening on port %d for remote SRTP daemon\n", srtp_port); printf("SRTP listening on port %d for request from local client\n", req_port); printf("To terminate: Ctrl+c\n\n"); /* wait on an event. The program should be terminated by signal SIGINT */ #ifdef UNIX pthread_join(thread_id, (void **) NULL); #else WaitForSingleObject(hSemJoin,INFINITE); #endif return 0; } /***************************************************************************************/ /* Function Name : init_server */ /* Description : create sockets, create datastructrs, initialize */ /* datastructures, and mutex, signals */ /***************************************************************************************/ void init_server() { int i,j; char local_ip_string[30]; buffered_outgoing_m1_message_tail = NULL; outgoing_segment_head = NULL; outgoing_segment_tail= NULL; transmit_index_head = NULL; transmit_index_tail = NULL; bundle_timeout_timer_tail = NULL; bundle_timeout_timer_head = NULL; incoming_packet_write_pointer =0; incoming_packet_read_pointer = 0; incoming_pool_write_pointer = 0; incoming_pool_read_pointer = 0; reader1Cleared = 1; heartbeat_msgqueue_head = NULL; heartbeat_msgqueue_tail = NULL; /* initialize the windows socket library */ #ifndef UNIX WSADATA wsaData; if (WSAStartup(0x0101, &wsaData)) { printf("WSAStartup failed %s\n", WSAGetLastError()); exit (-1); } #endif /* get local hosts' IP address */ local_ip_address=get_local_ipaddress(local_ip_string); printf("\nlocal ip address : %s \n", local_ip_string); /* create socket for communication with other SRMP daemons */ /* the socket used to communicate with remote SRTP daemon */ create_udp_socket(&srtp_socket, srtp_port); /* the socket used to communicate with local clients */ create_udp_socket(&client_socket, req_port); /* the socket to deliver messages to local clients */ create_udp_socket(&delivery_socket, 0); /* datastructure for recording subscribed clients */ for(i=0; ih_addr_list[0], host_ent->h_length); strcpy(address, inet_ntoa(local_addr)); return local_addr.s_addr; #else /* get local ip address, and set the sender_id and receiver_id */ char szHostname[100]; HOSTENT *pHostEnt; int nAdapter; struct in_addr sAddr; nAdapter=0; gethostname( szHostname, sizeof( szHostname )); pHostEnt = gethostbyname( szHostname ); memcpy( &sAddr.s_addr, pHostEnt->h_addr_list[nAdapter], pHostEnt->h_length); strcpy(address, inet_ntoa(sAddr)); return sAddr.s_addr; #endif } /***************************************************************************************/ /* Function Name : create_udp_socket */ /* Description : create a udp socket, and binding to a specified port number */ /* increase the socket's sending buffer size and receiver buffer */ /* size. */ /* Parameters : unsigned int * socket_id will contain the created socke */ /* int port_number the specified port number. if equals to 0, */ /* then do not bind */ /***************************************************************************************/ void create_udp_socket(unsigned int * socket_id, int port_number) { struct sockaddr_in socket_address; unsigned long int sock_buff_len=MAX_M1_BUFFER_SIZE; socket_address.sin_family = AF_INET; socket_address.sin_addr.s_addr = htonl(INADDR_ANY); socket_address.sin_port = htons(port_number); /* create the socket */ *socket_id= socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP); if( (*socket_id) <0 ) { printf("error creating socket\a\n"); exit (-1); } /* increate receiving buffer size and sending buffer size */ if(setsockopt(*socket_id, SOL_SOCKET, SO_SNDBUF, (const char *)&sock_buff_len, sizeof(sock_buff_len)) <0) { printf("set socket option error\a\n"); exit(-1); } sock_buff_len=0xFFFF; if(setsockopt(*socket_id, SOL_SOCKET, SO_RCVBUF, (const char *)&sock_buff_len, sizeof(sock_buff_len)) <0) { printf("set socket option error\a\n"); exit(-1); } /* bind the socket to to specified port number */ if(port_number>0) { if( bind((*socket_id),(sockaddr *)&socket_address, sizeof(socket_address)) < 0 ) { printf("error binding socket address\a\n"); exit(-1); } } return; } /***************************************************************************************/ /* Function Name : sigterm_handler */ /* Description : signal handler for SIGINT */ /* Parameters : SIGINT */ /***************************************************************************************/ void sigterm_handler(int sig) { exit(0); } /***************************************************************************************/ /* Function Name : retrieve_thread */ /* Description : This thread interface with SRMP client */ /* The thread keep on waiting for reqeust from local clients */ /* If the request is a SRMP_INIT, SRMP_REG_DATAID, SRMP_DROP_DATAID */ /* SRMP_SUB_DATAID, SRMP_UNSUB_DATID, the thread will call */ /* corresponding service functions */ /* If the client request is a mode 0 message, the thread will try */ /* to deliver the message to all the clients locally by inserting */ /* the message to incoming_seg_pool, and try to deliver the */ /* message to remote clients by inserting the message to */ /* outgoing_segment_head. */ /* If the client request is a mode 1 message, the thread will first */ /* check its SN, then buffer the mode 1 message. If it is a long */ /* message, the long message will be segmented, and all the */ /* segments will be inserted into incoming_seg_pool, and */ /* outgoing_segment_head */ /***************************************************************************************/ #ifdef UNIX void * retrieve_thread(void *mp) #else void retrieve_thread(void *mp) #endif { int req_len; char req_buffer[MAX_REQUEST_LEN]; /* the buffer for receving a message from client */ char ack_message[20]; int result; struct request_t client_header; sockaddr_in client_addr; /* address that client waiting for ack */ int client_addr_size; int ack_result; client_addr_size=sizeof(client_addr); memset(&client_addr, 0, client_addr_size); client_addr.sin_family = AF_INET; client_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); /* entering loop: wait to receive request from client */ /* listening on client_socket, udp port REQ_UDP_PORT */ while(1) { /*************************************/ /* receive a request from the client */ /*************************************/ if( receive_client_request(req_buffer,&req_len) == -1 ) { printf("error when receiving request from client\a\n"); exit(-1); } if(req_len=MAX_SUB_MCA_NUM) /* the multicast address has never been subscribed before */ /* record the mca in subscribed_mca[] */ { for(k=0; k=MAX_SUB_MCA_NUM) /* no empty slot */ { printf("no empty slot for subscribe new multicast group\a\n"); exit(-1); } } subscribed_address.s_addr = request_header.mca; /* check if the client specified UDP port has already subscribed the mca */ for(i=0; inext) { if (current->mca == request_header.mca) { memcpy(&message_header, current->buffer, MESSAGE_HEADER_LEN); deliver_to_client(request_header.port,current->buffer,message_header); } } #ifdef UNIX pthread_mutex_unlock(&mutex_m1_message_buffer); #else ReleaseMutex(mutex_m1_message_buffer); #endif return 0; } } /* else, no empty slots anymore */ printf("not enough space in client_port\a\n"); exit(-1); } /***************************************************************************************/ /* Function Name : service_unsubscribe_mca */ /* Description : client un-subscribe a multicast group */ /* search for mca in the registered_dataid[] and subscribed_mca[] */ /* removed client's port from client_port[][] */ /* Parameters : struct request_t request_header */ /***************************************************************************************/ int service_unsubscribe_mca(struct request_t request_header) { int i; int k; /* search for the mca */ for(k=0; k=MAX_SUB_MCA_NUM) /* the mca has never been subscribed before */ { printf("unsubscribed multicast address not found\a\n"); return -1; } /* remove client's UDP port number from the client_port[][] */ for(i=0; i */ /* register does not mean that only the registering client can send */ /* out . registeration is just to inform SRMP that */ /* some clients is going to send out message of the specified */ /* */ /* however, given a , the is fixed. */ /* the mode can only be changed after the was dropped */ /* Parameters : struct request_t request_header */ /* Return value : 0 success -1 failed */ /***************************************************************************************/ int service_reg_dataid(struct request_t client_header) { int k; struct ip_mreq mreq; int mcgflag; char tmp; /* check if the has been registered before */ for(k=0; kmca == (unsigned long int) client_header.mca) && (registered_dataid[k]->dataid == (unsigned int) client_header.dataid ) ) break; } if(k>=MAX_REG_MCA_NUM) /* the has never been registered before */ { mcgflag =0; /* check if the mca has been subscribed or registered before */ for(k=0; kmca==client_header.mca) ) { mcgflag = 1; break; } } for (k=0; k= MAX_REG_MCA_NUM) { printf("too many multicast groups registered \a\n"); exit(-1); } /* create and initialize a new group record */ registered_dataid[k]=(registered_dataid_t*)malloc(sizeof(registered_dataid_t)); if (!registered_dataid[k]) { printf("error allocating memory for registered_dataid[]\a\n"); exit(-1); } registered_dataid[k]->mca = client_header.mca; registered_dataid[k]->dataid = client_header.dataid; registered_dataid[k]->mode =client_header.mode; /* set socket option if the mca has never been subscribed nor registered before */ if(!mcgflag) { mreq.imr_multiaddr.s_addr = client_header.mca; mreq.imr_interface.s_addr = htonl(INADDR_ANY); if (setsockopt(srtp_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*)&mreq, sizeof(mreq)) < 0) { /*printf("setsockopt error!\a\n"); exit(-1);*/ } } /* set socket option to invoid loop back */ tmp = 0; if (setsockopt(srtp_socket, IPPROTO_IP, IP_MULTICAST_LOOP, (char *)&tmp, sizeof(tmp)) < 0) { free(registered_dataid[k]); registered_dataid[k] = NULL; printf("error when set socket option to disable loopback \a\n"); } printf("register dataid=%d, mode=%d\n", client_header.dataid, client_header.mode); } else { printf("dataid=%d has already been registered!\n", client_header.dataid); return (-1); } return(0); } /***************************************************************************************/ /* Function Name : service_drop_dataid */ /* Description : client drop a */ /* Parameters : struct request_t request_header */ /* Return value : 0 success -1 failed */ /***************************************************************************************/ int service_drop_dataid(struct request_t client_header) { int j; /* remove the client from registered_dataid[] */ for (j=0; jmca==client_header.mca && registered_dataid[j]->dataid==client_header.dataid ) { free(registered_dataid[j]); registered_dataid[j]=NULL; printf("dataid %d unregistered\n", client_header.dataid); return(0); } } } /*Error! the was not found */ printf(" dataid=%d to be dropped has never been registered before\a\n", client_header.dataid); return(-1); } /***************************************************************************************/ /* Function Name : deliver_to_client */ /* Description : deliver a message to a local client */ /* the message could be a mode0/1 message sent out by other clients */ /* the message is delivered to client's UDP port when subscribed */ /* Parameters : int port -- the udp port number that the client is listenning on */ /* char * msg -- the message body, proceeded with a message_header */ /* struct message_header_t message_header -- the message header */ /***************************************************************************************/ int deliver_to_client(int port, char * msg,struct message_header_t message_header) { char buffer[MAX_M1_MESSAGE_LEN + 6]; /* the message that will be send to client */ /* dataid + mca + message body */ unsigned int dataid; unsigned char high_byte; unsigned char low_byte; unsigned long mca; /* compose the message body */ memcpy(buffer+6,msg+MESSAGE_HEADER_LEN,message_header.len); /* compose dataid */ dataid=message_header.dataid; high_byte=dataid/256; low_byte=dataid%256; memcpy(buffer, &high_byte, 1); memcpy(buffer+1, &low_byte, 1); /* compose mca */ mca=message_header.mca; memcpy(buffer+2, &mca, sizeof(mca)); /* deliver the message to the local client */ if(dataid>0) printf("deliver message to local client: mode=1, dataid=%d, len=%d, port=%d\n", dataid, message_header.len, port); else printf("deliver message to local client: mode=0, len=%d, port=%d\n", message_header.len, port); sockaddr_in client_addr; memset(&client_addr, 0, sizeof(client_addr)); client_addr.sin_family = AF_INET; client_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); client_addr.sin_port= htons(port); sendto(delivery_socket, buffer, message_header.len+6, 0, (struct sockaddr *) &client_addr, sizeof(client_addr)); return 0; } /***************************************************************************************/ /* Function Name : service_data */ /* Description : got a SRMP_DATA request from local client, try send it to all the */ /* other clients of the same group locally and remotely */ /* Parameters : char * req -- the message body, proceeded with a message_header */ /* struct request_t client_header -- the message header */ /***************************************************************************************/ int service_data(struct request_t client_header, char *req) { int mode; int k; /* variables for measuring application sending rate */ /* int drop_rate; double traffic_rate; double packet_rate; static unsigned short start_time=get_timestamp(); unsigned short duration; static int number_of_packet_received=0; static int number_of_bytes_received=0; static int number_of_dropped=0; */ /*********** to measure traffic rate ************/ /* duration=get_timestamp() - start_time; if(duration > 5000) { start_time=get_timestamp(); drop_rate=(int)((double)number_of_dropped/number_of_packet_received*100); traffic_rate=(double)number_of_bytes_received/5; packet_rate=(double) number_of_packet_received/5; number_of_bytes_received=0; number_of_packet_received=0; number_of_dropped=0; printf("\nreceiving rate: %.2f bytes/second, %.2f packets/second, drop rate: %d\n", traffic_rate, packet_rate, drop_rate); } number_of_packet_received=number_of_packet_received+1; number_of_bytes_received=number_of_bytes_received+client_header.len; */ /* search for the in registered_dataid[], to check in what mode the */ /* message should be treated */ for( k=0; kmca == client_header.mca) &&(registered_dataid[k]->dataid== client_header.dataid)) break; } if(kmode; else /* the mode has never been registered before */ mode=0; /* the message will be treat as best effort */ switch (mode) { case 0: /* the has never been registered, or */ /* it has been registered as mode 0 */ if(client_header.len>MAX_M1_SEG_SIZE) { printf("mode 0 message over length\a\n"); return -1; } return(send_m0(client_header, req)); /* send mode 0 message */ case 1: /* the has been registered as mode 1 */ if(client_header.len>MAX_M1_MESSAGE_LEN) { printf("mode 1 message over length\a\n"); return -1; } return(send_m1(client_header, req)); /* send mode 1 message */ case 2: /* the has been registered as mode 2 */ /* currently, the mode 2 message will be discared */ printf("Mode 2 message received from local client\a\n"); return 0; } /* mode>2, which is invalid */ return -1; } /***************************************************************************************/ /* Function Name : send_m0 */ /* Description : processing a mode 0 request from local client, insert it to the */ /* outgoing_segment_head */ /* insert it to incoming_seg_pool as well, to all local */ /* subscribed clients */ /* Parameters : char * req -- the message body, proceeded with a message_header */ /* struct request_t client_header -- the message header */ /***************************************************************************************/ int send_m0(struct request_t client_header, char *req) { struct m0_hdr_t m0_header; char *msg; /* the message buffer, store the out going message within SRTP */ int len; /* len=M0_HEADER_LEN+message_length */ struct message_header_t message_header; /* compose the mode 0 message_header*/ m0_header.len=htons(client_header.len); m0_header.mode=0; m0_header.type=SRTP_DATA; m0_header.version=client_header.version; /* compose the mode 0 message, and insert it to outgoing_segment_head*/ len=M0_HEADER_LEN+client_header.len; msg = (char*)malloc(len); if (!msg) { printf("error allocating memory in send_m0()\a\n"); exit(-1); } memcpy(msg, &m0_header, M0_HEADER_LEN); memcpy(msg + M0_HEADER_LEN, req + REQ_HEADER_LEN, client_header.len); /* insert the message into outgoing_segment_head */ /* bundler_thread will then go on processing the message*/ insert_outgoing_pool(msg, len, client_header.mca, 0 ); /* send it to all other local clients that subscribed the mca */ message_header.len=client_header.len; message_header.port=client_header.port; message_header.mca=client_header.mca; message_header.mode=m0_header.mode; message_header.version=m0_header.version; message_header.type=m0_header.type; message_header.dataid=0; memcpy(local_buffer, &message_header, MESSAGE_HEADER_LEN); memcpy(local_buffer+MESSAGE_HEADER_LEN, req+REQ_HEADER_LEN, client_header.len); /* insert the message into incoming_seg_pool */ /* deliver_thread will then go on processing the message */ insert_incoming_pool(local_buffer, MESSAGE_HEADER_LEN+client_header.len); return(0); } /***************************************************************************************/ /* Function Name : send_m1 */ /* Description : Processing a mode 1 request from local client, */ /* First, get its sn, and buffer the sn for NACK operation */ /* If it is a long message, segment it into multiple segments */ /* Insert all the segments into outgoing_segment_pool, bundler_thread */ /* will then go on processing the segments to transmit it out */ /* Insert the message to incoming_seg_pool, so that deliver_thread() */ /* will then deliver the message to all other local clients */ /* Buffer the mode 1 message in buffered_outgoing_m1_message, for */ /* NACK operation */ /* Deliver_thread() will also buffer the message, this is for a new */ /* client can get the mode 1 message at once */ /* Parameters : char * req -- the message body, proceeded with a message_header */ /* struct request_t client_header -- the message header */ /***************************************************************************************/ int send_m1(struct request_t client_header, char *req) { char *msg; int sn; /* For a single segment message, msglen == message_size. * else, it is the length of a segment */ int msglen; int last_part_size; struct m1_hdr_t m1_header; /* total number of segments if a long mode 1 message has to be segmented */ int total_segments; int segment_no; /* segment number of each segment */ #ifndef UNIX unsigned char low_byte; unsigned char high_byte; #endif struct message_header_t message_header; /* get the sn of the specified */ sn = update_sn_outgoing(client_header.dataid,client_header.mca); if (sn < 0) { printf("incorrect sn\a\n"); exit(-1); } /* compose the mode 1 message header */ m1_header.dataid=htons(client_header.dataid); m1_header.len=htons(client_header.len); m1_header.mode=1; m1_header.sn=sn; m1_header.type=SRTP_DATA; m1_header.version=client_header.version; /* segment the mode 1 message if it is a long one */ /* calculating how many segments the message should be segmented into */ total_segments= client_header.len/ MAX_M1_SEG_SIZE ; if(client_header.len - total_segments * MAX_M1_SEG_SIZE>0) total_segments = total_segments+1; /* calculating the last segment's size */ last_part_size = client_header.len % MAX_M1_SEG_SIZE; if(last_part_size==0) last_part_size=MAX_M1_SEG_SIZE; /* try insert each segment to the outgoing_segment_head */ for(segment_no = 1; segment_no <= total_segments; segment_no=segment_no+1) { /* determine the current segment's size */ if( segment_no < total_segments ) msglen = MAX_M1_SEG_SIZE; else msglen = last_part_size; m1_header.nosegs=segment_no; /* allocating memory for current segment */ msg = (char*)malloc(M1_HEADER_LEN + msglen); if(!msg) { printf("error allocating memory in send_m02()\a\n"); exit(-1); } /* compose a segment */ memcpy(msg, &m1_header, M1_HEADER_LEN); #ifndef UNIX low_byte=((*(msg+7))>>1) & 0x7f; low_byte=low_byte |(((*(msg+6))<<7) & 0x80); high_byte=((*(msg+6))>>1) & 0x7f; high_byte=high_byte |(((*(msg+7))<<7) & 0x80); *(msg+7)=low_byte; *(msg+6)=high_byte; #endif memcpy(msg + M1_HEADER_LEN, req + REQ_HEADER_LEN+(segment_no-1)*MAX_M1_SEG_SIZE, msglen); /* insert the segment into the outgoing message pool */ /* bundler_thread will then go on processing the segment or message */ insert_outgoing_pool(msg,M1_HEADER_LEN+msglen, client_header.mca, client_header.dataid ); } /* end for each segment */ /* try insert the message into incoming_seg_pool */ /* compose the message */ message_header.dataid=client_header.dataid; message_header.len=client_header.len; message_header.mca=client_header.mca; message_header.mode=1; message_header.port=client_header.port; message_header.sn=sn; message_header.type=SRTP_DATA; message_header.version=client_header.version; memcpy(local_buffer, &message_header, MESSAGE_HEADER_LEN); memcpy(local_buffer+MESSAGE_HEADER_LEN, req+REQ_HEADER_LEN, client_header.len); /* insert the message to incoming_seg_pool */ /* deliver_thread will then go on processing the message */ insert_incoming_pool(local_buffer, MESSAGE_HEADER_LEN+client_header.len); /* buffer the mode 1 message, for NACK, or new clients */ if (buffer_outgoing_message_m1(local_buffer, MESSAGE_HEADER_LEN+client_header.len, client_header.dataid, client_header.mca) < 0) { printf("error buffer outoing mode 1 message\a\n"); if (msg) { free(msg); msg=NULL; } return(-1); } /* Schedule a Heart Beat Message */ /* time(&heartbeat_send_time); heartbeat_send_time += HEARTBEAT_TIMEOUT; if( heartbeat_msgqueue_head == NULL ) sendsignal = 1; else sendsignal = 0; schedule_heartbeat_msg(ttl,dest,heartbeat_send_time); if( heartbeat_msgqueue_head == NULL ) {} */ return(0); } /***************************************************************************************/ /* Function Name : update_sn_outgoing */ /* Description : increase the sn for */ /* if it is a new one, create a random sn number */ /* if it is a existing one, buffer the current sn for NACK operation */ /* Parameters : unsigned int dataid -- dataid */ /* unsigned long mca -- multicast address */ /***************************************************************************************/ int update_sn_outgoing(unsigned int dataid,unsigned long mca) { recent_m1_buffer_t *current; recent_m1_buffer_t *last_entry; /* point to the end of the link list */ recent_m1_buffer_t *new_entry; #ifdef UNIX pthread_mutex_lock(&mutex_recent_outgoing_m1_sn); #else WaitForSingleObject(mutex_recent_outgoing_m1_sn, INFINITE); #endif /* seach all the outoing , for the sn */ for (current=recent_outgoing_m1_sn; current;current=current->next) { if(current->mca == mca && current->dataid == dataid) { current->sn=current->sn+1; if(current->sn>MAX_SN) current->sn=1; #ifdef UNIX pthread_mutex_unlock(&mutex_recent_outgoing_m1_sn); #else ReleaseMutex(mutex_recent_outgoing_m1_sn); #endif return current->sn; } last_entry=current; } /* the has never been send out before */ /* create a new record for the new , set sn=1*/ if (recent_outgoing_m1_sn == NULL) /* the outgoing m1 buffer is empty */ { recent_outgoing_m1_sn = (recent_m1_buffer_t *)malloc(sizeof(recent_m1_buffer_t)); if (!recent_outgoing_m1_sn) { printf("erro allocating memory in update_sn_outgoing\a\n"); exit(-1); } recent_outgoing_m1_sn->dataid = dataid; recent_outgoing_m1_sn->mca = mca; recent_outgoing_m1_sn->sn = 1; recent_outgoing_m1_sn->next = NULL; } else /* append the new entry at the tail of the link list */ { new_entry = (recent_m1_buffer_t *)malloc(sizeof(recent_m1_buffer_t)); if (!new_entry) { printf("error allocating memory in update_sn_outgoing\a\n"); exit(-1); } new_entry->mca = mca; new_entry->dataid = dataid; new_entry->sn = 1; new_entry->next = NULL; last_entry->next = new_entry; } #ifdef UNIX pthread_mutex_unlock(&mutex_recent_outgoing_m1_sn); #else ReleaseMutex(mutex_recent_outgoing_m1_sn); #endif return 1; } /***************************************************************************************/ /* Function Name : insert_outgoing_pool */ /* Description : insert a segment to the tail of link list outgoing_segment_head */ /* bundler_thread() will then try to add all the entries in the */ /* link list to a bundle */ /* producer: retrieve_thread(), consumer: bundler_thread() */ /* Parameters : char * msg -- m0/1 header + message body */ /* int msg_len -- total length (including the header) */ /* unsigned long int mca -- the multicast address */ /* unsigned int dataid -- dataid */ /***************************************************************************************/ void insert_outgoing_pool(char *msg, int msg_len, unsigned long int mca, unsigned int dataid) { outgoing_segment_t *temp; /* create a new record, for the outgoing message */ temp = (outgoing_segment_t *) malloc( sizeof(outgoing_segment_t) ); if( temp == NULL ) { printf("error allocating memory in insert_outgoing_pool\a\n"); exit(-1); } /* compose the new outgoing message */ temp->msg=msg; temp->mca=mca; temp->dataid=dataid; temp->length = msg_len; temp->next = NULL; #ifdef UNIX pthread_mutex_lock(&mutex_outgoing_segment_head); #else WaitForSingleObject(mutex_outgoing_segment_head,INFINITE); #endif /* no current outgoing message, the link list is empty */ if( outgoing_segment_head == NULL ) { /* put the the new message at the link list head */ outgoing_segment_head = temp; outgoing_segment_tail = temp; #ifdef UNIX pthread_cond_signal(&event_outgoing_segment_head); #else SetEvent(event_outgoing_segment_head); /* inform bundler_thread */ #endif } else /* outgoing_segment_head is not empty, insert the new record at the tail */ { if( outgoing_segment_tail != NULL ) { outgoing_segment_tail->next = temp; outgoing_segment_tail = temp; } else { printf("synchronization error\a\n"); #ifdef UNIX pthread_mutex_unlock(&mutex_outgoing_segment_head); #else ReleaseMutex(mutex_outgoing_segment_head); #endif exit(-1); } } if( outgoing_segment_head == NULL ) { printf("synchronization error\a\n"); } #ifdef UNIX pthread_mutex_unlock(&mutex_outgoing_segment_head); #else ReleaseMutex(mutex_outgoing_segment_head); #endif return; } /***************************************************************************************/ /* Function Name : insert_incoming_pool */ /* Description : insert a message to the incoming_seg_pool */ /* deliver_thread() will then try to deliver the message to all */ /* the client the subscribed the mca */ /* producer: retrieve_thread(), consumer: bundler_thread() */ /* Parameters : char * indiv_msg -- message_header + message body */ /* int len -- total length (including the header) */ /***************************************************************************************/ void insert_incoming_pool(char *indiv_msg, int len) { #ifdef UNIX pthread_mutex_lock(&mutex_incoming_seg_pool); #else WaitForSingleObject(mutex_incoming_seg_pool, INFINITE); #endif if( incoming_pool_write_pointer == (( incoming_pool_read_pointer + 1 ) % MAX_INCOM_MSG_NUM) && reader1Cleared == 0 ) { printf("incoming_seg_pool overflow\a\n"); #ifdef UNIX pthread_mutex_unlock(&mutex_incoming_seg_pool); #else ReleaseMutex(mutex_incoming_seg_pool); #endif exit(-1); } /* insert the new message into the incoming_seg_pool */ memcpy(incoming_seg_pool[(incoming_pool_write_pointer )%MAX_INCOM_MSG_NUM], indiv_msg,len); incoming_seg_pool_size[( incoming_pool_write_pointer )%MAX_INCOM_MSG_NUM] = len; /* update the write_pointer and read_pointer */ incoming_pool_write_pointer = (incoming_pool_write_pointer+1)%MAX_INCOM_MSG_NUM; if( incoming_pool_write_pointer == MAX_INCOM_MSG_NUM ) reader1Cleared = 0; /* if the incoming message pool change from empty to non-empty, *inform the deliver_thread() */ if(incoming_pool_write_pointer == ( incoming_pool_read_pointer + 1 ) % MAX_INCOM_MSG_NUM) { #ifdef UNIX pthread_cond_signal(&event_incoming_seg_pool); #else SetEvent(event_incoming_seg_pool); #endif } #ifdef UNIX pthread_mutex_unlock(&mutex_incoming_seg_pool); #else ReleaseMutex(mutex_incoming_seg_pool); #endif return; } /***************************************************************************************/ /* Function Name : buffer_outgoing_message_m1 */ /* Description : buffer a outgoing m1 message into */ /* buffered_outgoing_m1_message_head */ /* Parameters : char * indiv_msg -- message_header + message body */ /* int len -- total length (including the header) */ /***************************************************************************************/ int buffer_outgoing_message_m1(char *mesg, int len,unsigned int dataid, unsigned long dest_mca) { message_buffer_t *current; /* search in link list buffered_outgoing_m1_message_head for */ #ifdef UNIX pthread_mutex_lock(&mutex_buffered_outgoing_m1_message); #else WaitForSingleObject(mutex_buffered_outgoing_m1_message, INFINITE); #endif for (current = buffered_outgoing_m1_message_head; current; current = current->next) { /* found the */ if ( current->dest_mca == dest_mca && current->dataid == dataid) { /* relase the old version of the m1 message */ if (current->message) free(current->message); /* update the entry with the new m1 message */ current->message = (char *) malloc(len); if (current->message != NULL) { memcpy(current->message, mesg, len); current->length = len; #ifdef UNIX pthread_mutex_unlock(&mutex_buffered_outgoing_m1_message); #else ReleaseMutex(mutex_buffered_outgoing_m1_message); #endif return 0; } else { #ifdef UNIX pthread_mutex_unlock(&mutex_buffered_outgoing_m1_message); #else ReleaseMutex(mutex_buffered_outgoing_m1_message); #endif return -1; } } } /* end of for loop */ /* the specified was not found */ /* create a new entry for the new m1 message */ current = (message_buffer_t *)malloc(sizeof(message_buffer_t)); if (!current) { printf("error allocating memory in buffer_outgoing_message_m1\a\n"); #ifdef UNIX pthread_mutex_unlock(&mutex_buffered_outgoing_m1_message); #else ReleaseMutex(mutex_buffered_outgoing_m1_message); #endif exit(-1); } current->dest_mca = dest_mca; current->dataid = dataid; current->next = NULL; current->message = (char *) malloc(len); if(current->message == NULL) { printf("erro allocating memory in buffer_outgoing_message_m1\a\n"); #ifdef UNIX pthread_mutex_unlock(&mutex_buffered_outgoing_m1_message); #else ReleaseMutex(mutex_buffered_outgoing_m1_message); #endif exit(-1); } memcpy(current->message, mesg, len); current->length = len; /* insert the new entry at the tail of the link list */ if (buffered_outgoing_m1_message_tail) { buffered_outgoing_m1_message_tail->next = current; buffered_outgoing_m1_message_tail = current; } else { buffered_outgoing_m1_message_tail = current; buffered_outgoing_m1_message_head = current; } #ifdef UNIX pthread_mutex_unlock(&mutex_buffered_outgoing_m1_message); #else ReleaseMutex(mutex_buffered_outgoing_m1_message); #endif return 0; } /***************************************************************************************/ /* Function Name : bundler_thread */ /* Description: This thread will read each entry in outgoing_segment_head, */ /* then try to bundle multiple messages into one bundle */ /* then but the bundle to outgoing_bundle_pool[] */ /* IF of the new segment is not found in outgoing_bundle_pool[] */ /* THEN */ /* create a new bundle */ /* IF the new segmet's len >=MAX_BUNDLE_SIZE */ /* schedule to transmit the new created bundle */ /* ELSE */ /* set a timer for bundle timeout */ /* ELSE */ /* IF the new segment's len + old bundle len >= MAX_BUNDLE_SIZE */ /* transmit the old bundle */ /* create a new bundle for the new segment */ /* IF the new segment's len >= MAX_BUNDLE_SIZE */ /* schedule to transmit the new created bundle */ /* ELSE */ /* set a timer for bundle timeout */ /* ELSE */ /* append the new segmet to the end of the old bundle */ /* */ /***************************************************************************************/ #ifdef UNIX void * bundler_thread(void *mp) #else void bundler_thread(void *mp) #endif { int i; outgoing_segment_t *current; unsigned long int mca; int bundle_pos; int cancel_status; /* thread while loop */ while(1) { /* check outgoing_segment_head */ #ifdef UNIX pthread_mutex_lock(&mutex_outgoing_segment_head); while( outgoing_segment_head == NULL ) { pthread_cond_wait(&event_outgoing_segment_head, &mutex_outgoing_segment_head); } #else WaitForSingleObject(mutex_outgoing_segment_head,INFINITE); if( outgoing_segment_head == NULL ) { /* the outgoing_segment_head is empty, waiting for non-empty event */ ReleaseMutex(mutex_outgoing_segment_head); WaitForSingleObject(event_outgoing_segment_head,INFINITE); ResetEvent(event_outgoing_segment_head); /* new segment is inserted */ continue; } #endif current = outgoing_segment_head; outgoing_segment_head = outgoing_segment_head->next; #ifdef UNIX pthread_mutex_unlock(&mutex_outgoing_segment_head); #else ReleaseMutex(mutex_outgoing_segment_head); #endif /******************************************************************************/ /* the outgoing_segment_head is not empty, process the first outgoing segment */ /******************************************************************************/ /* seach in outgoing_bundle_pool, to see if there is existing unfinished bundle */ #ifdef UNIX pthread_mutex_lock(&mutex_all_bundles); #else WaitForSingleObject(mutex_all_bundles, INFINITE); #endif mca=current->mca; for(bundle_pos=0; bundle_pos=MAX_BUFFERED_OUTGOING_BUNDLE ) { /*************************************************/ /* no same mca bundle exist, create a new bundle */ /*************************************************/ /* create the bundle by creating the bundle header and add * the message to the bundle */ bundle_pos = set_bundle_pos(mca); if(bundle_pos<0) { printf("no empty slot in outgoing_bundle_pool\a\n"); #ifdef UNIX pthread_mutex_unlock(&mutex_all_bundles); #else ReleaseMutex(mutex_all_bundles); #endif continue; } memcpy(outgoing_bundle_pool[bundle_pos].body, current->msg,current->length); outgoing_bundle_pool[bundle_pos].len =current->length; if(current->dataid >0) outgoing_bundle_pool[bundle_pos].dataid[0]=current->dataid; #ifdef UNIX pthread_mutex_unlock(&mutex_all_bundles); #else ReleaseMutex(mutex_all_bundles); #endif /* check the new segment's length */ if(current->length < (M1_HEADER_LEN + MAX_M1_SEG_SIZE)) schedule_signal(mca,bundle_pos); /* set a timer for bundle timeout */ else { /* schedule to transmit the new bundle */ outgoing_bundle_pool[bundle_pos].flag= 2; schedule_transmission(bundle_pos,mca,outgoing_bundle_pool[bundle_pos].len); } } else { /*************************************************/ /* unfinished bundle of the same was found */ /*************************************************/ if((current->length+outgoing_bundle_pool[bundle_pos].len) < (M1_HEADER_LEN + MAX_M1_SEG_SIZE )) { /* append the new segment at the tail of the existing bundle */ memcpy( outgoing_bundle_pool[bundle_pos].body+outgoing_bundle_pool[bundle_pos].len, current->msg,current->length); outgoing_bundle_pool[bundle_pos].len= outgoing_bundle_pool[bundle_pos].len+current->length; for(i=0; idataid) break; else if (outgoing_bundle_pool[bundle_pos].dataid[i] <0) { outgoing_bundle_pool[bundle_pos].dataid[i]=current->dataid; break; } } #ifdef UNIX pthread_mutex_unlock(&mutex_all_bundles); #else ReleaseMutex(mutex_all_bundles); #endif } else { /* schedule transmit existing bundle */ outgoing_bundle_pool[bundle_pos].flag= 2; #ifdef UNIX pthread_mutex_unlock(&mutex_all_bundles); #else ReleaseMutex(mutex_all_bundles); #endif cancel_status = cancel_signal(mca); /* cancel old bundle timeout timer */ if( cancel_status == 1 ) schedule_transmission(bundle_pos, mca,outgoing_bundle_pool[bundle_pos].len); /* create the bundle by creating the bundle header * and add the message to the bundle */ #ifdef UNIX pthread_mutex_lock(&mutex_all_bundles); #else WaitForSingleObject(mutex_all_bundles, INFINITE); #endif bundle_pos = set_bundle_pos(mca); if(bundle_pos<0) { printf("not empty slot in outgoing_bundle_pool\a\n"); #ifdef UNIX pthread_mutex_unlock(&mutex_all_bundles); #else ReleaseMutex(mutex_all_bundles); #endif continue; } memcpy(outgoing_bundle_pool[bundle_pos].body, current->msg,current->length); outgoing_bundle_pool[bundle_pos].dataid[0]=current->dataid; outgoing_bundle_pool[bundle_pos].len =current->length; #ifdef UNIX pthread_mutex_unlock(&mutex_all_bundles); #else ReleaseMutex(mutex_all_bundles); #endif /* check the new segment's length */ if(current->length < (M1_HEADER_LEN + MAX_M1_SEG_SIZE)) schedule_signal(mca,bundle_pos); /* set a timer for bundle timeout */ else { /* schedule to transmit the new bundle */ outgoing_bundle_pool[bundle_pos].flag= 2; schedule_transmission( bundle_pos,mca,outgoing_bundle_pool[bundle_pos].len); } } } free(current->msg); free(current); } } /***************************************************************************************/ /* Function Name : set_bundle_pos */ /* Description : find an empty slot in the outgoing_bundle_pool[] */ /* Parameters : unsigned long int address -- the multicast address */ /***************************************************************************************/ int set_bundle_pos(unsigned long int address) { int i; for(i=0; imca=address; sig_pointer->index = pos; #ifdef UNIX ftime(&timebuffer); #else _ftime(&timebuffer); #endif sig_pointer->bundle_timeout=(timebuffer.time*1000)+timebuffer.millitm +BUNDLE_TIMEOUT; sig_pointer->sendflag = 1; sig_pointer->next = NULL; /* insert the new record to the tail of the link list */ #ifdef UNIX pthread_mutex_lock(&mutex_bundle_timeout_timer_queue); #else WaitForSingleObject(mutex_bundle_timeout_timer_queue, INFINITE); #endif if( bundle_timeout_timer_tail == NULL ) { bundle_timeout_timer_head = sig_pointer; bundle_timeout_timer_tail = sig_pointer; #ifdef UNIX pthread_cond_signal(&event_bundle_timeout_timer_queue); #else SetEvent(event_bundle_timeout_timer_queue); #endif } else { bundle_timeout_timer_tail->next = sig_pointer; bundle_timeout_timer_tail=sig_pointer; } #ifdef UNIX pthread_mutex_unlock(&mutex_bundle_timeout_timer_queue); #else ReleaseMutex(mutex_bundle_timeout_timer_queue); #endif return; } /***************************************************************************************/ /* Function Name : cancel_signal */ /* Description : remove an entry into bundle_timeout_timer_head, because a bundle */ /* will be transmitted before the bundle time out */ /* Parameters : unsigned long int address -- the multicast address */ /***************************************************************************************/ int cancel_signal(unsigned long int address) { bundle_timeout_timer_t *sig_pointer = NULL; bundle_timeout_timer_t *prev = NULL; #ifdef UNIX pthread_mutex_lock(&mutex_bundle_timeout_timer_queue); #else WaitForSingleObject(mutex_bundle_timeout_timer_queue, INFINITE); #endif sig_pointer = bundle_timeout_timer_head; while(sig_pointer != NULL ) { /* search the oldest timer that belong's to the specified multicast group */ if( sig_pointer->mca==address ) { /* remove the entry from the link list */ if( prev != NULL ) { prev->next = sig_pointer->next; if(prev->next==NULL) bundle_timeout_timer_tail = NULL; } else { bundle_timeout_timer_head = sig_pointer->next; if( bundle_timeout_timer_head == NULL ) { bundle_timeout_timer_tail = NULL; } } free(sig_pointer); #ifdef UNIX pthread_mutex_unlock(&mutex_bundle_timeout_timer_queue); #else ReleaseMutex(mutex_bundle_timeout_timer_queue); #endif return(1); } prev = sig_pointer; sig_pointer = sig_pointer->next; } #ifdef UNIX pthread_mutex_unlock(&mutex_bundle_timeout_timer_queue); #else ReleaseMutex(mutex_bundle_timeout_timer_queue); #endif return(0); } /***************************************************************************************/ /* Function Name : schedule_transmission */ /* Description : create a new entry at the tail of link list transmit_index_head */ /* the new entry point to a entry in outgoing_bundle_pool[] */ /* each entry in transmit_index_head will be read by */ /* transmitter_thread, and send out through the socket */ /* Parameters : int index -- index to outgoing_bundle_pool[] */ /* unsigned long int address -- the multicast address */ /* int len -- bundle length ( do not include bundle_header and mca */ /***************************************************************************************/ void schedule_transmission(int index, unsigned long int address, int len) { transmit_index_t *temp; /* create a new entry */ temp=(transmit_index_t * ) malloc(sizeof(transmit_index_t)); if(temp==NULL) { printf("error allocating memory in schedule_transmission()\a\n"); exit(-1); } temp->SEL_INDEX = index; temp->SEL_MULTICAST_ADDRESS=address; temp->SEL_LENGTH = len; temp->next = NULL; /* insert the new entry to the tail of the link list */ #ifdef UNIX pthread_mutex_lock(&mutex_transmit_index_head); #else WaitForSingleObject(mutex_transmit_index_head,INFINITE); #endif if( transmit_index_head == NULL ) { transmit_index_head = temp; transmit_index_tail = temp; #ifdef UNIX pthread_cond_signal(&event_transmit_index_head); #else SetEvent(event_transmit_index_head); #endif } else { transmit_index_tail->next = temp; transmit_index_tail = temp; } #ifdef UNIX pthread_mutex_unlock(&mutex_transmit_index_head); #else ReleaseMutex(mutex_transmit_index_head); #endif return; } /***************************************************************************************/ /* Function Name : transmitter_thread */ /* Description : retrieve a outgoing bundle referenced by transmit_index_head */ /* then append the bundle header */ /* append as much DSNs as possible */ /* append the outgoing bundle after all the DSNs */ /* transmit the bundle throught the socket */ /***************************************************************************************/ #ifdef UNIX void * transmitter_thread(void *mp) #else void transmitter_thread(void *mp) #endif { char transmit_buffer[1500]; /* the buffer for sending out bundle through socket*/ struct sockaddr_in sa; unsigned long mca; int index; int len ; /* bundle payload ength */ transmit_index_t * temp; struct bundle_hdr_t bundle_header; int i; int dsn_count; /* number of DSNs in the bundle */ int offset; int first_dataid; unsigned int total_len; /* bundle_header + DSNs + destination mca + payload */ unsigned int dataid; int sn; struct dsn_t dsn; #ifndef UNIX unsigned char low_byte; unsigned char high_byte; #endif char loop; loop=0; memset(&sa, 0, sizeof(sa)); sa.sin_family = AF_INET; sa.sin_port = htons(8000); /* set the socket option to disable loopback */ if (setsockopt(srtp_socket, IPPROTO_IP, IP_MULTICAST_LOOP, (const char*)&loop, sizeof(loop)) < 0) { printf("Error: multicast loop back failed\a\n"); exit(-1); } while(1) { /* wait for the transmit_index_head became non-empty */ #ifdef UNIX pthread_mutex_lock(&mutex_transmit_index_head); while( transmit_index_head == NULL ) { pthread_cond_wait(&event_transmit_index_head, &mutex_transmit_index_head); } #else WaitForSingleObject(mutex_transmit_index_head,INFINITE); if( transmit_index_head == NULL ) { ReleaseMutex(mutex_transmit_index_head); /*WaitForSingleObject(event_transmit_index_head,1000);*/ WaitForSingleObject(event_transmit_index_head,INFINITE); ResetEvent(event_transmit_index_head); continue; } #endif len = transmit_index_head->SEL_LENGTH; index = transmit_index_head->SEL_INDEX; mca = transmit_index_head->SEL_MULTICAST_ADDRESS; if( transmit_index_head != NULL ) { temp=transmit_index_head; transmit_index_head = transmit_index_head->next; free(temp); if( transmit_index_head == NULL ) transmit_index_tail = NULL; } #ifdef UNIX pthread_mutex_unlock(&mutex_transmit_index_head); #else ReleaseMutex(mutex_transmit_index_head); #endif /* bundle header+payload */ total_len=len+ BUNDLE_HEADER_SIZE; /**************************/ /* add DSNs to the bundle */ /**************************/ dsn_count=0; offset=BUNDLE_HEADER_SIZE; first_dataid=-1; /* avoid searching wrapped in the recent_outgoing_m1_sn */ #ifdef UNIX pthread_mutex_lock(&mutex_all_bundles); #else WaitForSingleObject(mutex_all_bundles, INFINITE); #endif while(total_len < (MAX_BUNDLE_SIZE - sizeof(struct dsn_t)) && dsn_count */ #ifdef UNIX pthread_mutex_lock(&mutex_recent_outgoing_m1_sn); #else WaitForSingleObject(mutex_recent_outgoing_m1_sn, INFINITE); #endif sn=oldest_unrefreshed_outgoing_m1_sn(mca, &dataid); #ifdef UNIX pthread_mutex_unlock(&mutex_recent_outgoing_m1_sn); #else ReleaseMutex(mutex_recent_outgoing_m1_sn); #endif if(sn<0) /* no such mca found in recent_outgoing_m1_sn */ break; if((int)dataid==first_dataid) /* searching wrapped */ break; /* check if the m1 dataid already exist in the current bundle, if yes, then no need to put that dataid in DSN */ for(i=0; i=MAX_M1_PER_BUNDLE) /* no such dataid exist in the bundle payload*/ { /* put the m1 dataid into a DSN */ dsn.sn=sn; dsn.dataid=htons(dataid); memcpy(transmit_buffer+offset, &dsn, sizeof(struct dsn_t)); #ifndef UNIX low_byte=((*(transmit_buffer+offset+3))>>1) & 0x7f; low_byte=low_byte |(((*(transmit_buffer+offset+2))<<7) & 0x80); high_byte=((*(transmit_buffer+offset+2))>>1) & 0x7f; high_byte=high_byte |(((*(transmit_buffer+offset+3))<<7) & 0x80); *(transmit_buffer+offset+3)=low_byte; *(transmit_buffer+offset+2)=high_byte; #endif offset=offset+sizeof(struct dsn_t); dsn_count=dsn_count+1; total_len=total_len+sizeof(struct dsn_t); } if(first_dataid <0) /* record the first DSN, to avoid searching wrap */ first_dataid=dataid; } /* finished adding DSNs */ /*******************************/ /* fill in the bundle header */ /*******************************/ bundle_header.version=2; bundle_header.type=0; /*bundle_header.sender_timestamp=get_timestamp();*/ bundle_header.dsn_count=dsn_count; memcpy(transmit_buffer, &bundle_header, BUNDLE_HEADER_SIZE); /******************************/ /* fill in the payload */ /******************************/ memcpy(transmit_buffer+offset,outgoing_bundle_pool[index].body,len); /*******************************/ /* reset data structure */ /*******************************/ outgoing_bundle_pool[index].flag = 0; outgoing_bundle_pool[index].len = 0; for(i=0; i 0 ) { sa.sin_addr.s_addr = mca; if (sendto(srtp_socket, transmit_buffer, total_len, 0, (struct sockaddr*) &sa, sizeof(sa)) < 0) printf("error sending out through socket\a\n"); } } /* end of while loop */ } /***************************************************************************************/ /* Function Name : oldest_unrefreshed_outgoing_m1_sn */ /* Description : search in recent_outgoing_m1_sn in a round ribin manner */ /* and find the oldest m1 dataid that has not been put into DSN */ /* Parameters : unsigned long mca -- multicast group address */ /* unsigned int * m1_dataid -- after execution, equals to the oldest */ /* dataid that has not been refreshed */ /* Return value : return the sn value of the corrsponding m1_dataid */ /***************************************************************************************/ int oldest_unrefreshed_outgoing_m1_sn( unsigned long mca, unsigned int *m1_dataid) { recent_m1_buffer_t *current; int exist; int mode1_sn; exist=1; /* searh in the link list, and find the oldest, unfreshed mode 1 dataid */ for (current=recent_outgoing_m1_sn;current;current=current->next) { if (current->mca == mca ) { exist=1; /* mca exist */ if(current->refresh==0) { current->refresh=1; *m1_dataid = current->dataid; mode1_sn = current->sn; return(mode1_sn); } } } /* there exist the mca, but all dataid of that mca has been refreshed recently*/ if(exist==1 ) { /* reset all the refresh flag */ for (current=recent_outgoing_m1_sn;current;current=current->next) if (current->mca == mca ) current->refresh=0; /* find the first one match */ for (current=recent_outgoing_m1_sn;current;current=current->next) { current->refresh=1; *m1_dataid = current->dataid; mode1_sn = current->sn; return(mode1_sn); } } return(-1); } /***************************************************************************************/ /* Function Name : signaller_thread */ /* Description : retrieve the first entry in mutex_bundle_timeout_timer_queue */ /* sleep for a while, */ /* when bundle timed out, check to see if the timer has been cancled */ /* if not, schedule transmission */ /***************************************************************************************/ #ifdef UNIX void * signaller_thread(void *mp) #else void signaller_thread(void *mp) #endif { #ifdef UNIX struct timeb curr_time; #else _timeb curr_time; #endif int bundle_len; int time_expire; struct bundle_timeout_timer_t * sig_pointer; while(1) { /* wait till the bundle_timeout_timer_queue became non-empty */ #ifdef UNIX pthread_mutex_lock(&mutex_bundle_timeout_timer_queue); while( bundle_timeout_timer_head == NULL ) { pthread_cond_wait(&event_bundle_timeout_timer_queue, &mutex_bundle_timeout_timer_queue); } #else WaitForSingleObject(mutex_bundle_timeout_timer_queue,INFINITE); if( bundle_timeout_timer_head == NULL ) { ReleaseMutex(mutex_bundle_timeout_timer_queue); WaitForSingleObject(event_bundle_timeout_timer_queue,INFINITE); ResetEvent(event_bundle_timeout_timer_queue); continue; } #endif /*********************************************/ /* read the node at the head of signal queue */ /*********************************************/ time_expire=bundle_timeout_timer_head->bundle_timeout; #ifdef UNIX pthread_mutex_unlock(&mutex_bundle_timeout_timer_queue); #else ReleaseMutex(mutex_bundle_timeout_timer_queue); #endif /******************************/ /* wait for bundle time out */ /******************************/ #ifdef UNIX ftime(&curr_time); #else _ftime(&curr_time); #endif while( (curr_time.time * 1000 + curr_time.millitm) < time_expire ) { #ifdef UNIX usleep((time_expire -(curr_time.time * 1000 + curr_time.millitm)) * 1000); ftime(&curr_time); #else Sleep(time_expire -(curr_time.time * 1000 + curr_time.millitm)); _ftime(&curr_time); #endif } /******************************/ /* wait for bundle time out */ /******************************/ #ifdef UNIX pthread_mutex_lock(&mutex_bundle_timeout_timer_queue); #else WaitForSingleObject(mutex_bundle_timeout_timer_queue,INFINITE); #endif if( bundle_timeout_timer_head != NULL ) { sig_pointer=bundle_timeout_timer_head; #ifdef UNIX pthread_mutex_lock(&mutex_all_bundles); #else WaitForSingleObject(mutex_all_bundles, INFINITE); #endif bundle_len = outgoing_bundle_pool[bundle_timeout_timer_head->index].len; outgoing_bundle_pool[bundle_timeout_timer_head->index].flag = 2; #ifdef UNIX pthread_mutex_unlock(&mutex_all_bundles); #else ReleaseMutex(mutex_all_bundles); #endif schedule_transmission(bundle_timeout_timer_head->index, bundle_timeout_timer_head->mca,bundle_len); } /******************************/ /* remove the head node */ /******************************/ if( bundle_timeout_timer_head != NULL ) { bundle_timeout_timer_head = bundle_timeout_timer_head->next; if( bundle_timeout_timer_head == NULL ) bundle_timeout_timer_tail = NULL; free(sig_pointer); } #ifdef UNIX pthread_mutex_unlock(&mutex_bundle_timeout_timer_queue); #else ReleaseMutex(mutex_bundle_timeout_timer_queue); #endif } } /***************************************************************************************/ /* Function Name : receiver_thread */ /* Description : receive a packet from the socket */ /* check to see if it is a feedback or a bundle */ /* if it is a feedback, call feedback_received */ /* else, if it is a bundle, put it into incoming_packet_buffer */ /***************************************************************************************/ #ifdef UNIX void * receiver_thread(void * mp) #else void receiver_thread(void *mp) #endif { int len; /* the length of the received bundle, include bundle header */ struct sockaddr_in sa; int sa_size; char msg[1600]; /* buffer for receiving message from socket */ struct socket_info_t socket_in; struct bundle_hdr_t bundle_header; /*struct feedback_header_t feedback_header_para; struct feedback_hdr_t feedback_header;*/ char * bundle_buffer; /* the pointer to incoming_packet_buffer[] */ long source_ip; memcpy(&socket_in, mp, sizeof(struct socket_info_t)); #ifndef UNIX WSADATA wsadata; if( WSAStartup(0x101,&wsadata) <0 ) { printf("Unable to initialize WinSock library\n"); exit(-1); } #endif sa_size=sizeof(sa); while(1) { /**************************************/ /* receive a packet from the socket */ /**************************************/ if( (len=recvfrom(socket_in.socket_id, msg, 1600, 0, (struct sockaddr*) &sa, &sa_size)) <= 0 ) { perror("error receiving from remote SRTP daemon"); continue; } source_ip=sa.sin_addr.s_addr; /**********************************************/ /* check to see if it is a bundle or feedback */ /**********************************************/ memcpy(&bundle_header,msg,BUNDLE_HEADER_SIZE); /* check the version */ if(bundle_header.version!=2) continue; switch(bundle_header.type) { case 1: /*********************/ /* feedback received */ /*********************/ /*memcpy(&feedback_header,msg,FEEDBACK_HEADER_SIZE); feedback_header_para.received_time=get_timestamp(); dessemble_feedback_header(&feedback_header_para, feedback_header); upon_receive_feedback(feedback_header_para); printf("\n --- feedback received_time %d\n", get_timestamp());*/ break; case 0: /*********************/ /* bundle received */ /*********************/ /* append the source_ip address to the head of the message */ /* now what inside the message is like this: | source_ip | dest_mca | bundle_header| Dsns | payload| */ #ifdef UNIX pthread_mutex_lock(&mutex_incoming_packet_buffer); #else WaitForSingleObject(mutex_incoming_packet_buffer, INFINITE); #endif bundle_buffer=incoming_packet_buffer[incoming_packet_write_pointer]; memcpy(bundle_buffer, &source_ip, sizeof(unsigned long int)); memcpy(bundle_buffer+sizeof(unsigned long int), &socket_in.mca, sizeof(unsigned long int)); /* copy the packet body into incoming_packet_buffer */ memcpy(bundle_buffer+2*sizeof(unsigned long int),msg,len); incoming_packet_buffer_size[incoming_packet_write_pointer] = len+2*sizeof(unsigned long int); incoming_packet_write_pointer = ( incoming_packet_write_pointer + 1 ) % MAX_INCOMING_PACKETS; /* inform unbundler thread that the incoming packet *buffer became non-empty */ if( incoming_packet_write_pointer == (incoming_packet_read_pointer + 1) % MAX_INCOMING_PACKETS ) #ifdef UNIX pthread_cond_signal(&event_incoming_packet_buffer); #else SetEvent(event_incoming_packet_buffer); #endif #ifdef UNIX pthread_mutex_unlock(&mutex_incoming_packet_buffer); #else ReleaseMutex(mutex_incoming_packet_buffer); #endif } /* end of switch */ } /* end of while loop */ } /***************************************************************************************/ /* Function Name : unbundler_thread */ /* Description : retrieve a packet from the incoming_packet_buffer */ /* parse all the DSNs and triger NACK if necessary */ /* parse each message, and put the message to */ /* incoming_message_pool */ /***************************************************************************************/ #ifdef UNIX void * unbundler_thread(void *mp) #else void unbundler_thread(void *mp) #endif { char bundled_msg[1600]; int total_len; int curr_pos; char indiv_msg[MAX_M1_SEG_SIZE+MESSAGE_HEADER_LEN+12]; int i; int result; int temp; struct bundle_hdr_t bundle_header; struct dsn_t dsn; struct message_header_t message_header; struct m0_hdr_t m0_header; struct m1_hdr_t m1_header; struct nack_hdr_t nack_header; #ifndef UNIX unsigned char low_byte; unsigned char high_byte; #endif int total_parts, last_part_size; unsigned long int source_ip; unsigned long int mca; curr_pos=0; while(1) { /* wait till the incoming_packet_buffer became non-empty */ #ifdef UNIX pthread_mutex_lock(&mutex_incoming_packet_buffer); while( incoming_packet_read_pointer == incoming_packet_write_pointer ) { pthread_cond_wait(&event_incoming_packet_buffer, &mutex_incoming_packet_buffer); } #else WaitForSingleObject(mutex_incoming_packet_buffer, INFINITE); if( incoming_packet_read_pointer == incoming_packet_write_pointer ) { ReleaseMutex(mutex_incoming_packet_buffer); WaitForSingleObject(event_incoming_packet_buffer,INFINITE); ResetEvent(event_incoming_packet_buffer); continue; } #endif /**************************************************/ /* read the packet from the incoming_packet_buffer */ /**************************************************/ total_len = incoming_packet_buffer_size[incoming_packet_read_pointer]; memcpy(bundled_msg, incoming_packet_buffer[incoming_packet_read_pointer], total_len); incoming_packet_read_pointer = ( incoming_packet_read_pointer + 1 ) % MAX_INCOMING_PACKETS; #ifdef UNIX pthread_mutex_unlock(&mutex_incoming_packet_buffer); #else ReleaseMutex(mutex_incoming_packet_buffer); #endif /* what inside the buffer is like this: */ /* |source_ip|dest_mca | bundle_header| DSNs| payload | */ memcpy(&source_ip, bundled_msg, sizeof(unsigned long int)); memcpy(&mca, bundled_msg+sizeof(unsigned long int), sizeof(unsigned long int)); memcpy(&bundle_header, bundled_msg+2*sizeof(unsigned long int), sizeof(struct bundle_hdr_t)); message_header.source_ip=source_ip; message_header.mca=mca; /*****************/ /* parse each DSN */ /*****************/ for(i=1; i<=bundle_header.dsn_count; i=i+1) { curr_pos=2*sizeof(unsigned long int) +BUNDLE_HEADER_SIZE+(i-1)*sizeof(dsn); #ifndef UNIX low_byte=((*(bundled_msg+curr_pos+3))<<1) & 0xfe; low_byte=low_byte |(((*(bundled_msg+curr_pos+2))>>7) & 0x01); high_byte=((*(bundled_msg+curr_pos+2))<<1) & 0xfe; high_byte=high_byte |(((*(bundled_msg+curr_pos+3))>>7) & 0x01); *(bundled_msg+curr_pos+3)=low_byte; *(bundled_msg+curr_pos+2)=high_byte; #endif memcpy(&dsn, bundled_msg+curr_pos, sizeof(dsn)); message_header.sn=dsn.sn; message_header.dataid=ntohs(dsn.dataid); result=most_recent_incoming_m1_sn(message_header); switch(result) { case NEW: case NONE: send_nak(dsn.dataid, mca, source_ip); /* trigger NACK */ break; } } /****************************************/ /* parse each message inside the bundle */ /****************************************/ curr_pos =2*sizeof(unsigned long int)+BUNDLE_HEADER_SIZE+4*bundle_header.dsn_count; while( curr_pos < total_len ) { memset(&m0_header, 1, sizeof(m0_header)); memcpy(&m0_header, bundled_msg+curr_pos, M0_HEADER_LEN); temp=m0_header.len; m0_header.len=ntohs(temp); switch(m0_header.mode) { case SRTP_M0: message_header.len=m0_header.len; message_header.port=0; /* indicate the message is from remote */ message_header.mca=mca; message_header.mode=0; message_header.version=m0_header.version; message_header.type=m0_header.type; message_header.source_ip=source_ip; message_header.dataid=0; memcpy(indiv_msg, &message_header, MESSAGE_HEADER_LEN); memcpy(indiv_msg+MESSAGE_HEADER_LEN, bundled_msg+curr_pos+M0_HEADER_LEN, message_header.len); insert_incoming_pool(indiv_msg,MESSAGE_HEADER_LEN+message_header.len); curr_pos += M0_HEADER_LEN + message_header.len; break; case SRTP_M1: #ifndef UNIX low_byte=((*(bundled_msg+curr_pos+7))<<1) & 0xfe; low_byte=low_byte |(((*(bundled_msg+curr_pos+6))>>7) & 0x01); high_byte=((*(bundled_msg+curr_pos+6))<<1) & 0xfe; high_byte=high_byte |(((*(bundled_msg+curr_pos+7))>>7) & 0x01); *(bundled_msg+curr_pos+7)=low_byte; *(bundled_msg+curr_pos+6)=high_byte; #endif memcpy(&m1_header, bundled_msg+curr_pos, M1_HEADER_LEN); message_header.dataid=ntohs(m1_header.dataid); message_header.len=ntohs(m1_header.len); message_header.mca=mca; message_header.mode=1; message_header.port=0; /* indicate the message is from remote */ message_header.sn=m1_header.sn; message_header.type=m1_header.type; message_header.version=m1_header.version; message_header.nosegs=m1_header.nosegs; message_header.source_ip=source_ip; total_parts = message_header.len/ MAX_M1_SEG_SIZE ; if ( message_header.len - total_parts * MAX_M1_SEG_SIZE > 0 ) total_parts = total_parts + 1; memcpy(indiv_msg, &message_header, MESSAGE_HEADER_LEN); if(message_header.nosegs==total_parts) { last_part_size=message_header.len-(total_parts-1)*MAX_M1_SEG_SIZE; memcpy(indiv_msg+MESSAGE_HEADER_LEN, bundled_msg+curr_pos+M1_HEADER_LEN, last_part_size); insert_incoming_pool(indiv_msg,MESSAGE_HEADER_LEN+last_part_size); curr_pos += M1_HEADER_LEN + last_part_size; } else { memcpy(indiv_msg+MESSAGE_HEADER_LEN, bundled_msg+curr_pos+M1_HEADER_LEN, MAX_M1_SEG_SIZE); insert_incoming_pool(indiv_msg,MESSAGE_HEADER_LEN+MAX_M1_SEG_SIZE); curr_pos += M1_HEADER_LEN + MAX_M1_SEG_SIZE; } break; case SRTP_NACK: memcpy(&nack_header, bundled_msg+curr_pos, NACK_HEADER_LEN); curr_pos=curr_pos+NACK_HEADER_LEN; if(nack_header.source_ip==local_ip_address) { message_header.dataid=ntohs(nack_header.dataid); message_header.len=NACK_HEADER_LEN; message_header.mca=mca; message_header.mode=7; message_header.port=0; message_header.type=nack_header.type; message_header.version=nack_header.version; message_header.source_ip=nack_header.source_ip; memcpy(indiv_msg, &message_header, MESSAGE_HEADER_LEN); insert_incoming_pool(indiv_msg,MESSAGE_HEADER_LEN); } break; } /* end of switch */ } /* end of while, for parsing current bundle */ } /* end of thread while loop */ } /***************************************************************************************/ /* Function Name : most_recent_incoming_m1_sn */ /* Description : compare the sn carried in DSN with the sn of the most */ /* recent received mode 1 message of the same */ /* Parameters : struct message_header_t message_header -- carried the */ /* of the DSN */ /* Return value : EQUAL -- sn of the DSN equals to sn of the most recnet */ /* mode 1 message */ /* NEW -- sn of the DSN is new to mode 1 message */ /* OLD -- sn of the DSN is old */ /* NONE -- has never been received */ /* before */ /***************************************************************************************/ int most_recent_incoming_m1_sn(struct message_header_t message_header) { recent_m1_buffer_t *current; int sn; unsigned int dataid; unsigned long mca; unsigned long int source_ip; int result; sn=message_header.sn; dataid=message_header.dataid; mca=message_header.mca; source_ip=message_header.source_ip; result=NONE; #ifdef UNIX pthread_mutex_lock(&mutex_m1_message_buffer); #else WaitForSingleObject(mutex_m1_message_buffer, INFINITE); #endif if(m1_message_buffer==NULL) { #ifdef UNIX pthread_mutex_unlock(&mutex_m1_message_buffer); #else ReleaseMutex(mutex_m1_message_buffer); #endif return NONE; } /* search in the m1_message_buffer for the */ for (current=m1_message_buffer; current; current=current->next) { if (current->mca == mca && current->dataid == dataid && current->source_ip==source_ip) { /* found a message with same eid and mca */ if (sn == current->sn) result=EQUAL; /* have recvd corresponding mode 1 message */ else if (sn < current->sn && ((current->sn - sn) < (MAX_SN/2) ) ) result=OLD; /* late DSN */ else if (sn > current->sn && ((sn - current->sn ) > (MAX_SN/2)) ) result=OLD; /* late DSN */ else if (sn > current->sn && sn - current->sn < (MAX_SN/2)) result=NEW; /* new DSN */ else if (sn < current->sn && current->sn -sn > (MAX_SN/2)) result=NEW; /* new DSN */ else { result=NONE; } } } /* couldn't find such a pair */ #ifdef UNIX pthread_mutex_unlock(&mutex_m1_message_buffer); #else ReleaseMutex(mutex_m1_message_buffer); #endif return result; } /***************************************************************************************/ /* Function Name : send_nak */ /* Description : construct a NACK message with */ /* insert the NACK message into the outgoing message buffer */ /* Parameters : unsigned int dataid -- dataid to be nacked */ /* unsigned long int mca -- multicast group address */ /* unsigned long int source_ip -- source ip address */ /***************************************************************************************/ int send_nak(unsigned int dataid, unsigned long int mca, unsigned long int source_ip) { char *msg; /* NACK message is stored in this buffer */ struct nack_hdr_t nack_header; /* construct the NACK message */ nack_header.version=2; nack_header.type=SRTP_DATA; nack_header.mode=7; nack_header.len=htons(NACK_HEADER_LEN); nack_header.dataid=dataid; nack_header.source_ip=source_ip; msg = (char*)malloc(NACK_HEADER_LEN); if (!msg) { printf("eoor allocating memory\a\n"); exit(-1); } memcpy(msg, &nack_header, NACK_HEADER_LEN); /* ready to transmit the NACK */ #ifdef UNIX printf("triger nack for dataid=%d\n", dataid); #else printf("triger nack for dataid=%d\n", ntohs(dataid)); #endif insert_outgoing_pool(msg,NACK_HEADER_LEN, mca, 0 ); return(0); } /***************************************************************************************/ /* Function Name : deliver_thread */ /* Description : in turn process each message in incoming_seg_pool */ /* if it is a mode 0 message, call service_m0() */ /* else if it is a mode 1 message, call service_m1() */ /* else if it is a NACK, call service_nak() */ /***************************************************************************************/ #ifdef UNIX void * deliver_thread(void *mp) #else void deliver_thread(void *mp) #endif { char msg[MAX_M1_BUFFER_SIZE]; int len; struct message_header_t message_header; while(1) { /* wait the incoming pool to be non_empty */ #ifdef UNIX pthread_mutex_lock(&mutex_incoming_seg_pool); while( incoming_pool_read_pointer == incoming_pool_write_pointer ) { pthread_cond_wait(&event_incoming_seg_pool, &mutex_incoming_seg_pool); } #else WaitForSingleObject(mutex_incoming_seg_pool, INFINITE); if( incoming_pool_read_pointer == incoming_pool_write_pointer ) { ReleaseMutex(mutex_incoming_seg_pool); WaitForSingleObject(event_incoming_seg_pool,INFINITE); ResetEvent(event_incoming_seg_pool); continue; } #endif /* retrieve a message from the incoming message pool */ len=incoming_seg_pool_size[incoming_pool_read_pointer]; memcpy( msg,incoming_seg_pool[incoming_pool_read_pointer], len ); /* move forward the read pointer */ incoming_pool_read_pointer = ( incoming_pool_read_pointer + 1 ) % MAX_INCOM_MSG_NUM; if( incoming_pool_read_pointer == MAX_INCOM_MSG_NUM ) reader1Cleared = 0; #ifdef UNIX pthread_mutex_unlock(&mutex_incoming_seg_pool); #else ReleaseMutex(mutex_incoming_seg_pool); #endif /* parse the message */ memcpy(&message_header, msg, MESSAGE_HEADER_LEN); /* check the message version */ if(message_header.version != SRTP_VERSION) { printf("invalid version message received from remote host\a\n"); continue; /* discard the message */ } switch (message_header.type) { case SRTP_DATA: switch (message_header.mode) { case 0: /* process a received mode 0 message */ service_m0(msg, message_header); break; case 1: /* process a received mode 1 message */ service_m1(msg,message_header, len); break; case 2: /* mode 2, currently ignore */ break; case 7: /* process a received NACK */ service_nak(message_header); break; default: printf("invalid message received\a\n"); } break; case SRTP_HBEAT: service_heartbeat(msg,len,0); break; default: printf("invalid message received\a\n"); } } /* end of while loop */ } /***************************************************************************************/ /* Function Name : service_m0 */ /* Description : deiver the received mode 0 message to all clients that */ /* has subscribed the multicast group */ /* Parameters : char * msg -- the message_header + message body */ /* struct message_header_t message_header -- the message header */ /***************************************************************************************/ void service_m0(char *msg, struct message_header_t message_header) { if( deliver_message(msg, message_header) == -1 ) { printf("error deliver mode 0 message \a\n"); } return; } /***************************************************************************************/ /* Function Name : service_m1 */ /* Description : deiver the received mode 1 message to all clients that */ /* has subscribed the multicast group */ /* if it is an old message or segment, discard it */ /* if it is an new message or segment, */ /* if it is a message, deliver it */ /* else, buffer the segment */ /* if the received sn equals to the most recently received sn */ /* if it is a message, discard it */ /* else, check if all segments of a long message has been */ /* received, if yes, resemble the message and deliver */ /* it */ /* Parameters : char * msg -- the message_header + message body */ /* struct message_header_t message_header -- the message header */ /***************************************************************************************/ void service_m1(char *msg, struct message_header_t message_header, int len) { int index; struct m1_seg_buffer_t * seg_pointer; struct m1_seg_buffer_t * seg_pointer2; struct m1_seg_buffer_t * new_seg; int offset; /* check to see if this is an old/new message */ switch (most_recent_incoming_m1_sn(message_header)) { case OLD: /* the sn is older than current */ return; /* the message/segment should be discarded */ case EQUAL: /* the sn equals to current sn */ if(message_header.len == (len-MESSAGE_HEADER_LEN)) /* it is a message, and a duplicate one */ return; /* the duplicate message should be discarded */ /* else, what received is a segment of current m1 message */ /* try to insert it to m1_seg_chain[] */ for(index=0; index= MAX_UNASEMBLED_M1_MESSAGES) /* the m1 chain can't be find, one reason is that the current */ /* m1 message has been resembled, and then a duplicate segment arrived */ /* should be discarded */ return; /* now the index is the m1_chain, try insert current segment to the chain */ /* at this point, the chain must not be empty */ seg_pointer=m1_seg_chain[index].chain_tail; while(seg_pointer!=NULL) { if(seg_pointer->seg_number == message_header.nosegs) /* duplicate segment */ return; /* duplicat segment should be discarded */ if(seg_pointer->seg_number < message_header.nosegs) break; /* found the correct position */ seg_pointer=seg_pointer->prev; } /* initialize a new struct for the segment */ new_seg=(struct m1_seg_buffer_t *) malloc(sizeof(m1_seg_buffer_t)); memset(new_seg, 0, sizeof(m1_seg_buffer_t)); new_seg->seg_number=message_header.nosegs; new_seg->seg_len=len-MESSAGE_HEADER_LEN; memcpy(new_seg->seg_body, msg+MESSAGE_HEADER_LEN, len-MESSAGE_HEADER_LEN); /* insert the new segment at proper position */ if(seg_pointer==NULL) /* seach back to the chain head */ { m1_seg_chain[index].chain_head->prev=new_seg; new_seg->next=m1_seg_chain[index].chain_head; m1_seg_chain[index].chain_head=new_seg; } else { new_seg->next=seg_pointer->next; new_seg->prev=seg_pointer; seg_pointer->next=new_seg; if(new_seg->next==NULL) /* the new segment was inserted at the tail */ m1_seg_chain[index].chain_tail=new_seg; } /* check to see if all the segments have been received */ /* if yes, resemble the long message */ m1_seg_chain[index].len_received= m1_seg_chain[index].len_received+len-MESSAGE_HEADER_LEN; if(m1_seg_chain[index].len_received==m1_seg_chain[index].message_header.len) { memcpy(msg, &(m1_seg_chain[index].message_header), MESSAGE_HEADER_LEN); /* resemble all the segments into one message */ offset=MESSAGE_HEADER_LEN; seg_pointer=m1_seg_chain[index].chain_head; while(seg_pointer!=NULL) { memcpy(msg+offset, seg_pointer->seg_body, seg_pointer->seg_len); offset=offset+seg_pointer->seg_len; seg_pointer2=seg_pointer; seg_pointer=seg_pointer->next; free(seg_pointer2); /* release the memory used by resembled segments */ } /* send the message to clients */ deliver_message(msg, message_header); /* delete the recoreds in m1_seg_chain[index] */ memset(&(m1_seg_chain[index]), 0, sizeof(struct m1_seg_chain_t)); } break; case NEW: /* the received sn is new to the current sn */ /* if it is a whole message, deliver it to the clients */ if(message_header.len == (len-MESSAGE_HEADER_LEN)) { deliver_message(msg, message_header); return; } /* else, it is the first arrived sigment of a long message */ /* remove the old un_resembled m1_seg_chain[] if there exist */ for(index=0; indexnext; free(seg_pointer2); /* release the memory used by resembled segments */ } /* delete the recoreds in m1_seg_chain[index] */ memset(&(m1_seg_chain[index]), 0, sizeof(struct m1_seg_chain_t)); } /* find an empty slot for the new segment */ for(index=0; index= MAX_UNASEMBLED_M1_MESSAGES) /* no empty slot found */ { printf( " m1_seg_chain buffer over flew, maximum %d m1 long message\a\n", MAX_UNASEMBLED_M1_MESSAGES ); return; } /* insert the new segment to the chain */ m1_seg_chain[index].message_header=message_header; /* create a new struct for the segment */ new_seg=(struct m1_seg_buffer_t *) malloc(sizeof(m1_seg_buffer_t)); memset(new_seg, 0, sizeof(struct m1_seg_buffer_t)); new_seg->seg_number=message_header.nosegs; new_seg->seg_len=len-MESSAGE_HEADER_LEN; memcpy(new_seg->seg_body, msg+MESSAGE_HEADER_LEN, len-MESSAGE_HEADER_LEN); m1_seg_chain[index].chain_head=new_seg; m1_seg_chain[index].chain_tail=new_seg; m1_seg_chain[index].len_received=len-MESSAGE_HEADER_LEN; /* update the sn, but do not buffer the message */ update_recvd_m1_sn(message_header, NULL, -1); break; case NONE: /* if it is not a long message, deliver it to the clients */ if(message_header.len == (len-MESSAGE_HEADER_LEN)) { deliver_message(msg, message_header); return; } /* else, it is the first arrived sigment of a long message */ /* find a empty slot in m1_seg_chain[] */ for(index=0; index= MAX_UNASEMBLED_M1_MESSAGES) /* no empty slot found */ { printf(" m1_seg_chain buffer over flew, maximum %d m1 long message\a\n", MAX_UNASEMBLED_M1_MESSAGES); return; } /* insert the new segment to the chain */ m1_seg_chain[index].message_header=message_header; /* create a new struct for the segment */ new_seg=(struct m1_seg_buffer_t *) malloc(sizeof(m1_seg_buffer_t)); memset(new_seg, 0, sizeof(struct m1_seg_buffer_t)); new_seg->seg_number=message_header.nosegs; new_seg->seg_len=len-MESSAGE_HEADER_LEN; memcpy(new_seg->seg_body, msg+MESSAGE_HEADER_LEN, len-MESSAGE_HEADER_LEN); m1_seg_chain[index].chain_head=new_seg; m1_seg_chain[index].chain_tail=new_seg; m1_seg_chain[index].len_received=len-MESSAGE_HEADER_LEN; /* update the sn, but do not buffer the message */ update_recvd_m1_sn(message_header, NULL, -1); break; } return; } /***************************************************************************************/ /* Function Name : update_recvd_m1_sn */ /* Description : update the sn of current received mode 1 message */ /* if SRTP just received a message, this function will update */ /* the sn, as well as buffer the new received message */ /* if only a segment is received, this function will update the */ /* sn, but do not buffer the message */ /* Parameters : struct message_header_t message_header -- the message header */ /* char * msgbuf -- the message body. can be NULL in case */ /* only sn update needed */ /* int buffer_flag -- >0 buffer the message */ /* <0 do not buffer the message */ /***************************************************************************************/ int update_recvd_m1_sn(struct message_header_t message_header, char * msgbuf, int buffer_flag) { unsigned int dataid; unsigned long mca; unsigned long source_ip; int sn; int len; if(message_header.mode!=1) return 0; dataid=message_header.dataid; mca=message_header.mca; source_ip=message_header.source_ip; sn=message_header.sn; len=message_header.len+MESSAGE_HEADER_LEN; static recent_m1_buffer_t *last_entry_recv_m1; recent_m1_buffer_t *current; /* search in m1_message_buffer for */ #ifdef UNIX pthread_mutex_lock(&mutex_m1_message_buffer); #else WaitForSingleObject(mutex_m1_message_buffer, INFINITE); #endif for (current=m1_message_buffer; current;current=current->next) { if (current->dataid == dataid && current->mca == mca && current->source_ip==source_ip) { /* found the specified */ current->sn = sn; /* update the received sn */ if(buffer_flag>0) { /* buffer the new mode 1 message */ current->len=len; memset(current->buffer, 0, MAX_M1_BUFFER_SIZE); memcpy(current->buffer, msgbuf, len); } sn=current->sn; #ifdef UNIX pthread_mutex_unlock(&mutex_m1_message_buffer); #else ReleaseMutex(mutex_m1_message_buffer); #endif return sn; } } /* the specified is not found */ if (m1_message_buffer == NULL) { /* no other mode 1 message has ever been buffered before */ m1_message_buffer = (recent_m1_buffer_t *) malloc(sizeof(recent_m1_buffer_t)); memset(m1_message_buffer, 0, sizeof(struct recent_m1_buffer_t)); if (!m1_message_buffer) { printf("error allocating memory\a\n"); #ifdef UNIX pthread_mutex_unlock(&mutex_m1_message_buffer); #else ReleaseMutex(mutex_m1_message_buffer); #endif return(-1); } m1_message_buffer->dataid = dataid; m1_message_buffer->mca = mca; m1_message_buffer->source_ip=source_ip; m1_message_buffer->sn = sn; m1_message_buffer->next = NULL; if(buffer_flag>0) { m1_message_buffer->len=len; memset((void *)m1_message_buffer->buffer, 0, len); memcpy(m1_message_buffer->buffer, msgbuf, len); } last_entry_recv_m1 = m1_message_buffer; } else { /* insert a new record at the tail of the link list */ recent_m1_buffer_t *new_entry; new_entry = (recent_m1_buffer_t *)malloc(sizeof(recent_m1_buffer_t)); if (!new_entry) { printf("error allocating memory\a\n"); #ifdef UNIX pthread_mutex_unlock(&mutex_m1_message_buffer); #else ReleaseMutex(mutex_m1_message_buffer); #endif return(-1); } new_entry->dataid = dataid; new_entry->mca = mca; new_entry->sn = sn; new_entry->source_ip=source_ip; new_entry->next = NULL; if(buffer_flag>0) { new_entry->len=len; memset(new_entry->buffer, 0, MAX_M1_BUFFER_SIZE); memcpy(new_entry->buffer, msgbuf, len); } last_entry_recv_m1->next = new_entry; last_entry_recv_m1 = new_entry; } #ifdef UNIX pthread_mutex_unlock(&mutex_m1_message_buffer); #else ReleaseMutex(mutex_m1_message_buffer); #endif return 0; } /***************************************************************************************/ /* Function Name : deliver_message */ /* Description : deliver a message to all the client that subscribed */ /* Parameters : struct message_header_t message_header -- the message header */ /* char * msg -- message_header + message_body */ /***************************************************************************************/ int deliver_message(char * msg, struct message_header_t message_header) { int m; int i; /* buffer mode 1 message */ if(message_header.mode==1) update_recvd_m1_sn(message_header, msg, 1); /* search for the index of the subscribed mca in subscribed_mca[] */ for(m=0; m<20; m++) { if(subscribed_mca[m]==(long int)message_header.mca) break; } if(m>=20) /* the mca was not subscribed by any clients */ return 0; /* deliver the message to every client that subscribed */ for(i=0; i0) && (client_port[m][i]!=(int) message_header.port)) deliver_to_client(client_port[m][i],msg, message_header); } return 0; } /***************************************************************************************/ /* Function Name : service_nak */ /* Description : find the buffered mode 1 message of the specified */ /* resend the nacked mode 1 message */ /* Parameters : struct message_header_t message_header -- the message header */ /***************************************************************************************/ void service_nak(struct message_header_t message_header) { message_buffer_t *mode1_message; message_buffer_t * current; struct in_addr in; char * msg; mode1_message=NULL; in.s_addr = message_header.mca; fprintf(stderr, "NACK received for dataid=%d, mca=%s\n", message_header.dataid, inet_ntoa(in)); /* Check if this instance of SRTP has sent the Mode 1 */ #ifdef UNIX pthread_mutex_lock(&mutex_buffered_outgoing_m1_message); #else WaitForSingleObject(mutex_buffered_outgoing_m1_message, INFINITE); #endif for (current = buffered_outgoing_m1_message_head; current; current = current->next) { if(current->dataid == message_header.dataid && current->dest_mca == message_header.mca) { mode1_message=current; break; } } if (mode1_message != NULL) /* got the most recent mode message */ { msg=(char *)malloc(mode1_message->length); if(msg==NULL) { printf("malloc error!\a\n"); exit(-1); } memcpy(msg, mode1_message->message, mode1_message->length); #ifdef UNIX pthread_mutex_unlock(&mutex_buffered_outgoing_m1_message); #else ReleaseMutex(mutex_buffered_outgoing_m1_message); #endif /* resent the mode 1 message */ resend_m1(msg); } #ifdef UNIX pthread_mutex_unlock(&mutex_buffered_outgoing_m1_message); #else ReleaseMutex(mutex_buffered_outgoing_m1_message); #endif return; } /***************************************************************************************/ /* Function Name : resend_m1 */ /* Description : find the buffered mode 1 message of the specified */ /* resend the nacked mode 1 message */ /* Parameters : struct message_header_t message_header -- the message header */ /***************************************************************************************/ int resend_m1(char * buffered_m1) /* buffered_m1 = message_header + message_body */ { char *msg; int msglen; unsigned short total_parts; short last_part_size; unsigned short part_no; struct m1_hdr_t m1_header; struct message_header_t message_header; #ifndef UNIX unsigned char low_byte; unsigned char high_byte; #endif memcpy(&message_header, buffered_m1, MESSAGE_HEADER_LEN); /* construct the mode 1 message header */ m1_header.dataid=htons(message_header.dataid); m1_header.len=htons(message_header.len); m1_header.mode=1; m1_header.sn=message_header.sn; m1_header.type=SRTP_DATA; m1_header.version=message_header.version; /* if it is a long message, break it into multiple segments */ total_parts = message_header.len/ MAX_M1_SEG_SIZE ; if ( message_header.len - total_parts * MAX_M1_SEG_SIZE > 0 ) total_parts=total_parts+1; last_part_size = message_header.len % MAX_M1_SEG_SIZE; if(last_part_size==0) last_part_size=MAX_M1_SEG_SIZE; for(part_no = 1; part_no <= total_parts; part_no=part_no+1) { if( part_no < total_parts ) msglen = MAX_M1_SEG_SIZE; else msglen = last_part_size; m1_header.nosegs=part_no; msg = (char*)malloc(M1_HEADER_LEN + msglen); if (!msg) { printf("error allocating memory\a\n"); exit(-1); } memcpy(msg + M1_HEADER_LEN, buffered_m1 + MESSAGE_HEADER_LEN+(part_no-1)*MAX_M1_SEG_SIZE, msglen); memcpy(msg, &m1_header, M1_HEADER_LEN); #ifndef UNIX low_byte=((*(msg+7))>>1) & 0x7f; low_byte=low_byte |(((*(msg+6))<<7) & 0x80); high_byte=((*(msg+6))>>1) & 0x7f; high_byte=high_byte |(((*(msg+7))<<7) & 0x80); *(msg+7)=low_byte; *(msg+6)=high_byte; #endif insert_outgoing_pool(msg,M1_HEADER_LEN + msglen, message_header.mca, message_header.dataid ); } /* end for each part_no */ return(0); } /***************************************************************************************/ /* service this message as a mode 0 packet. Checks if a */ /* corresponding mode 1 packet is lost. If lost call for */ /* generation of NAK. And delivers the packet to all local */ /* clients registered for the category of the packet */ /***************************************************************************************/ void service_heartbeat(char *msg, int n, int p_eid) { int datalen; unsigned long mca; short sn; int result; /* put arguments in variables */ memcpy(&sn, msg+2, sizeof(sn)); sn = ntohs(sn); memcpy(&mca, msg+2+sizeof(sn), sizeof(mca)); mca = ntohl(mca); datalen = n - SRTP_HEADER_LENGTH; /* entity code is non zero */ /* then check if sn is equal to sn of most recently */ /* received mode 1 msg */ result =1; switch (result) { case WITHIN_WINDOW: case NONE: /* Missed a prev. mode 1 message, send a NAK */ /** send_nak( sn, mca);*/ printf("Sent a nak (WITHIN_WINDOW/NONE)\n"); case EQUAL: break; case LESS_BUT_WITHIN_WINDOW: /* Too late.. discard the message */ if (msg) free(msg); break; default: break; } return; } /***************************************************************************************/ /* Function Name : HeartBeatFunc */ /* Description : This thread reads from the scheduled list of heartbeat */ /* messages and sends the messages after checking that the message has not */ /* been cancelled. */ /***************************************************************************************/ void heartbeatter_thread(void *mp) { heartbeat_buffer_t *temp; #ifdef UNIX struct timeb curr_time; #else struct _timeb curr_time; #endif char *msg; /* packet is constructed into this */ unsigned char ttl; int sn; unsigned long net_dest; /* header contents converted to */ unsigned short net_sn; unsigned short net_len; struct sockaddr_in sa; struct sockaddr_in sa_local; struct in_addr in; char multicast_address[20]; while(1) { if( heartbeat_msgqueue_head == NULL ) { #ifdef UNIX pthread_mutex_lock(&hSigHeartBeat); #else WaitForSingleObject(hSigHeartBeat,INFINITE); #endif } /* Some heartbeat messages have been scheduled. Read the messages and send them at the time required to send */ if( heartbeat_msgqueue_head == NULL ) { continue; } temp = heartbeat_msgqueue_head; if( temp->send_flag == 0 ) { /* remove the message from the list */ heartbeat_msgqueue_head = temp->next; free(temp); continue; } /* wait for scheduled time of the heartbeat message and then send it to the entity and destination */ #ifdef UNIX ftime(&curr_time); usleep((temp->send_time - curr_time.time )*1000); #else _ftime(&curr_time); Sleep(temp->send_time - curr_time.time ); #endif if( temp->send_flag == 0 ) { heartbeat_msgqueue_head = temp->next; free(temp); continue; } heartbeat_msgqueue_head = temp->next; if( heartbeat_msgqueue_head == NULL ) { heartbeat_msgqueue_tail = NULL; } /* check whether the heartbeat message has been cancelled */ ttl = temp->ttl; net_dest = temp->mca; in.s_addr = temp->mca; /* get the Sequence number of most recent mode 1 message */ /* with entity id eid and destination as net_dest */ sn=1; if (sn == -1) { /* No Mode 1 message was sent with this eid to this dest */ /* Report Failure to the Application */ in.s_addr = net_dest; } /* end of change by Vinod */ net_sn = htons(sn); net_len = htons(0); /* build Mode0 message */ msg = (char*)malloc(SRTP_HEADER_LENGTH ); if (!msg) { printf("error allocating memory\a\n"); } msg[0] = SRTP_VERSION; msg[1] = ((SRTP_HBEAT) << 4) | 0x3; /* type is NAK */ memcpy(msg + 2 , &net_sn, sizeof(net_sn)); memcpy(msg + 2 + sizeof(net_sn), &(temp->mca), sizeof(temp->mca)); net_len = htons(0); memcpy(msg + 2 + sizeof(net_sn)+sizeof(net_dest), &net_len, sizeof(net_len)); memcpy(msg + 12 , &net_len, sizeof(net_len)); /* send the message */ memset(&sa, 0, sizeof(sa)); sa.sin_family = AF_INET; sa.sin_addr.s_addr = net_dest; in.s_addr = net_dest; strcpy(multicast_address,inet_ntoa(in)); in.s_addr = net_dest; /* free the message and return success */ memset(&sa_local, 0, sizeof(sa_local)); sa_local.sin_family = AF_INET; sa_local.sin_addr.s_addr = htonl(INADDR_ANY); if (msg) free(msg); free(temp); } } /***************************************************************************************/ /* Send the requested packet from the client to the multicast */ /* group specified in mode 0. This packets goes unreliably */ /* loss of this packet does not matter */ /***************************************************************************************/ int schedule_heartbeat_msg(int ttl, unsigned long dest, long send_time) { /* allocate a new node for the heartbeat message */ struct in_addr in; heartbeat_buffer_t *hbeat_node, *temp; in.s_addr = dest; hbeat_node = ( heartbeat_buffer_t *) malloc(sizeof(heartbeat_buffer_t)); if(hbeat_node == NULL ) return(-1); hbeat_node->ttl = ttl; hbeat_node->mca = dest; hbeat_node->send_time = send_time; hbeat_node->next = NULL; hbeat_node->send_flag = 1; /* Check whether a heartbeat message has already been scheduled for this entity. */ temp = heartbeat_msgqueue_head; while( temp != NULL ) { if( temp->mca == dest && temp->send_flag == 1 ) return(0); temp = temp->next; } /* add the new node to the end of the list of heartbeat messages */ if( heartbeat_msgqueue_tail != NULL ) heartbeat_msgqueue_tail->next = hbeat_node; else heartbeat_msgqueue_head = hbeat_node; heartbeat_msgqueue_tail = hbeat_node; return(0); } /***************************************************************************************/ /* Cancel */ /* group specified in mode 0. This packets goes unreliably */ /* loss of this packet does not matter */ /***************************************************************************************/ int cancell_heartbeat_msg( unsigned long dest ) { heartbeat_buffer_t *temp; temp = heartbeat_msgqueue_head; while( temp != NULL ) { if( temp->mca == dest) { temp->send_flag = 0; return(0); } temp = temp->next; } return(-1); }