[mw-devel] [Git][arthur/mw][master] Make the replay cache persistent

Andrew Price welshbyte at sucs.org
Sat Aug 5 16:16:44 BST 2017


Andrew Price pushed to branch master at Justin Mitchell / mw


Commits:
822ce86e by Andrew Price at 2017-08-05T16:10:58+01:00
Make the replay cache persistent

Just a simple store that uses one file per message.

- - - - -


5 changed files:

- src/server/mwserv.c
- src/server/replay.c
- src/server/replay.h
- src/server/servsock.c
- src/server/servsock.h


Changes:

=====================================
src/server/mwserv.c
=====================================
--- a/src/server/mwserv.c
+++ b/src/server/mwserv.c
@@ -155,7 +155,8 @@ int main(int argc, char **argv)
 	if (err)
 		return err;
 
-	init_server();
+	if (init_server() != 0)
+		return 1;
 
 	mainsock = open_mainsock(cfg_get_int("port"));
 


=====================================
src/server/replay.c
=====================================
--- a/src/server/replay.c
+++ b/src/server/replay.c
@@ -2,6 +2,7 @@
 #include <fcntl.h>
 #include <unistd.h>
 #include <sys/types.h>
+#include <sys/stat.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <string.h>
@@ -23,7 +24,14 @@
 #include "servsock.h"
 #include "replay.h"
 
+#define REPLAY_DIR STATEDIR "/replay"
+
+#define _STR(x) #x
+#define STR(x) _STR(x)
 #define STORE_SIZE 1000
+#define STORE_SIZE_STR STR(STORE_SIZE)
+#define STORE_SIZE_LEN (sizeof(STORE_SIZE_STR) - 1)
+#define STORE_FILE_NAME_LEN (sizeof(REPLAY_DIR) + STORE_SIZE_LEN + 1)
 
 static uint64_t serial = 0;
 
@@ -31,14 +39,6 @@ static ipc_message_t ** store = NULL;
 static int store_next = 0;
 static int store_len = 0;
 
-/* look at the history and set the serial number
- * appropriately.
- */
-void load_serial(void)
-{
-	serial = 1;
-}
-
 static int store_wrap(int index)
 {
 	while (index < 0) index += STORE_SIZE;
@@ -46,16 +46,36 @@ static int store_wrap(int index)
 	return index;
 }
 
-/* store the message for later replay */
-void store_message(ipc_message_t *msg)
+static int write_message(ipc_message_t *msg, unsigned n)
 {
-	if (store == NULL) {
-		/* create the store */
-		store = calloc(STORE_SIZE, sizeof(ipc_message_t *));
-		store_next = 0;
-		store_len = 0;
+	char pathname[STORE_FILE_NAME_LEN];
+	struct iovec iov[2];
+	int fd;
+
+	sprintf(pathname, REPLAY_DIR "/%0*u", (int)STORE_SIZE_LEN, n);
+	fd = open(pathname, O_RDWR|O_CLOEXEC|O_CREAT|O_TRUNC, S_IRUSR|S_IWUSR);
+	if (fd < 0) {
+		perror(pathname);
+		return 1;
 	}
+	iov[0] = (struct iovec) {
+		.iov_base = msg,
+		.iov_len = sizeof(*msg)
+	};
+	iov[1] = (struct iovec) {
+		.iov_base = msg->body,
+		.iov_len = msg->bodylen
+	};
+	if (pwritev(fd, iov, 2, 0) != (iov[0].iov_len + iov[1].iov_len))
+		perror(pathname);
+	fsync(fd);
+	close(fd);
+	return 0;
+}
 
+/* store the message for later replay */
+void store_message(ipc_message_t *msg)
+{
 	/* only store info/message, not actions */
 	if (msg->head.type <= 26 &&
 	   !( msg->head.type == IPC_TEXT || msg->head.type == IPC_WIZ))
@@ -69,6 +89,7 @@ void store_message(ipc_message_t *msg)
 		store[store_next] = NULL;
 	}
 
+	write_message(msg, store_next);
 	/* add to ref count so it wont get cleaned away yet
 	 * insert it at the current location and bump pointers
 	 */
@@ -84,7 +105,6 @@ void store_message(ipc_message_t *msg)
  */
 void assign_serial( ipc_message_t *msg )
 {
-	if (serial == 0) load_serial();
 	msg->head.serial = serial++;
 	msg->head.when = time(NULL);
 }
@@ -203,3 +223,80 @@ void replay(ipc_connection_t *conn, ipc_message_t *msg)
 
 	return;
 }
+
+int replay_init(void)
+{
+	uint64_t highest_serial = 0;
+	int ret;
+
+	store = calloc(STORE_SIZE, sizeof(ipc_message_t *));
+	if (store == NULL) {
+		perror("Failed to allocate replay store");
+		return 1;
+	}
+	store_next = 0;
+	store_len = 0;
+
+	ret = mkdir(REPLAY_DIR, S_IRWXU|S_IXGRP|S_IXOTH);
+	if (ret == -1 && errno != EEXIST) {
+		perror(REPLAY_DIR);
+		return 1;
+	}
+	for (unsigned i = 0; i < STORE_SIZE; i++) {
+		char pathname[STORE_FILE_NAME_LEN];
+		ipc_message_t *msg;
+		struct stat st;
+		int fd;
+
+		sprintf(pathname, REPLAY_DIR "/%0*u", (int)STORE_SIZE_LEN, i);
+		fd = open(pathname, O_RDONLY|O_CLOEXEC|O_CREAT, S_IRUSR|S_IWUSR);
+		if (fd < 0) {
+			perror(pathname);
+			return 1;
+		}
+		if (fstat(fd, &st) != 0) {
+			perror(pathname);
+			close(fd);
+			return 1;
+		}
+		if (st.st_size == 0) {
+			close(fd);
+			continue;
+		}
+		msg = malloc(sizeof(*msg));
+		if (msg == NULL) {
+			perror("Failed to allocate message header");
+			close(fd);
+			return 1;
+		}
+		if (pread(fd, msg, sizeof(*msg), 0) != sizeof(*msg)) {
+			perror("Failed to read message file");
+			close(fd);
+			return 1;
+		}
+		msg->body = malloc(msg->bodylen);
+		if (msg->body == NULL) {
+			perror("Failed to allocate message read buffer");
+			free(msg);
+			close(fd);
+			return 1;
+		}
+		if (pread(fd, msg->body, msg->bodylen, sizeof(*msg)) != msg->bodylen) {
+			perror("Failed to read message file");
+			free(msg->body);
+			free(msg);
+			close(fd);
+			return 1;
+		}
+		if (msg->head.serial > highest_serial) {
+			highest_serial = msg->head.serial;
+			store_next = store_wrap(i + 1);
+		}
+		msg->refcount = 1;
+		store_len++;
+		store[i] = msg;
+		close(fd);
+	}
+	serial = highest_serial + 1;
+	return 0;
+}


=====================================
src/server/replay.h
=====================================
--- a/src/server/replay.h
+++ b/src/server/replay.h
@@ -1,5 +1,9 @@
-/* replay.c */
-void load_serial(void);
+#ifndef REPLAY_H
+#define REPLAY_H
+
+int replay_init(void);
 void store_message(ipc_message_t *msg);
 void assign_serial(ipc_message_t *msg);
 void replay(ipc_connection_t *conn, ipc_message_t *msg);
+
+#endif /* REPLAY_H */


=====================================
src/server/servsock.c
=====================================
--- a/src/server/servsock.c
+++ b/src/server/servsock.c
@@ -729,10 +729,13 @@ void migrate_old_folders(void)
 	}
 }
 
-void init_server()
+int init_server()
 {
 	INIT_LIST_HEAD(&connection_list);
+	if (replay_init() != 0)
+		return 1;
 	poll_init();
+	return 0;
 }
 
 ipc_message_t * msg_wholist(void)


=====================================
src/server/servsock.h
=====================================
--- a/src/server/servsock.h
+++ b/src/server/servsock.h
@@ -23,7 +23,7 @@ void msg_attach_to_channel(ipc_message_t *msg, int channel, const char * exclude
 void msg_attach_to_perm(ipc_message_t *msg, perm_t perm);
 void msg_attach(ipc_message_t *msg, ipc_connection_t *conn);
 void migrate_old_folders(void);
-void init_server(void);
+int init_server(void);
 void send_error(ipc_connection_t *conn, ipc_message_t *orig, const char *format, ...);
 void msg_apply_gag(struct user * from, ipc_message_t * msg, const char * field);
 ipc_message_t * msg_wholist(void);



View it on GitLab: https://projects.sucs.org/arthur/mw/commit/822ce86e237bf1e82a66160519dd35d89e31bbe5

---
View it on GitLab: https://projects.sucs.org/arthur/mw/commit/822ce86e237bf1e82a66160519dd35d89e31bbe5
You're receiving this email because of your account on projects.sucs.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.sucs.org/pipermail/mw-devel/attachments/20170805/5a064efd/attachment-0001.html>


More information about the mw-devel mailing list