Flow Rule Subsystem实现Flow rule的管理,存储和同步,用户可通过Flow Rule Subsystem提供的北向API(FlowRuleService)查询,下发和删除Flow rule。下面将结合Flow Rule Subsystem的代码,并以OpenFlow为例,简要分析Flow Rule Subsystem的实现过程,希望以此抛砖引玉。
分析Flow Rule Subsystem之前,我们需要对ONOS的子系统结构(Subsystem Structure)和事件(Event)机制有比较好的了解,参考ONOS子系统结构和事件调度处理机制
Flow Rule Subsystem分析
在ONOS Core层,Flow Rule Subsystem的实现主要包括FlowRuleManager和DistributedFlowRuleStore这两个类,FlowRuleManager负责北向和南向接口的实现(FlowRuleService,FlowProviderService,FlowRuleProviderService等),是子系统的中枢,而DistributedFlowRuleStore负责数据的存储同步,是分布式实现的核心。下面是这两个类的UML图,
FlowRuleManger类结构图:
DistributedFlowRuleStore类结构图:
下面将以FlowRuleService方法applyFlowRules实现过程简要分析Flow Rule子系统。
每一个flowRule被封装成一个FlowRuleOperation,不同flowRule的集合被封装成一个FlowRuleOperations,然后调用apply方法。看代码注释,FlowRuleOperations是被分解成不同stage的FlowRuleOperation的集合,每一个stage是一组FlowRuleOperation的集合,不同stage的FlowRuleOperation集合保存在list中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22/**
* A batch of flow rule operations that are broken into stages.
* TODO move this up to parent's package
*/
public class FlowRuleOperations {
private final List<Set<FlowRuleOperation>> stages;
private final FlowRuleOperationsContext callback;
private FlowRuleOperations(List<Set<FlowRuleOperation>> stages,
FlowRuleOperationsContext cb) {
this.stages = stages;
this.callback = cb;
}
// kryo-constructor
protected FlowRuleOperations() {
this.stages = Lists.newArrayList();
this.callback = null;
}
......
}ONOS使用多线程的方式下发流表,但保证不同stage间FlowRuleOperation的下发是有顺序的,例如要使flowRule1在flowRule2之前下发完成,可以使用如下的方式构造不同stage的FlowRuleOperation:
1
2
3
4
5FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
ops.add(flowRule1);
ops.newStage();
ops.add(flowRule2);
flowRuleService.apply(ops.build());apply方法中,将FlowRuleOperations放到一个线程容量为32的线程池中处理:
1
2
3
4
5
6
7
8......
protected ExecutorService operationsService =
Executors.newFixedThreadPool(32, groupedThreads("onos/Flowservice", "operations-%d", log));
......
public void apply(FlowRuleOperations ops) {
checkPermission(FlowRULE_WRITE);
operationsService.execute(new FlowOperationsProcessor(ops));
}下面是FlowOperationsProcessor类的代码,熟悉Java多线程编程的应该知道,新建的线程会运行run()方法,而FlowOperationsProcessor中的run方法调用了process方法处理FlowRuleOperation。这里使用了一个Multimap (Multimap
perDeviceBatches) 来保存要下发到每个Device的Flow Rules的集合,每个Flow Rule被封装成一个FlowRuleBatchEntry类型的对象,保存在perDeviceBatches中。这样就以Device为单位将所有的FLow Rules进行归类,然后针对不同的Device逐个下发FlowRuleBatchOperation(就是一组待下发的Flow Rules集合)。这个类中,还有一个重要的属性pendingDevices,用来保存所有待下发Flow Rule的DeviceId的集合。当一个Device的FlowRuleBatchOperation下发完成后,该Device的DeviceId也会从pendingDevices移除,这个操作在satisfy方法中实现。并且当pendingDevices为空,即该stage的所有FlowRuleOperation下发完成后,就会调用operationsService.execute(this)进入下一个stage。从run方法可以看出,若所有的stages已经完成,并且没有出现过失败记录,那么就会回调FlowRuleOperations的onSuccess方法,通知应用程序所有的Flow Rules已经成功下发。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62private class FlowOperationsProcessor implements Runnable {
// Immutable
private final FlowRuleOperations fops;
// Mutable
private final List<Set<FlowRuleOperation>> stages;
private final Set<DeviceId> pendingDevices = new HashSet<>();
private boolean hasFailed = false;
FlowOperationsProcessor(FlowRuleOperations ops) {
this.stages = Lists.newArrayList(ops.stages());
this.fops = ops;
}
public synchronized void run() {
if (!stages.isEmpty()) {
process(stages.remove(0));
} else if (!hasFailed) {
fops.callback().onSuccess(fops);
}
}
private void process(Set<FlowRuleOperation> ops) {
Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches = ArrayListMultimap.create();
for (FlowRuleOperation op : ops) {
perDeviceBatches.put(op.rule().deviceId(),
new FlowRuleBatchEntry(mapOperationType(op.type()), op.rule()));
}
pendingDevices.addAll(perDeviceBatches.keySet());
for (DeviceId deviceId : perDeviceBatches.keySet()) {
long id = idGenerator.getNewId();
final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
deviceId, id);
pendingFlowOperations.put(id, this);
log.info("begin install flow rules to device {} with size {}", deviceId, b.getOperations().size());
deviceInstallers.execute(() -> store.storeBatch(b));
}
}
synchronized void satisfy(DeviceId devId) {
pendingDevices.remove(devId);
if (pendingDevices.isEmpty()) {
operationsService.execute(this);
}
}
synchronized void fail(DeviceId devId, Set<? extends FlowRule> failures) {
hasFailed = true;
pendingDevices.remove(devId);
if (pendingDevices.isEmpty()) {
operationsService.execute(this);
}
FlowRuleOperations.Builder failedOpsBuilder = FlowRuleOperations.builder();
failures.forEach(failedOpsBuilder::add);
fops.callback().onError(failedOpsBuilder.build());
}
}正如在1.1节中介绍的那样,Manager有关数据信息的操作都会通过store来完成。这里,通过调用store.storeBatch将Flow rule信息添加到FlowRuleStore中。store.storeBatch也是在一个新的线程池中处理的,deviceInstallers的声明如下:
1
2protected ExecutorService deviceInstallers =
Executors.newFixedThreadPool(32, groupedThreads("onos/Flowservice", "device-installer-%d", log));storeBatch具体的实现细节比较复杂,如要判断FlowRuleBatchOperation的设备是否有对应的master控制器节点,以及当前的节点是否就是master控制器节点,这是因为FlowRule需要存储到device对应的master控制器节点的FlowRuleStore中,并由master节点下发到对应的device,具体的操作过程看DistributedFlowRuleStore的源码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void storeBatch(FlowRuleBatchOperation operation) {
if (operation.getOperations().isEmpty()) {
notifyDelegate(FlowRuleBatchEvent.completed(
new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
return;
}
DeviceId deviceId = operation.deviceId();
NodeId master = mastershipService.getMasterFor(deviceId);
if (master == null) {
log.warn("No master for {} : flows will be marked for removal", deviceId);
updateStoreInternal(operation);
notifyDelegate(FlowRuleBatchEvent.completed(
new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
return;
}
if (Objects.equals(local, master)) {
storeBatchInternal(operation);
return;
}
log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
master, deviceId);
clusterCommunicator.unicast(operation,
APPLY_BATCH_FLOWS,
serializer::encode,
master)
.whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Set<FlowRule> allFailures = operation.getOperations()
.stream()
.map(op -> op.target())
.collect(Collectors.toSet());
notifyDelegate(FlowRuleBatchEvent.completed(
new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
new CompletedBatchOperation(false, allFailures, deviceId)));
}
});
}需要注意的是,若当前控制器不是对应device的master节点,storeBatch会使用ClusterCommunicationService服务的unicast方法,将该FlowRuleBatchOperation发送到对应的master控制器节点,Remote节点处理完成后会将处理的结果告诉该控制器节点。同时,DistributedFlowRuleStore使用ClusterCommunicationService服务的addSubscriber来监听东西向发送的FlowRule消息,并做出对应的处理。下面的APPLY_BATCH_FLOWS消息表示Remote节点将Flow Rules交给本地节点协助下发,而REMOTE_APPLY_COMPLETED消息则表示由Remote节点协助本地节点下发的Flow Rule已经下发完成,并调用notifyDelegate产生事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14private void registerMessageHandlers(ExecutorService executor) {
clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
clusterCommunicator.addSubscriber(
GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
clusterCommunicator.addSubscriber(
GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
clusterCommunicator.addSubscriber(
REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
clusterCommunicator.addSubscriber(
FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackupReceipt, serializer::encode, executor);
}最后,storeBatch中调用notifyDelegate函数向FlowRuleStoreDelegate通告FlowRuleStore中产生的事件,事件的类型在FlowRuleBatchEvent中定义。前一篇文章已经分析过,AbstractStore实现了notifyDelegate方法,而DistributedFlowRuleStore继承了AbstractStore类,因此可以调用notifyDelegate方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15......
notifyDelegate(FlowRuleBatchEvent.requested(new
FlowRuleBatchRequest(operation.id(),
currentOps), operation.deviceId()));
......
/**
* Notifies the delegate with the specified event.
*
* @param event event to delegate
*/
protected void notifyDelegate(E event) {
if (delegate != null) {
delegate.notify(event);
}
}上述代码块中delegate成员是在FlowRuleManager中注册到FlowRuleStore中来的,实现代码如下:
1
2
3
4
5
6
7
8
9
10
11private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
······
public void activate(ComponentContext context) {
......
store.setDelegate(delegate);
eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
deviceService.addListener(deviceListener);
......
log.info("Started");
}FlowRuleManager的内部类实现FlowRuleStoreDelegate接口,通过实现notify函数来处理来自FlowRueStore中产生的事件。这里post函数时最终会使用EventDeliveryService调度事件,其它应用组件通过实现FlowRuleListener接口,创建监听者并注册到FlowRuleManager中即可监听post抛出的事件,通过FlowRuleService.addListener()完成监听者的注册。ONOS中的StatisticManager模块中有一个实现了FlowRuleListener接口的内部类。根据上面的分析过程,storeBatch函数会产生BATCH_OPERATION_COMPLETED类型的事件,因此会通过FlowRuleProvider FlowRuleProvider = getProvider(deviceId)获取ProviderId,getProvider函数在AbstractProviderRegistry类中实现,FlowRuleManager继承了这个类,所有的Provider都会注册到FlowRuleManager,并保存在一个map中,具体实现查看AbstractProviderRegistry的源码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements FlowRuleStoreDelegate {
// TODO: Right now we only dispatch events at individual FlowEntry level.
// It may be more efficient for also dispatch events as a batch.
public void notify(FlowRuleBatchEvent event) {
final FlowRuleBatchRequest request = event.subject();
switch (event.type()) {
case BATCH_OPERATION_REQUESTED:
// Request has been forwarded to MASTER Node, and was
request.ops().forEach(
op -> {
switch (op.operator()) {
case ADD:
post(new FlowRuleEvent(RULE_ADD_REQUESTED, op.target()));
break;
case REMOVE:
post(new FlowRuleEvent(RULE_REMOVE_REQUESTED, op.target()));
break;
case MODIFY:
//TODO: do something here when the time comes.
break;
default:
log.warn("Unknown Flow operation operator: {}", op.operator());
}
}
);
DeviceId deviceId = event.deviceId();
FlowRuleBatchOperation batchOperation = request.asBatchOperation(deviceId);
FlowRuleProvider FlowRuleProvider = getProvider(deviceId);
if (FlowRuleProvider != null) {
FlowRuleProvider.executeBatch(batchOperation);
}
break;
case BATCH_OPERATION_COMPLETED:
......
default:
break;
}
}
}以OpenFlow为例,OpenFlowRuleProvider实现了FlowRuleProvider接口,并使用FlowRuleProviderRegistry服务将该Provider注册到FlowRuleManager上。OpenFlowRuleProvider中的executeBatch的方法会将FlowRuleBatchOperation转化成OpenFlow的消息格式并下发到对应的交换机上。
1
2
3
4
5
6
7
8
9
10
11(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleProviderRegistry providerRegistry;
......
private FlowRuleProviderService providerService;
......
protected void activate(ComponentContext context) {
......
providerService = providerRegistry.register(this);
......
}以上就是FlowRule子系统下发flow rule的过程了,需要注意的是,任何ONOS instance可以向flow service请求对网络中的任何设备下发flow rule,但当这个网络设的master不是当前的ONOS instance时,该flow rule会被发送到设备的master instance,最终由master instance完成flow rule的下发操作。具体的实现过程在上面已经结合代码进行分析了,如下图所示:
3. Flow Rule的存储
Flow rule的存储通过DistributedFlowRuleStore来实现,实现的接口服务是FlowRuleStore,需要考虑存储,备份,以及同步等问题。需要注意的是,ONOS会存储Flow rule并保持控制器和网络设备中Flow rule信息的一致性,若控制器发现了网络设备中有控制器没有存储的Flow rule条目,ONOS会将该Flow rule删除。同时,当master控制器对它的subnet中的device下发Flow rule信息后,会选择一个或多个(默认设置是2个备份节点,参考DistributedFlowRuleStore源码)其它的standby节点备份flowrule信息,从而使得当前节点down掉后能够恢复Flow rule信息。通常优先选择当前节点down掉后,接管该subnet的standby节点作为备份节点。
注:
ONOS控制器各个node节点与device设备有三种mastership关系:
- NONE:这意味着node并不了解该设备,或仅仅是无法与其交互
- STANDBY:此时node已经有对设备的认识,并可以读取其状态,但无法管理、控制该设备
- MASTER:此时node认识设备并对其有完全的控制权
4. 总结
总的来说,Flow Rule Subsystem的实现是比较复杂的,但其代码结构却很清晰。本文分析了Flow Rule子系统的代码结构,并结合Flow Rule的下发过程代码简单分析了Flow Rule子系统。若要了解Flow Rule的更多实现细节,需要进一步的阅读源码。
参考: