[mw-devel] MW3 r1265 - in trunk/src: . server
arthur at sucs.org
arthur at sucs.org
Sat Nov 3 21:16:43 GMT 2012
Author: arthur
Date: 2012-11-03 21:16:43 +0000 (Sat, 03 Nov 2012)
New Revision: 1265
Added:
trunk/src/list.h
trunk/src/server/
trunk/src/server/Makefile
trunk/src/server/mwserv.c
trunk/src/server/servsock.c
trunk/src/server/servsock.h
trunk/src/socket.c
trunk/src/socket.h
Modified:
trunk/src/Makefile
trunk/src/files.c
trunk/src/incoming.c
trunk/src/ipc.c
trunk/src/ipc.h
trunk/src/main.c
Log:
Client-Server mode, with simplistic server
Modified: trunk/src/Makefile
===================================================================
--- trunk/src/Makefile 2012-10-17 14:24:14 UTC (rev 1264)
+++ trunk/src/Makefile 2012-11-03 21:16:43 UTC (rev 1265)
@@ -59,7 +59,7 @@
topten.o sort.o tidyup.o gags.o script_inst.o script.o\
incoming.o command.o chattable.o alias.o frl.o hash.o vars.o expand.o\
files.o completion.o iconv.o gagtable.o \
-js.o sqlite.o ipc.o log.o uri.o
+js.o sqlite.o ipc.o log.o uri.o socket.o
$(CC) $(LDFLAGS) $(LDLIBS) -o $@ $^
del_user: del_user.o perms.o strings.o
Modified: trunk/src/files.c
===================================================================
--- trunk/src/files.c 2012-10-17 14:24:14 UTC (rev 1264)
+++ trunk/src/files.c 2012-11-03 21:16:43 UTC (rev 1265)
@@ -18,9 +18,8 @@
#include "perms.h"
#include "files.h"
#include "who.h"
+#include "ipc.h"
-int incoming_pipe;
-
#define MAXPIPELENGTH 2048
static char *get_pipe_name(int pid)
@@ -32,20 +31,11 @@
void create_pipe(void)
{
- if (mkfifo(get_pipe_name(getpid()),0600))
- {
- perror("Error in Creating mesg pipe");
- exit(-1);
- }
}
void open_incoming_fifo(void)
{
- if ((incoming_pipe=open(get_pipe_name(getpid()),O_RDWR))<0)
- {
- perror("Error opening incoming pipe");
- exit(-1);
- }
+ ipc_connect("localhost");
}
int open_outgoing_fifo(pid_t dest_pid)
@@ -60,9 +50,7 @@
void close_fifo(void)
{
- if(incoming_pipe)
- close(incoming_pipe);
- unlink(get_pipe_name(getpid()));
+ ipc_close();
}
void Lock_File(int f)
Modified: trunk/src/incoming.c
===================================================================
--- trunk/src/incoming.c 2012-10-17 14:24:14 UTC (rev 1264)
+++ trunk/src/incoming.c 2012-11-03 21:16:43 UTC (rev 1265)
@@ -28,6 +28,7 @@
#include "mesg.h"
#include "echo.h"
#include "intl.h"
+#include "socket.h"
extern int script_terminate;
extern Alias rpc_list;
@@ -40,6 +41,7 @@
extern struct room myroom;
extern int runautoexec;
extern int quietmode;
+extern ipc_connection_t *ipcsock;
int new_mail_waiting=0;
int mesg_waiting = 0;
@@ -68,6 +70,8 @@
static void force_protlevel(char *text, unsigned long theirprivs, const char *from);
static void force_protpower(char *text);
+#define _MIN(a,b) (a<b)?a:b
+
void InsertMesg(struct mstack *new)
{
new->next=NULL;
@@ -447,56 +451,21 @@
enum ipc_types msgtype = IPC_NOOP;
mesg_waiting = 0;
- fl = fcntl(incoming_pipe, F_GETFL);
- fcntl(incoming_pipe, F_SETFL, fl | O_NDELAY);
- while ((nos=read(incoming_pipe,buff,MAXTEXTLENGTH))>0)
- {
- for (i = 0; i < nos; i++) {
- if (! midread) {
- midread = 1;
- msgtype = buff[i];
- if (ptr>0) /* discard partial message */
- printf("\n*** handle_mesg: discarded %d chars.\n\007", ptr);
- pidp=0;
- } else {
- if (pidp<4) {
- pidbuff[pidp] = buff[i];
- pidp++;
- if (pidp>=4) {
- memcpy(&mesg_pid, &pidbuff, 4);
- if ((mesg_posn=get_who_userposn(mesg_pid))<0)
- strcpy(mesg_user.name,"System");
- else
- fetch_user(&mesg_user, mesg_posn);
- ptr=0;
- }
- continue;
- }
- if (! quotemode) {
- if (buff[i] == 1) {
- quotemode = 1;
- continue;
- } else if (buff[i] == '|') {
- newbuff[ptr] = 0;
- accept_pipe_cmd(msgtype, newbuff, mesg_pid, &mesg_user);
- ptr = 0;
- midread = 0;
- continue;
- }
- }
- /* just another character, buffer it for now */
- if (ptr < MAXPIPELENGTH)
- {
- newbuff[ptr]=buff[i];
- ptr++;
- }
- quotemode=0;
- }
- }
+ ipc_message_t * msg = read_socket(ipcsock, 1);
+
+ while (msg != NULL) {
+ if ((mesg_posn=get_who_userposn(msg->head.src))<0)
+ strcpy(mesg_user.name,"System");
+ else
+ fetch_user(&mesg_user, mesg_posn);
+ bzero(newbuff, sizeof(newbuff));
+ memcpy(newbuff, msg->body, _MIN(sizeof(newbuff)-1, msg->bodylen));
+ accept_pipe_cmd(msg->head.type, newbuff, msg->head.src, &mesg_user);
+
+ ipcmsg_destroy(msg);
+ msg = read_socket(ipcsock, 0);
}
- /*if (midread != 0) printf("\n*** handle_mesg: partial message read - will resume\n\007");*/
- fcntl(incoming_pipe, F_SETFL, fl);
if (cm_flags(user->chatmode,CM_ONCHAT,CM_MODE_ANY)) set_talk_rights(); else set_rights();
}
Modified: trunk/src/ipc.c
===================================================================
--- trunk/src/ipc.c 2012-10-17 14:24:14 UTC (rev 1264)
+++ trunk/src/ipc.c 2012-11-03 21:16:43 UTC (rev 1265)
@@ -13,7 +13,62 @@
#include "mesg.h"
#include "who.h"
#include "files.h"
+#include "socket.h"
+ipc_connection_t * ipcsock = NULL;
+char *ipc_parent = NULL;
+
+void ipc_connect(const char *target)
+{
+ const char * host = target;
+
+ if (ipcsock == NULL)
+ ipcsock = ipcconn_create();
+
+ if (target == NULL) host = ipc_parent;
+
+ int fd = ipcconn_connect(host);
+ if (fd < -1) {
+ free(ipcsock);
+ ipcsock = NULL;
+ fprintf(stderr, "Conneciton to server failed.\n");
+ return;
+ }
+
+ if (target != NULL) {
+ if (ipc_parent != NULL) free(ipc_parent);
+ ipc_parent = strdup(target);
+ }
+
+ ipcsock = ipcconn_create();
+ ipcsock->fd = fd;
+
+ pid_t mypid = getpid();
+ ipc_message_t * msg = ipcmsg_create(FOURCC("HELO"), mypid);
+ ipcmsg_send(msg, ipcsock);
+}
+
+void ipc_check(void)
+{
+ if (ipcsock == NULL || ipcsock->fd == -1)
+ ipc_connect(NULL);
+}
+
+void ipc_close()
+{
+ if (ipcsock == NULL) return;
+ close(ipcsock->fd);
+ free(ipcsock);
+ ipcsock=NULL;
+}
+
+int ipc_getfd()
+{
+ if (ipcsock == NULL) return -1;
+ return ipcsock->fd;
+}
+
+
/*
* Send a message to a remote process addressed by PID
* Returns:
@@ -29,44 +84,30 @@
* C - An arbitrary length of data
*/
int ipc_send_to_pid(pid_t dest, enum ipc_types msgtype, const char * data) {
- char buff[MAXTEXTLENGTH];
- int fd;
- ssize_t dgram_len;
- pid_t mypid;
- char * qdata;
int ret = 0;
if (dest < 1) return -1; // Invalid PID
if (! data) data = "";
- fd = open_outgoing_fifo(dest);
- if (fd < 0) {
- who_delete(dest);
- return -1;
- }
-
if (msgtype != IPC_NOOP) {
- memset(buff, 0, MAXTEXTLENGTH);
- qdata = quotetext(data);
- snprintf(buff, MAXTEXTLENGTH - 1, "%cxxxx%s",(char) msgtype, qdata);
- free(qdata);
+ char buff[MAXTEXTLENGTH];
+ snprintf(buff, MAXTEXTLENGTH - 1, "%s",data);
strip_quote(buff);
- strcat(buff,"|");
- mypid = getpid();
- memcpy(buff + 1, &mypid, 4);
+ pid_t mypid = getpid();
//ensure text (everything after the first 5 bytes) going into the pipes is clean utf-8 after all the truncating etc. that might have happened to it.
- if(utf8_cleanup(buff+5)>=0) {
- // the first 5 bytes are binary data that break strlen
- dgram_len = strlen(buff+5)+5;
- if (write(fd, buff, dgram_len) <= 0) {
- //fprintf(stderr, "Write to %s failed: %s\n", fifo_path, strerror(errno));
- ret = -2;
- }
+ //
+
+ ipc_check();
+ if(utf8_cleanup(buff)>=0) {
+ ssize_t dgram_len = strlen(buff);
+ ipc_message_t *msg = ipcmsg_create(msgtype, mypid);
+ ipcmsg_destination(msg, dest);
+ ipcmsg_append(msg, buff, dgram_len);
+ ipcmsg_send(msg, ipcsock);
} else {
fprintf(stderr, "Failed to utf8_cleanup the message\n");
}
}
- close(fd);
return ret;
}
Modified: trunk/src/ipc.h
===================================================================
--- trunk/src/ipc.h 2012-10-17 14:24:14 UTC (rev 1264)
+++ trunk/src/ipc.h 2012-11-03 21:16:43 UTC (rev 1265)
@@ -35,6 +35,9 @@
typedef int (send_filter)(const struct person * usr, const struct who * who, const void * info);
+void ipc_connect(const char *target);
+void ipc_close(void);
+int ipc_getfd(void);
int ipc_send_to_pid(pid_t dest, enum ipc_types msgtype, const char * data);
unsigned int ipc_send_to_username(const char * dest, enum ipc_types msgtype, const char * data);
unsigned int ipc_send_to_all(enum ipc_types msgtype, const char * data, send_filter * filter, const void * filterinfo);
Added: trunk/src/list.h
===================================================================
--- trunk/src/list.h (rev 0)
+++ trunk/src/list.h 2012-11-03 21:16:43 UTC (rev 1265)
@@ -0,0 +1,241 @@
+#ifndef __LIST_H
+#define __LIST_H
+
+/*
+ * Simple doubly linked list implementation.
+ *
+ * Some of the internal functions ("__xxx") are useful when
+ * manipulating whole lists rather than single entries, as
+ * sometimes we already know the next/prev entries and we can
+ * generate better code by using them directly rather than
+ * using the generic single-entry routines.
+ */
+
+struct list_head {
+ struct list_head *next, *prev;
+};
+
+#define LIST_HEAD_INIT(name) { &(name), &(name) }
+
+#define LIST_HEAD(name) \
+ struct list_head name = LIST_HEAD_INIT(name)
+
+#define INIT_LIST_HEAD(ptr) do { \
+ (ptr)->next = (ptr); (ptr)->prev = (ptr); \
+} while (0)
+
+/*
+ * Insert a new entry between two known consecutive entries.
+ *
+ * This is only for internal list manipulation where we know
+ * the prev/next entries already!
+ */
+static inline void __list_add(struct list_head *new,
+ struct list_head *prev,
+ struct list_head *next)
+{
+ next->prev = new;
+ new->next = next;
+ new->prev = prev;
+ prev->next = new;
+}
+
+/**
+ * list_add - add a new entry
+ * @new: new entry to be added
+ * @head: list head to add it after
+ *
+ * Insert a new entry after the specified head.
+ * This is good for implementing stacks.
+ */
+static inline void list_add(struct list_head *new, struct list_head *head)
+{
+ __list_add(new, head, head->next);
+}
+
+/**
+ * list_add_tail - add a new entry
+ * @new: new entry to be added
+ * @head: list head to add it before
+ *
+ * Insert a new entry before the specified head.
+ * This is useful for implementing queues.
+ */
+static inline void list_add_tail(struct list_head *new, struct list_head *head)
+{
+ __list_add(new, head->prev, head);
+}
+
+/*
+ * Delete a list entry by making the prev/next entries
+ * point to each other.
+ *
+ * This is only for internal list manipulation where we know
+ * the prev/next entries already!
+ */
+static inline void __list_del(struct list_head *prev, struct list_head *next)
+{
+ next->prev = prev;
+ prev->next = next;
+}
+
+/**
+ * list_del - deletes entry from list.
+ * @entry: the element to delete from the list.
+ * Note: list_empty on entry does not return true after this, the entry is in an undefined state.
+ */
+static inline void list_del(struct list_head *entry)
+{
+ __list_del(entry->prev, entry->next);
+ entry->next = (void *) 0;
+ entry->prev = (void *) 0;
+}
+
+/**
+ * list_del_init - deletes entry from list and reinitialize it.
+ * @entry: the element to delete from the list.
+ */
+static inline void list_del_init(struct list_head *entry)
+{
+ __list_del(entry->prev, entry->next);
+ INIT_LIST_HEAD(entry);
+}
+
+/**
+ * list_move - delete from one list and add as another's head
+ * @list: the entry to move
+ * @head: the head that will precede our entry
+ */
+static inline void list_move(struct list_head *list, struct list_head *head)
+{
+ __list_del(list->prev, list->next);
+ list_add(list, head);
+}
+
+/**
+ * list_move_tail - delete from one list and add as another's tail
+ * @list: the entry to move
+ * @head: the head that will follow our entry
+ */
+static inline void list_move_tail(struct list_head *list,
+ struct list_head *head)
+{
+ __list_del(list->prev, list->next);
+ list_add_tail(list, head);
+}
+
+/**
+ * list_empty - tests whether a list is empty
+ * @head: the list to test.
+ */
+static inline int list_empty(struct list_head *head)
+{
+ return head->next == head;
+}
+
+static inline void __list_splice(struct list_head *list,
+ struct list_head *head)
+{
+ struct list_head *first = list->next;
+ struct list_head *last = list->prev;
+ struct list_head *at = head->next;
+
+ first->prev = head;
+ head->next = first;
+
+ last->next = at;
+ at->prev = last;
+}
+
+/**
+ * list_splice - join two lists
+ * @list: the new list to add.
+ * @head: the place to add it in the first list.
+ */
+static inline void list_splice(struct list_head *list, struct list_head *head)
+{
+ if (!list_empty(list))
+ __list_splice(list, head);
+}
+
+/**
+ * list_splice_init - join two lists and reinitialise the emptied list.
+ * @list: the new list to add.
+ * @head: the place to add it in the first list.
+ *
+ * The list at @list is reinitialised
+ */
+static inline void list_splice_init(struct list_head *list,
+ struct list_head *head)
+{
+ if (!list_empty(list)) {
+ __list_splice(list, head);
+ INIT_LIST_HEAD(list);
+ }
+}
+
+/**
+ * list_entry - get the struct for this entry
+ * @ptr: the &struct list_head pointer.
+ * @type: the type of the struct this is embedded in.
+ * @member: the name of the list_struct within the struct.
+ */
+#define list_entry(ptr, type, member) \
+ ((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member)))
+
+#define list_first(head) (head)->next
+#define list_last(head) (head)->prev
+
+/**
+ * list_for_each - iterate over a list
+ * @pos: the &struct list_head to use as a loop counter.
+ * @head: the head for your list.
+ */
+#define list_for_each(pos, head) \
+ for (pos = (head)->next; pos != (head); \
+ pos = pos->next)
+/**
+ * list_for_each_prev - iterate over a list backwards
+ * @pos: the &struct list_head to use as a loop counter.
+ * @head: the head for your list.
+ */
+#define list_for_each_prev(pos, head) \
+ for (pos = (head)->prev; pos != (head); \
+ pos = pos->prev)
+
+/**
+ * list_for_each_safe - iterate over a list safe against removal of list entry
+ * @pos: the &struct list_head to use as a loop counter.
+ * @n: another &struct list_head to use as temporary storage
+ * @head: the head for your list.
+ */
+#define list_for_each_safe(pos, n, head) \
+ for (pos = (head)->next, n = pos->next; pos != (head); \
+ pos = n, n = pos->next)
+
+/**
+ * list_for_each_entry - iterate over list of given type
+ * @pos: the type * to use as a loop counter.
+ * @head: the head for your list.
+ * @member: the name of the list_struct within the struct.
+ */
+#define list_for_each_entry(pos, head, member) \
+ for (pos = list_entry((head)->next, typeof(*pos), member); \
+ &pos->member != (head); \
+ pos = list_entry(pos->member.next, typeof(*pos), member))
+
+/**
+ * list_for_each_entry_safe - iterate over list of given type safe against removal of list entry
+ * @pos: the type * to use as a loop counter.
+ * @n: another type * to use as temporary storage
+ * @head: the head for your list.
+ * @member: the name of the list_struct within the struct.
+ */
+#define list_for_each_entry_safe(pos, n, head, member) \
+ for (pos = list_entry((head)->next, typeof(*pos), member), \
+ n = list_entry(pos->member.next, typeof(*pos), member); \
+ &pos->member != (head); \
+ pos = n, n = list_entry(n->member.next, typeof(*n), member))
+
+
+#endif
Modified: trunk/src/main.c
===================================================================
--- trunk/src/main.c 2012-10-17 14:24:14 UTC (rev 1264)
+++ trunk/src/main.c 2012-11-03 21:16:43 UTC (rev 1265)
@@ -1031,6 +1031,7 @@
in_idle++;
+ int incoming_pipe = ipc_getfd();
FD_ZERO(&readfds);
FD_ZERO(&exceptfds);
FD_SET(incoming_pipe, &readfds);
@@ -1070,7 +1071,7 @@
if (nfds<0 && select_error == EINVAL)
{
char buf[256];
- snprintf(buf, 255, "\n{EINVAL: fd=%d, incoming_pipe=%d}\n", fd, incoming_pipe);
+ snprintf(buf, 255, "\n{EINVAL: fd=%d, incoming_pipe=%d}\n", fd, ipc_getfd());
write(1,buf,strlen(buf));
}
if (nfds > 0 && fd >= 0 && FD_ISSET(fd, &readfds)) return 1;
Added: trunk/src/server/Makefile
===================================================================
--- trunk/src/server/Makefile (rev 0)
+++ trunk/src/server/Makefile 2012-11-03 21:16:43 UTC (rev 1265)
@@ -0,0 +1,73 @@
+include ../../Makefile.common
+#libdir := /usr/lib
+#localstatedir := /var
+
+LOGDIR := $(localstatedir)/log/mw
+MSGDIR := $(localstatedir)/run/mw
+STATEDIR := $(localstatedir)/lib/mw
+HOMEPATH := $(libdir)/mw
+
+# cflags for standard 'cc' compiler
+CFLAGS= -Wall -pedantic --std=gnu99 -D_GNU_SOURCE -I.. -I/usr/include/postgresql
+#LDFLAGS+= -pie
+
+# info strings, do not edit.
+DEFS:= -DBUILD_DATE=\"$(shell date +%Y%m%d)\"
+DEFS+= -DBUILD_USER=\"$(shell whoami | awk -f ../capitalise.awk)\"
+DEFS+= -DVER_MAJ=\"$(VERSION_MAJOR)\"
+DEFS+= -DVER_MIN=\"$(VERSION_MINOR)\"
+DEFS+= -DVER_TWK=\"$(VERSION_TWEAK)\"
+DEFS+= -DHOMEPATH=\"$(HOMEPATH)\"
+DEFS+= -DLOGDIR=\"$(LOGDIR)\"
+DEFS+= -DSTATEDIR=\"$(STATEDIR)\"
+DEFS+= -DMSGDIR=\"$(MSGDIR)\"
+
+### uncomment for gdb debugging
+LDFLAGS+= -ggdb -g -lcrypt -lpq
+CFLAGS+= -ggdb -g -D__NO_STRING_INLINE -fstack-protector-all -std=c99
+
+### Optimisation - uncomment for release & extra testing
+#CFLAGS+=-O3
+
+### Only ever uncomment for final release versions
+DEFS+= -DRELEASE
+
+CFLAGS += $(DEFS)
+
+build: mwserv
+
+### The magic which lets us autogenerate dependencies
+CFLAGS += -MMD
+
+CODE=$(wildcard *.c)
+HDRS=$(wildcard *.h)
+
+%.o: %.c
+ $(CC) $(CFLAGS) -c -o $@ $<
+
+-include $(CODE:.c=.d)
+
+.PHONY: build install clean test
+
+mwserv: mwserv.o servsock.o ../socket.o
+ $(CC) $(LDFLAGS) $(LDLIBS) -o $@ $^
+
+clean:
+ -rm -f *.o *.d mwserv
+
+install: mwserv
+ install -Ds mwserv $(DESTDIR)$(HOMEPATH)/mwserv
+
+ifndef TESTDIR
+test:
+ make TESTDIR=$(CURDIR)/mwtest $@
+else
+test:
+ mkdir -p "$(TESTDIR)"
+ cd "$(TESTDIR)" && mkdir -p mw run/mw log/mw lib/mw
+ for d in $(INSTALLFILES); do \
+ svn export --force ../$$d "$(TESTDIR)/mw/$$d"; \
+ done
+ make libdir="$(TESTDIR)" localstatedir="$(TESTDIR)"
+
+endif
Added: trunk/src/server/mwserv.c
===================================================================
--- trunk/src/server/mwserv.c (rev 0)
+++ trunk/src/server/mwserv.c 2012-11-03 21:16:43 UTC (rev 1265)
@@ -0,0 +1,23 @@
+#include <stdio.h>
+#include <fcntl.h>
+#include "../socket.h"
+#include "servsock.h"
+
+int main(int argc, char **argv)
+{
+ int mainsock = -1;
+ init_server();
+
+ mainsock = open_mainsock(IPCPORT_DEFAULT);
+
+ if (mainsock < 0) {
+ fprintf(stderr, "Failed.\n");
+ return 1;
+ }
+
+ uint32_t test = FOURCC("CAST");
+ printf("test %4.4s == 0x%X\n", (char *)&test, test);
+
+ watch_mainsock(mainsock);
+ printf("Done.\n");
+}
Added: trunk/src/server/servsock.c
===================================================================
--- trunk/src/server/servsock.c (rev 0)
+++ trunk/src/server/servsock.c 2012-11-03 21:16:43 UTC (rev 1265)
@@ -0,0 +1,282 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/epoll.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <string.h>
+#include <errno.h>
+#include <arpa/inet.h>
+#include <stdlib.h>
+#include <netdb.h>
+#include <sys/param.h>
+#include <time.h>
+#include <sys/time.h>
+
+#include "../socket.h"
+#include "servsock.h"
+
+struct list_head connection_list;
+
+static int pollfd = -1;
+int mainsock_die = 0;
+
+typedef struct {
+ struct list_head list;
+
+ ipc_message_t * msg;
+
+ int headsent;
+ int used;
+} outq_msg_t;
+
+int open_mainsock(uint16_t port)
+{
+ int fd = -1;
+ if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ fprintf(stderr, "Error creating master socket. %s\n", strerror(errno));
+ return -1;
+ }
+
+ /* we could have done v6 here as well, but the 4-in-6 stuff is
+ * now deprecated so we would have to open a second v6 only socket
+ * if we really want v6 mode */
+ struct sockaddr_in in;
+ in.sin_family = AF_INET;
+ in.sin_port = htons(port);
+ in.sin_addr.s_addr = INADDR_ANY;
+
+ int i = 1;
+ if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i))) {
+ fprintf(stderr, "Error enabling rebind on port. %s\n", strerror(errno));
+ close(fd);
+ return -1;
+ }
+
+ if (bind(fd, (struct sockaddr *)&in, sizeof(in))) {
+ fprintf(stderr, "Error binding port: %s\n", strerror(errno));
+ close(fd);
+ return -1;
+ }
+
+ if (listen(fd, 2)) {
+ fprintf(stderr, "Error listening on port: %s\n", strerror(errno));
+ close(fd);
+ return -1;
+ }
+
+ return fd;
+}
+
+ipc_connection_t * add_connection(int fd)
+{
+ /* create new structure */
+ ipc_connection_t *new = ipcconn_create();
+ new->fd = fd;
+
+ /* add to list of all connections */
+ list_add_tail(&(new->list), &connection_list);
+
+ /* register interest in read events */
+ struct epoll_event ev;
+ bzero(&ev, sizeof(ev));
+ ev.events = EPOLLIN | EPOLLERR;
+ ev.data.ptr = new;
+ if (epoll_ctl( pollfd, EPOLL_CTL_ADD, new->fd, &ev)) {
+ fprintf(stderr, "Error adding new conn to poll: %s\n", strerror(errno));
+ }
+
+ new->state = IPCSTATE_CONNECTED;
+
+ return new;
+}
+
+void accept_connection(int mainsock)
+{
+ struct sockaddr_in in;
+ socklen_t inlen = sizeof(in);
+ int sock;
+
+ if ((sock=accept(mainsock, (struct sockaddr *)&in, &inlen))<0) {
+ fprintf(stderr, "Error accepting new connection: %s\n", strerror(errno));
+ return;
+ }
+
+ ipc_connection_t * c = add_connection(sock);
+
+ printf("Accept fd=%d %s:%d\n", c->fd, inet_ntoa(in.sin_addr), ntohs(in.sin_port));
+}
+
+void drop_connection(ipc_connection_t * conn)
+{
+ struct epoll_event ev;
+
+ printf("Drop connection\n");
+ bzero(&ev, sizeof(ev));
+ epoll_ctl(pollfd, EPOLL_CTL_DEL, conn->fd, &ev);
+ list_del_init(&conn->list);
+ if (conn->fd != -1) close(conn->fd);
+ conn->fd = -1;
+ conn->state = IPCSTATE_ERROR;
+ if (!list_empty(&conn->outq)) {
+ struct list_head *pos, *q;
+ list_for_each_safe(pos, q, &conn->outq) {
+ ipc_message_t * msg = list_entry(pos, ipc_message_t, list);
+ list_del(pos);
+ ipcmsg_destroy(msg);
+ }
+ }
+ bzero(conn, sizeof(ipc_connection_t));
+ free(conn);
+}
+
+void watch_mainsock(int mainsock)
+{
+ struct epoll_event ev;
+
+ /* add the mainsock to the epoll list
+ * it will only generate read events */
+ bzero(&ev, sizeof(ev));
+ ev.events = EPOLLIN | EPOLLERR;
+ ev.data.ptr = NULL;
+ if (epoll_ctl(pollfd, EPOLL_CTL_ADD, mainsock, &ev)) {
+ fprintf(stderr, "Error adding mainsock: %s\n", strerror(errno));
+ return;
+ }
+
+ struct epoll_event event[20];
+ int ret;
+ struct timeval last;
+ gettimeofday(&last, NULL);
+
+bodge:
+ while ((ret = epoll_wait(pollfd, event, 20, 1000)) >= 0) {
+ for (int i=0; i<ret; i++) {
+ /* even on mainsock */
+ if (event[i].data.ptr == NULL) {
+ if (event[i].events & (EPOLLERR | EPOLLHUP)) {
+ return;
+ } else
+ if (event[i].events & EPOLLIN) {
+ accept_connection(mainsock);
+ } else {
+ fprintf(stderr, "unexpected event on mainsock.\n");
+ }
+ } else {
+ ipc_connection_t *c = event[i].data.ptr;
+ if (event[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
+ drop_connection(c);
+ } else
+ if (event[i].events & (EPOLLIN | EPOLLPRI)) {
+ ipc_message_t *msg = read_socket(c, 1);
+ if (c->fd == -1) {
+ drop_connection(c);
+ }else
+ while (msg != NULL) {
+ process_msg(c, msg);
+ msg=read_socket(c,0);
+ }
+ } else
+ if (event[i].events & EPOLLOUT) {
+ write_socket(c);
+ } else {
+ fprintf(stderr, "unexpected event on fd=%d.\n", c->fd);
+ }
+ }
+ }
+ /* end of events handling, do periodic stuff here */
+ }
+
+ /* epoll got interupted, not actually an error, go around again */
+ if (ret == -1 && errno == EINTR) goto bodge;
+
+ /* epoll gave an error */
+ fprintf(stderr, "epoll error: %s\n", strerror(errno));
+}
+
+void write_socket(ipc_connection_t * conn)
+{
+ if (list_empty(&conn->outq)) {
+ /* tx queue is empty, stop write events */
+ struct epoll_event ev;
+ bzero(&ev, sizeof(ev));
+ ev.events = EPOLLIN | EPOLLERR;
+ ev.data.ptr = conn;
+ if (epoll_ctl(pollfd, EPOLL_CTL_MOD, conn->fd, &ev)) {
+ fprintf(stderr, "Error updating poll fd=%d: %s\n", conn->fd, strerror(errno));
+ }
+ return;
+ }
+ struct list_head * pos = conn->outq.next;
+ outq_msg_t * mtx = list_entry(pos, outq_msg_t, list);
+ list_del(pos);
+
+ ipcmsg_send(mtx->msg, conn);
+ mtx->msg->refcount--;
+ if (mtx->msg->refcount <= 0) ipcmsg_destroy(mtx->msg);
+ free(mtx);
+}
+
+
+void process_msg(ipc_connection_t *conn, ipc_message_t *msg)
+{
+ printf("src: %X, dst: %X, Type: %4.4s, len=%d\n", msg->head.src, msg->head.dst, (char *)&msg->head.type, msg->head.len);
+
+ /* client just told us who they are */
+ if (msg->head.type == FOURCC("HELO")) {
+ memcpy(&conn->addr, &msg->head.src, sizeof(conn->addr));
+ conn->state = IPCSTATE_VALID;
+ ipcmsg_destroy(msg);
+ return;
+ }
+
+ /* otherwise redistribute this message to intended target */
+ struct list_head *pos;
+ list_for_each(pos, &connection_list) {
+ ipc_connection_t * conn = list_entry(pos, ipc_connection_t, list);
+ if (conn->state == IPCSTATE_VALID) {
+ if (msg->head.dst == 0) {
+ /* broadcast */
+ msg_attach(msg, conn);
+ } else
+ if (msg->head.dst == conn->addr) {
+ /* unicast */
+ msg_attach(msg, conn);
+ }
+ }
+ }
+ if (msg->refcount <= 0) ipcmsg_destroy(msg);
+}
+
+void msg_attach(ipc_message_t *msg, ipc_connection_t *conn)
+{
+ if (conn == NULL) return;
+ /* add to end of tx queue */
+
+ int wasempty = list_empty(&conn->outq);
+
+ outq_msg_t * new = malloc(sizeof(outq_msg_t));
+ bzero(new, sizeof(outq_msg_t));
+ new->msg = msg;
+ new->msg->refcount++;
+ list_add_tail(&(new->list), &(conn->outq));
+
+ /* nothing was queueed, switch on write notifications */
+ if (wasempty) {
+ struct epoll_event ev;
+ bzero(&ev, sizeof(ev));
+ ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
+ ev.data.ptr = conn;
+ epoll_ctl(pollfd, EPOLL_CTL_MOD, conn->fd, &ev);
+ }
+}
+
+void init_server()
+{
+ INIT_LIST_HEAD(&connection_list);
+ if (pollfd == -1) {
+ pollfd = epoll_create(30);
+ }
+}
Added: trunk/src/server/servsock.h
===================================================================
--- trunk/src/server/servsock.h (rev 0)
+++ trunk/src/server/servsock.h 2012-11-03 21:16:43 UTC (rev 1265)
@@ -0,0 +1,10 @@
+/* servsock.c */
+int open_mainsock(uint16_t port);
+ipc_connection_t *add_connection(int fd);
+void accept_connection(int mainsock);
+void drop_connection(ipc_connection_t *conn);
+void watch_mainsock(int mainsock);
+void write_socket(ipc_connection_t *conn);
+void process_msg(ipc_connection_t *conn, ipc_message_t *msg);
+void msg_attach(ipc_message_t *msg, ipc_connection_t *conn);
+void init_server(void);
Added: trunk/src/socket.c
===================================================================
--- trunk/src/socket.c (rev 0)
+++ trunk/src/socket.c 2012-11-03 21:16:43 UTC (rev 1265)
@@ -0,0 +1,202 @@
+#include <features.h>
+#include <stdio.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include "socket.h"
+
+#define _MIN(a,b) (a<b)?a:b
+
+inline unsigned int FOURCC(char *a)
+{
+ return ((a[3]<<24)|(a[2]<<16)|(a[1]<<8)|a[0]);
+}
+
+ipc_message_t * ipcmsg_create(uint32_t type, uint32_t src)
+{
+ ipc_message_t *new = malloc(sizeof(ipc_message_t));
+ bzero(new, sizeof(ipc_message_t));
+ new->head.src = src;
+ new->head.type = type;
+ return new;
+}
+
+void ipcmsg_append(ipc_message_t *msg, const char * data, int len)
+{
+ if (msg == NULL) return;
+ int want = msg->bodylen + len;
+ msg->body = realloc(msg->body, want);
+ memcpy(&msg->body[msg->bodylen], data, len);
+ msg->bodylen += len;
+}
+
+void ipcmsg_destination(ipc_message_t *msg, uint32_t dest)
+{
+ if (msg == NULL) return;
+ msg->head.dst = dest;
+}
+
+void ipcmsg_destroy(ipc_message_t * msg)
+{
+ if (msg->body) free(msg->body);
+ bzero(msg, sizeof(ipc_message_t));
+ free(msg);
+}
+
+void ipcmsg_send(ipc_message_t *msg, ipc_connection_t *conn)
+{
+ struct iovec iov[2];
+ int iovused = 0;
+
+ bzero(&iov, sizeof(iov));
+ iov[0].iov_base = &msg->head;
+ iov[0].iov_len = sizeof(msg->head);
+ iovused++;
+ if (msg->bodylen > 0) {
+ msg->head.len = msg->bodylen;
+ iov[1].iov_base = msg->body;
+ iov[1].iov_len = msg->bodylen;
+ iovused++;
+ }
+
+ int bytes = 0;
+ for (int i=0;i<iovused;i++) bytes += iov[i].iov_len;
+
+ int n = writev(conn->fd, iov, iovused);
+ if (n == -1) {
+ ipcconn_bad(conn);
+ } else
+ if (n < bytes) {
+ fprintf(stderr, "Short write on ipc socket.\n");
+ }
+}
+
+void ipcconn_bad(ipc_connection_t * conn)
+{
+ close(conn->fd);
+ conn->fd = -1;
+}
+
+ipc_connection_t * ipcconn_create(void)
+{
+ ipc_connection_t * conn = malloc(sizeof(ipc_connection_t));
+ bzero(conn, sizeof(ipc_connection_t));
+ conn->fd = -1;
+ INIT_LIST_HEAD(&(conn->outq));
+ return conn;
+}
+
+int ipcconn_connect(const char * target)
+{
+ struct addrinfo hint;
+ struct addrinfo *list;
+ int fd;
+
+ char *host = alloca(strlen(target));
+ char *port = NULL;
+ strcpy(host, target);
+
+ if ((port = strrchr(host, ':'))!=NULL) {
+ *port = 0;
+ port++;
+ } else {
+ port = alloca(6);
+ snprintf(port, 6, "%d", IPCPORT_DEFAULT);
+ }
+
+ bzero(&hint, sizeof(hint));
+ hint.ai_family = AF_INET;
+ hint.ai_socktype = SOCK_STREAM;
+ hint.ai_protocol = IPPROTO_TCP;
+ hint.ai_flags = 0;
+
+ int err = 0;
+ if ((err=getaddrinfo(host, port, &hint, &list))) {
+ fprintf(stderr, "Lookup failed for %s:%s %s", host, port, gai_strerror(err));
+ return -1;
+ }
+
+ struct addrinfo *res = NULL;
+ for (res = list; res != NULL; res = res->ai_next) {
+ fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ if (fd == -1) continue;
+
+ int i = 0;
+ setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &i, sizeof(i));
+
+ if (connect(fd, res->ai_addr, res->ai_addrlen) == 0) {
+ break;
+ }
+ close(fd);
+ }
+ if (list) freeaddrinfo(list);
+ if (res == NULL) {
+ fprintf(stderr, "Failed to connect to %s:%s\n", host, port);
+ return -1;
+ }
+ return fd;
+}
+
+
+/**
+ * read=1
+ * We got an okay to read from this socket/connection
+ * make one read and see what we can do with it
+ *
+ * read=0
+ * Try to process the existing buffer for more messages
+ * return NULL if we dont have a complete one
+ */
+ipc_message_t * read_socket(ipc_connection_t * conn, int doread)
+{
+ if (doread) {
+ int remain = sizeof(conn->p_buffer) - conn->i_buffer;
+ int ret = read(conn->fd, &conn->p_buffer[conn->i_buffer], remain);
+ if (ret <= 0) {
+ ipcconn_bad(conn);
+ return NULL;
+ }
+ conn->i_buffer += ret;
+ }
+
+ int used = 0;
+
+ if (conn->incoming == NULL) {
+ if (conn->i_buffer < sizeof(ipc_msghead_t)) return NULL;
+ conn->incoming = ipcmsg_create(0, 0);
+ memcpy(&conn->incoming->head, conn->p_buffer, sizeof(ipc_msghead_t));
+ used += sizeof(ipc_msghead_t);
+
+ if (conn->incoming->head.len > 8192){
+ fprintf(stderr, "fd=%d Stupidly long message %d bytes.\n", conn->fd, conn->incoming->head.len);
+ ipcconn_bad(conn);
+ return NULL;
+ }
+ }
+
+ if (conn->incoming->bodylen < conn->incoming->head.len) {
+ int wanted = conn->incoming->head.len - conn->incoming->bodylen;
+ int available = conn->i_buffer - used;
+ int using = _MIN(wanted, available);
+ ipcmsg_append(conn->incoming, &conn->p_buffer[used], using);
+ used += using;
+ }
+
+ if (used >= conn->i_buffer) {
+ conn->i_buffer = 0;
+ } else {
+ conn->i_buffer -= used;
+ memmove(conn->p_buffer, &conn->p_buffer[used], conn->i_buffer);
+ }
+
+ if (conn->incoming->bodylen >= conn->incoming->head.len) {
+ ipc_message_t * msg = conn->incoming;
+ conn->incoming = NULL;
+ return msg;
+ }
+
+ return NULL;
+}
Added: trunk/src/socket.h
===================================================================
--- trunk/src/socket.h (rev 0)
+++ trunk/src/socket.h 2012-11-03 21:16:43 UTC (rev 1265)
@@ -0,0 +1,59 @@
+#ifndef SOCKET_H
+#define SOCKET_H
+
+#include <unistd.h>
+#include <stdint.h>
+#include "list.h"
+
+#define IPCPORT_DEFAULT 9999
+
+enum ipcsock_state {
+ IPCSTATE_CONNECTED,
+ IPCSTATE_VALID,
+ IPCSTATE_ERROR
+};
+
+/* packed struct for socket ipc */
+typedef struct {
+ uint32_t src;
+ uint32_t dst;
+ uint32_t type;
+ uint32_t len;
+} __attribute__((packed)) ipc_msghead_t;
+
+typedef struct {
+ struct list_head list;
+ ipc_msghead_t head;
+ char * body;
+ int bodylen;
+ int refcount;
+} ipc_message_t;
+
+typedef struct {
+ struct list_head list; /* list of connections */
+ uint32_t addr; /* id of client at other end */
+ enum ipcsock_state state;
+
+ int fd;
+ char p_buffer[4096]; /* incoming data buffer */
+ int i_buffer;
+
+ ipc_message_t * incoming; /* partial incoming message */
+
+ struct list_head outq; /* outgoing message queue */
+
+} ipc_connection_t;
+
+unsigned int FOURCC(char *a);
+/* socket.c */
+ipc_message_t *ipcmsg_create(uint32_t type,uint32_t src);
+void ipcmsg_append(ipc_message_t *msg, const char *data, int len);
+void ipcmsg_destination(ipc_message_t *msg, uint32_t dest);
+void ipcmsg_destroy(ipc_message_t *msg);
+void ipcmsg_send(ipc_message_t *msg, ipc_connection_t *conn);
+void ipcconn_bad(ipc_connection_t *conn);
+ipc_connection_t *ipcconn_create(void);
+int ipcconn_connect(const char *target);
+ipc_message_t *read_socket(ipc_connection_t *conn, int doread);
+
+#endif
More information about the mw-devel
mailing list