Dabai的个人博客

ONOS子系统结构和事件分发处理机制

ONOS是一个支持分布式的复杂网络操作系统,ONOS的设计高度层次化,模块化和抽象化,ONOS不同的模块和组件一般都使用特定的接口类型和异步事件机制通信协作。因此,理解ONOS子系统结构和事件分发处理机制对进一步深入了解ONOS是有很大帮助的。

1. ONOS子系统结构

ONOS有很多子系统,但不同的子系统的层次和结构是相同的。下面是ONOS子系统的结构图:

ONOS子系统主要包括App,Core,Provider,Protocol四部分。其中,Core是对网络的抽象,是协议无关的(protocol agnostic), 对上提供统一抽象的北向接口。Core中的组件主要包括Manager和Store,如FlowRuleManager和FlowRuleStore等。而Provider是协议相关的,主要为ONOS Core提供抽象的数据类型,Provider通过Core提供的ProviderService接口向Core注入网络信息,Provider也会暴露Provider接口给Core,接收来自Core的command消息。Protocol模块则是根据特定的协议类型通网路设备建立连接,Protocol模块一般是和对应的Provider模块共同启动的。

其中,Core中的Manager和Store是本次分析的重点。Manager对北实现Service和AdminService接口,对南实现ProviderRegistry和 ProviderService接口。Store则负责数据的存储,查询,更新以及东西向同步等,所有来自Manager中与数据相关的操作都会通过Store来完成。另外,Manager也会将Store中的事件抛出并实现ListenerService接口,其它应用通过ListenerService接口即可实现事件的监听,下面将详细分析这一过程。

2. ONOS系统事件分发处理机制

2.1 了解ONOS系统中的事件Event和EventSink

事件一般是由Store产生,用一种特定的类来表示一种事件类型,如AppIdEvent,DeviceEvent,FLowRuleEvent等,所有的事件类型实现Event接口,Event包括事件类型和事件主题,分别通过枚举类型和字符串类型来表示,表示一种特定的事件类型。每个事件最终会分发到一个对应的事件槽(EventSink),即事件传播处理的终点,系统调用事件槽的process函数处理对应的事件。下面分别是Event和EventSink的接口定义源码,二者都是泛型接口,接口的源码注释清楚的说明了这两个接口的作用。

Event接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Abstraction of an of a time-stamped event pertaining to an arbitrary subject.
*/
public interface Event<T extends Enum, S> {

/**
* Returns the timestamp of when the event occurred, given in milliseconds
* since the start of epoch.
*
* @return timestamp in milliseconds
*/
long time();

/**
* Returns the type of the event.
*
* @return event type
*/
T type();

/**
* Returns the subject of the event.
*
* @return subject to which this event pertains
*/
S subject();

}

EventSink接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Abstraction of an event sink capable of processing the specified event types.
*/
public interface EventSink<E extends Event> {

/**
* Processes the specified event.
*
* @param event event to be processed
*/
void process(E event);

/**
* Handles notification that event processing time limit has been exceeded.
*/
default void onProcessLimit() {
}

}

2.2 事件的分发服务EventDeliveryService

上面是EventDeliveryService的UML图,可以很容易的发现EventDeliveryService继承了两个接口,EventDispatcher和EventSinkRegistry,前者负责事件的分发(post),后者负责事件槽的管理(addSink和removeSink),ONOS子系统通过使用该服务,就能实现将事件正确分发到对应的事件槽,EventDeliveryService一般在子系统的Manager中使用。

2.3 EventDeliveryService服务的实现

EventDeliveryService服务是通过CoreEventDispatcher类来实现,该类被声明为一个OSGi Component,由OSGi进行管理。废话不多说,还是直接看CoreEventDispatcher的UML结构图:

由于该类比较复杂,没有显示该类的成员变量。事件的最终分发处理是由内部类DispatchLoop处理,该类实现了Runnable接口,从而使用独立的线程处理特定的事件组。DispatchLoop里有一个保存事件队列的重要成员变量eventsQueue,声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// Auxiliary event dispatching loop that feeds off the events queue.
private class DispatchLoop implements Runnable {
private final String name;
private volatile boolean stopped;
private volatile EventSink lastSink;
// Means to detect long-running sinks
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private TimerTask watchdog;
private volatile Future<?> dispatchFuture;
private final BlockingQueue<Event> eventsQueue;
private final ExecutorService executor;
......
}

eventsQueue是BlockingQueue类型,是一个阻塞队列,关于Java阻塞队列的功能和作用,参考:聊聊并发(七)——Java中的阻塞队列。EventDeliveryService的post方法最终会调用DispatchLoop的add方法将事件加入到这个队列,是程序主动调用的,DispatchLoop里的线程方法(run)会不断的取出事件并调用对应的事件槽处理函数处理对应事件。熟悉多线程协作以及设计模式的朋友应该会注意到这里所用的也就是生产者-消费者模式,参考:聊聊并发——生产者消费者模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
public void run() {
stopped = false;
log.info("Dispatch loop initiated");
while (!stopped) {
try {
// Fetch the next event and if it is the kill-pill, bail
Event event = eventsQueue.take();
if (event == KILL_PILL) {
break;
}
process(event);
} catch (InterruptedException e) {
log.warn("Dispatch loop interrupted");
} catch (Exception | Error e) {
log.warn("Error encountered while dispatching event:", e);
}
}
log.info("Dispatch loop terminated");
}

// Locate the sink for the event class and use it to process the event
@SuppressWarnings("unchecked")
private void process(Event event) {
EventSink sink = getSink(event.getClass());
if (sink != null) {
lastSink = sink;
stopwatch.start();
sink.process(event);
stopwatch.reset();
} else {
log.warn("No sink registered for event class {}",
event.getClass().getName());
}
}

上面就是CoreEventDispatcher分发处理事件到对应事件槽的大概流程,这里就不对CoreEventDispatcher的实现细节进一步分析了,要了解程序的更多实现细节,需要阅读源码。

2.4 添加事件监听者处理事件

下面是ONOS子系统中的事件分发示意图:

Manager对数据的操作导致Store产生事件,Store通过StoreDelegate的notify方法向Manager通告事件。AbstractStore类中有基本Store接口的实现,一般Store的实现类都会继承AbstractStore基础类,该类中的notifyDelegate方法会调用StoreDelegate的notify方法。Manager使用内部类实现StoreDelegate接口,notify方法的实现会调用post方法,post方法最终使用EventDeliveryService服务分发事件。post方法在AbstractListenerManager或AbstractListenerProviderRegistry中有实现,Manager组件一般都会继承AbstractListenerManager或AbstractListenerProviderRegistry基础类。下面是AbstractStore和AbstractListenerProviderRegistry的UML图如下所示:

外部应用通过实现EventListener接口构造一个事件监听者,使用ListenerService的addListener方法完成监听者的注册,AbstractListenerManager和AbstractListenerProviderRegistry类都实现了ListenerService接口。AbstractListenerManager或AbstractListenerProviderRegistry基础类中有一个重要的对象成员:

1
2
3
......
protected final ListenerRegistry<E, L> listenerRegistry = new ListenerRegistry<>();
.....

监听者注册最终会使用调用该成员对象的addListener方法,ListenerRegistry实现了ListenerService和EventSink接口,从而将事件(Event),事件槽(EventSink),监听者(EventListener)联系起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* Base implementation of an event sink and a registry capable of tracking
* listeners and dispatching events to them as part of event sink processing.
*/
public class ListenerRegistry<E extends Event, L extends EventListener<E>>
implements ListenerService<E, L>, EventSink<E> {
......

/**
* Set of listeners that have registered.
*/
protected final Set<L> listeners = new CopyOnWriteArraySet<>();

@Override
public void addListener(L listener) {
checkNotNull(listener, "Listener cannot be null");
listeners.add(listener);
}

@Override
public void removeListener(L listener) {
checkNotNull(listener, "Listener cannot be null");
if (!listeners.remove(listener)) {
log.warn("Listener {} not registered", listener);
}
}

@Override
public void process(E event) {
for (L listener : listeners) {
try {
lastListener = listener;
lastStart = System.currentTimeMillis();
if (listener.isRelevant(event)) {
listener.event(event);
}
lastStart = 0;
} catch (Exception error) {
reportProblem(event, error);
}
}
}

......
}

ListenerRegistry使用线程安全的集合CopyOnWriteArraySet管理监听者,EventSink的process方法就是逐个的执行EventListener的event方法。Manager使用EventDeliveryService的addSink方法将自己管理的事件和对应的事件槽联系起来。下面是FlowRuleManager的代码示例:

1
2
3
4
5
6
7
8
9
10
private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
······
@Activate
public void activate(ComponentContext context) {
......
store.setDelegate(delegate);
eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
......
log.info("Started");
}

3. 总结

总的来说,ONOS子系统功能复杂但结构清晰,ONOS子系统的结构具有模块化,层次化,抽象化的特点。Manager是子系统的核心,负责将服务,存储,以及事件联系起来。Manager实现ListenerService接口,管理事件监听者的注册和移除,事件的分发和处理调度由EventDeliveryService统一管理。

参考: