异步处理窥探

用过微信网页版的人应该都清楚网页登陆的流程,大致描述一下这个过程:

  1. 打开网页版登陆链接
  2. 页面会显示一个二维码
  3. 用微信客户端扫描二维码,让用户确认登陆网页版
  4. 如果确认登陆,网页版会自动进入聊天界面。

这个过程的交互方式和一般的WEB应用不太一样,步骤4网页自动跳转,明显是由服务端主动推送了内容给网页端,网页端收到跳转确认后才触发的,这里就引出了今天要讨论的问题:服务端推送技术。服务端推送又称为Comet,服务端异步处理等。很早以前就出现了,但一直没有一个统一的标准,存在着不少Comet技术框架,各个Web容器也各自实现了自己的Comet支持。最近公司的产品也出现了和微信网页版登陆类似的场景,需要用到Comet技术,我简单的研究了下,写下来记录一下。

针对Comet技术的选择性蛮多,我匆匆看了一下,就有这么3个方案:

  • Tomcat 内置支持,需要实现CometProcessor接口。但是应用就依赖Tomcat容器了。

  • Servlet3 天然支持,Servlet3提供一套完整的异步处理API,包括AsyncContext,AsyncLiseter,AsyncEvent. 要求Tomcat7.0++。

  • SpringMVC3.2 在Servlet3的基础上做了进一步的封装,编码更为简单,提供Callable,WebAsyncTask,DeferredResult三种方式进行异步编程支持,非常方便。

基于Tomcat的CometProcessor依赖性过大,我基本上不予考虑了。因为时间还算充裕,所以我分别针对Servlet3 和SpringMVC3.2 都做了尝试,其实过程都比较简单,关键是要理解场景。我来介绍下我们产品的实际场景吧,我们要实现的一个功能是扫描动态二维码关注微信公众账号。基本流程是这样的:

  1. 客户端调用服务端接口获取动态二维码以及二维码内容中内置的ID。(这个时候在客户端能看到一个二维码了,等待用户扫描)
  2. 客户端马上调用服务端的一个长连接接口,与服务端建立长连接,等待服务端通知。(这个过程是在后台发生的,用户无法感知)
  3. 用户拿出微信扫描二维码,就会有一个扫描事件通知到服务端的扫描接口。(这个时候服务端接收到扫描动作,完成自己的业务操作以后,通知长连接接口,用户已经扫描了,可以返回了)。

这个流程里面有这么几个地方是需要能解决的:

  1. 步骤2里面要求客户端–服务端建立长连接,不会立即返回,客户端一直在等待状态。(Servlet3 的API可以支持,需要把Timeout时间设置长一点,一般是60S够了)
  2. 步骤3中 扫描接口要通知长连接接口,如何做到? 必须存在一个公共的容器,容器里面存着上下文信息,扫描接口把执行完毕的上下文告知长连接接口就可以了。

所以,实现代码如下:

配置部分

web.xml 启用Servlet3 的命名空间

1
2
3
4
5
6
7
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
id="WebApp_ID" version="3.0">

</web-app>

长连接Servlet要开启异步支持:

1
@WebServlet(value = "/scan/*",asyncSupported = true)

Tomcat server.xml要开启NIO模式

1
2
<Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol"
connectionTimeout="20000" asyncTimeout="150000" URIEncoding="utf-8" redirectPort="8443" />

长连接Servlet实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@WebServlet(value = "/scan/*",asyncSupported = true)
public class ScanServlet extends HttpServlet {

// private ScanRetain retain;

private Logger logger = Logger.getLogger(getClass());

@Override
public void init() throws ServletException {

}

@Override
public void destroy() {
ScanRetain.MAP.clear();
}

@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
doPost(req, resp);
}

@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
logger.debug(">>>>>>>>>>>>>>>>>开始访问长连接Servlet.....");
String pathInfo = req.getPathInfo();
String key = null;
if (pathInfo != null) {
int i = pathInfo.lastIndexOf('/');
if (i >= 0) {
key = pathInfo.substring(i + 1);
}
}
if (key == null) {
PrintWriter writer = resp.getWriter();
writer.write("error:not found scan key");
writer.flush();
return;
}
req.startAsync(req, resp);
if (req.isAsyncStarted()) {
final AsyncContext asyncContext = req.getAsyncContext();
final String theKey = key;
asyncContext.setTimeout(60 * 1000L);

asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent asyncEvent) throws IOException {
ScanRetain.MAP.remove(theKey);
}

@Override
public void onTimeout(AsyncEvent asyncEvent) throws IOException {
ScanRetain.MAP.remove(theKey);
}

@Override
public void onError(AsyncEvent asyncEvent) throws IOException {
ScanRetain.MAP.remove(theKey);
}

@Override
public void onStartAsync(AsyncEvent asyncEvent) throws IOException {

}
});

logger.debug(">>>>>>>>>>>>>>>>>将长连接上下文对象加入队列等待处理.........");
ScanRetain.MAP.put(theKey, asyncContext);
}
}
}

公共Context容器存放类以及提供给扫描后对长连接响应处理的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ScanRetain {

// 公共上下文容器
public static final ConcurrentHashMap<String, AsyncContext> MAP = new ConcurrentHashMap<String, AsyncContext>();

private Logger logger = Logger.getLogger(getClass());

public void doReturn(String key){
logger.debug(">>>>>>>>>>>>>>>>>长连接正在响应.....");
AsyncContext asyncContext = MAP.get(key);
if (asyncContext == null) {
return;
}
HttpServletResponse res = (HttpServletResponse) asyncContext.getResponse();
DBObject data = new BasicDBObject("result",1)
.append("info","ok")
.append("now",System.currentTimeMillis());
String str = JSON.serialize(data);
OutputStream os = null;
try {
os = res.getOutputStream();
os.write(str.getBytes("utf-8"));
logger.debug(">>>>>>>>>>>>>>>>>长连接响应完毕.....");
os.flush();
asyncContext.setTimeout(100L);// 一定要加这一句才会及时返回
} catch (IOException e) {
e.printStackTrace();
}
}


}

扫描事件触发长连接响应的逻辑

1
2
3
4
5
Long senceId = 0L;
if (qrSenceId != null) {
senceId = Long.parseLong(qrSenceId);
}
scanRetain.doReturn(senceId + "");

SpringMVC3.2 的实现我也尝试了一下:
长连接接口:

1
2
3
4
5
6
7
8
9
10
// 上下文容器
public static final ConcurrentHashMap<String, DeferredResult<String>> MAP = new ConcurrentHashMap<String, DeferredResult<String>>();

@RequestMapping("doScan/{key}")
@ResponseBody
public DeferredResult<String> doScan(@PathVariable("key") String key) {
DeferredResult<String> result = new DeferredResult<String>();
MAP.put(key, result);
return result;
}

通知长连接响应客户端的测试代码:

1
2
3
4
5
6
7
8
9
10
11
@RequestMapping(value="/newScan/{key}",produces = "text/plain;charset=utf-8;")
@ResponseBody
public String newScan(@PathVariable("key") String key,
HttpServletRequest req, HttpServletResponse res) {
DeferredResult<String> data = Scans.MAP.get(key);
if(data!=null){
data.setResult("this is result:"+System.currentTimeMillis());
Scans.MAP.remove(key);
}
return "new scan test finished :"+key+"now is :"+System.currentTimeMillis();
}

Spring的代码实现简单很多,但是也不那么直观,不利于理解。

同时,它还提供另外两种异步处理的方式,只是不适于这个场景,这里也罗列一下。
Callable:

1
2
3
4
5
6
7
8
9
10
11
12
@ResponseBody
@RequestMapping("call")
public Callable<String> call(HttpServletRequest req, HttpServletResponse res) throws Exception {
return new Callable<String>() {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(5);
return "hello,callable";
}
};

}

WebAsyncTask:

1
2
3
4
5
6
7
8
9
10
11
12
13
@ResponseBody
@RequestMapping("async")
public WebAsyncTask<String> async(HttpServletRequest req, HttpServletResponse res) throws Exception {
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(5);
return "hello,WebAsyncTask";
}
};

return new WebAsyncTask<String>(1000*60L,callable);
}

上面两种方式也是用于异步操作的,它们比较适用于一些比较耗时的操作(如大数据计算,文件处理),它们的响应一般不存在其他的触发点,就是取决于Callable内部代码块的执行结束。

综上,我们大致可以总结出异步处理的两种应用场景:

1. 多点操作,单点的响应往往依赖于其他点的触发,最典型的就是微信扫描登录了。这个基本的编码思路应该是这样的:

  • 定义一个上下文存储容器,容器要支持并发,最好选用Concurrent类型。

  • 开发长连接接口,客户端请求连接后,将上下文加入存储容器。

  • 开发响应的触发逻辑代码段。

  • 触发业务完成以后,调用响应触发逻辑。

2. 单点操作,但是操作往往非常耗时,不能及时响应。这种场景一般会把耗时操作全部抽离到Callable代码段,响应的触发点就是Callable代码的结束处。