[mw-devel] MW3 r1265 - in trunk/src: . server

arthur at sucs.org arthur at sucs.org
Sat Nov 3 21:16:43 GMT 2012


Author: arthur
Date: 2012-11-03 21:16:43 +0000 (Sat, 03 Nov 2012)
New Revision: 1265

Added:
   trunk/src/list.h
   trunk/src/server/
   trunk/src/server/Makefile
   trunk/src/server/mwserv.c
   trunk/src/server/servsock.c
   trunk/src/server/servsock.h
   trunk/src/socket.c
   trunk/src/socket.h
Modified:
   trunk/src/Makefile
   trunk/src/files.c
   trunk/src/incoming.c
   trunk/src/ipc.c
   trunk/src/ipc.h
   trunk/src/main.c
Log:
Client-Server mode, with simplistic server


Modified: trunk/src/Makefile
===================================================================
--- trunk/src/Makefile	2012-10-17 14:24:14 UTC (rev 1264)
+++ trunk/src/Makefile	2012-11-03 21:16:43 UTC (rev 1265)
@@ -59,7 +59,7 @@
 topten.o sort.o tidyup.o gags.o script_inst.o script.o\
 incoming.o command.o chattable.o alias.o frl.o hash.o vars.o expand.o\
 files.o completion.o iconv.o gagtable.o \
-js.o sqlite.o ipc.o log.o uri.o
+js.o sqlite.o ipc.o log.o uri.o socket.o
 	$(CC) $(LDFLAGS) $(LDLIBS) -o $@ $^
 
 del_user: del_user.o perms.o strings.o

Modified: trunk/src/files.c
===================================================================
--- trunk/src/files.c	2012-10-17 14:24:14 UTC (rev 1264)
+++ trunk/src/files.c	2012-11-03 21:16:43 UTC (rev 1265)
@@ -18,9 +18,8 @@
 #include "perms.h"
 #include "files.h"
 #include "who.h"
+#include "ipc.h"
 
-int incoming_pipe;
-
 #define MAXPIPELENGTH 2048
 
 static char *get_pipe_name(int pid)
@@ -32,20 +31,11 @@
 
 void create_pipe(void)
 {
-	if (mkfifo(get_pipe_name(getpid()),0600))
-	{
-		perror("Error in Creating mesg pipe");
-		exit(-1);
-	}
 }
 
 void open_incoming_fifo(void)
 {
-	if ((incoming_pipe=open(get_pipe_name(getpid()),O_RDWR))<0)
-	{
-		perror("Error opening incoming pipe");
-		exit(-1);
-	}
+	ipc_connect("localhost");
 }
 
 int open_outgoing_fifo(pid_t dest_pid)
@@ -60,9 +50,7 @@
 
 void close_fifo(void)
 {
-	if(incoming_pipe)
-		close(incoming_pipe);
-	unlink(get_pipe_name(getpid()));
+	ipc_close();
 }
 
 void Lock_File(int f)

Modified: trunk/src/incoming.c
===================================================================
--- trunk/src/incoming.c	2012-10-17 14:24:14 UTC (rev 1264)
+++ trunk/src/incoming.c	2012-11-03 21:16:43 UTC (rev 1265)
@@ -28,6 +28,7 @@
 #include "mesg.h"
 #include "echo.h"
 #include "intl.h"
+#include "socket.h"
 
 extern int script_terminate;
 extern Alias rpc_list;
@@ -40,6 +41,7 @@
 extern struct room myroom;
 extern int runautoexec;
 extern int quietmode;
+extern ipc_connection_t *ipcsock;
 
 int new_mail_waiting=0;
 int mesg_waiting = 0;
@@ -68,6 +70,8 @@
 static void force_protlevel(char *text, unsigned long theirprivs, const char *from);
 static void force_protpower(char *text);
 
+#define _MIN(a,b) (a<b)?a:b
+
 void InsertMesg(struct mstack *new)
 {
 	new->next=NULL;
@@ -447,56 +451,21 @@
 	enum ipc_types msgtype = IPC_NOOP;
 
 	mesg_waiting = 0;
-	fl = fcntl(incoming_pipe, F_GETFL);
-	fcntl(incoming_pipe, F_SETFL, fl | O_NDELAY);
 
-	while ((nos=read(incoming_pipe,buff,MAXTEXTLENGTH))>0)
-	{
-		for (i = 0; i < nos; i++) {
-			if (! midread) {
-				midread = 1;
-				msgtype = buff[i];
-				if (ptr>0) /* discard partial message */
-					printf("\n*** handle_mesg: discarded %d chars.\n\007", ptr);
-				pidp=0;
-			} else {
-				if (pidp<4) {
-					pidbuff[pidp] = buff[i];
-					pidp++;
-					if (pidp>=4) {
-						memcpy(&mesg_pid, &pidbuff, 4);
-						if ((mesg_posn=get_who_userposn(mesg_pid))<0)
-							strcpy(mesg_user.name,"System");
-						else
-							fetch_user(&mesg_user, mesg_posn);
-						ptr=0;
-					}
-					continue;
-				}
-				if (! quotemode) {
-					if (buff[i] == 1) {
-						quotemode = 1;
-						continue;
-					} else if (buff[i] == '|') {
-						newbuff[ptr] = 0;
-						accept_pipe_cmd(msgtype, newbuff, mesg_pid, &mesg_user);
-						ptr = 0;
-						midread = 0;
-						continue;
-					}
-				}
-				/* just another character, buffer it for now */
-				if (ptr < MAXPIPELENGTH)
-				{
-					newbuff[ptr]=buff[i];
-					ptr++;
-				}
-				quotemode=0;
-			}
-		}
+	ipc_message_t * msg = read_socket(ipcsock, 1);
+
+	while (msg != NULL) {
+		if ((mesg_posn=get_who_userposn(msg->head.src))<0)
+			strcpy(mesg_user.name,"System");
+		else
+			fetch_user(&mesg_user, mesg_posn);
+		bzero(newbuff, sizeof(newbuff));
+		memcpy(newbuff, msg->body, _MIN(sizeof(newbuff)-1, msg->bodylen));
+		accept_pipe_cmd(msg->head.type, newbuff, msg->head.src, &mesg_user);
+
+		ipcmsg_destroy(msg);
+		msg = read_socket(ipcsock, 0);
 	}
-	/*if (midread != 0) printf("\n*** handle_mesg: partial message read - will resume\n\007");*/
-	fcntl(incoming_pipe, F_SETFL, fl);
 	if (cm_flags(user->chatmode,CM_ONCHAT,CM_MODE_ANY)) set_talk_rights(); else set_rights();
 }
 

Modified: trunk/src/ipc.c
===================================================================
--- trunk/src/ipc.c	2012-10-17 14:24:14 UTC (rev 1264)
+++ trunk/src/ipc.c	2012-11-03 21:16:43 UTC (rev 1265)
@@ -13,7 +13,62 @@
 #include "mesg.h"
 #include "who.h"
 #include "files.h"
+#include "socket.h"
 
+ipc_connection_t * ipcsock = NULL;
+char *ipc_parent = NULL;
+
+void ipc_connect(const char *target)
+{
+	const char * host = target;
+
+	if (ipcsock == NULL) 
+		ipcsock = ipcconn_create();
+
+	if (target == NULL) host = ipc_parent;
+
+	int fd = ipcconn_connect(host);
+	if (fd < -1) {
+		free(ipcsock);
+		ipcsock = NULL;
+		fprintf(stderr, "Conneciton to server failed.\n");
+		return;
+	}
+
+	if (target != NULL) {
+		if (ipc_parent != NULL) free(ipc_parent);
+		ipc_parent = strdup(target);
+	}
+
+	ipcsock = ipcconn_create();
+	ipcsock->fd = fd;
+
+	pid_t mypid = getpid();
+	ipc_message_t * msg = ipcmsg_create(FOURCC("HELO"), mypid);
+	ipcmsg_send(msg, ipcsock);
+}
+
+void ipc_check(void)
+{
+	if (ipcsock == NULL || ipcsock->fd == -1)
+		ipc_connect(NULL);
+}
+
+void ipc_close()
+{
+	if (ipcsock == NULL) return;
+	close(ipcsock->fd);
+	free(ipcsock);
+	ipcsock=NULL;
+}
+
+int ipc_getfd()
+{
+	if (ipcsock == NULL) return -1;
+	return ipcsock->fd;
+}
+
+
 /*
  * Send a message to a remote process addressed by PID
  * Returns:
@@ -29,44 +84,30 @@
  *	C	- An arbitrary length of data
  */
 int ipc_send_to_pid(pid_t dest, enum ipc_types msgtype, const char * data) {
-	char	buff[MAXTEXTLENGTH];
-	int	fd;
-	ssize_t	dgram_len;
-	pid_t	mypid;
-	char *	qdata;
 	int	ret = 0;
 
 	if (dest < 1) return -1;	// Invalid PID
 	if (! data) data = "";
 
-	fd = open_outgoing_fifo(dest);
-	if (fd < 0) {
-		who_delete(dest);
-		return -1;
-	}
-
 	if (msgtype != IPC_NOOP) {
-		memset(buff, 0, MAXTEXTLENGTH);
-		qdata = quotetext(data);
-		snprintf(buff, MAXTEXTLENGTH - 1, "%cxxxx%s",(char) msgtype, qdata);
-		free(qdata);
+		char	buff[MAXTEXTLENGTH];
+		snprintf(buff, MAXTEXTLENGTH - 1, "%s",data);
 		strip_quote(buff);
-		strcat(buff,"|");
-		mypid = getpid();
-		memcpy(buff + 1, &mypid, 4);
+		pid_t mypid = getpid();
 		//ensure text (everything after the first 5 bytes) going into the pipes is clean utf-8 after all the truncating etc. that might have happened to it.
-		if(utf8_cleanup(buff+5)>=0) {
-			// the first 5 bytes are binary data that break strlen
-			dgram_len = strlen(buff+5)+5;
-			if (write(fd, buff, dgram_len) <= 0) {
-				//fprintf(stderr, "Write to %s failed: %s\n", fifo_path, strerror(errno));
-				ret = -2;
-			}
+		//
+
+		ipc_check();
+		if(utf8_cleanup(buff)>=0) {
+			ssize_t dgram_len = strlen(buff);
+			ipc_message_t *msg = ipcmsg_create(msgtype, mypid);
+			ipcmsg_destination(msg, dest);
+			ipcmsg_append(msg, buff, dgram_len);
+			ipcmsg_send(msg, ipcsock);
 		} else {
 			fprintf(stderr, "Failed to utf8_cleanup the message\n"); 
 		}
 	}
-	close(fd);
 	return ret;
 }
 

Modified: trunk/src/ipc.h
===================================================================
--- trunk/src/ipc.h	2012-10-17 14:24:14 UTC (rev 1264)
+++ trunk/src/ipc.h	2012-11-03 21:16:43 UTC (rev 1265)
@@ -35,6 +35,9 @@
 
 typedef int (send_filter)(const struct person * usr, const struct who * who, const void * info);
 
+void ipc_connect(const char *target);
+void ipc_close(void);
+int ipc_getfd(void);
 int ipc_send_to_pid(pid_t dest, enum ipc_types msgtype, const char * data);
 unsigned int ipc_send_to_username(const char * dest, enum ipc_types msgtype, const char * data);
 unsigned int ipc_send_to_all(enum ipc_types msgtype, const char * data, send_filter * filter, const void * filterinfo);

Added: trunk/src/list.h
===================================================================
--- trunk/src/list.h	                        (rev 0)
+++ trunk/src/list.h	2012-11-03 21:16:43 UTC (rev 1265)
@@ -0,0 +1,241 @@
+#ifndef __LIST_H
+#define __LIST_H
+
+/*
+ * Simple doubly linked list implementation.
+ *
+ * Some of the internal functions ("__xxx") are useful when
+ * manipulating whole lists rather than single entries, as
+ * sometimes we already know the next/prev entries and we can
+ * generate better code by using them directly rather than
+ * using the generic single-entry routines.
+ */
+
+struct list_head {
+	struct list_head *next, *prev;
+};
+
+#define LIST_HEAD_INIT(name) { &(name), &(name) }
+
+#define LIST_HEAD(name) \
+	struct list_head name = LIST_HEAD_INIT(name)
+
+#define INIT_LIST_HEAD(ptr) do { \
+	(ptr)->next = (ptr); (ptr)->prev = (ptr); \
+} while (0)
+
+/*
+ * Insert a new entry between two known consecutive entries. 
+ *
+ * This is only for internal list manipulation where we know
+ * the prev/next entries already!
+ */
+static inline void __list_add(struct list_head *new,
+			      struct list_head *prev,
+			      struct list_head *next)
+{
+	next->prev = new;
+	new->next = next;
+	new->prev = prev;
+	prev->next = new;
+}
+
+/**
+ * list_add - add a new entry
+ * @new: new entry to be added
+ * @head: list head to add it after
+ *
+ * Insert a new entry after the specified head.
+ * This is good for implementing stacks.
+ */
+static inline void list_add(struct list_head *new, struct list_head *head)
+{
+	__list_add(new, head, head->next);
+}
+
+/**
+ * list_add_tail - add a new entry
+ * @new: new entry to be added
+ * @head: list head to add it before
+ *
+ * Insert a new entry before the specified head.
+ * This is useful for implementing queues.
+ */
+static inline void list_add_tail(struct list_head *new, struct list_head *head)
+{
+	__list_add(new, head->prev, head);
+}
+
+/*
+ * Delete a list entry by making the prev/next entries
+ * point to each other.
+ *
+ * This is only for internal list manipulation where we know
+ * the prev/next entries already!
+ */
+static inline void __list_del(struct list_head *prev, struct list_head *next)
+{
+	next->prev = prev;
+	prev->next = next;
+}
+
+/**
+ * list_del - deletes entry from list.
+ * @entry: the element to delete from the list.
+ * Note: list_empty on entry does not return true after this, the entry is in an undefined state.
+ */
+static inline void list_del(struct list_head *entry)
+{
+	__list_del(entry->prev, entry->next);
+	entry->next = (void *) 0;
+	entry->prev = (void *) 0;
+}
+
+/**
+ * list_del_init - deletes entry from list and reinitialize it.
+ * @entry: the element to delete from the list.
+ */
+static inline void list_del_init(struct list_head *entry)
+{
+	__list_del(entry->prev, entry->next);
+	INIT_LIST_HEAD(entry); 
+}
+
+/**
+ * list_move - delete from one list and add as another's head
+ * @list: the entry to move
+ * @head: the head that will precede our entry
+ */
+static inline void list_move(struct list_head *list, struct list_head *head)
+{
+        __list_del(list->prev, list->next);
+        list_add(list, head);
+}
+
+/**
+ * list_move_tail - delete from one list and add as another's tail
+ * @list: the entry to move
+ * @head: the head that will follow our entry
+ */
+static inline void list_move_tail(struct list_head *list,
+				  struct list_head *head)
+{
+        __list_del(list->prev, list->next);
+        list_add_tail(list, head);
+}
+
+/**
+ * list_empty - tests whether a list is empty
+ * @head: the list to test.
+ */
+static inline int list_empty(struct list_head *head)
+{
+	return head->next == head;
+}
+
+static inline void __list_splice(struct list_head *list,
+				 struct list_head *head)
+{
+	struct list_head *first = list->next;
+	struct list_head *last = list->prev;
+	struct list_head *at = head->next;
+
+	first->prev = head;
+	head->next = first;
+
+	last->next = at;
+	at->prev = last;
+}
+
+/**
+ * list_splice - join two lists
+ * @list: the new list to add.
+ * @head: the place to add it in the first list.
+ */
+static inline void list_splice(struct list_head *list, struct list_head *head)
+{
+	if (!list_empty(list))
+		__list_splice(list, head);
+}
+
+/**
+ * list_splice_init - join two lists and reinitialise the emptied list.
+ * @list: the new list to add.
+ * @head: the place to add it in the first list.
+ *
+ * The list at @list is reinitialised
+ */
+static inline void list_splice_init(struct list_head *list,
+				    struct list_head *head)
+{
+	if (!list_empty(list)) {
+		__list_splice(list, head);
+		INIT_LIST_HEAD(list);
+	}
+}
+
+/**
+ * list_entry - get the struct for this entry
+ * @ptr:	the &struct list_head pointer.
+ * @type:	the type of the struct this is embedded in.
+ * @member:	the name of the list_struct within the struct.
+ */
+#define list_entry(ptr, type, member) \
+	((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member)))
+
+#define list_first(head) (head)->next
+#define list_last(head) (head)->prev
+
+/**
+ * list_for_each	-	iterate over a list
+ * @pos:	the &struct list_head to use as a loop counter.
+ * @head:	the head for your list.
+ */
+#define list_for_each(pos, head) \
+	for (pos = (head)->next; pos != (head); \
+        	pos = pos->next)
+/**
+ * list_for_each_prev	-	iterate over a list backwards
+ * @pos:	the &struct list_head to use as a loop counter.
+ * @head:	the head for your list.
+ */
+#define list_for_each_prev(pos, head) \
+	for (pos = (head)->prev; pos != (head); \
+        	pos = pos->prev)
+        	
+/**
+ * list_for_each_safe	-	iterate over a list safe against removal of list entry
+ * @pos:	the &struct list_head to use as a loop counter.
+ * @n:		another &struct list_head to use as temporary storage
+ * @head:	the head for your list.
+ */
+#define list_for_each_safe(pos, n, head) \
+	for (pos = (head)->next, n = pos->next; pos != (head); \
+		pos = n, n = pos->next)
+
+/**
+ * list_for_each_entry	-	iterate over list of given type
+ * @pos:	the type * to use as a loop counter.
+ * @head:	the head for your list.
+ * @member:	the name of the list_struct within the struct.
+ */
+#define list_for_each_entry(pos, head, member)				\
+	for (pos = list_entry((head)->next, typeof(*pos), member);	\
+	     &pos->member != (head); 					\
+	     pos = list_entry(pos->member.next, typeof(*pos), member))
+
+/**
+ * list_for_each_entry_safe - iterate over list of given type safe against removal of list entry
+ * @pos:	the type * to use as a loop counter.
+ * @n:		another type * to use as temporary storage
+ * @head:	the head for your list.
+ * @member:	the name of the list_struct within the struct.
+ */
+#define list_for_each_entry_safe(pos, n, head, member)			\
+	for (pos = list_entry((head)->next, typeof(*pos), member),	\
+		n = list_entry(pos->member.next, typeof(*pos), member);	\
+	     &pos->member != (head); 					\
+	     pos = n, n = list_entry(n->member.next, typeof(*n), member))
+
+
+#endif

Modified: trunk/src/main.c
===================================================================
--- trunk/src/main.c	2012-10-17 14:24:14 UTC (rev 1264)
+++ trunk/src/main.c	2012-11-03 21:16:43 UTC (rev 1265)
@@ -1031,6 +1031,7 @@
 
 	in_idle++;
 
+	int incoming_pipe = ipc_getfd();
 	FD_ZERO(&readfds);
 	FD_ZERO(&exceptfds);
 	FD_SET(incoming_pipe, &readfds);
@@ -1070,7 +1071,7 @@
 	if (nfds<0 && select_error == EINVAL)
 	{
 		char buf[256];
-		snprintf(buf, 255, "\n{EINVAL: fd=%d, incoming_pipe=%d}\n", fd, incoming_pipe);
+		snprintf(buf, 255, "\n{EINVAL: fd=%d, incoming_pipe=%d}\n", fd, ipc_getfd());
 		write(1,buf,strlen(buf));
 	}
 	if (nfds > 0 && fd >= 0 && FD_ISSET(fd, &readfds)) return 1;

Added: trunk/src/server/Makefile
===================================================================
--- trunk/src/server/Makefile	                        (rev 0)
+++ trunk/src/server/Makefile	2012-11-03 21:16:43 UTC (rev 1265)
@@ -0,0 +1,73 @@
+include ../../Makefile.common
+#libdir := /usr/lib
+#localstatedir := /var
+
+LOGDIR := $(localstatedir)/log/mw
+MSGDIR := $(localstatedir)/run/mw
+STATEDIR := $(localstatedir)/lib/mw
+HOMEPATH := $(libdir)/mw
+
+# cflags for standard 'cc' compiler
+CFLAGS= -Wall -pedantic --std=gnu99 -D_GNU_SOURCE -I.. -I/usr/include/postgresql
+#LDFLAGS+= -pie
+
+# info strings, do not edit.
+DEFS:= -DBUILD_DATE=\"$(shell date +%Y%m%d)\"
+DEFS+= -DBUILD_USER=\"$(shell whoami | awk -f ../capitalise.awk)\"
+DEFS+= -DVER_MAJ=\"$(VERSION_MAJOR)\"
+DEFS+= -DVER_MIN=\"$(VERSION_MINOR)\"
+DEFS+= -DVER_TWK=\"$(VERSION_TWEAK)\"
+DEFS+= -DHOMEPATH=\"$(HOMEPATH)\"
+DEFS+= -DLOGDIR=\"$(LOGDIR)\"
+DEFS+= -DSTATEDIR=\"$(STATEDIR)\"
+DEFS+= -DMSGDIR=\"$(MSGDIR)\"
+
+### uncomment for gdb debugging
+LDFLAGS+= -ggdb -g -lcrypt -lpq
+CFLAGS+= -ggdb -g -D__NO_STRING_INLINE -fstack-protector-all -std=c99
+
+### Optimisation - uncomment for release & extra testing
+#CFLAGS+=-O3
+
+### Only ever uncomment for final release versions
+DEFS+= -DRELEASE
+
+CFLAGS += $(DEFS)
+
+build: mwserv
+
+### The magic which lets us autogenerate dependencies
+CFLAGS += -MMD
+
+CODE=$(wildcard *.c)
+HDRS=$(wildcard *.h)
+
+%.o: %.c
+	$(CC) $(CFLAGS) -c -o $@ $<
+
+-include $(CODE:.c=.d)
+
+.PHONY: build install clean test
+
+mwserv: mwserv.o servsock.o ../socket.o
+	$(CC) $(LDFLAGS) $(LDLIBS) -o $@ $^
+
+clean:
+	-rm -f *.o *.d mwserv
+
+install: mwserv
+	install -Ds mwserv $(DESTDIR)$(HOMEPATH)/mwserv
+
+ifndef TESTDIR
+test:
+	make TESTDIR=$(CURDIR)/mwtest $@
+else
+test:
+	mkdir -p "$(TESTDIR)"
+	cd "$(TESTDIR)" && mkdir -p mw run/mw log/mw lib/mw
+	for d in $(INSTALLFILES); do \
+		svn export --force ../$$d "$(TESTDIR)/mw/$$d"; \
+	done
+	make libdir="$(TESTDIR)" localstatedir="$(TESTDIR)"
+
+endif

Added: trunk/src/server/mwserv.c
===================================================================
--- trunk/src/server/mwserv.c	                        (rev 0)
+++ trunk/src/server/mwserv.c	2012-11-03 21:16:43 UTC (rev 1265)
@@ -0,0 +1,23 @@
+#include <stdio.h>
+#include <fcntl.h>
+#include "../socket.h"
+#include "servsock.h"
+
+int main(int argc, char **argv)
+{
+	int mainsock = -1;
+	init_server();
+
+	mainsock = open_mainsock(IPCPORT_DEFAULT);
+
+	if (mainsock < 0) {
+		fprintf(stderr, "Failed.\n");
+		return 1;
+	}
+
+	uint32_t test = FOURCC("CAST");
+	printf("test %4.4s == 0x%X\n", (char *)&test, test);
+
+	watch_mainsock(mainsock);
+	printf("Done.\n");
+}

Added: trunk/src/server/servsock.c
===================================================================
--- trunk/src/server/servsock.c	                        (rev 0)
+++ trunk/src/server/servsock.c	2012-11-03 21:16:43 UTC (rev 1265)
@@ -0,0 +1,282 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/epoll.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <string.h>
+#include <errno.h>
+#include <arpa/inet.h>
+#include <stdlib.h>
+#include <netdb.h>
+#include <sys/param.h>
+#include <time.h>
+#include <sys/time.h>
+
+#include "../socket.h"
+#include "servsock.h"
+
+struct list_head connection_list;
+
+static int pollfd = -1;
+int mainsock_die = 0;
+
+typedef struct {
+	struct list_head list;
+
+	ipc_message_t * msg;
+
+	int headsent;
+	int used;
+} outq_msg_t;
+
+int open_mainsock(uint16_t port)
+{
+	int fd = -1;
+	if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+		fprintf(stderr, "Error creating master socket. %s\n", strerror(errno));
+		return -1;
+	}
+
+	/* we could have done v6 here as well, but the 4-in-6 stuff is
+	 * now deprecated so we would have to open a second v6 only socket
+	 * if we really want v6 mode */
+	struct sockaddr_in in;
+	in.sin_family = AF_INET;
+	in.sin_port = htons(port);
+	in.sin_addr.s_addr = INADDR_ANY;
+
+	int i = 1;
+	if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i))) {
+		fprintf(stderr, "Error enabling rebind on port. %s\n", strerror(errno));
+		close(fd);
+		return -1;
+	}
+
+	if (bind(fd, (struct sockaddr *)&in, sizeof(in))) {
+		fprintf(stderr, "Error binding port: %s\n", strerror(errno));
+		close(fd);
+		return -1;
+	}
+
+	if (listen(fd, 2)) {
+		fprintf(stderr, "Error listening on port: %s\n", strerror(errno));
+		close(fd);
+		return -1;
+	}
+
+	return fd;
+}
+
+ipc_connection_t * add_connection(int fd)
+{
+	/* create new structure */
+	ipc_connection_t *new = ipcconn_create();
+	new->fd = fd;
+
+	/* add to list of all connections */
+	list_add_tail(&(new->list), &connection_list);
+
+	/* register interest in read events */
+	struct epoll_event ev;
+	bzero(&ev, sizeof(ev));
+	ev.events = EPOLLIN | EPOLLERR;
+	ev.data.ptr = new;
+	if (epoll_ctl( pollfd, EPOLL_CTL_ADD, new->fd, &ev)) {
+		fprintf(stderr, "Error adding new conn to poll: %s\n", strerror(errno));
+	}
+
+	new->state = IPCSTATE_CONNECTED;
+
+	return new;
+}
+
+void accept_connection(int mainsock)
+{
+	struct sockaddr_in in;
+	socklen_t inlen = sizeof(in);
+	int sock;
+
+	if ((sock=accept(mainsock, (struct sockaddr *)&in, &inlen))<0) {
+		fprintf(stderr, "Error accepting new connection: %s\n", strerror(errno));
+		return;
+	}
+
+	ipc_connection_t * c = add_connection(sock);
+
+	printf("Accept fd=%d %s:%d\n", c->fd, inet_ntoa(in.sin_addr), ntohs(in.sin_port));
+}
+
+void drop_connection(ipc_connection_t * conn)
+{
+	struct epoll_event ev;
+
+	printf("Drop connection\n");
+	bzero(&ev, sizeof(ev));
+	epoll_ctl(pollfd, EPOLL_CTL_DEL, conn->fd, &ev);
+	list_del_init(&conn->list);
+	if (conn->fd != -1) close(conn->fd);
+	conn->fd = -1;
+	conn->state = IPCSTATE_ERROR;
+	if (!list_empty(&conn->outq)) {
+		struct list_head *pos, *q;
+		list_for_each_safe(pos, q, &conn->outq) {
+			ipc_message_t * msg = list_entry(pos, ipc_message_t, list);
+			list_del(pos);
+			ipcmsg_destroy(msg);
+		}
+	}
+	bzero(conn, sizeof(ipc_connection_t));
+	free(conn);
+}
+
+void watch_mainsock(int mainsock)
+{
+	struct epoll_event ev;
+
+	/* add the mainsock to the epoll list
+	 * it will only generate read events */
+	bzero(&ev, sizeof(ev));
+	ev.events = EPOLLIN | EPOLLERR;
+	ev.data.ptr = NULL;
+	if (epoll_ctl(pollfd, EPOLL_CTL_ADD, mainsock, &ev)) {
+		fprintf(stderr, "Error adding mainsock: %s\n", strerror(errno));
+		return;
+	}
+
+	struct epoll_event event[20];
+	int ret;
+	struct timeval last;
+	gettimeofday(&last, NULL);
+
+bodge:
+	while ((ret = epoll_wait(pollfd, event, 20, 1000)) >= 0) {
+		for (int i=0; i<ret; i++) {
+			/* even on mainsock */
+			if (event[i].data.ptr == NULL) {
+				if (event[i].events & (EPOLLERR | EPOLLHUP)) {
+					return;
+				} else
+				if (event[i].events & EPOLLIN) {
+					accept_connection(mainsock);
+				} else {
+					fprintf(stderr, "unexpected event on mainsock.\n");
+				}
+			} else {
+				ipc_connection_t *c = event[i].data.ptr;
+				if (event[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
+					drop_connection(c);
+				} else
+				if (event[i].events & (EPOLLIN | EPOLLPRI)) {
+					ipc_message_t *msg = read_socket(c, 1);
+					if (c->fd == -1) {
+						drop_connection(c);
+					}else
+					while (msg != NULL) {
+						process_msg(c, msg);
+						msg=read_socket(c,0);
+					}
+				} else
+				if (event[i].events & EPOLLOUT) {
+					write_socket(c);
+				} else {
+					fprintf(stderr, "unexpected event on fd=%d.\n", c->fd);
+				}
+			}
+		}
+		/* end of events handling, do periodic stuff here */
+	}
+
+	/* epoll got interupted, not actually an error, go around again */
+	if (ret == -1 && errno == EINTR) goto bodge;
+
+	/* epoll gave an error */
+	fprintf(stderr, "epoll error: %s\n", strerror(errno));
+}
+
+void write_socket(ipc_connection_t * conn)
+{
+	if (list_empty(&conn->outq)) {
+		/* tx queue is empty, stop write events */
+		struct epoll_event ev;
+		bzero(&ev, sizeof(ev));
+		ev.events = EPOLLIN | EPOLLERR;
+		ev.data.ptr = conn;
+		if (epoll_ctl(pollfd, EPOLL_CTL_MOD, conn->fd, &ev)) {
+			fprintf(stderr, "Error updating poll fd=%d: %s\n", conn->fd, strerror(errno));
+		}
+		return;
+	}
+	struct list_head * pos = conn->outq.next;
+	outq_msg_t * mtx = list_entry(pos, outq_msg_t, list);
+	list_del(pos);
+
+	ipcmsg_send(mtx->msg, conn);
+	mtx->msg->refcount--;
+	if (mtx->msg->refcount <= 0) ipcmsg_destroy(mtx->msg);
+	free(mtx);
+}
+
+
+void process_msg(ipc_connection_t *conn, ipc_message_t *msg)
+{
+	printf("src: %X, dst: %X, Type: %4.4s, len=%d\n", msg->head.src, msg->head.dst, (char *)&msg->head.type, msg->head.len);
+
+	/* client just told us who they are */
+	if (msg->head.type == FOURCC("HELO")) {
+		memcpy(&conn->addr, &msg->head.src, sizeof(conn->addr));
+		conn->state = IPCSTATE_VALID;
+		ipcmsg_destroy(msg);
+		return;
+	}
+
+	/* otherwise redistribute this message to intended target */
+	struct list_head *pos;
+	list_for_each(pos, &connection_list) {
+		ipc_connection_t * conn = list_entry(pos, ipc_connection_t, list);
+		if (conn->state == IPCSTATE_VALID) {
+			if (msg->head.dst == 0) {
+				/* broadcast */
+				msg_attach(msg, conn);
+			} else
+			if (msg->head.dst == conn->addr) {
+				/* unicast */
+				msg_attach(msg, conn);
+			}
+		}
+	}
+	if (msg->refcount <= 0) ipcmsg_destroy(msg);
+}
+
+void msg_attach(ipc_message_t *msg, ipc_connection_t *conn)
+{
+	if (conn == NULL) return;
+	/* add to end of tx queue */
+
+	int wasempty = list_empty(&conn->outq);
+
+	outq_msg_t * new = malloc(sizeof(outq_msg_t));
+	bzero(new, sizeof(outq_msg_t));
+	new->msg = msg;
+	new->msg->refcount++;
+	list_add_tail(&(new->list), &(conn->outq));
+
+	/* nothing was queueed, switch on write notifications */
+	if (wasempty) {
+		struct epoll_event ev;
+		bzero(&ev, sizeof(ev));
+		ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
+		ev.data.ptr = conn;
+		epoll_ctl(pollfd, EPOLL_CTL_MOD, conn->fd, &ev);
+	}
+}
+
+void init_server()
+{
+	INIT_LIST_HEAD(&connection_list);
+	if (pollfd == -1) {
+		pollfd = epoll_create(30);
+	}
+}

Added: trunk/src/server/servsock.h
===================================================================
--- trunk/src/server/servsock.h	                        (rev 0)
+++ trunk/src/server/servsock.h	2012-11-03 21:16:43 UTC (rev 1265)
@@ -0,0 +1,10 @@
+/* servsock.c */
+int open_mainsock(uint16_t port);
+ipc_connection_t *add_connection(int fd);
+void accept_connection(int mainsock);
+void drop_connection(ipc_connection_t *conn);
+void watch_mainsock(int mainsock);
+void write_socket(ipc_connection_t *conn);
+void process_msg(ipc_connection_t *conn, ipc_message_t *msg);
+void msg_attach(ipc_message_t *msg, ipc_connection_t *conn);
+void init_server(void);

Added: trunk/src/socket.c
===================================================================
--- trunk/src/socket.c	                        (rev 0)
+++ trunk/src/socket.c	2012-11-03 21:16:43 UTC (rev 1265)
@@ -0,0 +1,202 @@
+#include <features.h>
+#include <stdio.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include "socket.h"
+
+#define _MIN(a,b) (a<b)?a:b
+
+inline unsigned int FOURCC(char *a)
+{
+	        return ((a[3]<<24)|(a[2]<<16)|(a[1]<<8)|a[0]);
+}
+
+ipc_message_t * ipcmsg_create(uint32_t type, uint32_t src)
+{
+	ipc_message_t *new = malloc(sizeof(ipc_message_t));
+	bzero(new, sizeof(ipc_message_t));
+	new->head.src = src;
+	new->head.type = type;
+	return new;
+}
+
+void ipcmsg_append(ipc_message_t *msg, const char * data, int len)
+{
+	if (msg == NULL) return;
+	int want = msg->bodylen + len;
+	msg->body = realloc(msg->body, want);
+	memcpy(&msg->body[msg->bodylen], data, len);
+	msg->bodylen += len;
+}
+
+void ipcmsg_destination(ipc_message_t *msg, uint32_t dest)
+{
+	if (msg == NULL) return;
+	msg->head.dst = dest;
+}
+
+void ipcmsg_destroy(ipc_message_t * msg)
+{
+	if (msg->body) free(msg->body);
+	bzero(msg, sizeof(ipc_message_t));
+	free(msg);
+}
+
+void ipcmsg_send(ipc_message_t *msg, ipc_connection_t *conn)
+{
+	struct iovec iov[2];
+	int iovused = 0;
+
+	bzero(&iov, sizeof(iov));
+	iov[0].iov_base = &msg->head;
+	iov[0].iov_len = sizeof(msg->head);
+	iovused++;
+	if (msg->bodylen > 0) {
+		msg->head.len = msg->bodylen;
+		iov[1].iov_base = msg->body;
+		iov[1].iov_len = msg->bodylen;
+		iovused++;
+	}
+
+	int bytes = 0;
+	for (int i=0;i<iovused;i++) bytes += iov[i].iov_len;
+
+	int n = writev(conn->fd, iov, iovused);
+	if (n == -1) {
+		ipcconn_bad(conn);
+	} else
+	if (n < bytes) {
+		fprintf(stderr, "Short write on ipc socket.\n");
+	}
+}
+
+void ipcconn_bad(ipc_connection_t * conn)
+{
+	close(conn->fd);
+	conn->fd = -1;
+}
+
+ipc_connection_t * ipcconn_create(void)
+{
+	ipc_connection_t * conn = malloc(sizeof(ipc_connection_t));
+	bzero(conn, sizeof(ipc_connection_t));
+	conn->fd = -1;
+	INIT_LIST_HEAD(&(conn->outq));
+	return conn;
+}
+
+int ipcconn_connect(const char * target)
+{
+	struct addrinfo hint;
+	struct addrinfo *list;
+	int fd;
+
+	char *host = alloca(strlen(target));
+	char *port = NULL;
+	strcpy(host, target);
+
+	if ((port = strrchr(host, ':'))!=NULL) {
+		*port = 0;
+		port++;
+	} else {
+		port = alloca(6);
+		snprintf(port, 6, "%d", IPCPORT_DEFAULT);
+	}
+
+	bzero(&hint, sizeof(hint));
+	hint.ai_family = AF_INET;
+	hint.ai_socktype = SOCK_STREAM;
+	hint.ai_protocol = IPPROTO_TCP;
+	hint.ai_flags = 0;
+
+	int err = 0;
+	if ((err=getaddrinfo(host, port, &hint, &list))) {
+		fprintf(stderr, "Lookup failed for %s:%s %s", host, port, gai_strerror(err));
+		return -1;
+	}
+
+	struct addrinfo *res = NULL;
+	for (res = list; res != NULL; res = res->ai_next) {
+		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+		if (fd == -1) continue;
+
+		int i = 0;
+		setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &i, sizeof(i));
+
+		if (connect(fd, res->ai_addr, res->ai_addrlen) == 0) {
+			break;
+		}
+		close(fd);
+	}
+	if (list) freeaddrinfo(list);
+	if (res == NULL) {
+		fprintf(stderr, "Failed to connect to %s:%s\n", host, port);
+		return -1;
+	}
+	return fd;
+}
+
+
+/** 
+ * read=1
+ * We got an okay to read from this socket/connection
+ * make one read and see what we can do with it
+ *
+ * read=0
+ * Try to process the existing buffer for more messages
+ * return NULL if we dont have a complete one
+ */
+ipc_message_t * read_socket(ipc_connection_t * conn, int doread)
+{
+	if (doread) {
+		int remain = sizeof(conn->p_buffer) - conn->i_buffer;
+		int ret = read(conn->fd, &conn->p_buffer[conn->i_buffer], remain);
+		if (ret <= 0) {
+			ipcconn_bad(conn);
+			return NULL;
+		}
+		conn->i_buffer += ret;
+	}
+
+	int used = 0;
+
+	if (conn->incoming == NULL) {
+	       if (conn->i_buffer < sizeof(ipc_msghead_t)) return NULL;
+	       conn->incoming = ipcmsg_create(0, 0);
+	       memcpy(&conn->incoming->head, conn->p_buffer, sizeof(ipc_msghead_t));
+	       used += sizeof(ipc_msghead_t);
+
+	       if (conn->incoming->head.len > 8192){
+		       fprintf(stderr, "fd=%d Stupidly long message %d bytes.\n", conn->fd, conn->incoming->head.len);
+		       ipcconn_bad(conn);
+		       return NULL;
+	       }
+	}
+
+	if (conn->incoming->bodylen < conn->incoming->head.len) {
+		int wanted = conn->incoming->head.len - conn->incoming->bodylen;
+		int available = conn->i_buffer - used;
+		int using = _MIN(wanted, available);
+		ipcmsg_append(conn->incoming, &conn->p_buffer[used], using);
+		used += using;
+	}
+
+	if (used >= conn->i_buffer) {
+		conn->i_buffer = 0;
+	} else {
+		conn->i_buffer -= used;
+		memmove(conn->p_buffer, &conn->p_buffer[used], conn->i_buffer);
+	}
+
+	if (conn->incoming->bodylen >= conn->incoming->head.len) {
+		ipc_message_t * msg = conn->incoming;
+		conn->incoming = NULL;
+		return msg;
+	}
+
+	return NULL;
+}

Added: trunk/src/socket.h
===================================================================
--- trunk/src/socket.h	                        (rev 0)
+++ trunk/src/socket.h	2012-11-03 21:16:43 UTC (rev 1265)
@@ -0,0 +1,59 @@
+#ifndef SOCKET_H
+#define SOCKET_H
+
+#include <unistd.h>
+#include <stdint.h>
+#include "list.h"
+
+#define IPCPORT_DEFAULT	9999
+
+enum ipcsock_state {
+	IPCSTATE_CONNECTED,
+	IPCSTATE_VALID,
+	IPCSTATE_ERROR
+};
+
+/* packed struct for socket ipc */
+typedef struct {
+	uint32_t src;
+	uint32_t dst;
+	uint32_t type;
+	uint32_t len;
+} __attribute__((packed)) ipc_msghead_t;
+
+typedef struct {
+	struct list_head	list;
+	ipc_msghead_t		head;
+	char *			body;
+	int			bodylen;
+	int			refcount;
+} ipc_message_t;
+
+typedef struct {
+	struct list_head 	list;	/* list of connections */
+	uint32_t		addr;	/* id of client at other end */
+	enum ipcsock_state	state;
+
+	int 			fd;
+	char 			p_buffer[4096]; /* incoming data buffer */
+	int 			i_buffer;
+
+	ipc_message_t *		incoming; /* partial incoming message */
+
+	struct list_head	outq;	/* outgoing message queue */
+
+} ipc_connection_t;
+
+unsigned int FOURCC(char *a);
+/* socket.c */
+ipc_message_t *ipcmsg_create(uint32_t type,uint32_t src);
+void ipcmsg_append(ipc_message_t *msg, const char *data, int len);
+void ipcmsg_destination(ipc_message_t *msg, uint32_t dest);
+void ipcmsg_destroy(ipc_message_t *msg);
+void ipcmsg_send(ipc_message_t *msg, ipc_connection_t *conn);
+void ipcconn_bad(ipc_connection_t *conn);
+ipc_connection_t *ipcconn_create(void);
+int ipcconn_connect(const char *target);
+ipc_message_t *read_socket(ipc_connection_t *conn, int doread);
+
+#endif




More information about the mw-devel mailing list