Dabai的个人博客

Flow Rule Subsystem简要分析

Flow Rule Subsystem实现Flow rule的管理,存储和同步,用户可通过Flow Rule Subsystem提供的北向API(FlowRuleService)查询,下发和删除Flow rule。下面将结合Flow Rule Subsystem的代码,并以OpenFlow为例,简要分析Flow Rule Subsystem的实现过程,希望以此抛砖引玉。

分析Flow Rule Subsystem之前,我们需要对ONOS的子系统结构(Subsystem Structure)和事件(Event)机制有比较好的了解,参考ONOS子系统结构和事件调度处理机制

Flow Rule Subsystem分析

在ONOS Core层,Flow Rule Subsystem的实现主要包括FlowRuleManager和DistributedFlowRuleStore这两个类,FlowRuleManager负责北向和南向接口的实现(FlowRuleService,FlowProviderService,FlowRuleProviderService等),是子系统的中枢,而DistributedFlowRuleStore负责数据的存储同步,是分布式实现的核心。下面是这两个类的UML图,

FlowRuleManger类结构图:

DistributedFlowRuleStore类结构图:

下面将以FlowRuleService方法applyFlowRules实现过程简要分析Flow Rule子系统。

  1. 每一个flowRule被封装成一个FlowRuleOperation,不同flowRule的集合被封装成一个FlowRuleOperations,然后调用apply方法。看代码注释,FlowRuleOperations是被分解成不同stage的FlowRuleOperation的集合,每一个stage是一组FlowRuleOperation的集合,不同stage的FlowRuleOperation集合保存在list中:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    /**
    * A batch of flow rule operations that are broken into stages.
    * TODO move this up to parent's package
    */
    public class FlowRuleOperations {

    private final List<Set<FlowRuleOperation>> stages;
    private final FlowRuleOperationsContext callback;

    private FlowRuleOperations(List<Set<FlowRuleOperation>> stages,
    FlowRuleOperationsContext cb) {
    this.stages = stages;
    this.callback = cb;
    }

    // kryo-constructor
    protected FlowRuleOperations() {
    this.stages = Lists.newArrayList();
    this.callback = null;
    }
    ......
    }

    ONOS使用多线程的方式下发流表,但保证不同stage间FlowRuleOperation的下发是有顺序的,例如要使flowRule1在flowRule2之前下发完成,可以使用如下的方式构造不同stage的FlowRuleOperation:

    1
    2
    3
    4
    5
    FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
    ops.add(flowRule1);
    ops.newStage();
    ops.add(flowRule2);
    flowRuleService.apply(ops.build());
  2. apply方法中,将FlowRuleOperations放到一个线程容量为32的线程池中处理:

    1
    2
    3
    4
    5
    6
    7
    8
    ......
    protected ExecutorService operationsService =
    Executors.newFixedThreadPool(32, groupedThreads("onos/Flowservice", "operations-%d", log));
    ......
    public void apply(FlowRuleOperations ops) {
    checkPermission(FlowRULE_WRITE);
    operationsService.execute(new FlowOperationsProcessor(ops));
    }
  3. 下面是FlowOperationsProcessor类的代码,熟悉Java多线程编程的应该知道,新建的线程会运行run()方法,而FlowOperationsProcessor中的run方法调用了process方法处理FlowRuleOperation。这里使用了一个Multimap (Multimap perDeviceBatches) 来保存要下发到每个Device的Flow Rules的集合,每个Flow Rule被封装成一个FlowRuleBatchEntry类型的对象,保存在perDeviceBatches中。这样就以Device为单位将所有的FLow Rules进行归类,然后针对不同的Device逐个下发FlowRuleBatchOperation(就是一组待下发的Flow Rules集合)。这个类中,还有一个重要的属性pendingDevices,用来保存所有待下发Flow Rule的DeviceId的集合。当一个Device的FlowRuleBatchOperation下发完成后,该Device的DeviceId也会从pendingDevices移除,这个操作在satisfy方法中实现。并且当pendingDevices为空,即该stage的所有FlowRuleOperation下发完成后,就会调用operationsService.execute(this)进入下一个stage。从run方法可以看出,若所有的stages已经完成,并且没有出现过失败记录,那么就会回调FlowRuleOperations的onSuccess方法,通知应用程序所有的Flow Rules已经成功下发。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    private class FlowOperationsProcessor implements Runnable {
    // Immutable
    private final FlowRuleOperations fops;

    // Mutable
    private final List<Set<FlowRuleOperation>> stages;
    private final Set<DeviceId> pendingDevices = new HashSet<>();
    private boolean hasFailed = false;

    FlowOperationsProcessor(FlowRuleOperations ops) {
    this.stages = Lists.newArrayList(ops.stages());
    this.fops = ops;
    }

    @Override
    public synchronized void run() {
    if (!stages.isEmpty()) {
    process(stages.remove(0));
    } else if (!hasFailed) {
    fops.callback().onSuccess(fops);
    }
    }

    private void process(Set<FlowRuleOperation> ops) {
    Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches = ArrayListMultimap.create();

    for (FlowRuleOperation op : ops) {
    perDeviceBatches.put(op.rule().deviceId(),
    new FlowRuleBatchEntry(mapOperationType(op.type()), op.rule()));
    }
    pendingDevices.addAll(perDeviceBatches.keySet());

    for (DeviceId deviceId : perDeviceBatches.keySet()) {
    long id = idGenerator.getNewId();
    final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
    deviceId, id);
    pendingFlowOperations.put(id, this);
    log.info("begin install flow rules to device {} with size {}", deviceId, b.getOperations().size());
    deviceInstallers.execute(() -> store.storeBatch(b));
    }
    }

    synchronized void satisfy(DeviceId devId) {
    pendingDevices.remove(devId);
    if (pendingDevices.isEmpty()) {
    operationsService.execute(this);
    }
    }

    synchronized void fail(DeviceId devId, Set<? extends FlowRule> failures) {
    hasFailed = true;
    pendingDevices.remove(devId);
    if (pendingDevices.isEmpty()) {
    operationsService.execute(this);
    }

    FlowRuleOperations.Builder failedOpsBuilder = FlowRuleOperations.builder();
    failures.forEach(failedOpsBuilder::add);

    fops.callback().onError(failedOpsBuilder.build());
    }
    }
  4. 正如在1.1节中介绍的那样,Manager有关数据信息的操作都会通过store来完成。这里,通过调用store.storeBatch将Flow rule信息添加到FlowRuleStore中。store.storeBatch也是在一个新的线程池中处理的,deviceInstallers的声明如下:

    1
    2
    protected ExecutorService deviceInstallers =
    Executors.newFixedThreadPool(32, groupedThreads("onos/Flowservice", "device-installer-%d", log));
  5. storeBatch具体的实现细节比较复杂,如要判断FlowRuleBatchOperation的设备是否有对应的master控制器节点,以及当前的节点是否就是master控制器节点,这是因为FlowRule需要存储到device对应的master控制器节点的FlowRuleStore中,并由master节点下发到对应的device,具体的操作过程看DistributedFlowRuleStore的源码。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    @Override
    public void storeBatch(FlowRuleBatchOperation operation) {
    if (operation.getOperations().isEmpty()) {
    notifyDelegate(FlowRuleBatchEvent.completed(
    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
    new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
    return;
    }

    DeviceId deviceId = operation.deviceId();
    NodeId master = mastershipService.getMasterFor(deviceId);

    if (master == null) {
    log.warn("No master for {} : flows will be marked for removal", deviceId);

    updateStoreInternal(operation);

    notifyDelegate(FlowRuleBatchEvent.completed(
    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
    new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
    return;
    }

    if (Objects.equals(local, master)) {
    storeBatchInternal(operation);
    return;
    }

    log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
    master, deviceId);

    clusterCommunicator.unicast(operation,
    APPLY_BATCH_FLOWS,
    serializer::encode,
    master)
    .whenComplete((result, error) -> {
    if (error != null) {
    log.warn("Failed to storeBatch: {} to {}", operation, master, error);

    Set<FlowRule> allFailures = operation.getOperations()
    .stream()
    .map(op -> op.target())
    .collect(Collectors.toSet());

    notifyDelegate(FlowRuleBatchEvent.completed(
    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
    new CompletedBatchOperation(false, allFailures, deviceId)));
    }
    });
    }

    需要注意的是,若当前控制器不是对应device的master节点,storeBatch会使用ClusterCommunicationService服务的unicast方法,将该FlowRuleBatchOperation发送到对应的master控制器节点,Remote节点处理完成后会将处理的结果告诉该控制器节点。同时,DistributedFlowRuleStore使用ClusterCommunicationService服务的addSubscriber来监听东西向发送的FlowRule消息,并做出对应的处理。下面的APPLY_BATCH_FLOWS消息表示Remote节点将Flow Rules交给本地节点协助下发,而REMOTE_APPLY_COMPLETED消息则表示由Remote节点协助本地节点下发的Flow Rule已经下发完成,并调用notifyDelegate产生事件。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private void registerMessageHandlers(ExecutorService executor) {

    clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
    clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
    REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
    clusterCommunicator.addSubscriber(
    GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
    clusterCommunicator.addSubscriber(
    GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
    clusterCommunicator.addSubscriber(
    REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
    clusterCommunicator.addSubscriber(
    FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackupReceipt, serializer::encode, executor);
    }

    最后,storeBatch中调用notifyDelegate函数向FlowRuleStoreDelegate通告FlowRuleStore中产生的事件,事件的类型在FlowRuleBatchEvent中定义。前一篇文章已经分析过,AbstractStore实现了notifyDelegate方法,而DistributedFlowRuleStore继承了AbstractStore类,因此可以调用notifyDelegate方法。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    ......
    notifyDelegate(FlowRuleBatchEvent.requested(new
    FlowRuleBatchRequest(operation.id(),
    currentOps), operation.deviceId()));
    ......
    /**
    * Notifies the delegate with the specified event.
    *
    * @param event event to delegate
    */
    protected void notifyDelegate(E event) {
    if (delegate != null) {
    delegate.notify(event);
    }
    }
  6. 上述代码块中delegate成员是在FlowRuleManager中注册到FlowRuleStore中来的,实现代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
    ······
    @Activate
    public void activate(ComponentContext context) {
    ......
    store.setDelegate(delegate);
    eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
    deviceService.addListener(deviceListener);
    ......
    log.info("Started");
    }
  7. FlowRuleManager的内部类实现FlowRuleStoreDelegate接口,通过实现notify函数来处理来自FlowRueStore中产生的事件。这里post函数时最终会使用EventDeliveryService调度事件,其它应用组件通过实现FlowRuleListener接口,创建监听者并注册到FlowRuleManager中即可监听post抛出的事件,通过FlowRuleService.addListener()完成监听者的注册。ONOS中的StatisticManager模块中有一个实现了FlowRuleListener接口的内部类。根据上面的分析过程,storeBatch函数会产生BATCH_OPERATION_COMPLETED类型的事件,因此会通过FlowRuleProvider FlowRuleProvider = getProvider(deviceId)获取ProviderId,getProvider函数在AbstractProviderRegistry类中实现,FlowRuleManager继承了这个类,所有的Provider都会注册到FlowRuleManager,并保存在一个map中,具体实现查看AbstractProviderRegistry的源码。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    // Store delegate to re-post events emitted from the store.
    private class InternalStoreDelegate implements FlowRuleStoreDelegate {
    // TODO: Right now we only dispatch events at individual FlowEntry level.
    // It may be more efficient for also dispatch events as a batch.
    @Override
    public void notify(FlowRuleBatchEvent event) {
    final FlowRuleBatchRequest request = event.subject();
    switch (event.type()) {
    case BATCH_OPERATION_REQUESTED:
    // Request has been forwarded to MASTER Node, and was
    request.ops().forEach(
    op -> {
    switch (op.operator()) {
    case ADD:
    post(new FlowRuleEvent(RULE_ADD_REQUESTED, op.target()));
    break;
    case REMOVE:
    post(new FlowRuleEvent(RULE_REMOVE_REQUESTED, op.target()));
    break;
    case MODIFY:
    //TODO: do something here when the time comes.
    break;
    default:
    log.warn("Unknown Flow operation operator: {}", op.operator());
    }
    }
    );

    DeviceId deviceId = event.deviceId();
    FlowRuleBatchOperation batchOperation = request.asBatchOperation(deviceId);
    FlowRuleProvider FlowRuleProvider = getProvider(deviceId);
    if (FlowRuleProvider != null) {
    FlowRuleProvider.executeBatch(batchOperation);
    }

    break;

    case BATCH_OPERATION_COMPLETED:
    ......
    default:
    break;
    }
    }
    }
  8. 以OpenFlow为例,OpenFlowRuleProvider实现了FlowRuleProvider接口,并使用FlowRuleProviderRegistry服务将该Provider注册到FlowRuleManager上。OpenFlowRuleProvider中的executeBatch的方法会将FlowRuleBatchOperation转化成OpenFlow的消息格式并下发到对应的交换机上。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected FlowRuleProviderRegistry providerRegistry;
    ......
    private FlowRuleProviderService providerService;
    ......
    @Activate
    protected void activate(ComponentContext context) {
    ......
    providerService = providerRegistry.register(this);
    ......
    }
  9. 以上就是FlowRule子系统下发flow rule的过程了,需要注意的是,任何ONOS instance可以向flow service请求对网络中的任何设备下发flow rule,但当这个网络设的master不是当前的ONOS instance时,该flow rule会被发送到设备的master instance,最终由master instance完成flow rule的下发操作。具体的实现过程在上面已经结合代码进行分析了,如下图所示:

image2

3. Flow Rule的存储

Flow rule的存储通过DistributedFlowRuleStore来实现,实现的接口服务是FlowRuleStore,需要考虑存储,备份,以及同步等问题。需要注意的是,ONOS会存储Flow rule并保持控制器和网络设备中Flow rule信息的一致性,若控制器发现了网络设备中有控制器没有存储的Flow rule条目,ONOS会将该Flow rule删除。同时,当master控制器对它的subnet中的device下发Flow rule信息后,会选择一个或多个(默认设置是2个备份节点,参考DistributedFlowRuleStore源码)其它的standby节点备份flowrule信息,从而使得当前节点down掉后能够恢复Flow rule信息。通常优先选择当前节点down掉后,接管该subnet的standby节点作为备份节点。

ONOS控制器各个node节点与device设备有三种mastership关系:

  • NONE:这意味着node并不了解该设备,或仅仅是无法与其交互
  • STANDBY:此时node已经有对设备的认识,并可以读取其状态,但无法管理、控制该设备
  • MASTER:此时node认识设备并对其有完全的控制权

4. 总结

总的来说,Flow Rule Subsystem的实现是比较复杂的,但其代码结构却很清晰。本文分析了Flow Rule子系统的代码结构,并结合Flow Rule的下发过程代码简单分析了Flow Rule子系统。若要了解Flow Rule的更多实现细节,需要进一步的阅读源码。

参考: