对于Unirx中MessageBroker一些思考
使用方法
MessageBroker
相当于一个订阅发布中心,提供事件驱动、解耦的架构,其主要由三个部分组成:
Publisher
:消息发布者Receiver
:消息接收者,接收由发布者产生的消息。MessageBroker
:所有事件的协调者。
其 API
包含两个主要方法:1
2IObservable<TMessage> MessageBroker.Default.Receive<TMessage>
void MessageBroker.Default.Publish<TMessage>
其中,TMessage
是要发布或者接收的类型。
首先,我们需要创建一个事件类1
2
3
4
5
6
7
8
9
10
11public class GameEvent
{
public const string Dragon = nameof(Dragon);
public const string HappyHours = nameof(HappyHours);
public string Name { get; }
public GameEvent(string name) {
Name = name;
}
}
接着我们需要一个 Publisher
1
2
3
4
5
6public class Player : MonoBehaviour
{
private void OnDragonNotice() {
MessageBroker.Default.Publish(new GameEvent(GameEvent.Dragon));
}
}
同时创建一些接收事件的 Receiver
通知村庄逃跑:1
2
3
4
5
6
7
8
9
10
11
12
13
14public class Villager : MonoBehaviour
{
void Awake() {
MessageBroker.Default
.Receive<GameEvent>()
.Where(x => x.Name == GameEvent.Dragon)
.Subscribe(_ => Flee())
.AddTo(this);
}
void Flee() {
// 尽可能快地逃跑
}
}
播放音频:1
2
3
4
5
6
7
8
9
10public class AudioPlayer : MonoBehaviour
{
void Awake() {
MessageBroker.Default
.Receive<GameEvent>()
.Where(x => x.Name == GameEvent.Dragon)
.Subscribe(_ => PlayGlobalEventSound())
.AddTo(this);
}
}
此处通过 MessageBroker
,就实现了脚本逻辑之间的解耦,Player
不必持有 Villager
和 AudioPlayer
的引用,也不必关心其实现,三者只需要知道事件发生了即可。
我们可以使用 DragonGameEvent
和 HappyHoursGameEvent
类,可以使用基于类型判断而且代替基于字符串的判断1
2public class DragonGameEvent : GameEvent { }
public class HappyHoursGameEvent : GameEvent { }
这样我们就可以监听具体类型的事件:1
2
3
4private void Awake() {
MessageBroker.Default.Receive<DragonGameEvent>()
.Subscribe(_ => PlayDragonSound() );
}
如果需要处理额外数据:1
2
3
4
5
6
7
8
9
10
11
12
13public class AudioPlayer : MonoBehaviour
{
private void Awake() {
MessageBroker.Default.Receive<DragonGameEvent>()
.Subscribe(message => PlayDragonSound(message) );
}
private void PlayDragonSound(GameEvent gameEvent) {
if (gameEvent is DragonGameEvent dragonEvent) {
// 播放龙事件音效
}
}
}
对其 AddTo 绑定生命周期的研究
首先我们来看两个示例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40// EventCenterTest.cs
public class EventCenterTest : MonoBehaviour
{
void Start()
{
// 此处没有 AddTo 绑定生命周期
MessageBroker.Default.Receive<TestEvent>().Subscribe(_ =>
{
Debug.Log(nameof(EventCenterTest));
});
StartCoroutine(DelayFor(TimeSpan.FromSeconds(3f)));
}
private void Update()
{
MessageBroker.Default.Publish(new TestEvent());
}
IEnumerator DelayFor(TimeSpan timeSpan)
{
yield return new WaitForSeconds((float)timeSpan.TotalSeconds);
Destroy(gameObject);
}
}
public class TestEvent
{
}
// EventCenterTest1.cs
public class EventCenterTest1 : MonoBehaviour
{
private void Update()
{
MessageBroker.Default.Publish(new TestEvent());
}
}
将这两个脚本分别挂载到不同的两个对象上面,先来看看其运行结果
观察可以看到:我们的两个 MessageBroker.Default.Publish(new TestEvent());
,有一个因为 gameobject
被 Destory
掉因此没有了。有一个仍在运行,那么其就代表着其注册的观察者回调没有被消除,这很好理解。当我们加上 AddTo(this)
将其生命周期和游戏物体绑定到一起时:
可以看到是同时停止,也就是在观察者注册的回调被清除了。说到这里可能觉得这不就是 AddTo()
的目的吗。但是当我翻阅 MessageBroker
源码时时,发现其释放非托管资源就是这样的:
1 | public void Dispose() |
看了就非常疑惑,这不把 notifiers
全部清除了吗。
从头理一下 AddTo(this)
的原理,其其实是在其物体上挂载了一个 ObservableDestroyTrigger
的 Component
:1
2
3
4
5
6var trigger = gameObject.GetComponent<ObservableDestroyTrigger>();
if (trigger == null)
{
trigger = gameObject.AddComponent<ObservableDestroyTrigger>();
}
// 此处的 gameobject 由传入的 Component 调用 gameobject 获取的
最后将传入的 IDisposable
对象添加到 ObservableDestroyTrigger
的 disposablesOnDestroy
当中1
trigger.AddDisposableOnDestroy(disposable);
而 disposablesOnDestroy
也就是个 CompositeDisposable
,相当于是一个对 List<IDisposable>
数据结构的封装,所有被 Add
添加的都会被添加到这个里面。
最后在 ObservableDestoryTrigger
的挂载对象在被销毁也就是 OnDestory
的时候调用其 Dispose
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public void Dispose()
{
var currentDisposables = default(IDisposable[]);
lock (_gate)
{
if (!_disposed)
{
_disposed = true;
currentDisposables = _disposables.ToArray();
_disposables.Clear();
_count = 0;
}
}
if (currentDisposables != null)
{
foreach (var d in currentDisposables)
if (d != null)
d.Dispose();
}
}
回到 MessageBroker
我们其实可以解决最开始的疑惑:1
2
3
4
5
6
7
8// Reciive 返回的是一个 IObservable<T>
return ((IObservable<T>)notifier).AsObservable();
// IObservable_1.cs
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
也就是说,我们去使用 AddTo
注册的 IDisposable
其实是对 Subscribe
注册事件之后返回的。而不是整个事件分发中心的 Dispose
。
此时,通过了解到绑定生命周期的全过程之后,我们也就可以实现一个自己具有绑定生命周期功能的全局事件分发中心:
介于工程量问题,我没有实现所有的像 unirx
当中诸如 IObserver
这种观察者接口。而是使用了一个实现 IDisposable
的 EventAction
类。去包裹一层实际需要调用的 Dispose
,同时此处扩展了事件分发中心的 Remove
接口1
2
3
4
5
6
7
8
9
10
11
12
13public class EventAction : IDisposable
{
private KeyValuePair<Type, Action<object>> _typeActionPair;
public EventAction(Type type, Action<object> notifier)
{
_typeActionPair = new KeyValuePair<Type, Action<object>>(type, notifier);
}
public void Dispose()
{
EventCenter.Default.Remove(_typeActionPair.Key, _typeActionPair.Value);
}
}
接着也是实现一个 trigger
挂载到绑定了声明周期的物体上,此处我直接就包裹了一个 List<IDisposable>
了。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class EventDestroyTrigger : MonoBehaviour
{
public bool IsCalledOnDestroy { get; } = false;
private List<IDisposable> _disposablesOnDestroy;
private void OnDestroy()
{
if (!IsCalledOnDestroy) {
if (_disposablesOnDestroy != null) {
foreach (var dispose in _disposablesOnDestroy) {
dispose.Dispose();
}
}
}
}
public void AddDisposableOnDestroy(IDisposable disposable)
{
if (IsCalledOnDestroy) {
disposable.Dispose();
return;
}
if (_disposablesOnDestroy == null) {
_disposablesOnDestroy = new List<IDisposable>();
}
_disposablesOnDestroy.Add(disposable);
}
}
其他逻辑和上面 unirx
的就一样了。
至此,全局事件中心的实现已完成。仓库源码地址