diff --git a/rtmp.c b/rtmp.c index 8a374bd..19e2074 100644 --- a/rtmp.c +++ b/rtmp.c @@ -1,7 +1,9 @@ /* -** server.c -- a stream socket server demo +RTMP Server */ +#include +#include #include #include #include @@ -14,33 +16,15 @@ #include #include #include - -#define MYPORT 2036 // the port users will be connecting to - -#define BACKLOG 10 // how many pending connections queue will hold - -typedef struct { - char unknown[3]; - char amfSize[3]; - char amfType[1]; - char srcDest[4]; - } amfHeader; - -typedef struct { - char unknown[3]; - int amfSize; - int amfType; - char srcDest[4]; - int bodySize; - int bodyCount; - char *body; - } amfHeader_real; +#include "rtmp.h" amfHeader_real *AMFS = 0x0; int main(void) { - int sockfd, new_fd; // listen on sock_fd, new connection on new_fd + int sockFD = 0x0; //Listener Socket + int newFD = 0x0; //New Socket; + struct sockaddr_in my_addr; // my address information struct sockaddr_in their_addr; // connector's address information socklen_t sin_size; @@ -49,12 +33,12 @@ AMFS = (amfHeader_real *)malloc(sizeof(amfHeader_real) * 128); - if ((sockfd = socket(PF_INET, SOCK_STREAM, 0)) == -1) { + if ((sockFD = socket(PF_INET, SOCK_STREAM, 0)) == -1) { perror("socket"); exit(1); } - if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) { + if (setsockopt(sockFD, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) { perror("setsockopt"); exit(1); } @@ -64,12 +48,12 @@ my_addr.sin_addr.s_addr = INADDR_ANY; // automatically fill with my IP memset(my_addr.sin_zero, '\0', sizeof my_addr.sin_zero); - if (bind(sockfd, (struct sockaddr *)&my_addr, sizeof(struct sockaddr)) == -1) { + if (bind(sockFD, (struct sockaddr *)&my_addr, sizeof(struct sockaddr)) == -1) { perror("bind"); exit(1); } - if (listen(sockfd, BACKLOG) == -1) { + if (listen(sockFD, BACKLOG) == -1) { perror("listen"); exit(1); } @@ -85,23 +69,24 @@ while(1) { // main accept() loop sin_size = sizeof(struct sockaddr_in); - if ((new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size)) == -1) { + if ((newFD = accept(sockFD, (struct sockaddr *)&their_addr, &sin_size)) == -1) { perror("accept"); continue; } + fcntl(newFD, F_SETFL, fcntl(newFD, F_GETFL, 0) | O_NONBLOCK); printf("server: got connection from %s\n",inet_ntoa(their_addr.sin_addr)); /* if (!fork()) { // this is the child process - close(sockfd); // child doesn't need the listener + close(sockFD); // child doesn't need the listener */ - if (doHandshake(new_fd) != 0) { - close(new_fd); + if (doHandshake(newFD) != 0) { + close(newFD); exit(0); } /* - if (doAccept(new_fd) != 0) { - close(new_fd); + if (doAccept(newFD) != 0) { + close(newFD); exit(0); } */ @@ -110,18 +95,18 @@ //bzero(&data,4096); //recv(new_fd,&data,1,MSG_WAITALL); /* - if (getAMF(new_fd,data[0]) != 0x0) { + if (getAMF(newFD,data[0]) != 0x0) { */ - if (getAMF(new_fd) != 0x0) { + if (getAMF(newFD) != 0x0) { printf("Error"); - close(new_fd); + close(newFD); exit(0); } } /* } */ - close(new_fd); // parent doesn't need this + close(newFD); // parent doesn't need this } return 0; @@ -136,6 +121,16 @@ int i = 0x0; char data[12]; amfHeader *amfHdr = (amfHeader *)&data; + fd_set readset; + + FD_ZERO(&readset); + FD_SET(fd,&readset); + + + while (select(fd + 1,&readset,0x0,0x0,0x0)) { + if (FD_ISSET(fd,&readset)) + break; + } recLen = recv(fd,&AMF,1,0); if (recLen != 1) @@ -146,6 +141,11 @@ switch(headerSize) { case 0: + while (select(fd + 1,&readset,0x0,0x0,0x0)) { + if (FD_ISSET(fd,&readset)) + break; + } + recLen = recv(fd,&data,11,0); printf("12 Byte Header\n"); packetSize = (amfHdr->amfSize[0] * 256 * 256) + (amfHdr->amfSize[1] * 256) + amfHdr->amfSize[2]; @@ -155,6 +155,11 @@ AMFS[amfID].body = (char *)malloc(packetSize); AMFS[amfID].amfType = amfHdr->amfType[0]; if (packetSize > 128) { + while (select(fd + 1,&readset,0x0,0x0,0x0)) { + if (FD_ISSET(fd,&readset)) + break; + } + recv(fd,AMFS[amfID].body + AMFS[amfID].bodyCount,128,0); AMFS[amfID].bodyCount = 128; } @@ -162,11 +167,21 @@ case 3: printf("4 Byte Header\n"); if ((AMFS[amfID].bodySize - AMFS[amfID].bodyCount) > 128) { + while (select(fd + 1,&readset,0x0,0x0,0x0)) { + if (FD_ISSET(fd,&readset)) + break; + } + recv(fd,AMFS[amfID].body + (AMFS[amfID].bodyCount - 1),128,0); AMFS[amfID].bodyCount += 128; } else { printf("Bal: (%i)\n",(AMFS[amfID].bodySize - AMFS[amfID].bodyCount)); + while (select(fd + 1,&readset,0x0,0x0,0x0)) { + if (FD_ISSET(fd,&readset)) + break; + } + recLen = recv(fd,AMFS[amfID].body + (AMFS[amfID].bodyCount - 1),(AMFS[amfID].bodySize - AMFS[amfID].bodyCount)-1,0); printf("Received: [%i]\n",recLen); AMFS[amfID].bodyCount += (AMFS[amfID].bodySize - AMFS[amfID].bodyCount)-1; @@ -194,6 +209,11 @@ int packetSize = 0x0; char data[12]; amfHeader *amfHdr = (amfHeader *)&data; + fd_set readset; + + FD_ZERO(&readset); + FD_SET(fd,&readset); + headerSize = (amf & 0xC0) >> 6; @@ -204,6 +224,11 @@ switch(headerSize) { case 0: printf("12 Byte Header\n"); + while (select(fd + 1,&readset,0x0,0x0,0x0)) { + if (FD_ISSET(fd,&readset)) + break; + } + recLen = recv(fd,&data,11,0); printf("Size: [0x%X:0x%X:0x%X]\n",amfHdr->amfSize[0] & 0xFF,amfHdr->amfSize[1] & 0xFF,amfHdr->amfSize[2] & 0xFF); packetSize = (amfHdr->amfSize[0] * 256 * 256) + (amfHdr->amfSize[1] * 256) + amfHdr->amfSize[2]; @@ -240,9 +265,19 @@ int ch = 0x0; char *packet = 0x0; char function[256]; + fd_set readset; + + FD_ZERO(&readset); + FD_SET(fd,&readset); + printf("Reading Packet"); packet = (char *)malloc(packetSize); + while (select(fd + 1,&readset,0x0,0x0,0x0)) { + if (FD_ISSET(fd,&readset)) + break; + } + recLen = recv(fd,packet,packetSize,MSG_WAITALL); for (i = 0;i < packet[2];i++) { @@ -271,14 +306,30 @@ int cmd = 0x0; char data1[1536]; char data2[1536]; + fd_set readset; + + FD_ZERO(&readset); + FD_SET(fd,&readset); + printf("Hi"); + while (select(fd + 1,&readset,0x0,0x0,0x0)) { + if (FD_ISSET(fd,&readset)) + break; + } + printf("Bye"); recLen = read(fd,&cmd,1); + printf("recLen: %i\n",recLen); if (cmd != 0x03) { printf("Invalid Handshake\n"); return(-1); } - recLen = recv(fd,&data1,1536,MSG_WAITALL); + while (select(fd + 1,&readset,0x0,0x0,0x0)) { + if (FD_ISSET(fd,&readset)) + break; + } + recLen = recv(fd,&data1,1536,0); + printf("recLen: %i\n",recLen); for (i = 0;i < 1536;i++) { data2[i] = (char)i; @@ -295,8 +346,13 @@ printf("Error Sending Hand1\n"); return(-1); } - recLen = recv(fd,&data1,1536,MSG_WAITALL); + while (select(fd + 1,&readset,0x0,0x0,0x0)) { + if (FD_ISSET(fd,&readset)) + break; + } + recLen = recv(fd,&data1,1536,0); printf("recLen: [%i]\n",recLen); + if (recLen return(0x0); } diff --git a/rtmp.h b/rtmp.h new file mode 100644 index 0000000..857f1c4 --- /dev/null +++ b/rtmp.h @@ -0,0 +1,21 @@ +#define MYPORT 2036 // the port users will be connecting to + +#define BACKLOG 10 // how many pending connections queue will hold + +typedef struct { + char unknown[3]; + char amfSize[3]; + char amfType[1]; + char srcDest[4]; + } amfHeader; + +typedef struct { + char unknown[3]; + int amfSize; + int amfType; + char srcDest[4]; + int bodySize; + int bodyCount; + char *body; + } amfHeader_real; +