0%

Android-EventBus

概述

EventBus是一个适用于Android和Java的开源库,使用发布者/订阅者模式进行松散耦合。EventBus使通信只需几行代码即可解耦类,从而简化了代码,消除了依赖关系并加快了应用程序的开发。EventBus容易混淆程序逻辑,且不支持跨进程。

使用

第一步,定义事件:

1
2
3
4
5
6
7
8
9
10
11
public class MessageEvent {
private String message;

public MessageEvent(String message) {
this.message = message;
}

public void print() {
Log.d(TAG, Thread.currentThread().getName() + ": " + message);
}
}

第二步,创建订阅者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Subscribe(threadMode = ThreadMode.MAIN)
public void onEvent(MessageEvent event) {
event.print();
}

@Override
protected void onStart() {
super.onStart();
EventBus.getDefault().register(this);
}

@Override
protected void onStop() {
EventBus.getDefault().unregister(this);
super.onStop();
}

第三步,发送事件:

1
EventBus.getDefault().post(new MessageEvent("hearing"));

线程模型

事件模型是指事件订阅者所在线程和发布事件者所在线程的关系。

POSTING

事件的订阅和事件的发布处于同一线程。这是默认设置。该模式的开销最小,因为它完全避免了线程切换。使用此模式的事件处理程序必须快速返回,以避免阻塞可能是主线程的发布线程。

MAIN

在Android上,用户将在Android的主线程(UI线程)中被调用。如果发布线程是主线程,则将直接调用订阅方法,从而阻塞发布线程。否则,事件将排队等待传递(非阻塞)。如果不在Android上,则行为与POSTING相同。

MAIN_ORDERED

在Android上,用户将在Android的主线程(UI线程)中被调用。与MAIN不同,事件将始终排队等待传递。这确保了post调用是非阻塞的。

BACKGROUND

在Android上,用户将在后台线程中被调用。如果发布线程不是主线程,则将在发布线程中直接调用订阅方方法。如果发布线程是主线程,则EventBus使用单个后台线程,该线程将按顺序传递其所有事件。如果不在Android上,则始终使用后台线程。

ASYNC

不管是否在UI线程产生事件,都会在单独的子线程中消费事件,即每个订阅方法都会在一个单独的线程中执行。EventBus使用线程池重用来已完成的线程。

事件类型

普通事件

普通事件是指已有的事件订阅者能够收到事件发送者发送的事件,在事件发送之后注册的事件接收者将无法收到事件。发送普通事件可以调用EventBus.getDefault().post()方法进行发送。

粘性事件

粘性事件是指,不管是在事件发送之前注册的事件接收者还是在事件发送之后注册的事件接收者都能够收到事件。这里于普通事件的区别之处在于事件接收处需要定义事件接收类型,它可以通过@Subscribe(threadMode = xxx, sticky = true)的方式进行声明;在事件发送时需要调用EventBus.getDefault().postSticky()方法进行发送。事件类型默认为普通事件。

事件优先级

订阅者优先级以影响事件传递顺序。在同一传递线程ThreadMode中,优先级较高的订阅者将在优先级较低的其他订阅者之前接收事件。默认优先级为0。注意:优先级不影响具有不同ThreadMode的订阅服务器之间的传递顺序。

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// key: 订阅方法类型(MessageEvent.class)
// value: 所有订阅了该类型的订阅者集合
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;

// key: 订阅者(MainActivity.this)
// value: 订阅事件集合(MessageEvent.class...)
private final Map<Object, List<Class<?>>> typesBySubscriber;

// 粘滞事件集合
// key: 订阅方法类型(MessageEvent.class)
// value: 订阅事件具体实例(MessageEvent messageEvent)
private final Map<Class<?>, Object> stickyEvents;

public class SubscriberMethod {
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;
/** Used for efficient comparison */
String methodString;

// ...
}

// 订阅方法描述,实体类(当前类中的订阅方法)
final class Subscription {
final Object subscriber; // 订阅者(MainActivity.this)
final SubscriberMethod subscriberMethod; // 订阅方法
/**
* Becomes false as soon as {@link EventBus#unregister(Object)} is called, which is checked by queued event delivery
* {@link EventBus#invokeSubscriber(PendingPost)} to prevent race conditions.
*/
volatile boolean active;
}

EventBusAnnotationProcessor

EventBusAnnotationProcessor是EventBus提供的注解处理器,它用来在编译期通过读取@Subscribe()注解并解析,处理其中所包含的信息,然后生成类似于EventBusIndex.java的类来保存所有订阅者关于订阅的信息。可以在gradle脚本中添加:

1
2
3
4
5
6
7
8
9
10
11
12
13
android {
defaultConfig {
javaCompileOptions {
annotationProcessorOptions {
arguments = [eventBusIndex: 'org.greenrobot.eventbus.MyEventBusIndex', verbose: "true"]
}
}
}
}

dependencies {
annotationProcessor 'org.greenrobot:eventbus-annotation-processor:3.2.0'
}

生成的MyEventBusIndex类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package org.greenrobot.eventbus;

import org.greenrobot.eventbus.meta.SimpleSubscriberInfo;
import org.greenrobot.eventbus.meta.SubscriberMethodInfo;
import org.greenrobot.eventbus.meta.SubscriberInfo;
import org.greenrobot.eventbus.meta.SubscriberInfoIndex;

import org.greenrobot.eventbus.ThreadMode;

import java.util.HashMap;
import java.util.Map;

/** This class is generated by EventBus, do not edit. */
public class MyEventBusIndex implements SubscriberInfoIndex {
private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;

static {
SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();

putIndex(new SimpleSubscriberInfo(com.hearing.eventbusdemo.MainActivity.class, true,
new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onEvent1", com.hearing.eventbusdemo.MessageEvent.class, ThreadMode.MAIN),
new SubscriberMethodInfo("onEvent2", com.hearing.eventbusdemo.MessageEvent.class, ThreadMode.MAIN),
}));

}

private static void putIndex(SubscriberInfo info) {
SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
}

@Override
public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
if (info != null) {
return info;
} else {
return null;
}
}
}

EventBusAnnotationProcessor其实就是通过注解处理器的方式,在编译器生成一个记录了订阅者方法新的的类,具体生成过程可以参考源码,注解相关的可以看这里

使用时需要添加index,否则使用时依旧用的是反射的方式处理注解:

1
mEventBus = EventBus.builder().addIndex(new MyEventBusIndex()).build();

EventBus创建

getDefault

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static volatile EventBus defaultInstance;
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();

public static EventBus getDefault() {
EventBus instance = defaultInstance;
if (instance == null) {
synchronized (EventBus.class) {
instance = EventBus.defaultInstance;
if (instance == null) {
instance = EventBus.defaultInstance = new EventBus();
}
}
}
return instance;
}

public EventBus() {
this(DEFAULT_BUILDER);
}

EventBus(EventBusBuilder builder) {
logger = builder.getLogger();
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}

自定义

可以通过builder自定义参数,如添加index类。

1
mEventBus = EventBus.builder().addIndex(new MyEventBusIndex()).build();

register

register

1
2
3
4
5
6
7
8
9
10
11
12
public void register(Object subscriber) {
// 获取订阅的类对象
Class<?> subscriberClass = subscriber.getClass();
// 获取订阅者所有的订阅方法,即以@Subscribe为注解的一些public方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
// 依次注册这些订阅方法
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}

findSubscriberMethods

SubscriberMethodFinder.java: 用来查找和缓存订阅者响应函数的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class SubscriberMethodFinder {
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();

SubscriberMethodFinder(List<SubscriberInfoIndex> subscriberInfoIndexes, boolean strictMethodVerification,
boolean ignoreGeneratedIndex) {
this.subscriberInfoIndexes = subscriberInfoIndexes; // default: null
this.strictMethodVerification = strictMethodVerification; // default: false
this.ignoreGeneratedIndex = ignoreGeneratedIndex; // default: false
}

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 从缓存中获取订阅方法
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
// 是否忽略注解处理器生成的MyEventBusIndex类
if (ignoreGeneratedIndex) {
// 利用反射来获取订阅类中的订阅方法信息
subscriberMethods = findUsingReflection(subscriberClass);
} else {
// 从注解器生成的MyEventBusIndex类中获得订阅类的订阅方法信息
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
// 添加缓存
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
}

对于获取注册的订阅方法,首先就是通过缓存来获取,如果没有的话则通过以下两种方式进行获取:

  1. EventBusAnnotationProcessor注解处理器在编译期通过读取@Subscribe()注解并解析,处理其中所包含的信息,然后生成MyEventBusIndex.java类来保存所有订阅者关于订阅的信息。
  2. 运行时使用反射来获得这些订阅者的信息。

很显然编译器间通过注解处理器读取订阅信息,避免运行时反射的方式,性能上更优。

findUsingReflection

FindState

FindState类: 保存了订阅者和订阅方法信息的一个实体类,包括订阅类中所有订阅的事件类型和所有的订阅方法等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static class FindState {
final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
final Map<Class, Object> anyMethodByEventType = new HashMap<>();
final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
final StringBuilder methodKeyBuilder = new StringBuilder(128);

Class<?> subscriberClass;
Class<?> clazz;
boolean skipSuperClasses;
SubscriberInfo subscriberInfo;

void initForSubscriber(Class<?> subscriberClass) {
this.subscriberClass = clazz = subscriberClass;
skipSuperClasses = false;
subscriberInfo = null;
}

void moveToSuperclass() {
if (skipSuperClasses) {
clazz = null;
} else {
clazz = clazz.getSuperclass();
String clazzName = clazz.getName();
if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") ||
clazzName.startsWith("android.") || clazzName.startsWith("androidx.")) {
clazz = null;
}
}
}
}

findUsingReflection

1
2
3
4
5
6
7
8
9
10
11
12
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
// FindState其实就是一个里面保存了订阅者和订阅方法信息的一个实体类
FindState findState = prepareFindState();
// 初始化findState
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 获取订阅方法
findUsingReflectionInSingleClass(findState);
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}

prepareFindState

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];
private static final int POOL_SIZE = 4;

private FindState prepareFindState() {
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
FindState state = FIND_STATE_POOL[i];
if (state != null) {
FIND_STATE_POOL[i] = null;
return state;
}
}
}
return new FindState();
}

findUsingReflectionInSingleClass

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC; // 5192
private static final int BRIDGE = 0x40;
private static final int SYNTHETIC = 0x1000;

private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// 获取本类声明的所有方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
try {
// 获取本类及父类的所有public方法
methods = findState.clazz.getMethods();
} catch (LinkageError error) {
// ...
throw new EventBusException(msg, error);
}
// 因为调用了getMethods,所以设置为true
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
// 订阅方法为public同时不是abstract、static
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
// 注解方法必须只有一个参数
if (parameterTypes.length == 1) {
// 获取订阅方法的注解
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
// 添加该注解方法
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}

getMethodsAndRelease

1
2
3
4
5
6
7
8
9
10
11
12
13
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
findState.recycle();
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
return subscriberMethods;
}

findUsingInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}

private SubscriberInfo getSubscriberInfo(FindState findState) {
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
return superclassInfo;
}
}
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}
return null;
}

subscribe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// key订阅方法类型(MessageEvent.class)——value所有订阅了该类型的订阅者集合
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;

// key订阅者(MainActivity.this)——value订阅事件集合(MessageEvent.class...)
private final Map<Object, List<Class<?>>> typesBySubscriber;
private final Map<Class<?>, Object> stickyEvents;

// 在synchronized中调用
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// 订阅方法的参数类型,也是事件类型
Class<?> eventType = subscriberMethod.eventType;
// 订阅方法描述,实体类(当前类中的订阅方法)
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}

int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
// 根据优先级,将订阅者插入到指定的位置
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// 获取订阅者所有订阅的事件类型
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);

// 如果接收sticky事件,立即分发sticky事件
if (subscriberMethod.sticky) {
// 默认为true,是否考虑eventType子类的sticky事件
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
// eventType是candidateEventType的父类或接口/eventType和candidateEventType是同一个类或接口
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}

private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
// If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
// --> Strange corner case, which we don't take care of here.
postToSubscription(newSubscription, stickyEvent, isMainThread());
}
}

订阅流程:

  • 首先获取订阅方法的参数类型即订阅事件类型;
  • 根据订阅事件类型获取该事件类型的所有订阅者;
  • 将该订阅者添加到该事件类型的订阅者集合中,即:subscriptionsByEventType;
  • 获取订阅者所有的订阅事件类型;
  • 将该事件类型添加到该订阅者的订阅事件类型集中即:typesBySubscriber。

post

PostingThreadState

1
2
3
4
5
6
7
8
9
10
11
final static class PostingThreadState {
// 当前线程的事件队列
final List<Object> eventQueue = new ArrayList<>();
// 是否有事件正在分发
boolean isPosting;
// post的线程是否是主线程
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}

post

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};

public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
// 将当前事件添加进当前进程的事件队列里
eventQueue.add(event);

if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
// 逐个post当前线程的队列中的事件
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

postSingleEvent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// key post的事件的类型(EventMessage.class)——value post事件类型本身及所有父类和接口
private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<>();

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
// post to 订阅了本事件类型及所有父类和接口的订阅者
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
// NoSubscriberEvent: This Event is posted by EventBus when no subscriber is found for a posted event.
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}

// 查找所有Class对象,包括超类和接口
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}

// 迭代添加指定类型的接口
static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
for (Class<?> interfaceClass : interfaces) {
if (!eventTypes.contains(interfaceClass)) {
eventTypes.add(interfaceClass);
addInterfaces(eventTypes, interfaceClass.getInterfaces());
}
}
}

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}

postToSubscription

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
// 直接在当前线程调用
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
// 发布者处于主线程则直接调用
invokeSubscriber(subscription, event);
} else {
// 发布者不处于主线程则入队列
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
// 任何时候都入队列
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
// 单独线程入队列
backgroundPoster.enqueue(subscription, event);
} else {
// 否则直接执行
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
// 异步入队列
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

invokeSubscriber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}

void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

private void handleSubscriberException(Subscription subscription, Object event, Throwable cause) {
if (event instanceof SubscriberExceptionEvent) {
if (logSubscriberExceptions) {
// Don't send another SubscriberExceptionEvent to avoid infinite event recursion, just log
logger.log(Level.SEVERE, "SubscriberExceptionEvent subscriber " + subscription.subscriber.getClass()
+ " threw an exception", cause);
SubscriberExceptionEvent exEvent = (SubscriberExceptionEvent) event;
logger.log(Level.SEVERE, "Initial event " + exEvent.causingEvent + " caused exception in "
+ exEvent.causingSubscriber, exEvent.throwable);
}
} else {
if (throwSubscriberException) {
throw new EventBusException("Invoking subscriber failed", cause);
}
if (logSubscriberExceptions) {
logger.log(Level.SEVERE, "Could not dispatch event: " + event.getClass() + " to subscribing class "
+ subscription.subscriber.getClass(), cause);
}
if (sendSubscriberExceptionEvent) {
SubscriberExceptionEvent exEvent = new SubscriberExceptionEvent(this, cause, event,
subscription.subscriber);
post(exEvent);
}
}
}

PendingPostQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
final class PendingPostQueue {
private PendingPost head;
private PendingPost tail;

synchronized void enqueue(PendingPost pendingPost) {
if (pendingPost == null) {
throw new NullPointerException("null cannot be enqueued");
}
if (tail != null) {
tail.next = pendingPost;
tail = pendingPost;
} else if (head == null) {
head = tail = pendingPost;
} else {
throw new IllegalStateException("Head present, but no tail");
}
notifyAll();
}

synchronized PendingPost poll() {
PendingPost pendingPost = head;
if (head != null) {
head = head.next;
if (head == null) {
tail = null;
}
}
return pendingPost;
}

synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
if (head == null) {
wait(maxMillisToWait);
}
return poll();
}
}

HandlerPoster

创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class AndroidHandlerMainThreadSupport implements MainThreadSupport {
// Looper.getMainLooper()
private final Looper looper;

public AndroidHandlerMainThreadSupport(Looper looper) {
this.looper = looper;
}

@Override
public boolean isMainThread() {
return looper == Looper.myLooper();
}

@Override
public Poster createPoster(EventBus eventBus) {
return new HandlerPoster(eventBus, looper, 10);
}
}

HandlerPoster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class HandlerPoster extends Handler implements Poster {

private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage; // 10s
private final EventBus eventBus;
private boolean handlerActive;

protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}

@Override
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}

@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
// 为了尽量避免ANR?
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}

PendingPost

PendingPost类似于Android的Message,里面保存了subscription和event,且也使用了享元模式共享PendingPost池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

Object event;
Subscription subscription;
PendingPost next;

private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}

static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}

static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}

BackgroundPoster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
final class BackgroundPoster implements Runnable, Poster {

private final PendingPostQueue queue;
private final EventBus eventBus;

private volatile boolean executorRunning;

BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}

@Override
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}

@Override
public void run() {
try {
try {
while (true) {
// 最多等待1s,超时抛出异常
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}

}

AsyncPoster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class AsyncPoster implements Runnable, Poster {

private final PendingPostQueue queue;
private final EventBus eventBus;

AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}

@Override
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}

@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}

postSticky

1
2
3
4
5
6
7
8
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
// 将粘性事件发送给已有的事件订阅者
post(event);
}

unregister

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public synchronized void unregister(Object subscriber) {
// 获取订阅者的所有订阅的事件类型
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
// 从事件类型的订阅者集合中移除订阅者
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}

private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
// 获取事件类型的所有订阅者
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}

取消订阅的流程:

  • 首先获取订阅者的所有订阅事件;
  • 遍历订阅事件;
  • 根据订阅事件获取所有的订阅了该事件的订阅者集合;
  • 从订阅者集合中将该订阅者移除;
  • 将步骤1中的集合中的订阅者移除。

支持跨进程的EventBus

概述

EventBus不支持跨进程传输,可以通过AIDL结合EventBus来实现跨进程。主进程有一个Service,负责维护监听器列表,以及监听相关事件,并发送给子进程;子进程需要绑定主进程的Service,并注册监听;主进程和子进程内的事件通过EventBus发送,需要跨进程的通过Binder转发。

接下来的实例要实现:

  • 主进程发送事件,主进程和子进程的订阅者都接收到订阅事件;
  • 子进程发送事件,主进程和子进程的订阅者都接收到订阅事件。

注:自定义的Message数据类需要实现Parcelable或Serializable接口,示例代码

AIDL

IEventInterface.aidl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// IEventInterface.aidl
package com.hearing.eventbusdemo.eventbus;

import com.hearing.eventbusdemo.eventbus.IEventCallback;
import android.os.Bundle;

interface IEventInterface {
// 子进程向主进程注册事件回调
void register(IEventCallback callback);
// 子进程解注册
void unregister(IEventCallback callback);
// 子进程向主进程通知事件
void notify(in Bundle event);
}

IEventCallback.aidl

1
2
3
4
5
6
7
8
9
// IEventCallback.aidl
package com.hearing.eventbusdemo.eventbus;

import android.os.Bundle;

interface IEventCallback {
// 主进程向子进程发送消息
void notifyEvent(in Bundle event);
}

EventBus

EventWrapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Event包装类,用来IPC转发
public class EventWrapper {
public Bundle mBundle;

public EventWrapper(Bundle bundle) {
mBundle = bundle;
}

@NonNull
@Override
public String toString() {
return mBundle == null ? "null" : mBundle.toString();
}
}

MyEventBus.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class MyEventBus extends EventBus {
private static final String KEY = "key";

private MyEventBus() {
}

private static class SingleTon {
private static MyEventBus sInstance = new MyEventBus();
}

public static MyEventBus getInstance() {
return SingleTon.sInstance;
}

// 转发到主进程和子进程
@Override
public void post(Object event) {
super.post(event);
Bundle bundle = new Bundle();
if (event instanceof Parcelable) {
bundle.putParcelable(KEY, (Parcelable) event);
super.post(new EventWrapper(bundle));
} else if (event instanceof Serializable) {
bundle.putSerializable(KEY, (Serializable) event);
super.post(new EventWrapper(bundle));
}
}

// 只发送到本进程
public void postSingle(Object event) {
super.post(event);
}

public Object unPack(@NonNull Bundle event) {
return event.get(KEY);
}
}

LocalService

主进程的Service:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public class LocalService extends Service {

private final RemoteCallbackList<IEventCallback> mRemoteCallbackList = new RemoteCallbackList<>();
private Binder mBinder = new IEventInterface.Stub() {
@Override
public void register(IEventCallback callback) throws RemoteException {
Log.d(TAG, "LocalService register: " + callback);
mRemoteCallbackList.register(callback);
}

@Override
public void unregister(IEventCallback callback) throws RemoteException {
Log.d(TAG, "LocalService unregister: " + callback);
mRemoteCallbackList.unregister(callback);
}

@Override
public void notify(Bundle event) throws RemoteException {
Log.d(TAG, "LocalService notify: " + event);
// 主进程收到子进程的事件后,通过EventBus转发给主进程的订阅者
MyEventBus.getInstance().postSingle(MyEventBus.getInstance().unPack(event));
}
};

// 桥梁:监听主进程发送的事件,并转发给子进程
@Subscribe
public void handle(EventWrapper wrapper) {
Log.v(TAG, "LocalService handle: " + wrapper);
synchronized (mRemoteCallbackList) {
int n = mRemoteCallbackList.beginBroadcast();
try {
for (int i = 0; i < n; i++) {
// 转发给子进程
mRemoteCallbackList.getBroadcastItem(i).notifyEvent(wrapper.mBundle);
}
} catch (RemoteException e) {
e.printStackTrace();
}
mRemoteCallbackList.finishBroadcast();
}
}

// 主进程的事件订阅,在接收到主进程的事件后,发给主进程的消息在这里处理,发给子进程的消息在handle方法中转发
@Subscribe
public void onEvent(String event) {
Log.d(TAG, "LocalService onEvent: " + event);
}

// 主进程的事件订阅,在接收到主进程的事件后,发给主进程的消息在这里处理,发给子进程的消息在handle方法中转发
@Subscribe
public void onEvent1(Integer event) {
Log.d(TAG, "LocalService onEvent: " + event);
}

@Override
public void onCreate() {
super.onCreate();
MyEventBus.getInstance().register(this);
new Thread(new Runnable() {
@Override
public void run() {
Utils.sleep(1000);
// 主进程发送事件
MyEventBus.getInstance().post("A message from main process at " + System.currentTimeMillis());
}
}).start();
}

@Override
public void onDestroy() {
MyEventBus.getInstance().unregister(this);
super.onDestroy();
}

@Nullable
@Override
public IBinder onBind(Intent intent) {
return mBinder;
}
}

RemoteService

子进程的Service:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class RemoteService extends Service {

private IEventInterface mEventInterface;

private IEventCallback mEventCallback = new IEventCallback.Stub() {
@Override
public void notifyEvent(Bundle event) throws RemoteException {
Log.d(TAG, "RemoteService notifyEvent: " + event);
// 收到主进程的转发后,将事件转发到本进程
MyEventBus.getInstance().postSingle(MyEventBus.getInstance().unPack(event));
}
};

@Override
public void onCreate() {
super.onCreate();
MyEventBus.getInstance().register(this);

bindService(new Intent(this, LocalService.class), new ServiceConnection() {
@Override
public void onServiceConnected(ComponentName name, IBinder service) {
Log.d(TAG, "RemoteService onServiceConnected");
try {
mEventInterface = IEventInterface.Stub.asInterface(service);
mEventInterface.register(mEventCallback);
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void onServiceDisconnected(ComponentName name) {
Log.d(TAG, "RemoteService onServiceConnected");
try {
mEventInterface.unregister(mEventCallback);
} catch (Exception e) {
e.printStackTrace();
}
}
}, BIND_AUTO_CREATE);

new Thread(new Runnable() {
@Override
public void run() {
Utils.sleep(2000);
MyEventBus.getInstance().post(100);
}
}).start();
}

// 桥梁:监听本进程发送的事件,并转发给主进程
@Subscribe
public void handle(EventWrapper wrapper) {
Log.v(TAG, "RemoteService handle: " + wrapper);
try {
mEventInterface.notify(wrapper.mBundle);
} catch (RemoteException e) {
e.printStackTrace();
}
}

// 主进程的事件订阅者
@Subscribe
public void onEvent(String event) {
Log.d(TAG, "RemoteService onEvent: " + event);
}

// 主进程的事件订阅者
@Subscribe
public void onEvent1(Integer event) {
Log.d(TAG, "RemoteService onEvent: " + event);
}

@Override
public void onDestroy() {
MyEventBus.getInstance().unregister(this);
super.onDestroy();
}

@Nullable
@Override
public IBinder onBind(Intent intent) {
return null;
}
}

总结

通过一个AIDL文件实现通用的跨进程接口调用

EventBus工作流程