分享IT技术,分享生活感悟,热爱摄影,热爱航天。
目前的HTTP服务器都能够支持对响应内容的gzip处理,即当请求头中含有Accept-Encoding: gzip时,会返回gzip压缩过的响应,并且响应头中包含Content-Encoding: gzip,例如以下的请求和响应头
GET / HTTP/1.1 User-Agent: curl/7.39.0 Host: localhost Accept: */* Accept-Encoding: gzip HTTP/1.1 200 OK Date: Fri, 08 Apr 2016 14:34:39 GMT Server: Apache/2.4.18 (Unix) PHP/7.0.5 X-Powered-By: PHP/7.0.5 Vary: Accept-Encoding Content-Encoding: gzip Content-Length: 22578 Content-Type: text/html; charset=UTF-8
而有时候我们希望在请求中发送的内容也使用gzip压缩,虽然浏览器都不支持这一特性,但在手机App中可以使用gzip压缩提交的数据来提高速度也帮助用户节约数据流量。同时我们又希望这一压缩过程对于服务器端的代码是透明的,即Web服务器对收到的内容进行解压。
POST / HTTP/1.1 User-Agent: curl/7.39.0 Host: localhost Accept: */* Content-Encoding: gzip Content-Length: 32 Content-Type: application/x-www-form-urlencoded HTTP/1.1 200 OK Date: Fri, 08 Apr 2016 14:43:09 GMT Server: Apache/2.4.18 (Unix) PHP/7.0.5 X-Powered-By: PHP/7.0.5 Content-Length: 14 Content-Type: text/html; charset=UTF-8
在Apache中只需要增加SetInputFilter deflate,即可实现这一效果。而Nginx中并没有实现这一功能的模块,需要借助Lua来实现这一功能。 在需要使用请求压缩的位置增加以下配置
--开始处加入 init_by_lua ' zlib = require("zlib"); '; --需要接受压缩请求的位置加入 rewrite_by_lua ' local content_encoding = ngx.req.get_headers()["Content-Encoding"]; if content_encoding == "gzip" then ngx.req.read_body(); local data = ngx.req.get_body_data(); if data ~= nil then local inflated = zlib.inflate()(data); ngx.req.clear_header("Content-Encoding"); ngx.req.clear_header("Content-Length"); ngx.req.set_body_data(inflated); end end ';
另外需要对Lua的zlib模块进行简单的修改,当发送的内容不是gzip格式时,zlib模块会直接报错退出导致Nginx出现服务器错误,解决方式是取消模块中的报错退出
diff --git a/lua_zlib.c b/lua_zlib.c index 84d1721..6ebfa47 100644 --- a/lua_zlib.c +++ b/lua_zlib.c @@ -92,7 +92,7 @@ static int lz_assert(lua_State *L, int result, const z_stream* stream, const cha lua_pushfstring(L, "ZLibError: unknown code %d (%s) at %s line %d", result, stream->msg, file, line); } - lua_error(L); + //lua_error(L); return result; }
在对HTTP内容的缓存过程中,对于压缩的处理方式一般是保存两份,或是保存一份不要缩的内容然后根据客户端的情况来选择进行压缩或者不进行压缩,然而如今一般的客户端都是支持压缩的,如果能够只缓存一份压缩的内容然后只对不支持压缩的客户端进行内容的解压缩,则能够节约不少的缓存空间。
传统的Nginx+Memcache缓存的配置如下,其中通过gzip hack强制后段换回不压缩的内容,然后再根据客户端的情况进行压缩
location / { gzip on; set $key $host$request_uri; srcache_fetch GET /memc $key; srcache_store PUT /memc $key; proxy_set_header Accept-Encoding ""; proxy_pass http://127.0.0.1:8080; } location /memc { internal; memc_connect_timeout 100ms; memc_send_timeout 100ms; memc_read_timeout 100ms; memc_ignore_client_abort on; set $memc_key $query_string; set $memc_exptime 300; memc_pass 127.0.0.1:11211; }
而实际可以通过Nginx gunzip模块实现根据客户端的情况将保存的压缩内容进行解压处理,其配置和上面非常相似,具体如下
location / { gunzip on; set $key $host$request_uri; srcache_fetch GET /memc $key; srcache_store PUT /memc $key; proxy_set_header Accept-Encoding "gzip"; proxy_pass http://127.0.0.1:8080; } location /memc { internal; memc_connect_timeout 100ms; memc_send_timeout 100ms; memc_read_timeout 100ms; memc_ignore_client_abort on; set $memc_key $query_string; set $memc_exptime 300; memc_pass 127.0.0.1:11211; }
这里还需要处理一个问题,srcache模块对于返回带有Content-Encoding响应头的内容会强制不予缓存,需要在其代码中去掉这一逻辑
diff --git a/src/ngx_http_srcache_store.c b/src/ngx_http_srcache_store.c index ed96ff2..27f78db 100644 --- a/src/ngx_http_srcache_store.c +++ b/src/ngx_http_srcache_store.c @@ -150,7 +150,7 @@ ngx_http_srcache_header_filter(ngx_http_request_t *r) return ngx_http_srcache_next_header_filter(r); } #endif - + /* if (!slcf->ignore_content_encoding && r->headers_out.content_encoding && r->headers_out.content_encoding->value.len) @@ -162,7 +162,7 @@ ngx_http_srcache_header_filter(ngx_http_request_t *r) &r->headers_out.content_encoding->value); return ngx_http_srcache_next_header_filter(r); - } + }*/ if (slcf->resp_cache_control && ngx_http_srcache_response_no_cache(r, slcf, ctx) == NGX_OK)
如果在应用中同时存在Java和PHP两套系统,并且需要共享数据,这种情况下MySQL是比较容易处理的,而Memcache则会面临两个问题,首先采用了基于一致性散列算法的分布式架构下需要Java和PHP使用同一种算法,已保证可以根据键找到对应的Memcache实例,另外就是数据存储格式上需要具有一致性,一般都用JSON格式序列化即可,但还需要保证存储时压缩算法和标志位使用的一致性,才能正确的处理压缩过的数据。
对于PHP目前使用较为广泛的是php-memcached扩展,其使用的一致性散列算法为Ketama,是一种简单并且广泛支持的算法,其它大多语言中都有对此算法的支持(Python Perl Go等)。为此需要Java中使用支持这一算法的连接驱动,这里尝试使用了xmemcached。
PHP通过php-memcached扩展访问Memcache
<?php $mem = new Memcached(); $mem->setOption(Memcached::DISTRIBUTION_CONSISTENT, true); $mem->setOption(Memcached::OPT_LIBKETAMA_COMPATIBLE, true); $mem->addServers(array( array('127.0.0.1', 11211), array('127.0.0.1', 11212), array('127.0.0.1', 11213), array('127.0.0.1', 11214), ));
Java通过xmemcached访问Memcache
try { XMemcachedClientBuilder builder = new XMemcachedClientBuilder(); //注意这里需要开启cwNginxUpstreamConsistent方式,兼容libmemcached中的11211 hack builder.setSessionLocator(new KetamaMemcachedSessionLocator(true)); client = builder.build(); client.addServer("127.0.0.1", 11211); client.addServer("127.0.0.1", 11212); client.addServer("127.0.0.1", 11213); client.addServer("127.0.0.1", 11214); client.shutdown(); } catch (Exception ex) { ex.printStackTrace(); }
这里简单说明一下libmemcached中的11211 hack,其为了让不加端口号的情况与使用默认11211端口号时保持一致性,其在端口号为11211时会自动去掉:11211,为此在使用很多其它的ketama库时会发现散列结果不一致,其大多是因为这个逻辑导致的,为了保持兼容在使用时也做相同的处理即可,具体逻辑可以参考以下libmemcached的源代码。
if (list[host_index].port() == MEMCACHED_DEFAULT_PORT) { sort_host_length= snprintf(sort_host, sizeof(sort_host), "%s-%u", list[host_index]._hostname, pointer_index - 1); } else { sort_host_length= snprintf(sort_host, sizeof(sort_host), "%s:%u-%u", list[host_index]._hostname, (uint32_t)list[host_index].port(), pointer_index - 1); }
在xmemcached中带权重的一致性散列算法的处理方法是将当前节点的虚节点个数(默认为160)直接乘以其权重,而在libmemcached中除了乘以了权重之外,还乘以了总节点个数/总权重值(好处是只要各节点权重比相同,散列的结果就是相同的)。为了保证散列结果的一致性,需要修改xmemcached的源代码(或者增加一个散列算法类型 AbstractMemcachedSessionLocator),其patch如下
diff --git a/src/main/java/net/rubyeye/xmemcached/impl/KetamaMemcachedSessionLocator.java b/src/main/java/net/rubyeye/xmemcached/impl/KetamaMemcachedSessionLocator.java index 2e9548d..2c5a191 100644 --- a/src/main/java/net/rubyeye/xmemcached/impl/KetamaMemcachedSessionLocator.java +++ b/src/main/java/net/rubyeye/xmemcached/impl/KetamaMemcachedSessionLocator.java @@ -91,7 +91,12 @@ AbstractMemcachedSessionLocator { private final void buildMap(Collection<Session> list, HashAlgorithm alg) { TreeMap<Long, List<Session>> sessionMap = new TreeMap<Long, List<Session>>(); - + int totalWeight = 0; + for (Session session : list) { + if (session instanceof MemcachedTCPSession) { + totalWeight += ((MemcachedSession) session).getWeight(); + } + } for (Session session : list) { String sockStr = null; if (this.cwNginxUpstreamConsistent) { @@ -117,7 +122,9 @@ AbstractMemcachedSessionLocator { */ int numReps = NUM_REPS; if (session instanceof MemcachedTCPSession) { - numReps *= ((MemcachedSession) session).getWeight(); + int weight = ((MemcachedSession) session).getWeight(); + float pct = (float) weight / (float) totalWeight; + numReps = (int) ((Math.floor((float)(pct * NUM_REPS / 4 * list.size() + 0.0000000001))) * 4); } if (alg == HashAlgorithm.KETAMA_HASH) { for (int i = 0; i < numReps / 4; i++) {
在较早版本的php-memcached中使用的是zlib进行的数据压缩,新版本中默认使用fastlz,这里为了保证兼容性依然使用zlib进行数据压缩,同时Java中默认就可以支持zlib压缩。同时在php-memcached的不同版本中有两套标志位的用法,这里需要分别予以支持。
xmemcached中默认情况下进行数据的读写时会采用其自身的序列化和标志位的规则进行处理,但可以通过传递Transcoder自行定制对原始数据在存储和获取时的处理行为。具体的处理方法如下
//定义针对PHP的数据处理器 abstract class PHPTranscoder extends PrimitiveTypeTranscoder<String> { } //针对旧版本的php-memcached,由于在PHP5中使用了这个版本,所以叫做PHP5 class PHP5Transcoder extends PHPTranscoder { //使用压缩的最小长度 final int COMPRESSION_THRESHOLD = 100; //压缩的标志位 final int COMPRESSION_MASK = 1 << 1; //取出数据的处理 public String decode(CachedData d) { byte[] data = d.getData(); int flag = d.getFlag(); //如果带有压缩标志位,则进行解压缩处理 if ((flag & COMPRESSION_MASK) == COMPRESSION_MASK) { setCompressionMode(CompressionMode.ZIP); data = decompress(data); } return new String(data); } //存储数据的处理 public CachedData encode(String o) { byte[] data = o.getBytes(); int flag = 0; //如果数据超过了压缩的最小长度,则进行压缩,并更改标志位 if (data.length > COMPRESSION_THRESHOLD) { setCompressionMode(CompressionMode.ZIP); data = compress(data); flag = COMPRESSION_MASK; } return new CachedData(flag, data); } } //针对新版本的php-memcached,由于在PHP7中使用了这个版本,所以叫做PHP7 class PHP7Transcoder extends PHPTranscoder { final int COMPRESSION_THRESHOLD = 2000; final int COMPRESSION_MASK = 3 << 4; public String decode(CachedData d) { byte[] data = d.getData(); int flag = d.getFlag(); if ((flag & COMPRESSION_MASK) == COMPRESSION_MASK) { setCompressionMode(CompressionMode.ZIP); //解压缩时需要处理size_t hack,存储时开头使用了一个size_t保存了压缩前数据的长度,为了支持fastlz byte[] realdata = new byte[data.length - 4]; System.arraycopy(data, 4, realdata, 0, data.length - 4); data = decompress(realdata); } return new String(data); } public CachedData encode(String o) { byte[] realdata = o.getBytes(); byte[] data = realdata; int flag = 0; if (realdata.length > COMPRESSION_THRESHOLD) { //上面提到的size_t hack,将长度转换为四个字节 byte[] lenbytes = new byte[4]; for (int i = 0; i < lenbytes.length; i++) { lenbytes[i] = (byte) ((realdata.length >> (8 * i)) & 0xff); } //压缩数据 setCompressionMode(CompressionMode.ZIP); realdata = compress(realdata); //合并数据 data = new byte[realdata.length + 4]; System.arraycopy(lenbytes, 0, data, 0, 4); System.arraycopy(realdata, 0, data, 4, realdata.length); flag = COMPRESSION_MASK; } return new CachedData(flag, data); } }
这里没有对PHP中的数据类型做相关的处理,只是处理了压缩,即PHP中的任何数据类型在Java中都会变为String类型,并且Java中目前也只是提供了写入String的接口
仿照php-memcached中的Memcached类实现了Java的访问接口,只实现了对String类型的读写操作
class Memcached { private MemcachedClient client; protected PHPTranscoder transcoder; public Memcached() throws IOException { transcoder = new PHP7Transcoder(); XMemcachedClientBuilder builder = new XMemcachedClientBuilder(); builder.setSessionLocator(new KetamaMemcachedSessionLocator(true)); client = builder.build(); } public void setTranscoder(PHPTranscoder transcoder) { this.transcoder = transcoder; } public void addServer(String server, int port, int weight) { try { client.addServer(server, port, weight); } catch (IOException ex) { ex.printStackTrace(); } } public void addServer(String server, int port) { this.addServer(server, port, 1); } public String get(String key) throws TimeoutException, InterruptedException, MemcachedException { return client.get(key, transcoder); } public boolean set(String key, String value, int expire) { boolean b = false; try { b = client.set(key, expire, value, transcoder); } catch (Exception ex) { ex.printStackTrace(); } return b; } public boolean set(String key, String value) { return set(key, value, 0); } public boolean add(String key, String value, int expire) { boolean b = false; try { b = client.add(key, expire, value, transcoder); } catch (Exception ex) { ex.printStackTrace(); } return b; } public boolean add(String key, String value) { return add(key, value, 0); } public boolean replace(String key, String value, int expire) { boolean b = false; try { b = client.add(key, expire, value, transcoder); } catch (Exception ex) { ex.printStackTrace(); } return b; } public boolean replace(String key, String value) { return replace(key, value, 0); } public void quit() throws IOException { client.shutdown(); } }