一步一步扩展 Volley (一),Volley 框架解析

去年 11 月开始写一个扩展 Volley 的网络通信框架 ———— VolleyEx,虽然 Github 上已经有 VolleyPlus 等优秀的扩展框架,但我想借此研究异步加载网络请求的原理,学习下 Volley 的代码设计,所以还是自己重新写一个扩展框架。并且把我一步一步扩展 Volley 的过程用博客的形式记录下来,希望可以帮助大家更好地理解 Volley 及网络加载。在扩展 Volley 之前,需要先了解其大致框架,明白它的优势与不足,这也是这篇文章的主要内容。

Volley 简介

Volley 是 2013 Google I/O Session 推出的一个网络通信框架,便于异步加载网络请求和图片请求,适用于数据量小通信频繁的网络操作。但是对于数据量大的网络(例如文件下载),Volley 的表现就很糟糕了,这种情况推荐用 DownloadManager。

根据 Android 官方培训课程中 Volley 教程的说明,Volle 有下面几个优点:

  1. 自动调度网络请求

  2. 高并发网络连接

  3. 通过标准的 HTTP cache coherence 缓存磁盘,内存透明的响应

  4. 支持指定请求的优先级

  5. 框架容易定制,如重试和回退功能

  6. 清晰的加载流程使异步加载网络数据并正确地显示到UI更加简单

  7. 包含了调试与追踪工具

关于 Volley 的不足之处,在解析完其框架再来讨论。

Volley 的官方项目地址: https://android.googlesource.com/platform/frameworks/volley/

Volley 的 Github 地址(由国人做的映射): https://github.com/mcxiaoke/android-volley

这是 2013 Google I/O 大会上关于 Volley 的文档: Volley-Easy,Fast Networking for Android.pdf

Volley 工作流程

上面提到的 Volley 文档中就有一张工作流程图:

从上面的流程图可以看出,Request 添加到请求队列后,先是给缓存调度器处理。如果该请求有一个有效的缓存则直接解析缓存后传递到主线程,否则把它给网络调度器,网络调度器把这些请求按规则循环处理,当一个请求处理时会通过 HTTP 协议加载数据,然后解析数据并写入缓存,之后把解析后的 Response 传递到主线程。

接下来以 Volley 的流程来大致分析源代码。

启动

在添加请求到请求队列前,必须先启动 Volley 通信框架,在代码上来看就是Volley.newRequestQueue(context),这行代码主要是新建一个 RequestQueue 对象并调用其 start 方法,关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class RequestQueue {
...
/** Number of network request dispatcher threads to start. */
private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 4;
/**
* Creates the worker pool. Processing will not begin until {@link #start()} is called.
*
* @param cache A Cache to use for persisting responses to disk
* @param network A Network interface for performing HTTP requests
* @param threadPoolSize Number of network dispatcher threads to create
* @param delivery A ResponseDelivery interface for posting responses and errors
*/
public RequestQueue(Context context, Cache cache, Network network, int threadPoolSize,
ResponseDelivery delivery) {
mContext = context;
mCache = cache;
mNetwork = network;
mDispatchers = new NetworkDispatcher[threadPoolSize];
mDelivery = delivery;
}
/**
* Starts the dispatchers in this queue.
*/
public void start() {
stop(); // Make sure any currently running dispatchers are stopped.
// Create the cache dispatcher and start it.
mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
mCacheDispatcher.start();
// Create network dispatchers (and corresponding threads) up to the pool size.
for (int i = 0; i < mDispatchers.length; i++) {
NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork,
mCache, mDelivery);
mDispatchers[i] = networkDispatcher;
networkDispatcher.start();
}
}
...
}

start方法主要是启动一个缓存调度器和几个网络调度器,这两个分配器都是继承 Thread,其中默认网络调度器的数量是 4 个,所以默认是启动 1 个缓存调度器线程和 4 个网络调度器线程作为常驻线程。

添加请求到分发队列

接下来就是通过RequestQueue.add(request)添加请求到分发队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* Adds a Request to the dispatch queue.
* @param request The request to service
* @return The passed-in request
*/
public <T> Request<T> add(Request<T> request) {
// Tag the request as belonging to this queue and add it to the set of current requests.
request.setRequestQueue(this);
synchronized (mCurrentRequests) {
mCurrentRequests.add(request);
}
// Process requests in the order they are added.
request.setSequence(getSequenceNumber());
request.addMarker("add-to-queue");
// If the request is uncacheable, skip the cache queue and go straight to the network.
if (!request.shouldCache()) {
mNetworkQueue.add(request);
return request;
}
// Insert request into stage if there's already a request with the same cache key in flight.
synchronized (mWaitingRequests) {
String cacheKey = request.getCacheKey();
if (mWaitingRequests.containsKey(cacheKey)) {
// There is already a request in flight. Queue up.
Queue<Request<?>> stagedRequests = mWaitingRequests.get(cacheKey);
if (stagedRequests == null) {
stagedRequests = new LinkedList<Request<?>>();
}
stagedRequests.add(request);
mWaitingRequests.put(cacheKey, stagedRequests);
if (VolleyLog.DEBUG) {
VolleyLog.v("Request for cacheKey=%s is in flight, putting on hold.", cacheKey);
}
} else {
// Insert 'null' queue for this cacheKey, indicating there is now a request in
// flight.
mWaitingRequests.put(cacheKey, null);
mCacheQueue.add(request);
}
return request;
}
}

add方法中,先将 Request 添加到当前请求队列,然后判断是否需要缓存,如果不需要缓存则直接添加到网络队列,默认shouldCache方法返回 true,接下来根据cache key判断是否有重复的请求,如果有则只是把请求添加到等待列表,如果没有的重复的请求则还会添加到缓存队列。所以从这里可以看出 Volley 对于重复的请求只会加载第一个请求,节省资源。

缓存调度器处理请求

CacheDispatcher缓存调度器线程会一直取出缓存队列的请求处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@Override
public void run() {
if (DEBUG) VolleyLog.v("start new dispatcher");
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
// Make a blocking call to initialize the cache.
mCache.initialize();
Request<?> request;
while (true) {
// release previous request object to avoid leaking request object when mQueue is drained.
request = null;
try {
// Take a request from the queue.
request = mCacheQueue.take();
} catch (InterruptedException e) {
// We may have been interrupted because it was time to quit.
if (mQuit) {
return;
}
continue;
}
try {
request.addMarker("cache-queue-take");
// If the request has been canceled, don't bother dispatching it.
if (request.isCanceled()) {
request.finish("cache-discard-canceled");
continue;
}
// Attempt to retrieve this item from cache.
Cache.Entry entry = mCache.get(request.getCacheKey());
if (entry == null) {
request.addMarker("cache-miss");
// Cache miss; send off to the network dispatcher.
mNetworkQueue.put(request);
continue;
}
// If it is completely expired, just send it to the network.
if (entry.isExpired()) {
request.addMarker("cache-hit-expired");
request.setCacheEntry(entry);
mNetworkQueue.put(request);
continue;
}
// We have a cache hit; parse its data for delivery back to the request.
request.addMarker("cache-hit");
Response<?> response = request.parseNetworkResponse(
new NetworkResponse(entry.data, entry.responseHeaders));
request.addMarker("cache-hit-parsed");
if (!entry.refreshNeeded()) {
// Completely unexpired cache hit. Just deliver the response.
mDelivery.postResponse(request, response);
} else {
// Soft-expired cache hit. We can deliver the cached response,
// but we need to also send the request to the network for
// refreshing.
request.addMarker("cache-hit-refresh-needed");
request.setCacheEntry(entry);
// Mark the response as intermediate.
response.intermediate = true;
// Post the intermediate response back to the user and have
// the delivery then forward the request along to the network.
final Request<?> finalRequest = request;
mDelivery.postResponse(request, response, new Runnable() {
@Override
public void run() {
try {
mNetworkQueue.put(finalRequest);
} catch (InterruptedException e) {
// Not much we can do about this.
}
}
});
}
} catch (Exception e) {
VolleyLog.e(e, "Unhandled exception %s", e.toString());
}
}
}

Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);这行说明缓存线程是在后台线程中运行,后面在while(true)循环中从缓存队列中拿请求出来处理,下面重点看缓存的处理逻辑,Cache.Entry entry = mCache.get(request.getCacheKey());先获得缓存数据,如果没有缓存或缓存是已经过期则添加请求到网络队列。然后解析缓存数据,如果缓存不需要刷新就直接传递解析后的 response 到主线程,如果需要刷新则把 response 传递到主线程后还会把请求添加网络队列。

上面代码中mDelivery.postResponse()其实是用来传递 response 的,可以看下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* Delivers responses and errors.
*/
public class ExecutorDelivery implements ResponseDelivery {
/** Used for posting responses, typically to the main thread. */
private final Executor mResponsePoster;
/**
* Creates a new response delivery interface.
* @param handler {@link Handler} to post responses on
*/
public ExecutorDelivery(final Handler handler) {
// Make an Executor that just wraps the handler.
mResponsePoster = new Executor() {
@Override
public void execute(Runnable command) {
handler.post(command);
}
};
}
/**
* Creates a new response delivery interface, mockable version
* for testing.
* @param executor For running delivery tasks
*/
public ExecutorDelivery(Executor executor) {
mResponsePoster = executor;
}
@Override
public void postResponse(Request<?> request, Response<?> response) {
postResponse(request, response, null);
}
@Override
public void postResponse(Request<?> request, Response<?> response, Runnable runnable) {
request.markDelivered();
request.addMarker("post-response");
mResponsePoster.execute(new ResponseDeliveryRunnable(request, response, runnable));
}
@Override
public void postError(Request<?> request, VolleyError error) {
request.addMarker("post-error");
Response<?> response = Response.error(error);
mResponsePoster.execute(new ResponseDeliveryRunnable(request, response, null));
}
/**
* A Runnable used for delivering network responses to a listener on the
* main thread.
*/
@SuppressWarnings("rawtypes")
private class ResponseDeliveryRunnable implements Runnable {
private final Request mRequest;
private final Response mResponse;
private final Runnable mRunnable;
public ResponseDeliveryRunnable(Request request, Response response, Runnable runnable) {
mRequest = request;
mResponse = response;
mRunnable = runnable;
}
@SuppressWarnings("unchecked")
@Override
public void run() {
// If this request has canceled, finish it and don't deliver.
if (mRequest.isCanceled()) {
mRequest.finish("canceled-at-delivery");
return;
}
// Deliver a normal response or error, depending.
if (mResponse.isSuccess()) {
mRequest.deliverResponse(mResponse.result);
} else {
mRequest.deliverError(mResponse.error);
}
// If this is an intermediate response, add a marker, otherwise we're done
// and the request can be finished.
if (mResponse.intermediate) {
mRequest.addMarker("intermediate-response");
} else {
mRequest.finish("done");
}
// If we have been provided a post-delivery runnable, run it.
if (mRunnable != null) {
mRunnable.run();
}
}
}
}

ExecutorDeliver 作为传递器,负责传递解析后的 Response 或者 error,是运行在主线程中的,主要是通过Handler.post(Runnable runnable)方法传递的。缓存调度器和网络调度器都是通过它来传递 Response 的,ResponseDeliveryRunnable的 run 方法是这个类的重点。run 方法的第一个 If 语句可以保证取消请求后,之后解析后的数据不会传递到主线程。第二个 If 语句中mRequest.deliverResponse(mResponse.result);的 deliverResponse 是 Request 类的抽象方法,需要子类实现,这句话就是把解析后的数据传递给主线程了。而第三个 If 语句中mResponse.intermediate默认是为 false 的,在之前的CacheDispatcher代码中可以看到当缓存需要刷新会设置为 true,所以当缓存调度器找到有效不需要刷新的缓存到这里时和网络调度器加载完数据解析后到这里时都会走到mRequest.finish("done");,这里会做一些完成这个请求的一些动作。

网络调度器处理请求

如果没有找到未过期缓存或者缓存需要刷新时,请求就会到网络队列,还有一开始的shouldCache()返回false的话也会到缓存队列,下面看网络调度器如何处理网络队列中的请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Override
public void run() {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
Request<?> request;
while (true) {
long startTimeMs = SystemClock.elapsedRealtime();
// release previous request object to avoid leaking request object when mQueue is drained.
request = null;
try {
// Take a request from the queue.
request = mQueue.take();
} catch (InterruptedException e) {
// We may have been interrupted because it was time to quit.
if (mQuit) {
return;
}
continue;
}
try {
request.addMarker("network-queue-take");
// If the request was cancelled already, do not perform the
// network request.
if (request.isCanceled()) {
request.finish("network-discard-cancelled");
continue;
}
addTrafficStatsTag(request);
// Perform the network request.
NetworkResponse networkResponse = mNetwork.performRequest(request);
request.addMarker("network-http-complete");
// If the server returned 304 AND we delivered a response already,
// we're done -- don't deliver a second identical response.
if (networkResponse.notModified && request.hasHadResponseDelivered()) {
request.finish("not-modified");
continue;
}
// Parse the response here on the worker thread.
Response<?> response = request.parseNetworkResponse(networkResponse);
request.addMarker("network-parse-complete");
// Write to cache if applicable.
// TODO: Only update cache metadata instead of entire record for 304s.
if (request.shouldCache() && response.cacheEntry != null) {
mCache.put(request.getCacheKey(), response.cacheEntry);
request.addMarker("network-cache-written");
}
// Post the response back.
request.markDelivered();
mDelivery.postResponse(request, response);
} catch (VolleyError volleyError) {
volleyError.setNetworkTimeMs(SystemClock.elapsedRealtime() - startTimeMs);
parseAndDeliverNetworkError(request, volleyError);
} catch (Exception e) {
VolleyLog.e(e, "Unhandled exception %s", e.toString());
VolleyError volleyError = new VolleyError(e);
volleyError.setNetworkTimeMs(SystemClock.elapsedRealtime() - startTimeMs);
mDelivery.postError(request, volleyError);
}
}
}

网络调度器线程也是运行在后台的,一直循环去网络队列中取请求处理,mNetwork.performRequest(request);就是通过 HTTP 协议加载网络数据的,这里不深入研究,因为现在主要是搞清楚 Volley 的大致框架。接下来request.parseNetworkResponse(networkResponse);是解析数据的,Volley 的解析过程都是在后台线程进行的,之后写入缓存,然后传递解析后的 Response 到主线程。

到这里已经跟着 Volley 的工作流程走了一遍,Volley 的大致框架也就出来了,现在再来看看那张流程图

我只是跟着 Volley 的工作流程大致分析了一下工作流程和大致框架,更深入的解析大家也可以看下 CodeKK 的 Volley 源码解析,当然后面的一步一步扩展 Volley 系列文章中也会更加深入地分析 Volley,同时明白从那些方面扩展 Volley。

END
Johnny Shieh wechat
我的公众号,不只有技术,还有咖啡和彩蛋!