用过微信网页版的人应该都清楚网页登陆的流程,大致描述一下这个过程:
- 打开网页版登陆链接
- 页面会显示一个二维码
- 用微信客户端扫描二维码,让用户确认登陆网页版
- 如果确认登陆,网页版会自动进入聊天界面。
这个过程的交互方式和一般的WEB应用不太一样,步骤4
网页自动跳转,明显是由服务端主动推送了内容给网页端,网页端收到跳转确认后才触发的,这里就引出了今天要讨论的问题:服务端推送技术
。服务端推送又称为Comet,服务端异步处理等。很早以前就出现了,但一直没有一个统一的标准,存在着不少Comet技术框架,各个Web容器也各自实现了自己的Comet支持。最近公司的产品也出现了和微信网页版登陆类似的场景,需要用到Comet技术,我简单的研究了下,写下来记录一下。
针对Comet技术的选择性蛮多,我匆匆看了一下,就有这么3个方案:
Tomcat 内置支持,需要实现CometProcessor接口。但是应用就依赖Tomcat容器了。
Servlet3 天然支持,Servlet3提供一套完整的异步处理API,包括AsyncContext,AsyncLiseter,AsyncEvent. 要求Tomcat7.0++。
SpringMVC3.2 在Servlet3的基础上做了进一步的封装,编码更为简单,提供Callable,WebAsyncTask,DeferredResult三种方式进行异步编程支持,非常方便。
基于Tomcat的CometProcessor依赖性过大,我基本上不予考虑了。因为时间还算充裕,所以我分别针对Servlet3 和SpringMVC3.2 都做了尝试,其实过程都比较简单,关键是要理解场景。我来介绍下我们产品的实际场景吧,我们要实现的一个功能是扫描动态二维码关注微信公众账号。基本流程是这样的:
- 客户端调用服务端接口获取动态二维码以及二维码内容中内置的ID。(这个时候在客户端能看到一个二维码了,等待用户扫描)
- 客户端马上调用服务端的一个长连接接口,与服务端建立长连接,等待服务端通知。(这个过程是在后台发生的,用户无法感知)
- 用户拿出微信扫描二维码,就会有一个扫描事件通知到服务端的扫描接口。(这个时候服务端接收到扫描动作,完成自己的业务操作以后,通知长连接接口,用户已经扫描了,可以返回了)。
这个流程里面有这么几个地方是需要能解决的:
- 步骤2里面要求客户端–服务端建立长连接,不会立即返回,客户端一直在等待状态。(Servlet3 的API可以支持,需要把Timeout时间设置长一点,一般是60S够了)
- 步骤3中 扫描接口要通知长连接接口,如何做到? 必须存在一个公共的容器,容器里面存着上下文信息,扫描接口把执行完毕的上下文告知长连接接口就可以了。
所以,实现代码如下:
配置部分
web.xml 启用Servlet3 的命名空间
1 2 3 4 5 6 7
| <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" id="WebApp_ID" version="3.0">
</web-app>
|
长连接Servlet要开启异步支持:
1
| @WebServlet(value = "/scan/*",asyncSupported = true)
|
Tomcat server.xml要开启NIO模式
1 2
| <Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol" connectionTimeout="20000" asyncTimeout="150000" URIEncoding="utf-8" redirectPort="8443" />
|
长连接Servlet实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| @WebServlet(value = "/scan/*",asyncSupported = true) public class ScanServlet extends HttpServlet {
// private ScanRetain retain;
private Logger logger = Logger.getLogger(getClass());
@Override public void init() throws ServletException { }
@Override public void destroy() { ScanRetain.MAP.clear(); }
@Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { doPost(req, resp); }
@Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { logger.debug(">>>>>>>>>>>>>>>>>开始访问长连接Servlet....."); String pathInfo = req.getPathInfo(); String key = null; if (pathInfo != null) { int i = pathInfo.lastIndexOf('/'); if (i >= 0) { key = pathInfo.substring(i + 1); } } if (key == null) { PrintWriter writer = resp.getWriter(); writer.write("error:not found scan key"); writer.flush(); return; } req.startAsync(req, resp); if (req.isAsyncStarted()) { final AsyncContext asyncContext = req.getAsyncContext(); final String theKey = key; asyncContext.setTimeout(60 * 1000L);
asyncContext.addListener(new AsyncListener() { @Override public void onComplete(AsyncEvent asyncEvent) throws IOException { ScanRetain.MAP.remove(theKey); }
@Override public void onTimeout(AsyncEvent asyncEvent) throws IOException { ScanRetain.MAP.remove(theKey); }
@Override public void onError(AsyncEvent asyncEvent) throws IOException { ScanRetain.MAP.remove(theKey); }
@Override public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
} });
logger.debug(">>>>>>>>>>>>>>>>>将长连接上下文对象加入队列等待处理........."); ScanRetain.MAP.put(theKey, asyncContext); } } }
|
公共Context容器存放类以及提供给扫描后对长连接响应处理的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class ScanRetain {
// 公共上下文容器 public static final ConcurrentHashMap<String, AsyncContext> MAP = new ConcurrentHashMap<String, AsyncContext>();
private Logger logger = Logger.getLogger(getClass());
public void doReturn(String key){ logger.debug(">>>>>>>>>>>>>>>>>长连接正在响应....."); AsyncContext asyncContext = MAP.get(key); if (asyncContext == null) { return; } HttpServletResponse res = (HttpServletResponse) asyncContext.getResponse(); DBObject data = new BasicDBObject("result",1) .append("info","ok") .append("now",System.currentTimeMillis()); String str = JSON.serialize(data); OutputStream os = null; try { os = res.getOutputStream(); os.write(str.getBytes("utf-8")); logger.debug(">>>>>>>>>>>>>>>>>长连接响应完毕....."); os.flush(); asyncContext.setTimeout(100L);// 一定要加这一句才会及时返回 } catch (IOException e) { e.printStackTrace(); } }
}
|
扫描事件触发长连接响应的逻辑
1 2 3 4 5
| Long senceId = 0L; if (qrSenceId != null) { senceId = Long.parseLong(qrSenceId); } scanRetain.doReturn(senceId + "");
|
SpringMVC3.2 的实现我也尝试了一下:
长连接接口:
1 2 3 4 5 6 7 8 9 10
| // 上下文容器 public static final ConcurrentHashMap<String, DeferredResult<String>> MAP = new ConcurrentHashMap<String, DeferredResult<String>>();
@RequestMapping("doScan/{key}") @ResponseBody public DeferredResult<String> doScan(@PathVariable("key") String key) { DeferredResult<String> result = new DeferredResult<String>(); MAP.put(key, result); return result; }
|
通知长连接响应客户端的测试代码:
1 2 3 4 5 6 7 8 9 10 11
| @RequestMapping(value="/newScan/{key}",produces = "text/plain;charset=utf-8;") @ResponseBody public String newScan(@PathVariable("key") String key, HttpServletRequest req, HttpServletResponse res) { DeferredResult<String> data = Scans.MAP.get(key); if(data!=null){ data.setResult("this is result:"+System.currentTimeMillis()); Scans.MAP.remove(key); } return "new scan test finished :"+key+"now is :"+System.currentTimeMillis(); }
|
Spring的代码实现简单很多,但是也不那么直观,不利于理解。
同时,它还提供另外两种异步处理的方式,只是不适于这个场景,这里也罗列一下。
Callable:
1 2 3 4 5 6 7 8 9 10 11 12
| @ResponseBody @RequestMapping("call") public Callable<String> call(HttpServletRequest req, HttpServletResponse res) throws Exception { return new Callable<String>() { @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(5); return "hello,callable"; } };
}
|
WebAsyncTask:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @ResponseBody @RequestMapping("async") public WebAsyncTask<String> async(HttpServletRequest req, HttpServletResponse res) throws Exception { Callable<String> callable = new Callable<String>() { @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(5); return "hello,WebAsyncTask"; } };
return new WebAsyncTask<String>(1000*60L,callable); }
|
上面两种方式也是用于异步操作的,它们比较适用于一些比较耗时的操作(如大数据计算,文件处理),它们的响应一般不存在其他的触发点,就是取决于Callable内部代码块的执行结束。
综上,我们大致可以总结出异步处理的两种应用场景:
1. 多点操作,单点的响应往往依赖于其他点的触发,最典型的就是微信扫描登录了。这个基本的编码思路应该是这样的:
2. 单点操作,但是操作往往非常耗时,不能及时响应。这种场景一般会把耗时操作全部抽离到Callable代码段,响应的触发点就是Callable代码的结束处。