概述 EventBus是一个适用于Android和Java的开源库,使用发布者/订阅者模式进行松散耦合。EventBus使通信只需几行代码即可解耦类,从而简化了代码,消除了依赖关系并加快了应用程序的开发。EventBus容易混淆程序逻辑,且不支持跨进程。
使用 第一步,定义事件:
1 2 3 4 5 6 7 8 9 10 11 public class MessageEvent { private String message; public MessageEvent (String message) { this .message = message; } public void print () { Log.d(TAG, Thread.currentThread().getName() + ": " + message); } }
第二步,创建订阅者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Subscribe(threadMode = ThreadMode.MAIN) public void onEvent (MessageEvent event) { event.print(); } @Override protected void onStart () { super .onStart(); EventBus.getDefault().register(this ); } @Override protected void onStop () { EventBus.getDefault().unregister(this ); super .onStop(); }
第三步,发送事件:
1 EventBus.getDefault().post(new MessageEvent("hearing" ));
线程模型 事件模型是指事件订阅者所在线程和发布事件者所在线程的关系。
POSTING 事件的订阅和事件的发布处于同一线程。这是默认设置。该模式的开销最小,因为它完全避免了线程切换。使用此模式的事件处理程序必须快速返回,以避免阻塞可能是主线程的发布线程。
MAIN 在Android上,用户将在Android的主线程(UI线程)中被调用。如果发布线程是主线程,则将直接调用订阅方法,从而阻塞发布线程。否则,事件将排队等待传递(非阻塞)。如果不在Android上,则行为与POSTING相同。
MAIN_ORDERED 在Android上,用户将在Android的主线程(UI线程)中被调用。与MAIN不同,事件将始终排队等待传递。这确保了post调用是非阻塞的。
BACKGROUND 在Android上,用户将在后台线程中被调用。如果发布线程不是主线程,则将在发布线程中直接调用订阅方方法。如果发布线程是主线程,则EventBus使用单个后台线程,该线程将按顺序传递其所有事件。如果不在Android上,则始终使用后台线程。
ASYNC 不管是否在UI线程产生事件,都会在单独的子线程中消费事件,即每个订阅方法都会在一个单独的线程中执行。EventBus使用线程池重用来已完成的线程。
事件类型 普通事件 普通事件是指已有的事件订阅者能够收到事件发送者发送的事件,在事件发送之后注册的事件接收者将无法收到事件。发送普通事件可以调用EventBus.getDefault().post()
方法进行发送。
粘性事件 粘性事件是指,不管是在事件发送之前注册的事件接收者还是在事件发送之后注册的事件接收者都能够收到事件。这里于普通事件的区别之处在于事件接收处需要定义事件接收类型,它可以通过@Subscribe(threadMode = xxx, sticky = true)
的方式进行声明;在事件发送时需要调用EventBus.getDefault().postSticky()
方法进行发送。事件类型默认为普通事件。
事件优先级 订阅者优先级以影响事件传递顺序。在同一传递线程ThreadMode中,优先级较高的订阅者将在优先级较低的其他订阅者之前接收事件。默认优先级为0。注意:优先级不影响具有不同ThreadMode的订阅服务器之间的传递顺序。
数据结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;private final Map<Object, List<Class<?>>> typesBySubscriber;private final Map<Class<?>, Object> stickyEvents;public class SubscriberMethod { final Method method; final ThreadMode threadMode; final Class<?> eventType; final int priority; final boolean sticky; String methodString; } final class Subscription { final Object subscriber; final SubscriberMethod subscriberMethod; volatile boolean active; }
EventBusAnnotationProcessor EventBusAnnotationProcessor是EventBus提供的注解处理器,它用来在编译期通过读取@Subscribe()注解并解析,处理其中所包含的信息,然后生成类似于EventBusIndex.java的类来保存所有订阅者关于订阅的信息。可以在gradle脚本中添加:
1 2 3 4 5 6 7 8 9 10 11 12 13 android { defaultConfig { javaCompileOptions { annotationProcessorOptions { arguments = [eventBusIndex: 'org.greenrobot.eventbus.MyEventBusIndex' , verbose: "true" ] } } } } dependencies { annotationProcessor 'org.greenrobot:eventbus-annotation-processor:3.2.0' }
生成的MyEventBusIndex类如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package org.greenrobot.eventbus;import org.greenrobot.eventbus.meta.SimpleSubscriberInfo;import org.greenrobot.eventbus.meta.SubscriberMethodInfo;import org.greenrobot.eventbus.meta.SubscriberInfo;import org.greenrobot.eventbus.meta.SubscriberInfoIndex;import org.greenrobot.eventbus.ThreadMode;import java.util.HashMap;import java.util.Map;public class MyEventBusIndex implements SubscriberInfoIndex { private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX; static { SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>(); putIndex(new SimpleSubscriberInfo(com.hearing.eventbusdemo.MainActivity.class, true , new SubscriberMethodInfo[] { new SubscriberMethodInfo("onEvent1" , com.hearing.eventbusdemo.MessageEvent.class, ThreadMode.MAIN), new SubscriberMethodInfo("onEvent2" , com.hearing.eventbusdemo.MessageEvent.class, ThreadMode.MAIN), })); } private static void putIndex (SubscriberInfo info) { SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info); } @Override public SubscriberInfo getSubscriberInfo (Class<?> subscriberClass) { SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass); if (info != null ) { return info; } else { return null ; } } }
EventBusAnnotationProcessor其实就是通过注解处理器的方式,在编译器生成一个记录了订阅者方法新的的类,具体生成过程可以参考源码,注解相关的可以看这里 。
使用时需要添加index,否则使用时依旧用的是反射的方式处理注解:
1 mEventBus = EventBus.builder().addIndex(new MyEventBusIndex()).build();
EventBus创建 getDefault 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 static volatile EventBus defaultInstance;private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();public static EventBus getDefault () { EventBus instance = defaultInstance; if (instance == null ) { synchronized (EventBus.class) { instance = EventBus.defaultInstance; if (instance == null ) { instance = EventBus.defaultInstance = new EventBus(); } } } return instance; } public EventBus () { this (DEFAULT_BUILDER); } EventBus(EventBusBuilder builder) { logger = builder.getLogger(); subscriptionsByEventType = new HashMap<>(); typesBySubscriber = new HashMap<>(); stickyEvents = new ConcurrentHashMap<>(); mainThreadSupport = builder.getMainThreadSupport(); mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this ) : null ; backgroundPoster = new BackgroundPoster(this ); asyncPoster = new AsyncPoster(this ); indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0 ; subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes, builder.strictMethodVerification, builder.ignoreGeneratedIndex); logSubscriberExceptions = builder.logSubscriberExceptions; logNoSubscriberMessages = builder.logNoSubscriberMessages; sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent; sendNoSubscriberEvent = builder.sendNoSubscriberEvent; throwSubscriberException = builder.throwSubscriberException; eventInheritance = builder.eventInheritance; executorService = builder.executorService; }
自定义 可以通过builder自定义参数,如添加index类。
1 mEventBus = EventBus.builder().addIndex(new MyEventBusIndex()).build();
register register 1 2 3 4 5 6 7 8 9 10 11 12 public void register (Object subscriber) { Class<?> subscriberClass = subscriber.getClass(); List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); synchronized (this ) { for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } } }
findSubscriberMethods SubscriberMethodFinder.java: 用来查找和缓存订阅者响应函数的信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class SubscriberMethodFinder { private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>(); SubscriberMethodFinder(List<SubscriberInfoIndex> subscriberInfoIndexes, boolean strictMethodVerification, boolean ignoreGeneratedIndex) { this .subscriberInfoIndexes = subscriberInfoIndexes; this .strictMethodVerification = strictMethodVerification; this .ignoreGeneratedIndex = ignoreGeneratedIndex; } List<SubscriberMethod> findSubscriberMethods (Class<?> subscriberClass) { List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass); if (subscriberMethods != null ) { return subscriberMethods; } if (ignoreGeneratedIndex) { subscriberMethods = findUsingReflection(subscriberClass); } else { subscriberMethods = findUsingInfo(subscriberClass); } if (subscriberMethods.isEmpty()) { throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation" ); } else { METHOD_CACHE.put(subscriberClass, subscriberMethods); return subscriberMethods; } } }
对于获取注册的订阅方法,首先就是通过缓存来获取,如果没有的话则通过以下两种方式进行获取:
EventBusAnnotationProcessor注解处理器在编译期通过读取@Subscribe()注解并解析,处理其中所包含的信息,然后生成MyEventBusIndex.java类来保存所有订阅者关于订阅的信息。
运行时使用反射来获得这些订阅者的信息。
很显然编译器间通过注解处理器读取订阅信息,避免运行时反射的方式,性能上更优。
findUsingReflection FindState FindState类: 保存了订阅者和订阅方法信息的一个实体类,包括订阅类中所有订阅的事件类型和所有的订阅方法等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 static class FindState { final List<SubscriberMethod> subscriberMethods = new ArrayList<>(); final Map<Class, Object> anyMethodByEventType = new HashMap<>(); final Map<String, Class> subscriberClassByMethodKey = new HashMap<>(); final StringBuilder methodKeyBuilder = new StringBuilder(128 ); Class<?> subscriberClass; Class<?> clazz; boolean skipSuperClasses; SubscriberInfo subscriberInfo; void initForSubscriber (Class<?> subscriberClass) { this .subscriberClass = clazz = subscriberClass; skipSuperClasses = false ; subscriberInfo = null ; } void moveToSuperclass () { if (skipSuperClasses) { clazz = null ; } else { clazz = clazz.getSuperclass(); String clazzName = clazz.getName(); if (clazzName.startsWith("java." ) || clazzName.startsWith("javax." ) || clazzName.startsWith("android." ) || clazzName.startsWith("androidx." )) { clazz = null ; } } } }
findUsingReflection 1 2 3 4 5 6 7 8 9 10 11 12 private List<SubscriberMethod> findUsingReflection (Class<?> subscriberClass) { FindState findState = prepareFindState(); findState.initForSubscriber(subscriberClass); while (findState.clazz != null ) { findUsingReflectionInSingleClass(findState); findState.moveToSuperclass(); } return getMethodsAndRelease(findState); }
prepareFindState 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];private static final int POOL_SIZE = 4 ;private FindState prepareFindState () { synchronized (FIND_STATE_POOL) { for (int i = 0 ; i < POOL_SIZE; i++) { FindState state = FIND_STATE_POOL[i]; if (state != null ) { FIND_STATE_POOL[i] = null ; return state; } } } return new FindState(); }
findUsingReflectionInSingleClass 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC; private static final int BRIDGE = 0x40 ;private static final int SYNTHETIC = 0x1000 ;private void findUsingReflectionInSingleClass (FindState findState) { Method[] methods; try { methods = findState.clazz.getDeclaredMethods(); } catch (Throwable th) { try { methods = findState.clazz.getMethods(); } catch (LinkageError error) { throw new EventBusException(msg, error); } findState.skipSuperClasses = true ; } for (Method method : methods) { int modifiers = method.getModifiers(); if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0 ) { Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 1 ) { Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); if (subscribeAnnotation != null ) { Class<?> eventType = parameterTypes[0 ]; if (findState.checkAdd(method, eventType)) { ThreadMode threadMode = subscribeAnnotation.threadMode(); findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); } } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException("@Subscribe method " + methodName + "must have exactly 1 parameter but has " + parameterTypes.length); } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException(methodName + " is a illegal @Subscribe method: must be public, non-static, and non-abstract" ); } } }
getMethodsAndRelease 1 2 3 4 5 6 7 8 9 10 11 12 13 private List<SubscriberMethod> getMethodsAndRelease (FindState findState) { List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods); findState.recycle(); synchronized (FIND_STATE_POOL) { for (int i = 0 ; i < POOL_SIZE; i++) { if (FIND_STATE_POOL[i] == null ) { FIND_STATE_POOL[i] = findState; break ; } } } return subscriberMethods; }
findUsingInfo 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private List<SubscriberMethod> findUsingInfo (Class<?> subscriberClass) { FindState findState = prepareFindState(); findState.initForSubscriber(subscriberClass); while (findState.clazz != null ) { findState.subscriberInfo = getSubscriberInfo(findState); if (findState.subscriberInfo != null ) { SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods(); for (SubscriberMethod subscriberMethod : array) { if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) { findState.subscriberMethods.add(subscriberMethod); } } } else { findUsingReflectionInSingleClass(findState); } findState.moveToSuperclass(); } return getMethodsAndRelease(findState); } private SubscriberInfo getSubscriberInfo (FindState findState) { if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null ) { SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo(); if (findState.clazz == superclassInfo.getSubscriberClass()) { return superclassInfo; } } if (subscriberInfoIndexes != null ) { for (SubscriberInfoIndex index : subscriberInfoIndexes) { SubscriberInfo info = index.getSubscriberInfo(findState.clazz); if (info != null ) { return info; } } } return null ; }
subscribe 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;private final Map<Object, List<Class<?>>> typesBySubscriber;private final Map<Class<?>, Object> stickyEvents;private void subscribe (Object subscriber, SubscriberMethod subscriberMethod) { Class<?> eventType = subscriberMethod.eventType; Subscription newSubscription = new Subscription(subscriber, subscriberMethod); CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null ) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } int size = subscriptions.size(); for (int i = 0 ; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break ; } } List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null ) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); if (subscriberMethod.sticky) { if (eventInheritance) { Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet(); for (Map.Entry<Class<?>, Object> entry : entries) { Class<?> candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } private void checkPostStickyEventToSubscription (Subscription newSubscription, Object stickyEvent) { if (stickyEvent != null ) { postToSubscription(newSubscription, stickyEvent, isMainThread()); } }
订阅流程:
首先获取订阅方法的参数类型即订阅事件类型;
根据订阅事件类型获取该事件类型的所有订阅者;
将该订阅者添加到该事件类型的订阅者集合中,即:subscriptionsByEventType;
获取订阅者所有的订阅事件类型;
将该事件类型添加到该订阅者的订阅事件类型集中即:typesBySubscriber。
post PostingThreadState 1 2 3 4 5 6 7 8 9 10 11 final static class PostingThreadState { final List<Object> eventQueue = new ArrayList<>(); boolean isPosting; boolean isMainThread; Subscription subscription; Object event; boolean canceled; }
post 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() { @Override protected PostingThreadState initialValue () { return new PostingThreadState(); } }; public void post (Object event) { PostingThreadState postingState = currentPostingThreadState.get(); List<Object> eventQueue = postingState.eventQueue; eventQueue.add(event); if (!postingState.isPosting) { postingState.isMainThread = isMainThread(); postingState.isPosting = true ; if (postingState.canceled) { throw new EventBusException("Internal error. Abort state was not reset" ); } try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0 ), postingState); } } finally { postingState.isPosting = false ; postingState.isMainThread = false ; } } }
postSingleEvent 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<>();private void postSingleEvent (Object event, PostingThreadState postingState) throws Error { Class<?> eventClass = event.getClass(); boolean subscriptionFound = false ; if (eventInheritance) { List<Class<?>> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0 ; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } if (!subscriptionFound) { if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this , event)); } } } private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) { synchronized (eventTypesCache) { List<Class<?>> eventTypes = eventTypesCache.get(eventClass); if (eventTypes == null ) { eventTypes = new ArrayList<>(); Class<?> clazz = eventClass; while (clazz != null ) { eventTypes.add(clazz); addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } eventTypesCache.put(eventClass, eventTypes); } return eventTypes; } } static void addInterfaces (List<Class<?>> eventTypes, Class<?>[] interfaces) { for (Class<?> interfaceClass : interfaces) { if (!eventTypes.contains(interfaceClass)) { eventTypes.add(interfaceClass); addInterfaces(eventTypes, interfaceClass.getInterfaces()); } } } private boolean postSingleEventForEventType (Object event, PostingThreadState postingState, Class<?> eventClass) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this ) { subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted; try { postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null ; postingState.subscription = null ; postingState.canceled = false ; } if (aborted) { break ; } } return true ; } return false ; }
postToSubscription 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 private void postToSubscription (Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case POSTING: invokeSubscriber(subscription, event); break ; case MAIN: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break ; case MAIN_ORDERED: if (mainThreadPoster != null ) { mainThreadPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break ; case BACKGROUND: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break ; case ASYNC: asyncPoster.enqueue(subscription, event); break ; default : throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
invokeSubscriber 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 void invokeSubscriber (PendingPost pendingPost) { Object event = pendingPost.event; Subscription subscription = pendingPost.subscription; PendingPost.releasePendingPost(pendingPost); if (subscription.active) { invokeSubscriber(subscription, event); } } void invokeSubscriber (Subscription subscription, Object event) { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception" , e); } } private void handleSubscriberException (Subscription subscription, Object event, Throwable cause) { if (event instanceof SubscriberExceptionEvent) { if (logSubscriberExceptions) { logger.log(Level.SEVERE, "SubscriberExceptionEvent subscriber " + subscription.subscriber.getClass() + " threw an exception" , cause); SubscriberExceptionEvent exEvent = (SubscriberExceptionEvent) event; logger.log(Level.SEVERE, "Initial event " + exEvent.causingEvent + " caused exception in " + exEvent.causingSubscriber, exEvent.throwable); } } else { if (throwSubscriberException) { throw new EventBusException("Invoking subscriber failed" , cause); } if (logSubscriberExceptions) { logger.log(Level.SEVERE, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), cause); } if (sendSubscriberExceptionEvent) { SubscriberExceptionEvent exEvent = new SubscriberExceptionEvent(this , cause, event, subscription.subscriber); post(exEvent); } } }
PendingPostQueue 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 final class PendingPostQueue { private PendingPost head; private PendingPost tail; synchronized void enqueue (PendingPost pendingPost) { if (pendingPost == null ) { throw new NullPointerException("null cannot be enqueued" ); } if (tail != null ) { tail.next = pendingPost; tail = pendingPost; } else if (head == null ) { head = tail = pendingPost; } else { throw new IllegalStateException("Head present, but no tail" ); } notifyAll(); } synchronized PendingPost poll () { PendingPost pendingPost = head; if (head != null ) { head = head.next; if (head == null ) { tail = null ; } } return pendingPost; } synchronized PendingPost poll (int maxMillisToWait) throws InterruptedException { if (head == null ) { wait(maxMillisToWait); } return poll(); } }
HandlerPoster 创建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class AndroidHandlerMainThreadSupport implements MainThreadSupport { private final Looper looper; public AndroidHandlerMainThreadSupport (Looper looper) { this .looper = looper; } @Override public boolean isMainThread () { return looper == Looper.myLooper(); } @Override public Poster createPoster (EventBus eventBus) { return new HandlerPoster(eventBus, looper, 10 ); } }
HandlerPoster 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public class HandlerPoster extends Handler implements Poster { private final PendingPostQueue queue; private final int maxMillisInsideHandleMessage; private final EventBus eventBus; private boolean handlerActive; protected HandlerPoster (EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) { super (looper); this .eventBus = eventBus; this .maxMillisInsideHandleMessage = maxMillisInsideHandleMessage; queue = new PendingPostQueue(); } @Override public void enqueue (Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this ) { queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true ; if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message" ); } } } } @Override public void handleMessage (Message msg) { boolean rescheduled = false ; try { long started = SystemClock.uptimeMillis(); while (true ) { PendingPost pendingPost = queue.poll(); if (pendingPost == null ) { synchronized (this ) { pendingPost = queue.poll(); if (pendingPost == null ) { handlerActive = false ; return ; } } } eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message" ); } rescheduled = true ; return ; } } } finally { handlerActive = rescheduled; } } }
PendingPost PendingPost类似于Android的Message,里面保存了subscription和event,且也使用了享元模式共享PendingPost池。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 final class PendingPost { private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>(); Object event; Subscription subscription; PendingPost next; private PendingPost (Object event, Subscription subscription) { this .event = event; this .subscription = subscription; } static PendingPost obtainPendingPost (Subscription subscription, Object event) { synchronized (pendingPostPool) { int size = pendingPostPool.size(); if (size > 0 ) { PendingPost pendingPost = pendingPostPool.remove(size - 1 ); pendingPost.event = event; pendingPost.subscription = subscription; pendingPost.next = null ; return pendingPost; } } return new PendingPost(event, subscription); } static void releasePendingPost (PendingPost pendingPost) { pendingPost.event = null ; pendingPost.subscription = null ; pendingPost.next = null ; synchronized (pendingPostPool) { if (pendingPostPool.size() < 10000 ) { pendingPostPool.add(pendingPost); } } } }
BackgroundPoster 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 final class BackgroundPoster implements Runnable , Poster { private final PendingPostQueue queue; private final EventBus eventBus; private volatile boolean executorRunning; BackgroundPoster(EventBus eventBus) { this .eventBus = eventBus; queue = new PendingPostQueue(); } @Override public void enqueue (Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this ) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true ; eventBus.getExecutorService().execute(this ); } } } @Override public void run () { try { try { while (true ) { PendingPost pendingPost = queue.poll(1000 ); if (pendingPost == null ) { synchronized (this ) { pendingPost = queue.poll(); if (pendingPost == null ) { executorRunning = false ; return ; } } } eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted" , e); } } finally { executorRunning = false ; } } }
AsyncPoster 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class AsyncPoster implements Runnable , Poster { private final PendingPostQueue queue; private final EventBus eventBus; AsyncPoster(EventBus eventBus) { this .eventBus = eventBus; queue = new PendingPostQueue(); } @Override public void enqueue (Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); queue.enqueue(pendingPost); eventBus.getExecutorService().execute(this ); } @Override public void run () { PendingPost pendingPost = queue.poll(); if (pendingPost == null ) { throw new IllegalStateException("No pending post available" ); } eventBus.invokeSubscriber(pendingPost); } }
postSticky 1 2 3 4 5 6 7 8 public void postSticky (Object event) { synchronized (stickyEvents) { stickyEvents.put(event.getClass(), event); } post(event); }
unregister 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public synchronized void unregister (Object subscriber) { List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber); if (subscribedTypes != null ) { for (Class<?> eventType : subscribedTypes) { unsubscribeByEventType(subscriber, eventType); } typesBySubscriber.remove(subscriber); } else { logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } } private void unsubscribeByEventType (Object subscriber, Class<?> eventType) { List<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions != null ) { int size = subscriptions.size(); for (int i = 0 ; i < size; i++) { Subscription subscription = subscriptions.get(i); if (subscription.subscriber == subscriber) { subscription.active = false ; subscriptions.remove(i); i--; size--; } } } }
取消订阅的流程:
首先获取订阅者的所有订阅事件;
遍历订阅事件;
根据订阅事件获取所有的订阅了该事件的订阅者集合;
从订阅者集合中将该订阅者移除;
将步骤1中的集合中的订阅者移除。
支持跨进程的EventBus 概述 EventBus不支持跨进程传输,可以通过AIDL结合EventBus来实现跨进程。主进程有一个Service,负责维护监听器列表,以及监听相关事件,并发送给子进程;子进程需要绑定主进程的Service,并注册监听;主进程和子进程内的事件通过EventBus发送,需要跨进程的通过Binder转发。
接下来的实例要实现:
主进程发送事件,主进程和子进程的订阅者都接收到订阅事件;
子进程发送事件,主进程和子进程的订阅者都接收到订阅事件。
注:自定义的Message数据类需要实现Parcelable或Serializable接口,示例代码 。
AIDL IEventInterface.aidl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 package com.hearing.eventbusdemo.eventbus;import com.hearing.eventbusdemo.eventbus.IEventCallback;import android.os.Bundle;interface IEventInterface { void register (IEventCallback callback) ; void unregister (IEventCallback callback) ; void notify (in Bundle event) ; }
IEventCallback.aidl
1 2 3 4 5 6 7 8 9 package com.hearing.eventbusdemo.eventbus;import android.os.Bundle;interface IEventCallback { void notifyEvent (in Bundle event) ; }
EventBus EventWrapper.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class EventWrapper { public Bundle mBundle; public EventWrapper (Bundle bundle) { mBundle = bundle; } @NonNull @Override public String toString () { return mBundle == null ? "null" : mBundle.toString(); } }
MyEventBus.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class MyEventBus extends EventBus { private static final String KEY = "key" ; private MyEventBus () { } private static class SingleTon { private static MyEventBus sInstance = new MyEventBus(); } public static MyEventBus getInstance () { return SingleTon.sInstance; } @Override public void post (Object event) { super .post(event); Bundle bundle = new Bundle(); if (event instanceof Parcelable) { bundle.putParcelable(KEY, (Parcelable) event); super .post(new EventWrapper(bundle)); } else if (event instanceof Serializable) { bundle.putSerializable(KEY, (Serializable) event); super .post(new EventWrapper(bundle)); } } public void postSingle (Object event) { super .post(event); } public Object unPack (@NonNull Bundle event) { return event.get(KEY); } }
LocalService 主进程的Service:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 public class LocalService extends Service { private final RemoteCallbackList<IEventCallback> mRemoteCallbackList = new RemoteCallbackList<>(); private Binder mBinder = new IEventInterface.Stub() { @Override public void register (IEventCallback callback) throws RemoteException { Log.d(TAG, "LocalService register: " + callback); mRemoteCallbackList.register(callback); } @Override public void unregister (IEventCallback callback) throws RemoteException { Log.d(TAG, "LocalService unregister: " + callback); mRemoteCallbackList.unregister(callback); } @Override public void notify (Bundle event) throws RemoteException { Log.d(TAG, "LocalService notify: " + event); MyEventBus.getInstance().postSingle(MyEventBus.getInstance().unPack(event)); } }; @Subscribe public void handle (EventWrapper wrapper) { Log.v(TAG, "LocalService handle: " + wrapper); synchronized (mRemoteCallbackList) { int n = mRemoteCallbackList.beginBroadcast(); try { for (int i = 0 ; i < n; i++) { mRemoteCallbackList.getBroadcastItem(i).notifyEvent(wrapper.mBundle); } } catch (RemoteException e) { e.printStackTrace(); } mRemoteCallbackList.finishBroadcast(); } } @Subscribe public void onEvent (String event) { Log.d(TAG, "LocalService onEvent: " + event); } @Subscribe public void onEvent1 (Integer event) { Log.d(TAG, "LocalService onEvent: " + event); } @Override public void onCreate () { super .onCreate(); MyEventBus.getInstance().register(this ); new Thread(new Runnable() { @Override public void run () { Utils.sleep(1000 ); MyEventBus.getInstance().post("A message from main process at " + System.currentTimeMillis()); } }).start(); } @Override public void onDestroy () { MyEventBus.getInstance().unregister(this ); super .onDestroy(); } @Nullable @Override public IBinder onBind (Intent intent) { return mBinder; } }
RemoteService 子进程的Service:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 public class RemoteService extends Service { private IEventInterface mEventInterface; private IEventCallback mEventCallback = new IEventCallback.Stub() { @Override public void notifyEvent (Bundle event) throws RemoteException { Log.d(TAG, "RemoteService notifyEvent: " + event); MyEventBus.getInstance().postSingle(MyEventBus.getInstance().unPack(event)); } }; @Override public void onCreate () { super .onCreate(); MyEventBus.getInstance().register(this ); bindService(new Intent(this , LocalService.class), new ServiceConnection() { @Override public void onServiceConnected (ComponentName name, IBinder service) { Log.d(TAG, "RemoteService onServiceConnected" ); try { mEventInterface = IEventInterface.Stub.asInterface(service); mEventInterface.register(mEventCallback); } catch (Exception e) { e.printStackTrace(); } } @Override public void onServiceDisconnected (ComponentName name) { Log.d(TAG, "RemoteService onServiceConnected" ); try { mEventInterface.unregister(mEventCallback); } catch (Exception e) { e.printStackTrace(); } } }, BIND_AUTO_CREATE); new Thread(new Runnable() { @Override public void run () { Utils.sleep(2000 ); MyEventBus.getInstance().post(100 ); } }).start(); } @Subscribe public void handle (EventWrapper wrapper) { Log.v(TAG, "RemoteService handle: " + wrapper); try { mEventInterface.notify(wrapper.mBundle); } catch (RemoteException e) { e.printStackTrace(); } } @Subscribe public void onEvent (String event) { Log.d(TAG, "RemoteService onEvent: " + event); } @Subscribe public void onEvent1 (Integer event) { Log.d(TAG, "RemoteService onEvent: " + event); } @Override public void onDestroy () { MyEventBus.getInstance().unregister(this ); super .onDestroy(); } @Nullable @Override public IBinder onBind (Intent intent) { return null ; } }
总结 通过一个AIDL文件实现通用的跨进程接口调用 。