diff --git a/Makefile b/Makefile index 0202448..2460765 100644 --- a/Makefile +++ b/Makefile @@ -15,14 +15,14 @@ REMOVE = rm -f #Objects -OBJS = socket.o amf.o rtmp.o +OBJS = rtmp.o socket.o amf.o -LIBRARIES = -lpthread -CFLAGS = -ggdb -Wall -O2 -D_POSIX_PTHREAD_SEMANTICS +LIBRARIES = +CFLAGS = -Wall -O # Link The Binary $(BINARY) : $(OBJS) - $(CC) $(CFLAGS) -o $@ $(STARTUP) $(LIBRARIES) $(OBJS) + $(CC) $(CFLAGS) -o $@ $(LIBRARIES) $(OBJS) # Compile the source files .cc.o: diff --git a/amf.c b/amf.c index 4e9dcfe..c12b9a9 100644 --- a/amf.c +++ b/amf.c @@ -1,6 +1,8 @@ #include #include #include +#include +#include #include "rtmp.h" @@ -19,7 +21,7 @@ recLen = sReadSocket(fd,&data1,1536); for (i = 0;i < 1536;i++) { - data2[i] = 'a'; + data2[i] = (char)i; } if (send(fd,&cmd,1,MSG_NOSIGNAL) != 1) { printf("Error Sending Header\n"); @@ -54,3 +56,350 @@ return(0x0); } +int amfGetData(int fd) { + int AMF; + int recLen = 0x0; + int amfID = 0x0; + int headerSize = 0x0; + int packetSize = 0x0; + char data[12]; + amfHeader *amfHdr = (amfHeader *)&data; + + recLen = sReadSocket(fd,&AMF,1); + if (recLen != 1) + return(-1); + + headerSize = (AMF & 0xC0) >> 6; + amfID = (AMF & 0x3F); + + printf("HeaderSize: [%i]\n",headerSize); + + switch(headerSize) { + case 0: + recLen = sReadSocket(fd,&data,11); + printf("12 Byte Header\n"); + packetSize = (amfHdr->amfSize[0] * 256 * 256) + (amfHdr->amfSize[1] * 256) + amfHdr->amfSize[2]; + printf("PacketSize: [%i]\n",packetSize); + AMFS[amfID].bodySize = packetSize; + AMFS[amfID].bodyCount = 0; + AMFS[amfID].body = (char *)malloc(packetSize); + AMFS[amfID].amfType = amfHdr->amfType[0]; + if (packetSize > 128) { + sReadSocket(fd,AMFS[amfID].body + AMFS[amfID].bodyCount,128); + AMFS[amfID].bodyCount = 128; + } + else { + sReadSocket(fd,AMFS[amfID].body + AMFS[amfID].bodyCount,packetSize); + AMFS[amfID].bodyCount = packetSize; + } + break; + case 1: + printf("8 Byte Header\n"); + recLen = sReadSocket(fd,&data,7); + packetSize = (amfHdr->amfSize[0] * 256 * 256) + (amfHdr->amfSize[1] * 256) + amfHdr->amfSize[2]; + printf("PacketSize: [%i]\n",packetSize); + AMFS[amfID].bodySize = packetSize; + AMFS[amfID].bodyCount = 0; + AMFS[amfID].body = (char *)malloc(packetSize); + AMFS[amfID].amfType = amfHdr->amfType[0]; + if ((AMFS[amfID].bodySize - AMFS[amfID].bodyCount) > 128) { + sReadSocket(fd,AMFS[amfID].body + (AMFS[amfID].bodyCount),128); + AMFS[amfID].bodyCount += 128; + } + else { + printf("Bal: (%i)\n",(AMFS[amfID].bodySize - AMFS[amfID].bodyCount)); + recLen = sReadSocket(fd,AMFS[amfID].body + (AMFS[amfID].bodyCount - 1),(AMFS[amfID].bodySize - AMFS[amfID].bodyCount)); + AMFS[amfID].bodyCount += (AMFS[amfID].bodySize - AMFS[amfID].bodyCount); + printf("Received: [%i]\n",recLen); + } + break; + case 2: + printf("4 Byte Header\n"); + if ((AMFS[amfID].bodySize - AMFS[amfID].bodyCount) > 128) { + sReadSocket(fd,AMFS[amfID].body + (AMFS[amfID].bodyCount - 1),128); + AMFS[amfID].bodyCount += 128; + } + else { + printf("Bal: (%i)\n",(AMFS[amfID].bodySize - AMFS[amfID].bodyCount)); + recLen = sReadSocket(fd,AMFS[amfID].body + (AMFS[amfID].bodyCount - 1),(AMFS[amfID].bodySize - AMFS[amfID].bodyCount)); + AMFS[amfID].bodyCount += (AMFS[amfID].bodySize - AMFS[amfID].bodyCount); + printf("Received: [%i]\n",recLen); + } + break; + case 3: + printf("1 Byte Header\n"); + if ((AMFS[amfID].bodySize - AMFS[amfID].bodyCount) > 128) { + //sReadSocket(fd,AMFS[amfID].body + (AMFS[amfID].bodyCount - 1),128); + sReadSocket(fd,AMFS[amfID].body + (AMFS[amfID].bodyCount),128); + printf("Bigger Before: [%c:%c:%c:%c]\n",AMFS[amfID].body[AMFS[amfID].bodyCount-3],AMFS[amfID].body[AMFS[amfID].bodyCount-2],AMFS[amfID].body[AMFS[amfID].bodyCount-1],AMFS[amfID].body[AMFS[amfID].bodyCount]); + AMFS[amfID].bodyCount += 128; + printf("Bigger After: [%c:%c:%c:%c]\n",AMFS[amfID].body[AMFS[amfID].bodyCount-3],AMFS[amfID].body[AMFS[amfID].bodyCount-2],AMFS[amfID].body[AMFS[amfID].bodyCount-1],AMFS[amfID].body[AMFS[amfID].bodyCount]); + } + else { + printf("Bal: (%i)\n",(AMFS[amfID].bodySize - AMFS[amfID].bodyCount)); + //recLen = sReadSocket(fd,AMFS[amfID].body + (AMFS[amfID].bodyCount - 1),(AMFS[amfID].bodySize - AMFS[amfID].bodyCount)); + recLen = sReadSocket(fd,AMFS[amfID].body + (AMFS[amfID].bodyCount),(AMFS[amfID].bodySize - AMFS[amfID].bodyCount)); + printf("Received: [%i]\n",recLen); + printf("Less Before: [%c:%c:%c:%c]\n",AMFS[amfID].body[AMFS[amfID].bodyCount-3],AMFS[amfID].body[AMFS[amfID].bodyCount-2],AMFS[amfID].body[AMFS[amfID].bodyCount-1],AMFS[amfID].body[AMFS[amfID].bodyCount]); + AMFS[amfID].bodyCount += (AMFS[amfID].bodySize - AMFS[amfID].bodyCount); + printf("Less After: [%c:%c:%c:%c]\n",AMFS[amfID].body[AMFS[amfID].bodyCount-3],AMFS[amfID].body[AMFS[amfID].bodyCount-2],AMFS[amfID].body[AMFS[amfID].bodyCount-1],AMFS[amfID].body[AMFS[amfID].bodyCount]); + } + break; + + default: + printf("Unhandled Header Size: [%i:0x%X]\n",headerSize,amfID); + return(-1); + break; + } + + if (AMFS[amfID].bodySize == AMFS[amfID].bodyCount) { + amfProcessPacket(fd,amfID); + } + return(0x0); + } + +int amfProcessPacket(fd,amfID) { + switch (AMFS[amfID].amfType) { + case 0x11: + amfMessage(fd,amfID); + break; + case 0x14: + amfFunction(fd,amfID); + printf("AMF FUNCTION\n"); + break; + default: + printf("Not a supported type: [0x%X]\n",AMFS[amfID].amfType); + break; + } + return(0x0); + } + +int amfMessage(int fd, int amfID) { + int i = 0x0; + int s = 0x0; + int mI = 0x0; + int ch = 0x0; + int x = 0x0; + char *packet = 0x0; + char data[1024]; + + packet = AMFS[amfID].body; + + for (i=0;i= 60 && ch <= 128) + printf("[%c]",ch); + else + printf("[0x%X]",ch); + + } + + + /* Skip to message not sure what the header information is yet */ + for (i=0;i= 10 && ch <= 128) + printf("[%c]",ch); + else + printf("[0x%X]",ch); + } + printf("\n"); + return(0x0); + } + diff --git a/rtmp.c b/rtmp.c index 7f3f6fb..4d26f54 100644 --- a/rtmp.c +++ b/rtmp.c @@ -4,265 +4,31 @@ #include #include -#include -#include -#include -#include -#include -#include -#include +#include #include "rtmp.h" amfHeader_real *AMFS = 0x0; -int getAMF(int); -int doProcessPacket(int,int); -int doAmfFunction(int fd,int amfID,int packetSize); - - -int main(void) { - int newFD = 0x0; //New Socket; +int main(int argc,char **argv) { fd_set readset; + int readSocks = 0x0; AMFS = (amfHeader_real *)malloc(sizeof(amfHeader_real) * 128); sStartListener(); while (1) { - if (sGetConnections(&readset) != 0) - perror("Problem Building readset"); - } + sGetConnections(&readset); - while(1) { // main accept() loop - while (1); -/* - sin_size = sizeof(struct sockaddr_in); - if ((newFD = accept(listenerFD, (struct sockaddr *)&remoteAddr, &sin_size)) == -1) { - perror("accept"); - continue; + readSocks = select(highSock+1,&readset,0x0,0x0,0x0); + if (readSocks < 0) { + perror("select failed"); + exit(0x0); } - fcntl(newFD, F_SETFL, fcntl(newFD, F_GETFL, 0) | O_NONBLOCK); - printf("server: got connection from %s\n",inet_ntoa(remoteAddr.sin_addr)); - if (doHandshake(newFD) != 0) { - close(newFD); - exit(0); - } -*/ - while (1) { - if (getAMF(newFD) != 0x0) { - printf("Error"); - close(newFD); - exit(0); - } - } - close(newFD); // parent doesn't need this + else if (readSocks > 0) + sProcessConnections(&readset); } - return 0; + + return(0); } - -int getAMF(int fd) { - int AMF; - int recLen = 0x0; - int amfID = 0x0; - int headerSize = 0x0; - int packetSize = 0x0; - char data[12]; - amfHeader *amfHdr = (amfHeader *)&data; - fd_set readset; - - FD_ZERO(&readset); - FD_SET(fd,&readset); - - - while (select(fd + 1,&readset,0x0,0x0,0x0)) { - if (FD_ISSET(fd,&readset)) - break; - } - - recLen = recv(fd,&AMF,1,0); - if (recLen != 1) - return(-1); - - headerSize = (AMF & 0xC0) >> 6; - amfID = (AMF & 0x3F); - - switch(headerSize) { - case 0: - while (select(fd + 1,&readset,0x0,0x0,0x0)) { - if (FD_ISSET(fd,&readset)) - break; - } - - recLen = recv(fd,&data,11,0); - printf("12 Byte Header\n"); - packetSize = (amfHdr->amfSize[0] * 256 * 256) + (amfHdr->amfSize[1] * 256) + amfHdr->amfSize[2]; - printf("PacketSize: [%i]\n",packetSize); - AMFS[amfID].bodySize = packetSize; - AMFS[amfID].bodyCount = 0; - AMFS[amfID].body = (char *)malloc(packetSize); - AMFS[amfID].amfType = amfHdr->amfType[0]; - if (packetSize > 128) { - while (select(fd + 1,&readset,0x0,0x0,0x0)) { - if (FD_ISSET(fd,&readset)) - break; - } - recv(fd,AMFS[amfID].body + AMFS[amfID].bodyCount,128,0); - AMFS[amfID].bodyCount = 128; - } - break; - case 3: - printf("4 Byte Header\n"); - if ((AMFS[amfID].bodySize - AMFS[amfID].bodyCount) > 128) { - while (select(fd + 1,&readset,0x0,0x0,0x0)) { - if (FD_ISSET(fd,&readset)) - break; - } - recv(fd,AMFS[amfID].body + (AMFS[amfID].bodyCount - 1),128,0); - AMFS[amfID].bodyCount += 128; - } - else { - printf("Bal: (%i)\n",(AMFS[amfID].bodySize - AMFS[amfID].bodyCount)); - while (select(fd + 1,&readset,0x0,0x0,0x0)) { - if (FD_ISSET(fd,&readset)) - break; - } - recLen = recv(fd,AMFS[amfID].body + (AMFS[amfID].bodyCount - 1),(AMFS[amfID].bodySize - AMFS[amfID].bodyCount)-1,0); - printf("Received: [%i]\n",recLen); - AMFS[amfID].bodyCount += (AMFS[amfID].bodySize - AMFS[amfID].bodyCount)-1; - } - break; - case 2: - printf("2 Byte Header\n"); - if ((AMFS[amfID].bodySize - AMFS[amfID].bodyCount) > 128) { - while (select(fd + 1,&readset,0x0,0x0,0x0)) { - if (FD_ISSET(fd,&readset)) - break; - } - recv(fd,AMFS[amfID].body + (AMFS[amfID].bodyCount - 1),128,0); - AMFS[amfID].bodyCount += 128; - } - else { - printf("Bal: (%i)\n",(AMFS[amfID].bodySize - AMFS[amfID].bodyCount)); - while (select(fd + 1,&readset,0x0,0x0,0x0)) { - if (FD_ISSET(fd,&readset)) - break; - } - recLen = recv(fd,AMFS[amfID].body + (AMFS[amfID].bodyCount - 1),(AMFS[amfID].bodySize - AMFS[amfID].bodyCount)-1,0); - printf("Received: [%i]\n",recLen); - AMFS[amfID].bodyCount += (AMFS[amfID].bodySize - AMFS[amfID].bodyCount)-1; - } - break; - - default: - printf("Unhandled Header Size: [%i]\n",headerSize); - return(-1); - break; - } - if (AMFS[amfID].bodySize == AMFS[amfID].bodyCount) - doProcessPacket(fd,amfID); - return(0x0); - } - -int doProcessPacket(int fd,int amfID) { - printf("WOOP"); - return(0x0); - } - -int getAMF_old(int fd,int amf) { - int recLen = 0x0; - int amfID = 0x0; - int headerSize = 0x0; - int packetSize = 0x0; - char data[12]; - amfHeader *amfHdr = (amfHeader *)&data; - fd_set readset; - - FD_ZERO(&readset); - FD_SET(fd,&readset); - - - - headerSize = (amf & 0xC0) >> 6; - amfID = amf & 0x3F; - - printf("AMF ID: [0x%X]\n",amfID); - - switch(headerSize) { - case 0: - printf("12 Byte Header\n"); - while (select(fd + 1,&readset,0x0,0x0,0x0)) { - if (FD_ISSET(fd,&readset)) - break; - } - - recLen = recv(fd,&data,11,0); - printf("Size: [0x%X:0x%X:0x%X]\n",amfHdr->amfSize[0] & 0xFF,amfHdr->amfSize[1] & 0xFF,amfHdr->amfSize[2] & 0xFF); - packetSize = (amfHdr->amfSize[0] * 256 * 256) + (amfHdr->amfSize[1] * 256) + amfHdr->amfSize[2]; - printf("PacketSize: [%i]\n",packetSize); -/* - recLen = recv(fd,&packet,packetSize,0); - if (recLen != packetSize) { - printf("Error Reading Packet\n"); - return(-1); - } -*/ - break; - default: - printf("Unhandled Header Size: [%i]\n",headerSize); - return(-1); - break; - } - - switch (amfHdr->amfType[0]) { - case 0x14: - doAmfFunction(fd,amfID,packetSize); - break; - default: - printf("Unhandled amfType: [0x%X]\n",amfHdr->amfType[0]); - return(-1); - break; - } - return(0x0); - } - -int doAmfFunction(int fd,int amfID,int packetSize) { - int recLen = 0x0; - int i = 0x0; - int ch = 0x0; - char *packet = 0x0; - char function[256]; - fd_set readset; - - FD_ZERO(&readset); - FD_SET(fd,&readset); - - - printf("Reading Packet"); - packet = (char *)malloc(packetSize); - while (select(fd + 1,&readset,0x0,0x0,0x0)) { - if (FD_ISSET(fd,&readset)) - break; - } - - recLen = recv(fd,packet,packetSize,MSG_WAITALL); - - for (i = 0;i < packet[2];i++) { - function[i] = packet[i + 3]; - } - function[i] = '\0'; - printf("Function: [%s]\n",function); - if (!strcmp(function,"connect")) - amfDoAccept(fd); - - for (i=0;i= 10 && ch <= 128) - printf("[%c]",ch); - else - printf("[0x%X]",ch); - } - printf("\n"); - return(0x0); - } - diff --git a/rtmp.h b/rtmp.h index 7b9d790..aaa453e 100644 --- a/rtmp.h +++ b/rtmp.h @@ -1,10 +1,9 @@ +#include + #define MYPORT 2036 // the port users will be connecting to #define BACKLOG 10 // how many pending connections queue will hold -#define N 1000 -#define MEGEXTRA 1000000 - typedef struct { char unknown[3]; char amfSize[3]; @@ -28,10 +27,23 @@ int socketFD; } myConnections_t; -int amfDoAccept(int); +/* + Global variables very not safe + */ +extern int listenerFD; +extern int highSock; +extern amfHeader_real *AMFS; + +/* AMF Functions */ int amfDoHandshake(int); -int sStartListener(); -void *sListenerThread(void *); -int sAddConnection(int); +int amfGetData(int); +int amfProcessPacket(int,int); +int amfFunction(int,int); +int amfMessage(int, int); + +/* Socket Functions */ ssize_t sReadSocket(int socketFD,void *buffer,size_t length); +int sStartListener(); +int sAddConnection(int); int sGetConnections(fd_set *); +int sProcessConnections(fd_set *); diff --git a/socket.c b/socket.c index 6a017cc..e545bbd 100644 --- a/socket.c +++ b/socket.c @@ -12,22 +12,18 @@ #include #include #include -#include #include #include #include "rtmp.h" -pthread_t listenerThread; -pthread_mutex_t sConnectionsMutex; -pthread_attr_t attr; - myConnections_t *connections = 0x0; +int listenerFD = 0x0; +int highSock = 0x0; + int sStartListener() { int optVal = 0x1; - int listenerFD = 0x0; - size_t stacksize; struct sockaddr_in myAddr; // my address information @@ -56,13 +52,10 @@ exit(1); } - pthread_mutex_init(&sConnectionsMutex,NULL); - pthread_attr_getstacksize(&attr,&stacksize); - printf("SS: [0x%X]\n",stacksize); - pthread_attr_setstacksize(&attr,sizeof(double)*N*N+MEGEXTRA); - printf("SS: [0x%X]\n",stacksize); + fcntl(listenerFD, F_SETFL, fcntl(listenerFD, F_GETFL, 0) | O_NONBLOCK); - pthread_create(&listenerThread,&attr,sListenerThread,(void *)listenerFD); + highSock = listenerFD; + return(0x0); } @@ -92,39 +85,36 @@ return(recLen); } -void *sListenerThread(void *lFD) { - int listenerFD = (int)lFD; - int newFD = 0x0; //New Socket; +int sGetConnection() { + int newFD = 0x0; // New Socket; socklen_t sin_size; - struct sockaddr_in remoteAddr; // connector's address information + struct sockaddr_in remoteAddr; sin_size = sizeof(struct sockaddr_in); - while (1) { - printf("Waiting On Connection\n"); - if ((newFD = accept(listenerFD, (struct sockaddr *)&remoteAddr, &sin_size)) == -1) { - perror("accept"); - } - else { - fcntl(newFD, F_SETFL, fcntl(newFD, F_GETFL, 0) | O_NONBLOCK); - - printf("server: got connection from %s\n",inet_ntoa(remoteAddr.sin_addr)); - - if (amfDoHandshake(newFD) == -1) { - printf("Error: Bad Handshake\n"); - close(newFD); - } - printf("A"); - sAddConnection(newFD); - } + if ((newFD = accept(listenerFD, (struct sockaddr *)&remoteAddr, &sin_size)) == -1) { + perror("accept"); } + + printf("Server: New connection from: %s\n",inet_ntoa(remoteAddr.sin_addr)); + + if (amfDoHandshake(newFD) == -1) { + printf("Error: Bad Handshake\n"); + // sRemoveConnection(newFD); + close(newFD); + } + else { + sAddConnection(newFD); + highSock = newFD; + } + + /* Return */ + return(0x0); } int sAddConnection(int socketFD) { myConnections_t *tmpConnection = 0x0; - printf("Adding Socket"); - - pthread_mutex_lock(&sConnectionsMutex); + printf("Adding Socket\n"); if (connections == 0x0) { connections = (myConnections_t *)malloc(sizeof(myConnections_t)); @@ -140,19 +130,43 @@ connections->prev = tmpConnection; connections = tmpConnection; } - printf("Socket Added"); - pthread_mutex_unlock(&sConnectionsMutex); + printf("Socket Added\n"); return(0x0); } +int sRemoveConnection(int socketFD) { + myConnections_t *tmpConnection = 0x0; + printf("Removing Socket\n"); + + for (tmpConnection = connections;tmpConnection != 0x0;tmpConnection = tmpConnection->next) { + if (tmpConnection->socketFD == socketFD) { + if (tmpConnection == connections) { + connections = tmpConnection->next; + free(tmpConnection); + } + else { + if (tmpConnection->prev != 0x0) + tmpConnection->prev->next = tmpConnection->next; + if (tmpConnection->next != 0x0) + tmpConnection->next->prev = tmpConnection->prev; + free(tmpConnection); + } + return(0x0); + } + } + + printf("Socket Removed\n"); + return(-1); + } + int sGetConnections(fd_set *readset) { int retVal = 0; myConnections_t *tmpConnection = 0x0; FD_ZERO(readset); - pthread_mutex_lock(&sConnectionsMutex); + FD_SET(listenerFD,readset); if (connections != 0x0) { for (tmpConnection = connections;tmpConnection != 0x0;tmpConnection = tmpConnection->next) { @@ -161,6 +175,17 @@ retVal = 1; } - pthread_mutex_unlock(&sConnectionsMutex); return(retVal); } + +int sProcessConnections(fd_set *readset) { + myConnections_t *tmpConnection = 0x0; + if (FD_ISSET(listenerFD,readset)) + sGetConnection(); + + for (tmpConnection = connections;tmpConnection != 0x0;tmpConnection = tmpConnection->next) { + if (FD_ISSET(tmpConnection->socketFD,readset)) + amfGetData(tmpConnection->socketFD); + } + return(0x0); + }