/** Starts the dispatchers in this queue. */ publicvoidstart(){ // Make sure any currently running dispatchers are stopped. stop(); // Create the cache dispatcher and start it. mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery); mCacheDispatcher.start();
// Create network dispatchers (and corresponding threads) up to the pool size. for (int i = 0; i < mDispatchers.length; i++) { NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork, mCache, mDelivery); mDispatchers[i] = networkDispatcher; networkDispatcher.start(); } }
/** Stops the cache and network dispatchers. */ publicvoidstop(){ if (mCacheDispatcher != null) { mCacheDispatcher.quit(); } for (final NetworkDispatcher mDispatcher : mDispatchers) { if (mDispatcher != null) { mDispatcher.quit(); } } }
public@interface RequestEvent { /** The request was added to the queue. */ publicstaticfinalint REQUEST_QUEUED = 0; /** Cache lookup started for the request. */ publicstaticfinalint REQUEST_CACHE_LOOKUP_STARTED = 1; /** Cache lookup finished for the request and cached response is delivered or request is queued for network dispatching. */ publicstaticfinalint REQUEST_CACHE_LOOKUP_FINISHED = 2; /** Network dispatch started for the request. */ publicstaticfinalint REQUEST_NETWORK_DISPATCH_STARTED = 3; /** The network dispatch finished for the request and response (if any) is delivered. */ publicstaticfinalint REQUEST_NETWORK_DISPATCH_FINISHED = 4; /** All the work associated with the request is finished and request is removed from all the queues. */ publicstaticfinalint REQUEST_FINISHED = 5; }
privatefinal AtomicInteger mSequenceGenerator = new AtomicInteger(); privatefinal Set<Request<?>> mCurrentRequests = new HashSet<>(); privatefinal PriorityBlockingQueue<Request<?>> mCacheQueue = new PriorityBlockingQueue<>(); privatefinal PriorityBlockingQueue<Request<?>> mNetworkQueue = new PriorityBlockingQueue<>();
public <T> Request<T> add(Request<T> request){ // Tag the request as belonging to this queue and add it to the set of current requests. request.setRequestQueue(this); synchronized (mCurrentRequests) { mCurrentRequests.add(request); }
// Process requests in the order they are added. request.setSequence(getSequenceNumber()); request.addMarker("add-to-queue"); sendRequestEvent(request, RequestEvent.REQUEST_QUEUED);
@Override publicvoidrun(){ if (DEBUG) VolleyLog.v("start new dispatcher"); Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
// Make a blocking call to initialize the cache. mCache.initialize();
while (true) { try { processRequest(); } catch (InterruptedException e) { // We may have been interrupted because it was time to quit. if (mQuit) { Thread.currentThread().interrupt(); return; } VolleyLog.e("Ignoring spurious interrupt of CacheDispatcher thread; " + "use quit() to terminate it"); } } }
try { // If the request has been canceled, don't bother dispatching it. if (request.isCanceled()) { request.finish("cache-discard-canceled"); return; }
// Attempt to retrieve this item from cache. Cache.Entry entry = mCache.get(request.getCacheKey()); if (entry == null) { request.addMarker("cache-miss"); // Cache miss; send off to the network dispatcher. if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) { mNetworkQueue.put(request); } return; }
// If it is completely expired, just send it to the network. if (entry.isExpired()) { request.addMarker("cache-hit-expired"); request.setCacheEntry(entry); if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) { mNetworkQueue.put(request); } return; }
// We have a cache hit; parse its data for delivery back to the request. request.addMarker("cache-hit"); Response<?> response = request.parseNetworkResponse(new NetworkResponse(entry.data, entry.responseHeaders)); request.addMarker("cache-hit-parsed");
if (!response.isSuccess()) { request.addMarker("cache-parsing-failed"); mCache.invalidate(request.getCacheKey(), true); request.setCacheEntry(null); if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) { mNetworkQueue.put(request); } return; } if (!entry.refreshNeeded()) { // 缓存不需要刷新,直接交给ResponseDelivery去处理 mDelivery.postResponse(request, response); } else { // 软过期的缓存命中,可以分发缓存的response,但是我们还需要将请求发送到网络以进行刷新。 request.addMarker("cache-hit-refresh-needed"); request.setCacheEntry(entry); // Mark the response as intermediate. response.intermediate = true;
@Override publicvoidrun(){ Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); while (true) { try { processRequest(); } catch (InterruptedException e) { // We may have been interrupted because it was time to quit. if (mQuit) { Thread.currentThread().interrupt(); return; } VolleyLog.e("Ignoring spurious interrupt of NetworkDispatcher thread; " + "use quit() to terminate it"); } } }
// If the server returned 304 AND we delivered a response already, // we're done -- don't deliver a second identical response. if (networkResponse.notModified && request.hasHadResponseDelivered()) { request.finish("not-modified"); request.notifyListenerResponseNotUsable(); return; }
// Parse the response here on the worker thread. Response<?> response = request.parseNetworkResponse(networkResponse); request.addMarker("network-parse-complete");
// Write to cache if applicable. // TODO: Only update cache metadata instead of entire record for 304s. if (request.shouldCache() && response.cacheEntry != null) { mCache.put(request.getCacheKey(), response.cacheEntry); request.addMarker("network-cache-written"); }
publicinterfaceResponseDelivery{ /** Parses a response from the network or cache and delivers it. */ voidpostResponse(Request<?> request, Response<?> response);
/** * Parses a response from the network or cache and delivers it. The provided Runnable will be * executed after delivery. */ voidpostResponse(Request<?> request, Response<?> response, Runnable runnable);
/** Posts an error for the given request. */ voidpostError(Request<?> request, VolleyError error); }