Search
j0ke.net Open Build Service
>
Projects
>
internetx
:
projects
:
sysutils
>
repcached
> repcached-2.3.1-1.4.13.patch
Sign Up
|
Log In
Username
Password
Cancel
Overview
Repositories
Revisions
Requests
Users
Advanced
Attributes
Meta
File repcached-2.3.1-1.4.13.patch of Package repcached
diff -urN memcached-1.4.13/AUTHORS.repcached memcached-1.4.13-repcached-2.3.1/AUTHORS.repcached --- memcached-1.4.13/AUTHORS.repcached Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/AUTHORS.repcached Mon Feb 13 05:07:52 2012 @@ -0,0 +1,2 @@ +YASUI Masanobu <yasui-m at klab.org> +HIROSE Masaaki <hirose at klab.org> diff -urN memcached-1.4.13/ChangeLog.repcached memcached-1.4.13-repcached-2.3.1/ChangeLog.repcached --- memcached-1.4.13/ChangeLog.repcached Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/ChangeLog.repcached Mon Feb 13 05:07:52 2012 @@ -0,0 +1,63 @@ +2011-02-14 Maxim Dounin + + * release of repcached-2.3.1 for memcached-1.4.5 + * fixed another cpu hog on network problems + +2010-11-24 Maxim Dounin + + * release of repcached-2.3 for memcached-1.4.5 + * fixed cpu hog on transient network problems + +2009-05-26 YASUI Masanobu + * release of repcached-2.2 for memcached-1.2.8 + +2009-01-05 YASUI Masanobu + * fixed segmentation fault on marugoto copying for 64bit platform + +2008-08-26 HIROSE Masaaki + + * release of repcached-2.1 for memcached-1.2.6 + +2008-04-11 YASUI Masanobu + * release of repcached-2.0 for memcached-1.2.5 + * multi master replication support + * replication able to cas id + * fixed replication connect error + +2008-03-10 YASUI Masanobu + * release of repcached-1.2 for memcached-1.2.4 + * bug fix. + * flags was not possible to replication. + always stored to zero in backup. + master> set key flags expire len + backup> set key 0 expire len + This bug was corrected. + * fixed memory leaks when replicating the flush_all command. + +2008-02-15 HIROSE Masaaki + + * release of repcached-1.1 for memcached-1.2.4 + +2008-02-05 YASUI Masanobu + + * support memcached-1.2.4's new commands (append, prepend, cas) + known problem: repcached-1.1 cannot replicate cas unique id. so + it is possible that cas operation is fail if master/backup + fail-over occur between gets and cas. + +2008-02-04 HIROSE Masaaki + + * include test suite for replication. + prove -l t.rep + +2007-11-13 HIROSE Masaaki + + * release of repcached-1.0 for memcached-1.2.2 + + * first release. + + + +Local Variables: +mode: change-log +End: diff -urN memcached-1.4.13/Makefile.am memcached-1.4.13-repcached-2.3.1/Makefile.am --- memcached-1.4.13/Makefile.am Fri Jan 6 22:19:50 2012 +++ memcached-1.4.13-repcached-2.3.1/Makefile.am Mon Feb 13 01:13:58 2012 @@ -31,6 +31,10 @@ memcached_SOURCES += sasl_defs.c endif +if ENABLE_REPLICATION +memcached_SOURCES += replication.h replication.c +endif + memcached_debug_SOURCES = $(memcached_SOURCES) memcached_CPPFLAGS = -DNDEBUG memcached_debug_LDADD = @PROFILER_LDFLAGS@ diff -urN memcached-1.4.13/Makefile.in memcached-1.4.13-repcached-2.3.1/Makefile.in --- memcached-1.4.13/Makefile.in Fri Feb 3 01:24:19 2012 +++ memcached-1.4.13-repcached-2.3.1/Makefile.in Mon Feb 13 05:07:10 2012 @@ -43,13 +43,14 @@ @BUILD_CACHE_TRUE@am__append_2 = cache.c @BUILD_SOLARIS_PRIVS_TRUE@am__append_3 = solaris_priv.c @ENABLE_SASL_TRUE@am__append_4 = sasl_defs.c -@BUILD_DTRACE_TRUE@am__append_5 = memcached_dtrace.h +@ENABLE_REPLICATION_TRUE@am__append_5 = replication.h replication.c @BUILD_DTRACE_TRUE@am__append_6 = memcached_dtrace.h -@DTRACE_INSTRUMENT_OBJ_TRUE@am__append_7 = memcached_dtrace.o +@BUILD_DTRACE_TRUE@am__append_7 = memcached_dtrace.h @DTRACE_INSTRUMENT_OBJ_TRUE@am__append_8 = memcached_dtrace.o -@DTRACE_INSTRUMENT_OBJ_TRUE@am__append_9 = memcached_debug_dtrace.o +@DTRACE_INSTRUMENT_OBJ_TRUE@am__append_9 = memcached_dtrace.o @DTRACE_INSTRUMENT_OBJ_TRUE@am__append_10 = memcached_debug_dtrace.o -@DTRACE_INSTRUMENT_OBJ_TRUE@am__append_11 = memcached_dtrace.o memcached_debug_dtrace.o +@DTRACE_INSTRUMENT_OBJ_TRUE@am__append_11 = memcached_debug_dtrace.o +@DTRACE_INSTRUMENT_OBJ_TRUE@am__append_12 = memcached_dtrace.o memcached_debug_dtrace.o subdir = . DIST_COMMON = README $(am__configure_deps) $(pkginclude_HEADERS) \ $(srcdir)/Makefile.am $(srcdir)/Makefile.in \ @@ -72,34 +73,41 @@ am__memcached_SOURCES_DIST = memcached.c memcached.h hash.c hash.h \ slabs.c slabs.h items.c items.h assoc.c assoc.h thread.c \ daemon.c stats.c stats.h util.c util.h trace.h cache.h \ - sasl_defs.h cache.c solaris_priv.c sasl_defs.c + sasl_defs.h cache.c solaris_priv.c sasl_defs.c replication.h \ + replication.c @BUILD_CACHE_TRUE@am__objects_1 = memcached-cache.$(OBJEXT) @BUILD_SOLARIS_PRIVS_TRUE@am__objects_2 = \ @BUILD_SOLARIS_PRIVS_TRUE@ memcached-solaris_priv.$(OBJEXT) @ENABLE_SASL_TRUE@am__objects_3 = memcached-sasl_defs.$(OBJEXT) +@ENABLE_REPLICATION_TRUE@am__objects_4 = \ +@ENABLE_REPLICATION_TRUE@ memcached-replication.$(OBJEXT) am_memcached_OBJECTS = memcached-memcached.$(OBJEXT) \ memcached-hash.$(OBJEXT) memcached-slabs.$(OBJEXT) \ memcached-items.$(OBJEXT) memcached-assoc.$(OBJEXT) \ memcached-thread.$(OBJEXT) memcached-daemon.$(OBJEXT) \ memcached-stats.$(OBJEXT) memcached-util.$(OBJEXT) \ - $(am__objects_1) $(am__objects_2) $(am__objects_3) + $(am__objects_1) $(am__objects_2) $(am__objects_3) \ + $(am__objects_4) memcached_OBJECTS = $(am_memcached_OBJECTS) am__memcached_debug_SOURCES_DIST = memcached.c memcached.h hash.c \ hash.h slabs.c slabs.h items.c items.h assoc.c assoc.h \ thread.c daemon.c stats.c stats.h util.c util.h trace.h \ - cache.h sasl_defs.h cache.c solaris_priv.c sasl_defs.c -@BUILD_CACHE_TRUE@am__objects_4 = memcached_debug-cache.$(OBJEXT) -@BUILD_SOLARIS_PRIVS_TRUE@am__objects_5 = memcached_debug-solaris_priv.$(OBJEXT) -@ENABLE_SASL_TRUE@am__objects_6 = memcached_debug-sasl_defs.$(OBJEXT) -am__objects_7 = memcached_debug-memcached.$(OBJEXT) \ + cache.h sasl_defs.h cache.c solaris_priv.c sasl_defs.c \ + replication.h replication.c +@BUILD_CACHE_TRUE@am__objects_5 = memcached_debug-cache.$(OBJEXT) +@BUILD_SOLARIS_PRIVS_TRUE@am__objects_6 = memcached_debug-solaris_priv.$(OBJEXT) +@ENABLE_SASL_TRUE@am__objects_7 = memcached_debug-sasl_defs.$(OBJEXT) +@ENABLE_REPLICATION_TRUE@am__objects_8 = memcached_debug-replication.$(OBJEXT) +am__objects_9 = memcached_debug-memcached.$(OBJEXT) \ memcached_debug-hash.$(OBJEXT) memcached_debug-slabs.$(OBJEXT) \ memcached_debug-items.$(OBJEXT) \ memcached_debug-assoc.$(OBJEXT) \ memcached_debug-thread.$(OBJEXT) \ memcached_debug-daemon.$(OBJEXT) \ memcached_debug-stats.$(OBJEXT) memcached_debug-util.$(OBJEXT) \ - $(am__objects_4) $(am__objects_5) $(am__objects_6) -am_memcached_debug_OBJECTS = $(am__objects_7) + $(am__objects_5) $(am__objects_6) $(am__objects_7) \ + $(am__objects_8) +am_memcached_debug_OBJECTS = $(am__objects_9) memcached_debug_OBJECTS = $(am_memcached_debug_OBJECTS) memcached_debug_LINK = $(CCLD) $(memcached_debug_CFLAGS) $(CFLAGS) \ $(AM_LDFLAGS) $(LDFLAGS) -o $@ @@ -107,8 +115,9 @@ sizes_OBJECTS = sizes.$(OBJEXT) sizes_LDADD = $(LDADD) am__testapp_SOURCES_DIST = testapp.c util.c util.h cache.c -@BUILD_CACHE_TRUE@am__objects_8 = cache.$(OBJEXT) -am_testapp_OBJECTS = testapp.$(OBJEXT) util.$(OBJEXT) $(am__objects_8) +@BUILD_CACHE_TRUE@am__objects_10 = cache.$(OBJEXT) +am_testapp_OBJECTS = testapp.$(OBJEXT) util.$(OBJEXT) \ + $(am__objects_10) testapp_OBJECTS = $(am_testapp_OBJECTS) testapp_LDADD = $(LDADD) am_timedrun_OBJECTS = timedrun.$(OBJEXT) @@ -307,21 +316,22 @@ top_builddir = @top_builddir@ top_srcdir = @top_srcdir@ pkginclude_HEADERS = protocol_binary.h -BUILT_SOURCES = $(am__append_5) +BUILT_SOURCES = $(am__append_6) testapp_SOURCES = testapp.c util.c util.h $(am__append_2) timedrun_SOURCES = timedrun.c memcached_SOURCES = memcached.c memcached.h hash.c hash.h slabs.c \ slabs.h items.c items.h assoc.c assoc.h thread.c daemon.c \ stats.c stats.h util.c util.h trace.h cache.h sasl_defs.h \ - $(am__append_1) $(am__append_3) $(am__append_4) + $(am__append_1) $(am__append_3) $(am__append_4) \ + $(am__append_5) memcached_debug_SOURCES = $(memcached_SOURCES) memcached_CPPFLAGS = -DNDEBUG -memcached_debug_LDADD = @PROFILER_LDFLAGS@ $(am__append_9) +memcached_debug_LDADD = @PROFILER_LDFLAGS@ $(am__append_10) memcached_debug_CFLAGS = @PROFILER_FLAGS@ -memcached_LDADD = $(am__append_7) -memcached_DEPENDENCIES = $(am__append_8) -memcached_debug_DEPENDENCIES = $(am__append_10) -CLEANFILES = $(am__append_6) $(am__append_11) +memcached_LDADD = $(am__append_8) +memcached_DEPENDENCIES = $(am__append_9) +memcached_debug_DEPENDENCIES = $(am__append_11) +CLEANFILES = $(am__append_7) $(am__append_12) SUBDIRS = doc DIST_DIRS = scripts EXTRA_DIST = doc scripts t memcached.spec memcached_dtrace.d version.m4 @@ -451,6 +461,7 @@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/memcached-hash.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/memcached-items.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/memcached-memcached.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/memcached-replication.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/memcached-sasl_defs.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/memcached-slabs.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/memcached-solaris_priv.Po@am__quote@ @@ -463,6 +474,7 @@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/memcached_debug-hash.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/memcached_debug-items.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/memcached_debug-memcached.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/memcached_debug-replication.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/memcached_debug-sasl_defs.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/memcached_debug-slabs.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/memcached_debug-solaris_priv.Po@am__quote@ @@ -656,6 +668,20 @@ @AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCC_FALSE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(memcached_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o memcached-sasl_defs.obj `if test -f 'sasl_defs.c'; then $(CYGPATH_W) 'sasl_defs.c'; else $(CYGPATH_W) '$(srcdir)/sasl_defs.c'; fi` +memcached-replication.o: replication.c +@am__fastdepCC_TRUE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(memcached_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT memcached-replication.o -MD -MP -MF $(DEPDIR)/memcached-replication.Tpo -c -o memcached-replication.o `test -f 'replication.c' || echo '$(srcdir)/'`replication.c +@am__fastdepCC_TRUE@ $(am__mv) $(DEPDIR)/memcached-replication.Tpo $(DEPDIR)/memcached-replication.Po +@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='replication.c' object='memcached-replication.o' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(memcached_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o memcached-replication.o `test -f 'replication.c' || echo '$(srcdir)/'`replication.c + +memcached-replication.obj: replication.c +@am__fastdepCC_TRUE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(memcached_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT memcached-replication.obj -MD -MP -MF $(DEPDIR)/memcached-replication.Tpo -c -o memcached-replication.obj `if test -f 'replication.c'; then $(CYGPATH_W) 'replication.c'; else $(CYGPATH_W) '$(srcdir)/replication.c'; fi` +@am__fastdepCC_TRUE@ $(am__mv) $(DEPDIR)/memcached-replication.Tpo $(DEPDIR)/memcached-replication.Po +@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='replication.c' object='memcached-replication.obj' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(memcached_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o memcached-replication.obj `if test -f 'replication.c'; then $(CYGPATH_W) 'replication.c'; else $(CYGPATH_W) '$(srcdir)/replication.c'; fi` + memcached_debug-memcached.o: memcached.c @am__fastdepCC_TRUE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(memcached_debug_CFLAGS) $(CFLAGS) -MT memcached_debug-memcached.o -MD -MP -MF $(DEPDIR)/memcached_debug-memcached.Tpo -c -o memcached_debug-memcached.o `test -f 'memcached.c' || echo '$(srcdir)/'`memcached.c @am__fastdepCC_TRUE@ $(am__mv) $(DEPDIR)/memcached_debug-memcached.Tpo $(DEPDIR)/memcached_debug-memcached.Po @@ -823,6 +849,20 @@ @AMDEP_TRUE@@am__fastdepCC_FALSE@ source='sasl_defs.c' object='memcached_debug-sasl_defs.obj' libtool=no @AMDEPBACKSLASH@ @AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCC_FALSE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(memcached_debug_CFLAGS) $(CFLAGS) -c -o memcached_debug-sasl_defs.obj `if test -f 'sasl_defs.c'; then $(CYGPATH_W) 'sasl_defs.c'; else $(CYGPATH_W) '$(srcdir)/sasl_defs.c'; fi` + +memcached_debug-replication.o: replication.c +@am__fastdepCC_TRUE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(memcached_debug_CFLAGS) $(CFLAGS) -MT memcached_debug-replication.o -MD -MP -MF $(DEPDIR)/memcached_debug-replication.Tpo -c -o memcached_debug-replication.o `test -f 'replication.c' || echo '$(srcdir)/'`replication.c +@am__fastdepCC_TRUE@ $(am__mv) $(DEPDIR)/memcached_debug-replication.Tpo $(DEPDIR)/memcached_debug-replication.Po +@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='replication.c' object='memcached_debug-replication.o' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(memcached_debug_CFLAGS) $(CFLAGS) -c -o memcached_debug-replication.o `test -f 'replication.c' || echo '$(srcdir)/'`replication.c + +memcached_debug-replication.obj: replication.c +@am__fastdepCC_TRUE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(memcached_debug_CFLAGS) $(CFLAGS) -MT memcached_debug-replication.obj -MD -MP -MF $(DEPDIR)/memcached_debug-replication.Tpo -c -o memcached_debug-replication.obj `if test -f 'replication.c'; then $(CYGPATH_W) 'replication.c'; else $(CYGPATH_W) '$(srcdir)/replication.c'; fi` +@am__fastdepCC_TRUE@ $(am__mv) $(DEPDIR)/memcached_debug-replication.Tpo $(DEPDIR)/memcached_debug-replication.Po +@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='replication.c' object='memcached_debug-replication.obj' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(memcached_debug_CFLAGS) $(CFLAGS) -c -o memcached_debug-replication.obj `if test -f 'replication.c'; then $(CYGPATH_W) 'replication.c'; else $(CYGPATH_W) '$(srcdir)/replication.c'; fi` install-pkgincludeHEADERS: $(pkginclude_HEADERS) @$(NORMAL_INSTALL) test -z "$(pkgincludedir)" || $(MKDIR_P) "$(DESTDIR)$(pkgincludedir)" diff -urN memcached-1.4.13/assoc.c memcached-1.4.13-repcached-2.3.1/assoc.c --- memcached-1.4.13/assoc.c Fri Jan 6 22:19:50 2012 +++ memcached-1.4.13-repcached-2.3.1/assoc.c Mon Feb 13 01:13:58 2012 @@ -270,3 +270,51 @@ } +#ifdef USE_REPLICATION +char *assoc_key_snap(int *n) +{ + char *p = NULL; + char *b = NULL; + item *i = NULL; + int co = 0; + int sz = 1; + int hs = 0; + int hm = hashsize(hashpower); + + hs = hm; + while(hs--){ + if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){ + i = old_hashtable[hs]; + }else{ + i = primary_hashtable[hs]; + } + while(i){ + sz += i->nkey + 1; + co++; + i = i->h_next; + } + } + + if(co){ + if((p = b = malloc(sz))){ + hs = hm; + while(hs--){ + if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){ + i = old_hashtable[hs]; + }else{ + i = primary_hashtable[hs]; + } + while(i){ + memcpy(p, ITEM_key(i), i->nkey); + p += i->nkey; + *(p++) = 0; + i = i->h_next; + } + } + *(p++) = 0; + } + } + if(n) *n = co; + return(b); +} +#endif /* USE_REPLICATION */ diff -urN memcached-1.4.13/assoc.h memcached-1.4.13-repcached-2.3.1/assoc.h --- memcached-1.4.13/assoc.h Fri Jan 6 22:19:50 2012 +++ memcached-1.4.13-repcached-2.3.1/assoc.h Mon Feb 13 01:13:58 2012 @@ -7,3 +7,6 @@ int start_assoc_maintenance_thread(void); void stop_assoc_maintenance_thread(void); +#ifdef USE_REPLICATION +char *assoc_key_snap(int *n); +#endif /*USE_REPLICATION*/ diff -urN memcached-1.4.13/config.guess memcached-1.4.13-repcached-2.3.1/config.guess diff -urN memcached-1.4.13/config.h.in memcached-1.4.13-repcached-2.3.1/config.h.in --- memcached-1.4.13/config.h.in Fri Feb 3 01:24:25 2012 +++ memcached-1.4.13-repcached-2.3.1/config.h.in Mon Feb 13 05:07:26 2012 @@ -117,6 +117,9 @@ /* Define to 1 if you have the ANSI C header files. */ #undef STDC_HEADERS +/* Define this if you want to use replication */ +#undef USE_REPLICATION + /* Version number of package */ #undef VERSION diff -urN memcached-1.4.13/config.sub memcached-1.4.13-repcached-2.3.1/config.sub diff -urN memcached-1.4.13/configure memcached-1.4.13-repcached-2.3.1/configure --- memcached-1.4.13/configure Fri Feb 3 01:24:20 2012 +++ memcached-1.4.13-repcached-2.3.1/configure Mon Feb 13 05:07:10 2012 @@ -606,6 +606,8 @@ BUILD_CACHE_TRUE BUILD_SOLARIS_PRIVS_FALSE BUILD_SOLARIS_PRIVS_TRUE +ENABLE_REPLICATION_FALSE +ENABLE_REPLICATION_TRUE PROFILER_FLAGS PROFILER PROFILER_LDFLAGS @@ -720,6 +722,7 @@ enable_coverage enable_64bit with_libevent +enable_replication enable_docs ' ac_precious_vars='build_alias @@ -1358,6 +1361,7 @@ --enable-dtrace Enable dtrace probes --disable-coverage Disable code coverage --enable-64bit build 64bit version + --enable-replication support replication --disable-docs Disable documentation generation Optional Packages: @@ -5948,6 +5952,28 @@ as_fn_error "Can't enable threads without the POSIX thread library." "$LINENO" 5 fi +# Check whether --enable-replication was given. +if test "${enable_replication+set}" = set; then : + enableval=$enable_replication; if test "x$enable_threads" = "xyes"; then + as_fn_error "Can't enable threads and replication together." "$LINENO" 5 + else + +$as_echo "#define USE_REPLICATION /**/" >>confdefs.h + + fi + +fi + + + if test "x$enable_replication" = "xyes"; then + ENABLE_REPLICATION_TRUE= + ENABLE_REPLICATION_FALSE='#' +else + ENABLE_REPLICATION_TRUE='#' + ENABLE_REPLICATION_FALSE= +fi + + for ac_func in mlockall do : ac_fn_c_check_func "$LINENO" "mlockall" "ac_cv_func_mlockall" @@ -6400,6 +6426,10 @@ fi if test -z "${ENABLE_SASL_TRUE}" && test -z "${ENABLE_SASL_FALSE}"; then as_fn_error "conditional \"ENABLE_SASL\" was never defined. +Usually this means the macro was only invoked conditionally." "$LINENO" 5 +fi +if test -z "${ENABLE_REPLICATION_TRUE}" && test -z "${ENABLE_REPLICATION_FALSE}"; then + as_fn_error "conditional \"ENABLE_REPLICATION\" was never defined. Usually this means the macro was only invoked conditionally." "$LINENO" 5 fi if test -z "${BUILD_SOLARIS_PRIVS_TRUE}" && test -z "${BUILD_SOLARIS_PRIVS_FALSE}"; then diff -urN memcached-1.4.13/configure.ac memcached-1.4.13-repcached-2.3.1/configure.ac --- memcached-1.4.13/configure.ac Thu Feb 2 10:01:29 2012 +++ memcached-1.4.13-repcached-2.3.1/configure.ac Mon Feb 13 01:14:16 2012 @@ -441,6 +441,18 @@ AC_MSG_ERROR([Can't enable threads without the POSIX thread library.]) fi +dnl Check whether the user wants replication or not +AC_ARG_ENABLE(replication, + [AS_HELP_STRING([--enable-replication],[support replication])], + [if test "x$enable_threads" = "xyes"; then + AC_MSG_ERROR([Can't enable threads and replication together.]) + else + AC_DEFINE([USE_REPLICATION],,[Define this if you want to use replication]) + fi + ]) + +AM_CONDITIONAL(ENABLE_REPLICATION, test "x$enable_replication" = "xyes") + AC_CHECK_FUNCS(mlockall) AC_CHECK_FUNCS(getpagesizes) AC_CHECK_FUNCS(memcntl) diff -urN memcached-1.4.13/doc/Makefile memcached-1.4.13-repcached-2.3.1/doc/Makefile diff -urN memcached-1.4.13/items.c memcached-1.4.13-repcached-2.3.1/items.c --- memcached-1.4.13/items.c Thu Feb 2 06:02:02 2012 +++ memcached-1.4.13-repcached-2.3.1/items.c Mon Feb 13 01:18:17 2012 @@ -141,6 +141,9 @@ STATS_LOCK(); stats.evictions++; STATS_UNLOCK(); +#ifdef USE_REPLICATION + replication_call_del(ITEM_key(search), search->nkey); +#endif /* USE_REPLICATION */ it = search; slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal); do_item_unlink_nolock(it, hash(ITEM_key(it), it->nkey, 0)); @@ -283,8 +286,14 @@ stats.total_items += 1; STATS_UNLOCK(); +#ifdef USE_REPLICATION + /* Allocate a new CAS ID on link. */ + if(!(it->it_flags & ITEM_REPDATA)) + ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0); +#else /* Allocate a new CAS ID on link. */ ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0); +#endif /* USE_REPLICATION */ assoc_insert(it, hv); item_link_q(it); refcount_incr(&it->refcount); diff -urN memcached-1.4.13/memcached.c memcached-1.4.13-repcached-2.3.1/memcached.c --- memcached-1.4.13/memcached.c Sun Jan 29 02:37:59 2012 +++ memcached-1.4.13-repcached-2.3.1/memcached.c Mon Feb 13 04:26:01 2012 @@ -97,6 +97,29 @@ static void conn_free(conn *c); +#ifdef USE_REPLICATION +static int rep_exit = 0; +static conn *rep_recv = NULL; +static conn *rep_send = NULL; +static conn *rep_conn = NULL; +static conn *rep_serv = NULL; +static int server_socket_replication(const int); +static void server_close_replication(void); +static int replication_init(void); +static int replication_server_init(void); +static int replication_client_init(void); +static int replication_start(void); +static int replication_connect(void); +static int replication_close(void); +static void replication_dispatch_close(void); +static int replication_marugoto(int); +static int replication_send(conn *); +static int replication_pop(void); +static int replication_exit(void); +static int replication_item(Q_ITEM *); +static pthread_mutex_t replication_pipe_lock = PTHREAD_MUTEX_INITIALIZER; +#endif /* USE_REPLICATION */ + /** exported globals **/ struct stats stats; struct settings settings; @@ -223,6 +246,11 @@ settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */ settings.maxconns_fast = false; settings.hashpower_init = 0; +#ifdef USE_REPLICATION + settings.rep_addr.s_addr = htonl(INADDR_ANY); + settings.rep_port = 11212; + settings.rep_qmax = 8192; +#endif /* USE_REPLICATION */ settings.slab_reassign = false; settings.slab_automove = false; } @@ -413,6 +441,10 @@ prot_text(c->protocol)); } else if (IS_UDP(transport)) { fprintf(stderr, "<%d server listening (udp)\n", sfd); +#ifdef USE_REPLICATION + } else if (init_state == conn_rep_listen) { + fprintf(stderr, "<%d server listening (replication)\n", sfd); +#endif /* USE_REPLICATION */ } else if (c->protocol == negotiating_prot) { fprintf(stderr, "<%d new auto-negotiating client connection\n", sfd); @@ -630,7 +662,11 @@ "conn_nread", "conn_swallow", "conn_closing", - "conn_mwrite" }; + "conn_mwrite", + "conn_repconnect", + "conn_rep_listen", + "conn_pipe_recv", + "conn_pipe_send" }; return statenames[state]; } @@ -788,6 +824,14 @@ assert(c != NULL); +#ifdef USE_REPLICATION + if (c == rep_conn){ + if (settings.verbose > 1) + fprintf(stderr, "REP>%d %s\n", c->sfd, str); + conn_set_state(c, conn_new_cmd); + return; + } +#endif /* USE_REPLICATION */ if (c->noreply) { if (settings.verbose > 1) fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str); @@ -833,9 +877,11 @@ int comm = c->cmd; enum store_item_type ret; - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++; - pthread_mutex_unlock(&c->thread->stats.mutex); + if (c->thread) { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++; + pthread_mutex_unlock(&c->thread->stats.mutex); + } if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) { out_string(c, "CLIENT_ERROR bad data chunk"); @@ -874,6 +920,11 @@ switch (ret) { case STORED: +#ifdef USE_REPLICATION + if( c != rep_conn ){ + replication_call_rep(ITEM_key(it), it->nkey); + } +#endif /* USE_REPLICATION */ out_string(c, "STORED"); break; case EXISTS: @@ -2587,6 +2638,11 @@ APPEND_STAT("hash_is_expanding", "%u", stats.hash_is_expanding); APPEND_STAT("expired_unfetched", "%llu", stats.expired_unfetched); APPEND_STAT("evicted_unfetched", "%llu", stats.evicted_unfetched); +#ifdef USE_REPLICATION + APPEND_STAT("replication", "MASTER", 0); + APPEND_STAT("repcached_version", "%s", REPCACHED_VERSION); + APPEND_STAT("repcached_qi_free", "%u", settings.rep_qmax - get_qi_count()); +#endif /*USE_REPLICATION*/ if (settings.slab_reassign) { APPEND_STAT("slab_reassign_running", "%u", stats.slab_reassign_running); APPEND_STAT("slabs_moved", "%llu", stats.slabs_moved); @@ -2979,6 +3035,11 @@ c->thread->stats.slab_stats[it->slabs_clsid].touch_hits++; pthread_mutex_unlock(&c->thread->stats.mutex); +#ifdef USE_REPLICATION + if (c != rep_conn) + replication_call_rep(ITEM_key(it), it->nkey); +#endif /* USE_REPLICATION */ + out_string(c, "TOUCHED"); item_remove(it); } else { @@ -3125,6 +3186,12 @@ do_item_update(it); } +#ifdef USE_REPLICATION + if (c != rep_conn) { + replication_call_rep(ITEM_key(it), it->nkey); + } +#endif /* USE_REPLICATION */ + if (cas) { *cas = ITEM_get_cas(it); /* swap the incoming CAS value */ } @@ -3168,17 +3235,25 @@ if (it) { MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey); - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++; - pthread_mutex_unlock(&c->thread->stats.mutex); + if (c->thread) { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++; + pthread_mutex_unlock(&c->thread->stats.mutex); + } item_unlink(it); item_remove(it); /* release our reference */ +#ifdef USE_REPLICATION + if( c != rep_conn ) + replication_call_del(key, nkey); +#endif /* USE_REPLICATION */ out_string(c, "DELETED"); } else { - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.delete_misses++; - pthread_mutex_unlock(&c->thread->stats.mutex); + if (c->thread) { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.delete_misses++; + pthread_mutex_unlock(&c->thread->stats.mutex); + } out_string(c, "NOT_FOUND"); } @@ -3263,6 +3338,22 @@ process_update_command(c, tokens, ntokens, comm, true); +#ifdef USE_REPLICATION + } else if ((ntokens == 7) && (strcmp(tokens[COMMAND_TOKEN].value, "rep") == 0 && (comm = NREAD_SET)) && (c == rep_conn)) { + + process_update_command(c, tokens, ntokens, comm, true); + if(c->item) + ((item *)(c->item))->it_flags |= ITEM_REPDATA; + + } else if ((ntokens == 2) && (strcmp(tokens[COMMAND_TOKEN].value, "marugoto_end") == 0) && (c == rep_conn)) { + if(replication_start() == -1) + exit(EXIT_FAILURE); + if (settings.verbose > 0) + fprintf(stderr,"replication: start\n"); + out_string(c, "OK"); + return; + +#endif /* USE_REPLICATION */ } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) { process_arithmetic_command(c, tokens, ntokens, 1); @@ -3292,11 +3383,17 @@ set_noreply_maybe(c, tokens, ntokens); - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.flush_cmds++; - pthread_mutex_unlock(&c->thread->stats.mutex); + if (c->thread) { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.flush_cmds++; + pthread_mutex_unlock(&c->thread->stats.mutex); + } if(ntokens == (c->noreply ? 3 : 2)) { +#ifdef USE_REPLICATION + if( c != rep_conn ) + replication_call_flush_all(); +#endif settings.oldest_live = current_time - 1; item_flush_expired(); out_string(c, "OK"); @@ -3309,6 +3406,11 @@ return; } +#ifdef USE_REPLICATION + if( c != rep_conn ) + replication_call_defer_flush_all(realtime(exptime) + process_started); +#endif + settings.oldest_live = realtime(exptime) - 1; /* If exptime is zero realtime() would return zero too, and realtime(exptime) - 1 would overflow to the max unsigned @@ -3603,9 +3705,11 @@ int avail = c->rsize - c->rbytes; res = read(c->sfd, c->rbuf + c->rbytes, avail); if (res > 0) { - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.bytes_read += res; - pthread_mutex_unlock(&c->thread->stats.mutex); + if (c->thread) { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.bytes_read += res; + pthread_mutex_unlock(&c->thread->stats.mutex); + } gotdata = READ_DATA_RECEIVED; c->rbytes += res; if (res == avail) { @@ -3754,6 +3858,15 @@ assert(c != NULL); +#ifdef USE_REPLICATION + if(rep_exit && (c->state != conn_pipe_recv)){ + if (c == rep_conn && c->wbytes) { + replication_send(c); + } + return; + } +#endif /* USE_REPLICATION */ + while (!stop) { switch(c->state) { @@ -3844,9 +3957,11 @@ if (nreqs >= 0) { reset_cmd_handler(c); } else { - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.conn_yields++; - pthread_mutex_unlock(&c->thread->stats.mutex); + if (c->thread) { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.conn_yields++; + pthread_mutex_unlock(&c->thread->stats.mutex); + } if (c->rbytes > 0) { /* We have already read in data into the input buffer, so libevent will most likely not signal read events @@ -3887,9 +4002,11 @@ /* now try reading from the socket */ res = read(c->sfd, c->ritem, c->rlbytes); if (res > 0) { - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.bytes_read += res; - pthread_mutex_unlock(&c->thread->stats.mutex); + if (c->thread) { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.bytes_read += res; + pthread_mutex_unlock(&c->thread->stats.mutex); + } if (c->rcurr == c->ritem) { c->rcurr += res; } @@ -3942,9 +4059,11 @@ /* now try reading from the socket */ res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize); if (res > 0) { - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.bytes_read += res; - pthread_mutex_unlock(&c->thread->stats.mutex); + if (c->thread) { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.bytes_read += res; + pthread_mutex_unlock(&c->thread->stats.mutex); + } c->sbytes -= res; break; } @@ -4040,6 +4159,10 @@ case conn_closing: if (IS_UDP(c->transport)) conn_cleanup(c); +#ifdef USE_REPLICATION + else if(c == rep_conn) + replication_close(); +#endif /*USE_REPLICATION*/ else conn_close(c); stop = true; @@ -4048,9 +4171,79 @@ case conn_max_state: assert(false); break; + +#ifdef USE_REPLICATION + case conn_pipe_recv: + if(replication_pop()){ + replication_close(); + }else{ + replication_send(rep_conn); + } + stop = true; + break; + + case conn_rep_listen: + if (settings.verbose > 0) + fprintf(stderr,"replication: accept\n"); + addrlen = sizeof(addr); + res = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); + if(res == -1){ + if(errno == EAGAIN || errno == EWOULDBLOCK) { + } else if (errno == EMFILE) { + fprintf(stderr, "replication: Too many opened connections\n"); + } else { + fprintf(stderr, "replication: accept error\n"); + } + }else{ + if(rep_conn){ + close(res); + fprintf(stderr,"replication: already connected\n"); + }else{ + if((flags = fcntl(res, F_GETFL, 0)) < 0 || fcntl(res, F_SETFL, flags | O_NONBLOCK) < 0){ + close(res); + fprintf(stderr, "replication: Can't Setting O_NONBLOCK\n"); + }else{ + server_close_replication(); + rep_conn = conn_new(res, conn_read, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base); + rep_conn->item = NULL; + rep_conn->rbytes = 0; + rep_conn->rcurr = rep_conn->rbuf; + replication_connect(); + replication_marugoto(1); + replication_marugoto(0); + } + } + } + stop = true; + break; + + case conn_repconnect: + rep_conn = c; + replication_connect(); + conn_set_state(c, conn_read); + if (settings.verbose > 0) + fprintf(stderr,"replication: marugoto copying\n"); + if(!update_event(c, EV_READ | EV_PERSIST)){ + fprintf(stderr, "replication: Couldn't update event\n"); + conn_set_state(c, conn_closing); + } + stop = true; + break; + + case conn_pipe_send: + /* should not happen */ + fprintf(stderr, "replication: unexpected conn_pipe_send state\n"); + break; +#endif /* USE_REPLICATION */ } } +#ifdef USE_REPLICATION + if (c == rep_conn && c->wbytes) { + replication_send(c); + } +#endif /* USE_REPLICATION */ + return; } @@ -4389,6 +4582,89 @@ return 0; } +#ifdef USE_REPLICATION +static int server_socket_replication(const int port) { + int sfd; + struct linger ling = {0, 0}; + struct addrinfo *ai; + struct addrinfo *next; + struct addrinfo hints; + char port_buf[NI_MAXSERV]; + int error; + int success = 0; + + int flags =1; + + memset(&hints, 0, sizeof (hints)); + hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG; + hints.ai_family = AF_UNSPEC; + hints.ai_protocol = IPPROTO_TCP; + hints.ai_socktype = SOCK_STREAM; + snprintf(port_buf, NI_MAXSERV, "%d", port); + error= getaddrinfo(settings.inter, port_buf, &hints, &ai); + if (error != 0) { + if (error != EAI_SYSTEM) + fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error)); + else + perror("getaddrinfo()"); + + return 1; + } + + for (next= ai; next; next= next->ai_next) { + conn *rep_serv_add; + if ((sfd = new_socket(next)) == -1) { + freeaddrinfo(ai); + return 1; + } + setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); + setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); + setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); + setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); + + if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) { + if (errno != EADDRINUSE) { + perror("bind()"); + close(sfd); + freeaddrinfo(ai); + return 1; + } + close(sfd); + continue; + } else { + success++; + if (listen(sfd, 1024) == -1) { + perror("listen()"); + close(sfd); + freeaddrinfo(ai); + return 1; + } + } + + if (!(rep_serv_add = conn_new(sfd, conn_rep_listen, + EV_READ | EV_PERSIST, 1, tcp_transport, main_base))) { + fprintf(stderr, "failed to create replication server connection\n"); + exit(EXIT_FAILURE); + } + + rep_serv_add->next = rep_serv; + rep_serv = rep_serv_add; + } + + freeaddrinfo(ai); + + /* Return zero iff we detected no errors in starting up connections */ + return success == 0; +} + +static void server_close_replication(void) { + while(rep_serv){ + conn_close(rep_serv); + rep_serv = rep_serv->next; + } +} +#endif /* USE_REPLICATION */ + /* * We keep the current time of day in a global variable that's updated by a * timer event. This saves us a bunch of time() system calls (we really only @@ -4450,6 +4726,9 @@ static void usage(void) { printf(PACKAGE " " VERSION "\n"); +#ifdef USE_REPLICATION + printf("repcached %s\n",REPCACHED_VERSION); +#endif /* USE_REPLICATION */ printf("-p <num> TCP port number to listen on (default: 11211)\n" "-U <num> UDP port number to listen on (default: 11211, 0 is off)\n" "-s <file> UNIX socket path to listen on (disables network support)\n" @@ -4502,6 +4781,10 @@ #ifdef ENABLE_SASL printf("-S Turn on Sasl authentication\n"); #endif +#ifdef USE_REPLICATION + printf("-x <ip_addr> hostname or IP address of peer repcached\n"); + printf("-X <num> TCP port number for replication (default: 11212)\n"); +#endif /* USE_REPLICATION */ printf("-o Comma separated list of extended or experimental options\n" " - (EXPERIMENTAL) maxconns_fast: immediately close new\n" " connections if over maxconns limit\n" @@ -4625,6 +4908,26 @@ exit(EXIT_SUCCESS); } +#ifdef USE_REPLICATION +static void sig_handler_cb(int fd, short event, void *arg) +{ + struct event *signal = arg; + + if (settings.verbose) + fprintf(stderr, "got signal %d\n", EVENT_SIGNAL(signal)); + + if (replication_exit()) { + exit(EXIT_FAILURE); + } + + pthread_mutex_lock(&replication_pipe_lock); + if (!rep_send) { + exit(EXIT_SUCCESS); + } + pthread_mutex_unlock(&replication_pipe_lock); +} +#endif /* USE_REPLICATION */ + #ifndef HAVE_SIGIGNORE static int sigignore(int sig) { struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = 0 }; @@ -4680,6 +4983,57 @@ #endif } +static void create_listening_sockets(void) +{ + /* create unix mode sockets after dropping privileges */ + if (settings.socketpath != NULL) { + errno = 0; + if (server_socket_unix(settings.socketpath,settings.access)) { + vperror("failed to listen on UNIX socket: %s", settings.socketpath); + exit(EX_OSERR); + } + } + + /* create the listening socket, bind it, and init */ + if (settings.socketpath == NULL) { + const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME"); + char temp_portnumber_filename[PATH_MAX]; + FILE *portnumber_file = NULL; + + if (portnumber_filename != NULL) { + snprintf(temp_portnumber_filename, + sizeof(temp_portnumber_filename), + "%s.lck", portnumber_filename); + + portnumber_file = fopen(temp_portnumber_filename, "a"); + if (portnumber_file == NULL) { + fprintf(stderr, "Failed to open \"%s\": %s\n", + temp_portnumber_filename, strerror(errno)); + } + } + + errno = 0; + if (settings.port && server_sockets(settings.port, tcp_transport, + portnumber_file)) { + vperror("failed to listen on TCP port %d", settings.port); + exit(EX_OSERR); + } + + /* create the UDP listening socket and bind it */ + errno = 0; + if (settings.udpport && server_sockets(settings.udpport, udp_transport, + portnumber_file)) { + vperror("failed to listen on UDP port %d", settings.udpport); + exit(EX_OSERR); + } + + if (portnumber_file) { + fclose(portnumber_file); + rename(temp_portnumber_filename, portnumber_filename); + } + } +} + /** * Do basic sanity check of the runtime environment * @return true if no errors found, false if we can't use this env @@ -4714,6 +5068,11 @@ struct rlimit rlim; char unit = '\0'; int size_max = 0; +#ifdef USE_REPLICATION + struct in_addr addr; + struct addrinfo master_hint; + struct addrinfo *master_addr; +#endif /* USE_REPLICATION */ int retval = EXIT_SUCCESS; /* listening sockets */ static int *l_socket = NULL; @@ -4781,6 +5140,11 @@ "B:" /* Binding protocol */ "I:" /* Max item size */ "S" /* Sasl ON */ +#ifdef USE_REPLICATION + "X:" /* replication port */ + "x:" /* replication master */ + "q:" /* replication queue length */ +#endif /* USE_REPLICATION */ "o:" /* Extended generic options */ ))) { switch (c) { @@ -4949,6 +5313,31 @@ ); } break; +#ifdef USE_REPLICATION + case 'x': + if (inet_pton(AF_INET, optarg, &addr) <= 0) { + memset(&master_hint, 0, sizeof(master_hint)); + master_hint.ai_flags = 0; + master_hint.ai_socktype = 0; + master_hint.ai_protocol = 0; + if(!getaddrinfo(optarg, NULL, &master_hint, &master_addr)){ + settings.rep_addr = ((struct sockaddr_in *)(master_addr->ai_addr)) -> sin_addr; + freeaddrinfo(master_addr); + }else{ + fprintf(stderr, "Illegal address: %s\n", optarg); + return 1; + } + } else { + settings.rep_addr = addr; + } + break; + case 'X': + settings.rep_port = atoi(optarg); + break; + case 'q': + settings.rep_qmax = atoi(optarg); + break; +#endif /* USE_REPLICATION */ case 'S': /* set Sasl authentication to true. Default is false */ #ifndef ENABLE_SASL fprintf(stderr, "This server is not built with SASL support.\n"); @@ -5120,6 +5509,17 @@ /* initialize main thread libevent instance */ main_base = event_init(); +#ifdef USE_REPLICATION + /* register events for SIGINT and SIGTERM to handle them in main thread */ + struct event signal_int, signal_term; + event_set(&signal_int, SIGINT, EV_SIGNAL|EV_PERSIST, sig_handler_cb, + &signal_int); + event_add(&signal_int, NULL); + event_set(&signal_term, SIGTERM, EV_SIGNAL|EV_PERSIST, sig_handler_cb, + &signal_term); + event_add(&signal_term, NULL); +#endif + /* initialize other stuff */ stats_init(); assoc_init(settings.hashpower_init); @@ -5149,60 +5549,21 @@ /* initialise clock event */ clock_handler(0, 0, 0); - /* create unix mode sockets after dropping privileges */ - if (settings.socketpath != NULL) { - errno = 0; - if (server_socket_unix(settings.socketpath,settings.access)) { - vperror("failed to listen on UNIX socket: %s", settings.socketpath); - exit(EX_OSERR); - } - } - - /* create the listening socket, bind it, and init */ - if (settings.socketpath == NULL) { - const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME"); - char temp_portnumber_filename[PATH_MAX]; - FILE *portnumber_file = NULL; - - if (portnumber_filename != NULL) { - snprintf(temp_portnumber_filename, - sizeof(temp_portnumber_filename), - "%s.lck", portnumber_filename); - - portnumber_file = fopen(temp_portnumber_filename, "a"); - if (portnumber_file == NULL) { - fprintf(stderr, "Failed to open \"%s\": %s\n", - temp_portnumber_filename, strerror(errno)); - } - } - - errno = 0; - if (settings.port && server_sockets(settings.port, tcp_transport, - portnumber_file)) { - vperror("failed to listen on TCP port %d", settings.port); - exit(EX_OSERR); - } - - /* - * initialization order: first create the listening sockets - * (may need root on low ports), then drop root if needed, - * then daemonise if needed, then init libevent (in some cases - * descriptors created by libevent wouldn't survive forking). - */ - - /* create the UDP listening socket and bind it */ - errno = 0; - if (settings.udpport && server_sockets(settings.udpport, udp_transport, - portnumber_file)) { - vperror("failed to listen on UDP port %d", settings.udpport); - exit(EX_OSERR); - } + /* + * initialization order: first create the listening sockets + * (may need root on low ports), then drop root if needed, + * then daemonise if needed, then init libevent (in some cases + * descriptors created by libevent wouldn't survive forking). + */ - if (portnumber_file) { - fclose(portnumber_file); - rename(temp_portnumber_filename, portnumber_filename); - } +#ifdef USE_REPLICATION + if(replication_init() == -1){ + fprintf(stderr, "faild to replication init\n"); + exit(EXIT_FAILURE); } +#else + create_listening_sockets(); +#endif /* Give the sockets a moment to open. I know this is dumb, but the error * is only an advisory. @@ -5240,3 +5601,362 @@ return retval; } + +#ifdef USE_REPLICATION +static int replication_start(void) +{ + static int start = 0; + if(start) + return(0); + + create_listening_sockets(); + + start = 1; + return(0); +} + +static int replication_server_init(void) +{ + rep_recv = NULL; + rep_send = NULL; + rep_conn = NULL; + if(server_socket_replication(settings.rep_port)){ + fprintf(stderr, "replication: failed to initialize replication server socket\n"); + return(-1); + } + if (settings.verbose > 0) + fprintf(stderr, "replication: listen\n"); + return(replication_start()); +} + +static int replication_client_init(void) +{ + int s; + conn *c; + struct addrinfo ai; + struct sockaddr_in server; + + rep_recv = NULL; + rep_send = NULL; + rep_conn = NULL; + + memset(&ai,0,sizeof(ai)); + ai.ai_family = AF_INET; + ai.ai_socktype = SOCK_STREAM; + s = new_socket(&ai); + + if(s == -1) { + fprintf(stderr, "replication: failed to replication client socket\n"); + return(-1); + }else{ + /* connect */ + memset((char *)&server, 0, sizeof(server)); + server.sin_family = AF_INET; + server.sin_addr = settings.rep_addr; + server.sin_port = htons(settings.rep_port); + if (settings.verbose > 0) + fprintf(stderr,"replication: connect (peer=%s:%d)\n", inet_ntoa(settings.rep_addr), settings.rep_port); + if(connect(s,(struct sockaddr *)&server, sizeof(server)) == 0){ + c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base); + if(c == NULL){ + fprintf(stderr, "replication: failed to create client conn"); + close(s); + return(-1); + } + drive_machine(c); + }else{ + if(errno == EINPROGRESS){ + c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base); + if(c == NULL){ + fprintf(stderr, "replication: failed to create client conn"); + close(s); + return(-1); + } + }else{ + fprintf(stdout,"replication: can't connect %s:%d\n", inet_ntoa(server.sin_addr), ntohs(server.sin_port)); + close(s); + return(-1); + } + } + } + return(0); +} + +static int replication_init(void) +{ + if(settings.rep_addr.s_addr != htonl(INADDR_ANY)){ + if(replication_client_init() != -1){ + return(0); + } + } + return(replication_server_init()); +} + +static int replication_connect(void) +{ + int f; + int p[2]; + + if(pipe(p) == -1){ + fprintf(stderr, "replication: can't create pipe\n"); + return(-1); + }else{ + if((f = fcntl(p[0], F_GETFL, 0)) < 0 || fcntl(p[0], F_SETFL, f | O_NONBLOCK) < 0) { + fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n"); + return(-1); + } + if((f = fcntl(p[1], F_GETFL, 0)) < 0 || fcntl(p[1], F_SETFL, f | O_NONBLOCK) < 0) { + fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n"); + return(-1); + } + pthread_mutex_lock(&replication_pipe_lock); + rep_recv = conn_new(p[0], conn_pipe_recv, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base); + rep_send = conn_new(p[1], conn_pipe_send, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base); + event_del(&rep_send->event); + pthread_mutex_unlock(&replication_pipe_lock); + } + return(0); +} + +static int replication_close(void) +{ + Q_ITEM *q; + + if(settings.verbose > 0) + fprintf(stderr,"replication: close\n"); + if(rep_recv){ + conn_close(rep_recv); + rep_recv = NULL; + } + pthread_mutex_lock(&replication_pipe_lock); + if(rep_send){ + conn_close(rep_send); + rep_send = NULL; + if (settings.verbose > 1) + fprintf(stderr,"replication: close send\n"); + } + pthread_mutex_unlock(&replication_pipe_lock); + while ((q = replication_queue_pop()) != NULL) { + qi_free(q); + } + if (settings.verbose > 1) { + fprintf(stderr, "replication: qitem free %d items\n", qi_free_list()); + } + if(rep_conn){ + conn_close(rep_conn); + rep_conn = NULL; + if (settings.verbose > 1) + fprintf(stderr,"replication: close conn\n"); + } + if(!rep_exit) + replication_server_init(); + return(0); +} + +static void replication_dispatch_close(void) +{ + if (settings.verbose > 1) + fprintf(stderr, "replication: dispatch close\n"); + pthread_mutex_lock(&replication_pipe_lock); + if (rep_send) { + conn_close(rep_send); + rep_send = NULL; + } + pthread_mutex_unlock(&replication_pipe_lock); +} + +static int replication_marugoto(int f) +{ + static int keysend = 0; + static int keycount = 0; + static char *keylist = NULL; + static char *keyptr = NULL; + + if(f){ + if(keylist){ + free(keylist); + keylist = NULL; + keyptr = NULL; + keycount = 0; + keysend = 0; + } + pthread_mutex_lock(&cache_lock); + keylist = (char *)assoc_key_snap((int *)&keycount); + pthread_mutex_unlock(&cache_lock); + keyptr = keylist; + if (!keyptr){ + replication_call_marugoto_end(); + }else{ + if (settings.verbose > 0) + fprintf(stderr,"replication: marugoto start\n"); + } + }else{ + if(keyptr){ + while(*keyptr){ + item *it = item_get(keyptr, strlen(keyptr)); + if(it){ + item_remove(it); + if(replication_call_rep(keyptr, strlen(keyptr)) == -1){ + return(-1); + }else{ + keysend++; + keyptr += strlen(keyptr) + 1; + return(0); + } + } + keyptr += strlen(keyptr) + 1; + } + if(settings.verbose > 0) + fprintf(stderr,"replication: marugoto %d\n", keysend); + replication_call_marugoto_end(); + if(settings.verbose > 0) + fprintf(stderr,"replication: marugoto owari\n"); + free(keylist); + keylist = NULL; + keyptr = NULL; + keycount = 0; + keysend = 0; + } + } + return(0); +} + +static int replication_send(conn *c) +{ + int w; + + w = write(c->sfd, c->wcurr, c->wbytes); + + if (w == -1 && !(errno == EAGAIN || errno == EINTR)) { + fprintf(stderr,"replication: send error %d\n", errno); + replication_close(); + return -1; + } + + if (w > 0) { + c->wbytes -= w; + c->wcurr += w; + } + + if (c->wbytes < 1024 * 1024 && replication_pop()) { + replication_close(); + return -1; + } + + if (!update_event(c, (c->wbytes ? EV_WRITE : 0) | EV_READ | EV_PERSIST)) { + fprintf(stderr, "replication: couldn't update event\n"); + replication_close(); + return -1; + } + + if (rep_exit && c->wbytes == 0) { + replication_close(); + if (settings.verbose) + fprintf(stderr,"replication: cleanup complete\n"); + exit(EXIT_SUCCESS); + } + + return 0; +} + +static int replication_pop(void) +{ + int r; + Q_ITEM *q; + + if (settings.verbose > 1) + fprintf(stderr, "replication: pop\n"); + + if (!rep_recv) + return 0; + + r = read(rep_recv->sfd, rep_recv->rbuf, rep_recv->rsize); + + if (r == -1 && !(errno == EAGAIN || errno == EINTR)) { + fprintf(stderr, "replication: pop error %d\n", errno); + return -1; + } + if (r == 0) { + /* other end closed, trigger replication_close() */ + return -1; + } + + /* process queue */ + + while (rep_conn->wbytes < 1024 * 1024 && + (q = replication_queue_pop()) != NULL) + { + if (replication_cmd(rep_conn, q)) { + return -1; + } + + qi_free(q); + } + + if (rep_conn->wbytes < 1024 * 1024) { + replication_marugoto(0); + } + + return 0; +} + +static int replication_exit(void) +{ + rep_exit = 1; + return(replication_item(NULL)); +} + +static int replication_item(Q_ITEM *q) +{ + int w; + + pthread_mutex_lock(&replication_pipe_lock); + if (!rep_send) { + qi_free(q); + pthread_mutex_unlock(&replication_pipe_lock); + return 0; + } + + /* add item to queue */ + + if (q) { + replication_queue_push(q); + } + + /* notify main thread we have more data */ + + w = write(rep_send->sfd, "", 1); + + if (w == -1 && !(errno == EAGAIN || errno == EINTR)) { + fprintf(stderr,"replication: pipe write error %d\n", errno); + qi_free(q); + pthread_mutex_unlock(&replication_pipe_lock); + replication_dispatch_close(); + return -1; + } + + pthread_mutex_unlock(&replication_pipe_lock); + return 0; +} + +int replication(enum CMD_TYPE type, R_CMD *cmd) +{ + Q_ITEM *q; + + pthread_mutex_lock(&replication_pipe_lock); + if (!rep_send) { + pthread_mutex_unlock(&replication_pipe_lock); + return 0; + } + pthread_mutex_unlock(&replication_pipe_lock); + + if((q = qi_new(type, cmd, false))) { + replication_item(q); + }else{ + fprintf(stderr,"replication: can't create Q_ITEM\n"); + replication_dispatch_close(); + return(-1); + } + return(0); +} +#endif /* USE_REPLICATION */ diff -urN memcached-1.4.13/memcached.h memcached-1.4.13-repcached-2.3.1/memcached.h --- memcached-1.4.13/memcached.h Wed Jan 11 03:09:09 2012 +++ memcached-1.4.13-repcached-2.3.1/memcached.h Mon Feb 13 01:14:16 2012 @@ -151,6 +151,12 @@ conn_swallow, /**< swallowing unnecessary bytes w/o storing */ conn_closing, /**< closing this connection */ conn_mwrite, /**< writing out many items sequentially */ +#ifdef USE_REPLICATION + conn_repconnect, /**< replication connecting to master */ + conn_rep_listen, /**< replication listening socket */ + conn_pipe_recv, /**< replication command pipe recv */ + conn_pipe_send, /**< replication command pipe send */ +#endif /* USE_REPLICATION */ conn_max_state /**< Max state value (used for assertion) */ }; @@ -303,6 +309,11 @@ bool slab_reassign; /* Whether or not slab reassignment is allowed */ bool slab_automove; /* Whether or not to automatically move slabs */ int hashpower_init; /* Starting hash power level */ +#ifdef USE_REPLICATION + struct in_addr rep_addr; /* replication addr */ + int rep_port; /* replication port */ + int rep_qmax; /* replication QITEM max */ +#endif /*USE_REPLICATION*/ }; extern struct stats stats; @@ -317,6 +328,10 @@ #define ITEM_FETCHED 8 +#ifdef USE_REPLICATION +#define ITEM_REPDATA 128 +#endif /*USE_REPLICATION*/ + /** * Structure for storing items within memcached. */ @@ -498,6 +513,10 @@ #include "trace.h" #include "hash.h" #include "util.h" + +#ifdef USE_REPLICATION +#include "replication.h" +#endif /* USE_REPLICATION */ /* * Functions such as the libevent-related calls that need to do cross-thread diff -urN memcached-1.4.13/replication.c memcached-1.4.13-repcached-2.3.1/replication.c --- memcached-1.4.13/replication.c Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/replication.c Mon Feb 13 01:13:58 2012 @@ -0,0 +1,383 @@ +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * + */ +#include "memcached.h" +#include "replication.h" +#include <stdlib.h> +#include <stdio.h> +#include <unistd.h> +#include <string.h> +#include <errno.h> + +static Q_ITEM *q_head = NULL; +static Q_ITEM *q_tail = NULL; +static Q_ITEM *q_freelist = NULL; +static int q_itemcount = 0; +static pthread_mutex_t replication_queue_lock = PTHREAD_MUTEX_INITIALIZER; + +int get_qi_count(void) +{ + int c; + pthread_mutex_lock(&replication_queue_lock); + c = q_itemcount; + pthread_mutex_unlock(&replication_queue_lock); + return(c); +} + +Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool reuse) +{ + Q_ITEM *q = NULL; + char *key = NULL; + uint32_t keylen = 0; + rel_time_t time = 0; + + pthread_mutex_lock(&replication_queue_lock); + if(q_freelist){ + q = q_freelist; + q_freelist = q->next; + } + + if(NULL == q){ + if(reuse) { + pthread_mutex_unlock(&replication_queue_lock); + return(NULL); + } + if(q_itemcount >= settings.rep_qmax) { + pthread_mutex_unlock(&replication_queue_lock); + return(NULL); + } + q = malloc(sizeof(Q_ITEM)); + if (NULL == q){ + fprintf(stderr,"replication: qi_new out of memory\n"); + pthread_mutex_unlock(&replication_queue_lock); + return(NULL); + } + q_itemcount++; + if (settings.verbose > 2) + fprintf(stderr,"replication: alloc c=%d\n", q_itemcount); + } + + pthread_mutex_unlock(&replication_queue_lock); + + switch (type) { + case REPLICATION_REP: + case REPLICATION_DEL: + key = cmd->key; + keylen = cmd->keylen; + break; + case REPLICATION_FLUSH_ALL: + break; + case REPLICATION_DEFER_FLUSH_ALL: + time = cmd->time; + break; + case REPLICATION_MARUGOTO_END: + break; + default: + fprintf(stderr,"replication: got unknown command: %d\n", type); + return(NULL); + } + + q->key = NULL; + q->type = type; + q->time = time; + q->next = NULL; + if (keylen) { + q->key = malloc(keylen + 1); + if(NULL == q->key){ + qi_free(q); + q = NULL; + }else{ + memcpy(q->key, key, keylen); + *(q->key + keylen) = 0; + } + } + + return(q); +} + +void qi_free(Q_ITEM *q) +{ + if(q){ + if(q->key){ + free(q->key); + q->key = NULL; + } + pthread_mutex_lock(&replication_queue_lock); + q->next = q_freelist; + q_freelist = q; + pthread_mutex_unlock(&replication_queue_lock); + } +} + +int qi_free_list() +{ + int c = 0; + Q_ITEM *q = NULL; + + pthread_mutex_lock(&replication_queue_lock); + while((q = q_freelist)){ + q_itemcount--; + c++; + q_freelist = q->next; + free(q); + } + pthread_mutex_unlock(&replication_queue_lock); + return(c); +} + +void replication_queue_push(Q_ITEM *q) +{ + pthread_mutex_lock(&replication_queue_lock); + if (q_tail != NULL) { + q_tail->next = q; + } else { + q_head = q; + } + q_tail = q; + q->next = NULL; + pthread_mutex_unlock(&replication_queue_lock); +} + +Q_ITEM *replication_queue_pop(void) +{ + Q_ITEM *q; + + pthread_mutex_lock(&replication_queue_lock); + q = q_head; + if (q != NULL) { + q_head = q->next; + if (q->next == NULL) { + q_tail = NULL; + } + } + pthread_mutex_unlock(&replication_queue_lock); + return q; +} + +static int replication_get_num(char *p, int n) +{ + int l; + char b[64]; + if(p) + l = sprintf(p, "%u", n); + else + l = sprintf(b, "%u", n); + return(l); +} + +int replication_call_rep(char *key, size_t keylen) +{ + R_CMD r; + r.key = key; + r.keylen = keylen; + return(replication(REPLICATION_REP, &r)); +} + +int replication_call_del(char *key, size_t keylen) +{ + R_CMD r; + r.key = key; + r.keylen = keylen; + return(replication(REPLICATION_DEL, &r)); +} + +int replication_call_flush_all() +{ + R_CMD r; + r.key = NULL; + return(replication(REPLICATION_FLUSH_ALL, &r)); +} + +int replication_call_defer_flush_all(const rel_time_t time) +{ + R_CMD r; + r.key = NULL; + r.time = time; + return(replication(REPLICATION_DEFER_FLUSH_ALL, &r)); +} + +int replication_call_marugoto_end() +{ + R_CMD r; + r.key = NULL; + return(replication(REPLICATION_MARUGOTO_END, &r)); +} + +static int replication_alloc(conn *c, int s) +{ + char *p; + s += c->wbytes; + if(c->wsize < s + (c->wcurr - c->wbuf)){ + while(c->wsize < s) + c->wsize += 4096; + if((p = malloc(c->wsize))){ + memcpy(p, c->wcurr, c->wbytes); + free(c->wbuf); + c->wbuf = p; + c->wcurr = p; + }else{ + return(-1); + } + } + return(0); +} + +static int replication_del(conn *c, char *k) +{ + int l = 0; + char *s = "delete "; + char *n = "\r\n"; + char *p = NULL; + + l += strlen(s); + l += strlen(k); + l += strlen(n); + if(replication_alloc(c,l) == -1){ + fprintf(stderr, "replication: del malloc error\n"); + return(-1); + } + p = c->wcurr + c->wbytes; + memcpy(p, s, strlen(s)); + p += strlen(s); + memcpy(p, k, strlen(k)); + p += strlen(k); + memcpy(p, n, strlen(n)); + p += strlen(n); + c->wbytes = p - c->wcurr; + return(0); +} + +static int replication_rep(conn *c, item *it) +{ + int exp = 0; + int len = 0; + char *s = "rep "; + char *n = "\r\n"; + char *p = NULL; + char flag[40]; + + if(it->exptime) + exp = it->exptime + process_started; + flag[0]=0; + if((p=ITEM_suffix(it))){ + int i; + memcpy(flag, p, it->nsuffix - 2); + flag[it->nsuffix - 2] = 0; + for(i=0;i<strlen(flag);i++){ + if(flag[i] > ' ') + break; + } + memmove(flag,&flag[i],strlen(flag)-i); + for(p=flag;*p>' ';p++); + *p=0; + } + len += strlen(s); + len += it->nkey; + len += 1; + len += strlen(flag); + len += 1; + len += replication_get_num(NULL, exp); + len += 1; + len += replication_get_num(NULL, it->nbytes - 2); + len += 1; + len += replication_get_num(NULL, ITEM_get_cas(it)); + len += strlen(n); + len += it->nbytes; + len += strlen(n); + if(replication_alloc(c,len) == -1){ + fprintf(stderr, "replication: rep malloc error\n"); + return(-1); + } + p = c->wcurr + c->wbytes; + memcpy(p, s, strlen(s)); + p += strlen(s); + memcpy(p, ITEM_key(it), it->nkey); + p += it->nkey; + *(p++) = ' '; + memcpy(p, flag, strlen(flag)); + p += strlen(flag); + *(p++) = ' '; + p += replication_get_num(p, exp); + *(p++) = ' '; + p += replication_get_num(p, it->nbytes - 2); + *(p++) = ' '; + p += replication_get_num(p, ITEM_get_cas(it)); + memcpy(p, n, strlen(n)); + p += strlen(n); + memcpy(p, ITEM_data(it), it->nbytes); + p += it->nbytes; + c->wbytes = p - c->wcurr; + return(0); +} + +static int replication_flush_all(conn *c, rel_time_t exp) +{ + char *s = "flush_all "; + char *n = "\r\n"; + char *p = NULL; + + int l = strlen(s) + strlen(n); + if (exp > 0) + l += replication_get_num(NULL, exp); + if(replication_alloc(c,l) == -1){ + fprintf(stderr, "replication: flush_all malloc error\n"); + return(-1); + } + p = c->wcurr + c->wbytes; + memcpy(p, s, strlen(s)); + p += strlen(s); + if (exp > 0) + p += replication_get_num(p, exp); + memcpy(p, n, strlen(n)); + p += strlen(n); + c->wbytes = p - c->wcurr; + return(0); +} + +static int replication_marugoto_end(conn *c) +{ + char *s = "marugoto_end"; + char *n = "\r\n"; + char *p = NULL; + + int l = strlen(s) + strlen(n); + if(replication_alloc(c,l) == -1){ + fprintf(stderr, "replication: marugoto_end malloc error\n"); + return(-1); + } + p = c->wcurr + c->wbytes; + memcpy(p, s, strlen(s)); + p += strlen(s); + memcpy(p, n, strlen(n)); + p += strlen(n); + c->wbytes = p - c->wcurr; + return(0); +} + +int replication_cmd(conn *c, Q_ITEM *q) +{ + item *it; + int r; + + switch (q->type) { + case REPLICATION_REP: + it = item_get(q->key, strlen(q->key)); + if (!it) + return(replication_del(c, q->key)); + r = replication_rep(c, it); + item_remove(it); + return r; + case REPLICATION_DEL: + return(replication_del(c, q->key)); + case REPLICATION_FLUSH_ALL: + return(replication_flush_all(c, 0)); + case REPLICATION_DEFER_FLUSH_ALL: + return(replication_flush_all(c, q->time)); + case REPLICATION_MARUGOTO_END: + return(replication_marugoto_end(c)); + default: + fprintf(stderr,"replication: got unknown command:%d\n", q->type); + return(0); + } +} diff -urN memcached-1.4.13/replication.h memcached-1.4.13-repcached-2.3.1/replication.h --- memcached-1.4.13/replication.h Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/replication.h Mon Feb 13 01:13:58 2012 @@ -0,0 +1,45 @@ +#ifndef MEMCACHED_REPLICATION_H +#define MEMCACHED_REPLICATION_H +#define REPCACHED_VERSION "2.3.1" +#include <netdb.h> + +enum CMD_TYPE { + REPLICATION_REP, + REPLICATION_DEL, + REPLICATION_FLUSH_ALL, + REPLICATION_DEFER_FLUSH_ALL, + REPLICATION_MARUGOTO_END, +}; + +typedef struct queue_item_t Q_ITEM; +struct queue_item_t { + enum CMD_TYPE type; + char *key; + rel_time_t time; + Q_ITEM *next; +}; + +typedef struct replication_cmd_t R_CMD; +struct replication_cmd_t { + char *key; + int keylen; + rel_time_t time; +}; + +Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool); +void qi_free(Q_ITEM *); +int qi_free_list(void); +int replication_cmd(conn *, Q_ITEM *); +int get_qi_count(void); + +void replication_queue_push(Q_ITEM *); +Q_ITEM *replication_queue_pop(void); + +int replication_call_rep(char *key, size_t keylen); +int replication_call_del(char *key, size_t keylen); +int replication_call_flush_all(void); +int replication_call_defer_flush_all(const rel_time_t time); +int replication_call_marugoto_end(void); +int replication(enum CMD_TYPE type, R_CMD *cmd); + +#endif diff -urN memcached-1.4.13/t/binary.t memcached-1.4.13-repcached-2.3.1/t/binary.t --- memcached-1.4.13/t/binary.t Thu Jan 26 10:56:02 2012 +++ memcached-1.4.13-repcached-2.3.1/t/binary.t Mon Feb 13 01:16:42 2012 @@ -2,10 +2,12 @@ use strict; use warnings; -use Test::More tests => 3539; +use Test::More; use FindBin qw($Bin); use lib "$Bin/lib"; use MemcachedTest; + +Test::More::plan(tests => 3539 + (support_replication() ? 36 : 0)); my $server = new_memcached(); ok($server, "started the server"); diff -urN memcached-1.4.13/t/issue_67.t memcached-1.4.13-repcached-2.3.1/t/issue_67.t --- memcached-1.4.13/t/issue_67.t Fri Jan 6 22:19:50 2012 +++ memcached-1.4.13-repcached-2.3.1/t/issue_67.t Mon Feb 13 01:13:58 2012 @@ -41,6 +41,10 @@ my $exe = "$builddir/memcached-debug"; croak("memcached binary doesn't exist. Haven't run 'make' ?\n") unless -e $exe; + if (support_replication()) { + $args .= ' -X 0'; + } + my $childpid = fork(); my $root = ''; diff -urN memcached-1.4.13/t/lib/MemcachedTest.pm memcached-1.4.13-repcached-2.3.1/t/lib/MemcachedTest.pm --- memcached-1.4.13/t/lib/MemcachedTest.pm Fri Jan 6 22:19:50 2012 +++ memcached-1.4.13-repcached-2.3.1/t/lib/MemcachedTest.pm Mon Feb 13 01:13:58 2012 @@ -13,7 +13,8 @@ @EXPORT = qw(new_memcached sleep mem_get_is mem_gets mem_gets_is mem_stats - supports_sasl free_port); + supports_sasl free_port support_replication memcached_version + version2num); sub sleep { my $n = shift; @@ -148,6 +149,23 @@ return 0; } +sub support_replication { + my $output = `$builddir/memcached-debug -h`; + return 1 if $output =~ /^-x <ip_addr>/m; + return 0; +} + +sub memcached_version { + my $output = `$builddir/memcached-debug -h`; + return $1 if $output =~ /^memcached (\d[\d\.]+)/; + return 0; +} + +sub version2num { + my($major,$minor,$pl) = ($_[0] =~ /^(\d+)\.(\d+)\.(\d+)$/); + return $major*100**2 + $minor*100 + $pl +} + sub new_memcached { my ($args, $passed_port) = @_; my $port = $passed_port || free_port(); @@ -171,6 +189,9 @@ } if ($< == 0) { $args .= " -u root"; + } + if (support_replication() && $args !~ m/-X/) { + $args .= ' -X 0'; } my $childpid = fork(); diff -urN memcached-1.4.13/t/stats.t memcached-1.4.13-repcached-2.3.1/t/stats.t --- memcached-1.4.13/t/stats.t Fri Jan 6 22:19:50 2012 +++ memcached-1.4.13-repcached-2.3.1/t/stats.t Mon Feb 13 01:13:58 2012 @@ -58,7 +58,8 @@ my $stats = mem_stats($sock); # Test number of keys -is(scalar(keys(%$stats)), 48, "48 stats values"); +is(scalar(keys(%$stats)), 48 + (support_replication() ? 3 : 0), + "number of stats values"); # Test initial state foreach my $key (qw(curr_items total_items bytes cmd_get cmd_set get_hits evictions get_misses diff -urN memcached-1.4.13/t.rep/00-startup.t memcached-1.4.13-repcached-2.3.1/t.rep/00-startup.t --- memcached-1.4.13/t.rep/00-startup.t Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/00-startup.t Mon Feb 13 05:07:52 2012 @@ -0,0 +1,12 @@ +#!/usr/bin/perl + +use strict; +use Test::More qw(no_plan); +use FindBin qw($Bin); +use lib ("$Bin/lib", "$Bin/../t/lib"); +use RepcachedTest; + +my($m,$b) = new_repcached(); +ok($m, "started the master"); +ok($b, "started the backup"); + diff -urN memcached-1.4.13/t.rep/11-set.t memcached-1.4.13-repcached-2.3.1/t.rep/11-set.t --- memcached-1.4.13/t.rep/11-set.t Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/11-set.t Mon Feb 13 05:07:52 2012 @@ -0,0 +1,22 @@ +#!/usr/bin/perl + +use strict; +use Test::More qw(no_plan); +use FindBin qw($Bin); +use lib ("$Bin/lib", "$Bin/../t/lib"); +use RepcachedTest; + +my($m1, $m2) = new_repcached(); +my($sock_m1, $sock_m2) = ($m1->sock, $m2->sock); + +for my $sock ([$sock_m1, $sock_m2], [$sock_m2, $sock_m1]) { + my($sock_m, $sock_b) = ($sock->[0], $sock->[1]); + + my $uniq = int(rand()*100000); + my($key, $val) = ('settest'.$uniq, 'setval'.$uniq); + my $vallen = length $val; + + print $sock_m "set $key 0 0 $vallen\r\n$val\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored"); + sync_get_is($sock_m, $sock_b, $key, $val); +} diff -urN memcached-1.4.13/t.rep/12-add.t memcached-1.4.13-repcached-2.3.1/t.rep/12-add.t --- memcached-1.4.13/t.rep/12-add.t Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/12-add.t Mon Feb 13 05:07:52 2012 @@ -0,0 +1,29 @@ +#!/usr/bin/perl + +use strict; +use Test::More qw(no_plan); +use FindBin qw($Bin); +use lib ("$Bin/lib", "$Bin/../t/lib"); +use RepcachedTest; + +my($m1,$m2) = new_repcached(); +my($sock_m1, $sock_m2) = ($m1->sock, $m2->sock); + +for my $sock ([$sock_m1, $sock_m2], [$sock_m2, $sock_m1]) { + my($sock_m, $sock_b) = ($sock->[0], $sock->[1]); + + my $uniq = int(rand()*100000); + my($key, $val) = ('addtest'.$uniq, 'addval'.$uniq); + my $vallen = length $val; + + print $sock_m "add $key 0 0 $vallen\r\n$val\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored"); + sync_get_is($sock_m, $sock_b, $key, $val); + + my $val2 = 'addval2'.$uniq; + my $vallen2 = length $val2; + + print $sock_m "add $key 0 0 $vallen2\r\n$val2\r\n"; + is(scalar <$sock_m>, "NOT_STORED\r\n", "not stored"); + sync_get_is($sock_m, $sock_b, $key, $val); # not $val2 +} diff -urN memcached-1.4.13/t.rep/13-replace.t memcached-1.4.13-repcached-2.3.1/t.rep/13-replace.t --- memcached-1.4.13/t.rep/13-replace.t Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/13-replace.t Mon Feb 13 05:07:52 2012 @@ -0,0 +1,31 @@ +#!/usr/bin/perl + +use strict; +use Test::More qw(no_plan); +use FindBin qw($Bin); +use lib ("$Bin/lib", "$Bin/../t/lib"); +use RepcachedTest; + +my($m1, $m2) = new_repcached(); +my($sock_m1, $sock_m2) = ($m1->sock, $m2->sock); + +for my $sock ([$sock_m1, $sock_m2], [$sock_m2, $sock_m1]) { + my($sock_m, $sock_b) = ($sock->[0], $sock->[1]); + + my $uniq = int(rand()*100000); + my($key, $val) = ('replacetest'.$uniq, 'replaceval'.$uniq); + my $vallen = length $val; + + print $sock_m "replace $key 0 0 $vallen\r\n$val\r\n"; + is(scalar <$sock_m>, "NOT_STORED\r\n", "not stored"); + + my $val2 = 'setval_replace'.$$; + my $vallen2 = length $val2; + print $sock_m "set $key 0 0 $vallen2\r\n$val2\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored"); + sync_get_is($sock_m, $sock_b, $key, $val2); + + print $sock_m "replace $key 0 0 $vallen\r\n$val\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored"); + sync_get_is($sock_m, $sock_b, $key, $val); +} diff -urN memcached-1.4.13/t.rep/21-set-exptime.t memcached-1.4.13-repcached-2.3.1/t.rep/21-set-exptime.t --- memcached-1.4.13/t.rep/21-set-exptime.t Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/21-set-exptime.t Mon Feb 13 05:07:52 2012 @@ -0,0 +1,26 @@ +#!/usr/bin/perl + +use strict; +use Test::More qw(no_plan); +use FindBin qw($Bin); +use lib ("$Bin/lib", "$Bin/../t/lib"); +use RepcachedTest; + +my($m1, $m2) = new_repcached(); +my($sock_m1, $sock_m2) = ($m1->sock, $m2->sock); + +for my $sock ([$sock_m1, $sock_m2], [$sock_m2, $sock_m1]) { + my($sock_m, $sock_b) = ($sock->[0], $sock->[1]); + + my $uniq = int(rand()*100000); + my($key, $val) = ('settest'.$uniq, 'setval'.$uniq); + my $vallen = length $val; + + my $exptime = 3; + + print $sock_m "set $key 0 $exptime $vallen\r\n$val\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored"); + sync_get_is($sock_m, $sock_b, $key, $val); + sleep $exptime+1; + sync_get_is($sock_m, $sock_b, $key, undef) +} diff -urN memcached-1.4.13/t.rep/22-add-exptime.t memcached-1.4.13-repcached-2.3.1/t.rep/22-add-exptime.t --- memcached-1.4.13/t.rep/22-add-exptime.t Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/22-add-exptime.t Mon Feb 13 05:07:52 2012 @@ -0,0 +1,26 @@ +#!/usr/bin/perl + +use strict; +use Test::More qw(no_plan); +use FindBin qw($Bin); +use lib ("$Bin/lib", "$Bin/../t/lib"); +use RepcachedTest; + +my($m1, $m2) = new_repcached(); +my($sock_m1, $sock_m2) = ($m1->sock, $m2->sock); + +for my $sock ([$sock_m1, $sock_m2], [$sock_m2, $sock_m1]) { + my($sock_m, $sock_b) = ($sock->[0], $sock->[1]); + + my $uniq = int(rand()*100000); + my($key, $val) = ('addtest'.$uniq, 'addval'.$uniq); + my $vallen = length $val; + + my $exptime = 3; + + print $sock_m "add $key 0 $exptime $vallen\r\n$val\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored"); + sync_get_is($sock_m, $sock_b, $key, $val); + sleep $exptime+1; + sync_get_is($sock_m, $sock_b, $key, undef) +} diff -urN memcached-1.4.13/t.rep/23-replace-exptime.t memcached-1.4.13-repcached-2.3.1/t.rep/23-replace-exptime.t --- memcached-1.4.13/t.rep/23-replace-exptime.t Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/23-replace-exptime.t Mon Feb 13 05:07:52 2012 @@ -0,0 +1,32 @@ +#!/usr/bin/perl + +use strict; +use Test::More qw(no_plan); +use FindBin qw($Bin); +use lib ("$Bin/lib", "$Bin/../t/lib"); +use RepcachedTest; + +my($m1, $m2) = new_repcached(); +my($sock_m1, $sock_m2) = ($m1->sock, $m2->sock); + +for my $sock ([$sock_m1, $sock_m2], [$sock_m2, $sock_m1]) { + my($sock_m, $sock_b) = ($sock->[0], $sock->[1]); + + my $uniq = int(rand()*100000); + my($key, $val) = ('replacetest'.$uniq, 'setval_replace'.$uniq); + my $vallen = length $val; + + print $sock_m "set $key 0 0 $vallen\r\n$val\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored"); + sync_get_is($sock_m, $sock_b, $key, $val); + + my $exptime = 2; + my $val2 = 'replaceval'.$uniq; + my $vallen2 = length $val2; + + print $sock_m "replace $key 0 $exptime $vallen2\r\n$val2\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored"); + sync_get_is($sock_m, $sock_b, $key, $val2); + sleep $exptime+2; + sync_get_is($sock_m, $sock_b, $key, undef) +} diff -urN memcached-1.4.13/t.rep/24-touch.t memcached-1.4.13-repcached-2.3.1/t.rep/24-touch.t --- memcached-1.4.13/t.rep/24-touch.t Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/24-touch.t Mon Feb 13 05:07:52 2012 @@ -0,0 +1,34 @@ +#!/usr/bin/perl + +use strict; +use Test::More qw(no_plan); +use FindBin qw($Bin); +use lib ("$Bin/lib", "$Bin/../t/lib"); +use RepcachedTest; + +my($m1, $m2) = new_repcached(); +my($sock_m1, $sock_m2) = ($m1->sock, $m2->sock); + +for my $sock ([$sock_m1, $sock_m2], [$sock_m2, $sock_m1]) { + my($sock_m, $sock_b) = ($sock->[0], $sock->[1]); + + my $uniq = int(rand()*100000); + my($key, $val) = ('settest'.$uniq, 'setval'.$uniq); + my $vallen = length $val; + + my $exptime = 4; + + print $sock_m "set $key 0 $exptime $vallen\r\n$val\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored"); + sync_get_is($sock_m, $sock_b, $key, $val); + sleep 2; + print $sock_m "touch $key $exptime\r\n"; + is(scalar <$sock_m>, "TOUCHED\r\n", "touched 1"); + sleep 2; + print $sock_m "touch $key $exptime\r\n"; + is(scalar <$sock_m>, "TOUCHED\r\n", "touched 2"); + sleep 2; + sync_get_is($sock_m, $sock_b, $key, $val); + sleep $exptime+1; + sync_get_is($sock_m, $sock_b, $key, undef) +} diff -urN memcached-1.4.13/t.rep/31-delete.t memcached-1.4.13-repcached-2.3.1/t.rep/31-delete.t --- memcached-1.4.13/t.rep/31-delete.t Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/31-delete.t Mon Feb 13 05:07:52 2012 @@ -0,0 +1,26 @@ +#!/usr/bin/perl + +use strict; +use Test::More qw(no_plan); +use FindBin qw($Bin); +use lib ("$Bin/lib", "$Bin/../t/lib"); +use RepcachedTest; + +my($m1, $m2) = new_repcached(); +my($sock_m1, $sock_m2) = ($m1->sock, $m2->sock); + +for my $sock ([$sock_m1, $sock_m2], [$sock_m2, $sock_m1]) { + my($sock_m, $sock_b) = ($sock->[0], $sock->[1]); + + my $uniq = int(rand()*100000); + my($key, $val) = ('deletetest'.$uniq, 'deleteval'.$uniq); + my $vallen = length $val; + + print $sock_m "set $key 0 0 $vallen\r\n$val\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored"); + sync_get_is($sock_m, $sock_b, $key, $val); + + print $sock_m "delete $key\r\n"; + is(scalar <$sock_m>, "DELETED\r\n", "deleted"); + sync_get_is($sock_m, $sock_b, $key, undef) +} diff -urN memcached-1.4.13/t.rep/41-incrdecr.t memcached-1.4.13-repcached-2.3.1/t.rep/41-incrdecr.t --- memcached-1.4.13/t.rep/41-incrdecr.t Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/41-incrdecr.t Mon Feb 13 05:07:52 2012 @@ -0,0 +1,74 @@ +#!/usr/bin/perl + +use strict; +use Test::More qw(no_plan); +use FindBin qw($Bin); +use lib ("$Bin/lib", "$Bin/../t/lib"); +use RepcachedTest; + +my $version = memcached_version(); +my $version_num = version2num($version); +my $support_overflow = $version_num >= version2num('1.2.4') ? 1 : 0; + +my($m1, $m2) = new_repcached(); +my($sock_m1, $sock_m2) = ($m1->sock, $m2->sock); + +for my $sock ([$sock_m1, $sock_m2], [$sock_m2, $sock_m1]) { + my($sock_m, $sock_b) = ($sock->[0], $sock->[1]); + + print $sock_m "set num 0 0 1\r\n1\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored num"); + sync_get_is($sock_m, $sock_b, "num", 1, "stored 1"); + + print $sock_m "incr num 1\r\n"; + is(scalar <$sock_m>, "2\r\n", "+ 1 = 2"); + sync_get_is($sock_m, $sock_b, "num", 2); + + print $sock_m "incr num 8\r\n"; + is(scalar <$sock_m>, "10\r\n", "+ 8 = 10"); + sync_get_is($sock_m, $sock_b, "num", 10); + + print $sock_m "decr num 1\r\n"; + is(scalar <$sock_m>, "9\r\n", "- 1 = 9"); + #sync_get_is($sock_m, $sock_b, "num", '9 '); + sync_get_is($sock_m, $sock_b, "num", '9'); + + print $sock_m "decr num 9\r\n"; + is(scalar <$sock_m>, "0\r\n", "- 9 = 0"); + #sync_get_is($sock_m, $sock_b, "num", '0 '); + sync_get_is($sock_m, $sock_b, "num", '0'); + + print $sock_m "decr num 5\r\n"; + is(scalar <$sock_m>, "0\r\n", "- 5 = 0"); + #sync_get_is($sock_m, $sock_b, "num", '0 '); + sync_get_is($sock_m, $sock_b, "num", '0'); + + printf $sock_m "set num 0 0 10\r\n4294967296\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored 2**32"); + + if ($support_overflow) { + print $sock_m "incr num 1\r\n"; + is(scalar <$sock_m>, "4294967297\r\n", "4294967296 + 1 = 4294967297"); + sync_get_is($sock_m, $sock_b, "num", 4294967297); + + printf $sock_m "set num 0 0 %d\r\n18446744073709551615\r\n", length("18446744073709551615"); + is(scalar <$sock_m>, "STORED\r\n", "stored 2**64-1"); + sync_get_is($sock_m, $sock_b, "num", '18446744073709551615'); + + print $sock_m "incr num 1\r\n"; + is(scalar <$sock_m>, "0\r\n", "(2**64 - 1) + 1 = 0"); + #sync_get_is($sock_m, $sock_b, "num", sprintf('%-20s','0')); + sync_get_is($sock_m, $sock_b, "num", '0'); + } + + print $sock_m "decr bogus 5\r\n"; + is(scalar <$sock_m>, "NOT_FOUND\r\n", "can't decr bogus key"); + + print $sock_m "decr incr 5\r\n"; + is(scalar <$sock_m>, "NOT_FOUND\r\n", "can't incr bogus key"); + + print $sock_m "set text 0 0 2\r\nhi\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored text"); + print $sock_m "incr text 1\r\n"; + like(scalar <$sock_m>, qr/CLIENT_ERROR/, "hi - 1 = error"); +} diff -urN memcached-1.4.13/t.rep/51-flush-all.t memcached-1.4.13-repcached-2.3.1/t.rep/51-flush-all.t --- memcached-1.4.13/t.rep/51-flush-all.t Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/51-flush-all.t Mon Feb 13 05:07:52 2012 @@ -0,0 +1,33 @@ +#!/usr/bin/perl + +use strict; +use Test::More qw(no_plan); +use FindBin qw($Bin); +use lib ("$Bin/lib", "$Bin/../t/lib"); +use RepcachedTest; + +my($m1, $m2) = new_repcached(); +my($sock_m1, $sock_m2) = ($m1->sock, $m2->sock); + +for my $sock ([$sock_m1, $sock_m2], [$sock_m2, $sock_m1]) { + my($sock_m, $sock_b) = ($sock->[0], $sock->[1]); + + print $sock_m "set foo 0 0 6\r\nfooval\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored foo"); + sync_get_is($sock_m, $sock_b, "foo", "fooval"); + + print $sock_m "flush_all\r\n"; + is(scalar <$sock_m>, "OK\r\n", "did flush_all"); + sync_get_is($sock_m, $sock_b, "foo", undef); + + # and the other form, specifying a flush_all time... + my $expire = 4; + print $sock_m "flush_all $expire\r\n"; + is(scalar <$sock_m>, "OK\r\n", "did flush_all in future"); + + print $sock_m "set foo 0 0 4\r\n1234\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored foo = '1234'"); + sync_get_is($sock_m, $sock_b, "foo", '1234'); + sleep $expire+1; + sync_get_is($sock_m, $sock_b, "foo", undef); +} diff -urN memcached-1.4.13/t.rep/61-append-prepend.t memcached-1.4.13-repcached-2.3.1/t.rep/61-append-prepend.t --- memcached-1.4.13/t.rep/61-append-prepend.t Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/61-append-prepend.t Mon Feb 13 05:07:52 2012 @@ -0,0 +1,39 @@ +#!/usr/bin/perl + +use strict; +use Test::More qw(no_plan); +use FindBin qw($Bin); +use lib ("$Bin/lib", "$Bin/../t/lib"); +use RepcachedTest; + +my($m1, $m2) = new_repcached(); +my($sock_m1, $sock_m2) = ($m1->sock, $m2->sock); + +for my $sock ([$sock_m1, $sock_m2], [$sock_m2, $sock_m1]) { + my($sock_m, $sock_b) = ($sock->[0], $sock->[1]); + + my $uniq = int(rand()*100000); + my $key = 'append-prepend-test'.$uniq; + my($prepend, $base, $append) = ('PPP', 'BASE'.$uniq, 'AAA'); #val + + print $sock_m "append $key 0 0 ".length($append)."\r\n$append\r\n"; + is(scalar <$sock_m>, "NOT_STORED\r\n", "not stored"); + sync_get_is($sock_m, $sock_b, $key, undef); + + print $sock_m "prepend $key 0 0 ".length($prepend)."\r\n$prepend\r\n"; + is(scalar <$sock_m>, "NOT_STORED\r\n", "not stored"); + sync_get_is($sock_m, $sock_b, $key, undef); + + + print $sock_m "set $key 0 0 ".length($base)."\r\n$base\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored"); + + + print $sock_m "append $key 0 0 ".length($append)."\r\n$append\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored"); + sync_get_is($sock_m, $sock_b, $key, $base.$append); + + print $sock_m "prepend $key 0 0 ".length($prepend)."\r\n$prepend\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored"); + sync_get_is($sock_m, $sock_b, $key, $prepend.$base.$append); +} diff -urN memcached-1.4.13/t.rep/62-cas.t memcached-1.4.13-repcached-2.3.1/t.rep/62-cas.t --- memcached-1.4.13/t.rep/62-cas.t Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/62-cas.t Mon Feb 13 05:07:52 2012 @@ -0,0 +1,118 @@ +#!/usr/bin/perl + +use strict; +use Test::More qw(no_plan); +use FindBin qw($Bin); +use lib ("$Bin/lib", "$Bin/../t/lib"); +use RepcachedTest; + +my($m1, $m2) = new_repcached(); +my($sock_m1, $sock_m2) = ($m1->sock, $m2->sock); + +for my $sock ([$sock_m1, $sock_m2], [$sock_m2, $sock_m1]) { + my($sock_m, $sock_b) = ($sock->[0], $sock->[1]); + + my $key = 'castest'; + my $val = 'casval'; + my $val2 = 'casval2'; + + # gets foo (should not exist) + print $sock_m "gets $key\r\n"; + is(scalar <$sock_m>, "END\r\n", "not stored"); + sync_get_is($sock_m, $sock_b, $key, undef); + + # set foo + print $sock_m "set $key 0 0 ".length($val)."\r\n$val\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "stored"); + sync_get_is($sock_m, $sock_b, $key, $val); + + # cas fail + print $sock_m "cas $key 0 0 ".length($val2)." 123\r\n$val2\r\n"; + is(scalar <$sock_m>, "EXISTS\r\n", "cas failed for $key"); + + # gets foo - success + my @result; + @result = mem_gets($sock_m, $key); + mem_gets_is($sock_m,$result[0],$key,$val); + + @result = mem_gets($sock_b, $key); + mem_gets_is($sock_b,$result[0],$key,$val); + + # cas success + print $sock_m "cas $key 0 0 ".length($val2)." $result[0]\r\n$val2\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "cas success, set $key"); + sync_get_is($sock_m, $sock_b, $key, $val2); + + # cas failure (reusing the same key) + print $sock_m "cas $key 0 0 ".length($val2)." $result[0]\r\n$val2\r\n"; + is(scalar <$sock_m>, "EXISTS\r\n", "reusing a CAS ID"); + + # delete foo + print $sock_m "delete $key\r\n"; + is(scalar <$sock_m>, "DELETED\r\n", "deleted $key"); + sync_get_is($sock_m, $sock_b, $key, undef); + + # cas missing + print $sock_m "cas $key 0 0 ".length($val2)." $result[0]\r\n$val2\r\n"; + is(scalar <$sock_m>, "NOT_FOUND\r\n", "cas failed, $key does not exist"); + + # cas empty + print $sock_m "cas $key 0 0 ".length($val2)." \r\n$val2\r\n"; + is(scalar <$sock_m>, "ERROR\r\n", "cas empty, throw error"); + # cant parse barval2\r\n + is(scalar <$sock_m>, "ERROR\r\n", "error out on barval2 parsing"); + + # set foo1 + print $sock_m "set foo1 0 0 1\r\n1\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "foo1"); + sync_get_is($sock_m, $sock_b, 'foo1', '1'); + # set foo2 + print $sock_m "set foo2 0 0 1\r\n2\r\n"; + is(scalar <$sock_m>, "STORED\r\n", "foo1"); + sync_get_is($sock_m, $sock_b, 'foo2', '2'); + + # gets foo1 check + print $sock_m "gets foo1\r\n"; + ok(scalar <$sock_m> =~ /VALUE foo1 0 1 (\d+)\r\n/, "gets foo1 regexp success"); + my $foo1_cas_m = $1; + is(scalar <$sock_m>, "1\r\n","gets foo1 data is 1"); + is(scalar <$sock_m>, "END\r\n","gets foo1 END"); + + print $sock_b "gets foo1\r\n"; + ok(scalar <$sock_b> =~ /VALUE foo1 0 1 (\d+)\r\n/, "gets foo1 regexp success"); + my $foo1_cas_b = $1; + is(scalar <$sock_b>, "1\r\n","gets foo1 data is 1"); + is(scalar <$sock_b>, "END\r\n","gets foo1 END"); + + # gets foo2 check + print $sock_m "gets foo2\r\n"; + ok(scalar <$sock_m> =~ /VALUE foo2 0 1 (\d+)\r\n/,"gets foo2 regexp success"); + my $foo2_cas_m = $1; + is(scalar <$sock_m>, "2\r\n","gets foo2 data is 2"); + is(scalar <$sock_m>, "END\r\n","gets foo2 END"); + + print $sock_b "gets foo2\r\n"; + ok(scalar <$sock_b> =~ /VALUE foo2 0 1 (\d+)\r\n/,"gets foo2 regexp success"); + my $foo2_cas_b = $1; + is(scalar <$sock_b>, "2\r\n","gets foo2 data is 2"); + is(scalar <$sock_b>, "END\r\n","gets foo2 END"); + + # validate foo1 != foo2 + ok($foo1_cas_m = $foo1_cas_b,"foo1 = foo1 between master and backup"); + ok($foo2_cas_m = $foo2_cas_b,"foo2 = foo2 between master and backup"); + ok($foo1_cas_m != $foo2_cas_m,"foo1 != foo2 single-gets success"); + ok($foo1_cas_b != $foo2_cas_b,"foo1 != foo2 single-gets success"); + + # multi-gets + print $sock_m "gets foo1 foo2\r\n"; + ok(scalar <$sock_m> =~ /VALUE foo1 0 1 (\d+)\r\n/, "validating first set of data is foo1"); + $foo1_cas_m = $1; + is(scalar <$sock_m>, "1\r\n",, "validating foo1 set of data is 1"); + ok(scalar <$sock_m> =~ /VALUE foo2 0 1 (\d+)\r\n/, "validating second set of data is foo2"); + $foo2_cas_m = $1; + is(scalar <$sock_m>, "2\r\n", "validating foo2 set of data is 2"); + is(scalar <$sock_m>, "END\r\n","validating foo1,foo2 gets is over - END"); + + # validate foo1 != foo2 + ok($foo1_cas_m != $foo2_cas_m, "foo1 != foo2 multi-gets success"); +} diff -urN memcached-1.4.13/t.rep/71-cpuhog.t memcached-1.4.13-repcached-2.3.1/t.rep/71-cpuhog.t --- memcached-1.4.13/t.rep/71-cpuhog.t Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/71-cpuhog.t Mon Feb 13 05:07:52 2012 @@ -0,0 +1,50 @@ +#!/usr/bin/perl + +use strict; +use warnings; + +use Test::More qw(no_plan); +use FindBin qw($Bin); +use lib ("$Bin/lib", "$Bin/../t/lib"); +use RepcachedTest; + +my($m1, $m2) = new_repcached(); +my($sock_m1, $sock_m2) = ($m1->sock, $m2->sock); + +# emulate transient network problems by stopping processes +# stop second process, fill buffers on first one +# XXX use (pid + 1) to bypass timedrun, unsafe +kill 'STOP', $m2->{pid} + 1; + +my ($uniq, $key, $val, $vallen); + +for my $i (1..1000) { + $uniq = int(rand()*100000); + $key = 'settest_1_' . $i; + $val = 'setval' . ('0' x 1024) . $uniq; + $vallen = length $val; + print $sock_m1 "set $key 0 0 $vallen\r\n$val\r\n"; + is(scalar <$sock_m1>, "STORED\r\n", "stored"); +} + +# stop first process, fill buffers on second one +kill 'STOP', $m1->{pid} + 1; +kill 'CONT', $m2->{pid} + 1; + +for my $i (1..1000) { + $uniq = int(rand()*100000); + $key = 'settest_2_' . $i; + $val = 'setval' . ('0' x 1024) . $uniq; + $vallen = length $val; + print $sock_m2 "set $key 0 0 $vallen\r\n$val\r\n"; + is(scalar <$sock_m2>, "STORED\r\n", "stored"); +} + +# release first process +kill 'CONT', $m1->{pid} + 1; + +# wait for a while, as normal sync time may not be enough +sleep(1); + +mem_get_is($sock_m1, $key, $val, "get $key from 1st"); +mem_get_is($sock_m2, $key, $val, "get $key from 2nd"); diff -urN memcached-1.4.13/t.rep/72-shutdown-sync.t memcached-1.4.13-repcached-2.3.1/t.rep/72-shutdown-sync.t --- memcached-1.4.13/t.rep/72-shutdown-sync.t Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/72-shutdown-sync.t Mon Feb 13 05:07:52 2012 @@ -0,0 +1,43 @@ +#!/usr/bin/perl + +use strict; +use warnings; + +use Test::More qw(no_plan); +use FindBin qw($Bin); +use lib ("$Bin/lib", "$Bin/../t/lib"); +use RepcachedTest; + +my($m1, $m2) = new_repcached(); +my($sock_m1, $sock_m2) = ($m1->sock, $m2->sock); + +# stop second process, fill buffers on first one +# XXX use (pid + 1) to bypass timedrun, unsafe +kill 'STOP', $m2->{pid} + 1; + +my ($uniq, $key, $val, $vallen); + +for my $i (1..10) { + $key = 'settest_1_' . $i; + $val = 'setval' . $i . ('0' x (1000*1024)); + $vallen = length $val; + print $sock_m1 "set $key 0 0 $vallen\r\n$val\r\n"; + is(scalar <$sock_m1>, "STORED\r\n", "stored $i"); +} + +# ask first process to shutdown, and release second one +kill 'INT', $m1->{pid} + 1; +kill 'CONT', $m2->{pid} + 1; + +# wait for a while +sleep(2); + +for my $i (1..10) { + $key = 'settest_1_' . $i; + my $expected = 'setval' . $i . ('0' x (1000*1024)) . "\r\nEND\r\n"; + print $sock_m2 "get $key\r\n"; + my $r = <$sock_m2>; + $r = <$sock_m2> . <$sock_m2> if $r =~ /VALUE/; + is(substr($r, 0, 20), substr($expected, 0, 20), "get $i"); + is(length($r), length($expected), "get $i length"); +} diff -urN memcached-1.4.13/t.rep/73-cpuhog-pipe.t memcached-1.4.13-repcached-2.3.1/t.rep/73-cpuhog-pipe.t --- memcached-1.4.13/t.rep/73-cpuhog-pipe.t Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/73-cpuhog-pipe.t Mon Feb 13 05:07:52 2012 @@ -0,0 +1,55 @@ +#!/usr/bin/perl + +use strict; +use warnings; + +use Test::More qw(no_plan); +use FindBin qw($Bin); +use lib ("$Bin/lib", "$Bin/../t/lib"); +use RepcachedTest; + +my($m1, $m2) = new_repcached(); +my($sock_m1, $sock_m2) = ($m1->sock, $m2->sock); + +# emulate transient network problems by stopping processes +# stop second process, fill buffers on first one +# XXX use (pid + 1) to bypass timedrun, unsafe +kill 'STOP', $m2->{pid} + 1; + +my ($uniq, $key, $val, $vallen); + +# we should fill send buffer and internal pipe buffer +# typical 64k pipe buffer on 32-bit platform requires 16k items +# to be filled (64k / sizeof(void*)) + +for my $i (1..20000) { + $uniq = int(rand()*100000); + $key = 'settest_1_' . $i; + $val = 'setval' . ('0' x 1024) . $uniq; + $vallen = length $val; + print $sock_m1 "set $key 0 0 $vallen\r\n$val\r\n"; + is(scalar <$sock_m1>, "STORED\r\n", "stored"); +} + +# release second process +kill 'CONT', $m2->{pid} + 1; + +# wait for a while, as normal sync time may not be enough +sleep(10); + +mem_get_is($sock_m1, $key, $val, "get $key from 1st"); +mem_get_is($sock_m2, $key, $val, "get $key from 2nd"); + +# another item; it should push everything if not yet + +$uniq = int(rand()*100000); +$key = 'settest_1_fini'; +$val = 'setval' . ('0' x 1024) . $uniq; +$vallen = length $val; +print $sock_m1 "set $key 0 0 $vallen\r\n$val\r\n"; +is(scalar <$sock_m1>, "STORED\r\n", "stored"); + +sleep(1); + +mem_get_is($sock_m1, $key, $val, "get $key from 1st"); +mem_get_is($sock_m2, $key, $val, "get $key from 2nd"); diff -urN memcached-1.4.13/t.rep/README memcached-1.4.13-repcached-2.3.1/t.rep/README --- memcached-1.4.13/t.rep/README Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/README Mon Feb 13 05:07:52 2012 @@ -0,0 +1,4 @@ +./configure --enable-replication +make +make test +prove t.rep diff -urN memcached-1.4.13/t.rep/lib/RepcachedTest.pm memcached-1.4.13-repcached-2.3.1/t.rep/lib/RepcachedTest.pm --- memcached-1.4.13/t.rep/lib/RepcachedTest.pm Thu Jan 1 03:00:00 1970 +++ memcached-1.4.13-repcached-2.3.1/t.rep/lib/RepcachedTest.pm Mon Feb 13 05:07:52 2012 @@ -0,0 +1,30 @@ +package RepcachedTest; +use strict; +use warnings; +use Carp; +use MemcachedTest; +use base qw(MemcachedTest); +use Exporter qw(import); +our @EXPORT = (@MemcachedTest::EXPORT, + qw(new_repcached sync_get_is + )); +use Time::HiRes; + +our $REP_LAG = 0.8; + +sub new_repcached() { + my ($port_master,$port_backup,$port_rep) = (free_port(),free_port(),free_port()); + return ( + new_memcached("-l 127.0.0.1 -x 127.0.0.1 -X $port_rep -q 65536", $port_master), + new_memcached("-l 127.0.0.1 -x 127.0.0.1 -X $port_rep -q 65536", $port_backup), + ); +} + +sub sync_get_is { + my($sock_m,$sock_b) = (shift, shift); + mem_get_is($sock_m, @_); + Time::HiRes::sleep $REP_LAG; + mem_get_is($sock_b, @_); +} + +1; diff -urN memcached-1.4.13/testapp.c memcached-1.4.13-repcached-2.3.1/testapp.c --- memcached-1.4.13/testapp.c Fri Jan 6 22:19:50 2012 +++ memcached-1.4.13-repcached-2.3.1/testapp.c Mon Feb 13 01:13:58 2012 @@ -338,6 +338,10 @@ argv[arg++] = "-1"; argv[arg++] = "-U"; argv[arg++] = "0"; +#ifdef USE_REPLICATION + argv[arg++] = "-X"; + argv[arg++] = "0"; +#endif /* Handle rpmbuild and the like doing this as root */ if (getuid() == 0) { argv[arg++] = "-u";