修改适应ns3的bofuss库源码之自定义NewMessage

自定义NewMessage

Posted by MZ on September 23, 2023

自定义NewMessage

在这个例子中,我们新增两个消息OFPT_QUE_CN和OFPT_QUE_CR实现从交换机发送到控制器,告诉控制器目前是拥塞状态,还是恢复拥塞。使用与下面的示例类似的实验者消息来创建新动作或新消息。

一、定义新消息

1.添加两个新的类型

/* file:include/openflow/openflow.h 
 * Add OFPT_QUE_CN and OFPT_QUE_CR
 * after the last defined type
 */
enum ofp_type {
     OFPT_HELLO = 0,
     ...
     OFPT_METER_MOD = 29,
     OFPT_QUE_CN = 30,
     OFPT_QUE_CR = 31,
}

2.定义新消息的Openflow消息格式

/* file:include/openflow/openflow.h 
 * Struct representing the cn and cr. 
 */
struct ofp_msg_que_cn_cr{
    struct ofp_header header;
    uint16_t queue_length;
    uint32_t port_no;
    uint8_t pad[2];
};

3.定义新消息的交换机内部数据呈现格式

/* file:oflib/ofl-messages.h
 * The common header for messages. All message structures start with this
 * header, therefore they can be safely cast back and forth */
struct ofl_msg_header {
    enum ofp_type   type;   /* One of the OFPT_ constants. */
};

/* file:oflib/ofl-messages.h
 * Internal representation of 
 * OFPT_QUE_CN and OFPT_QUE_CR.
 */ 
struct ofl_msg_que_cn_cr{
    struct ofl_msg_header header;
    uint16_t queue_length;
    uint32_t port_no;
    uint8_t pad[2];
};

4.定义新消息的Openflow格式与交换机内部格式转换规则(pack与unpack)

static int
ofl_msg_pack_que_cn_cr(struct ofl_msg_que_cn_cr *msg, uint8_t **buf, size_t *buf_len) {
        struct ofp_que_cn_cr *rep;
        /* Allocates memory for the OpenFlow message
        *buf_len = sizeof(struct ofp_que_cn_cr);
        *buf     = (uint8_t *)malloc(*buf_len);
        /* Point the struct to the buffer address 
         * By doing it we can simply set the struct fields.
         */
        rep = (struct ofp_que_cn_cr *)(*buf);
        /* It is important to set the variables in network order */
        rep->queue_length =  htons(msg->queue_length);
        rep->port_no =  htons(msg->port_no);
        memset(rep->pad,0,sizeof(rep->pad));
        return 0;
}

ofl_msg_pack(struct ofl_msg_header *msg, uint32_t xid, uint8_t **buf, size_t *buf_len, struct ofl_exp *exp) {
    struct ofp_header *oh;
    int error = 0;
    switch (msg->type) {

        case OFPT_HELLO: {
            error = ofl_msg_pack_empty(msg, buf, buf_len);
            break;
        }
        /* Most messages are omited for space */
        case OFPT_ECHO_REQUEST:
        case OFPT_ECHO_REPLY: {
            error = ofl_msg_pack_echo((struct ofl_msg_echo *)msg, buf, buf_len);
            break;
        }
    }
    if (error) {
        return error;
        // TODO Zoltan: free buffer?
    }

    oh = (struct ofp_header *)(*buf);
    oh->version =        OFP_VERSION;
    oh->type    =        msg->type;
    oh->length  = htons(*buf_len);
    oh->xid     = htonl(xid);

    return 0;
}
static ofl_err
ofl_msg_unpack_que_cn_cr(struct ofp_header *src, size_t *len, struct ofl_msg_header **msg) {

    struct ofp_que_cn_cr *rep;
    struct ofl_que_cn_cr *irep;

    /* Check if the message has the expected size */
    if (*len < sizeof(struct  ofp_que_cn_cr)){
        OFL_LOG_WARN(LOG_MODULE, "Received MAX_QUEUE_REQUEST message has invalid length (%zu).", *len);
        return ofl_error(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
    }
    *len -= sizeof(struct ofp_que_cn_cr);

    /* Extract the field from the OF message */
    rep = (struct ofp_que_cn_cr *) src;
    irep = (struct ofl_que_cn_cr *) malloc(sizeof(struct ofl_que_cn_cr));

    irep->queue_length = ntohs(rep->queue_length);
    irep->port_no = ntohs(rep->port_no);
    *msg = (struct ofl_msg_header *)irep;
    return 0;
}

ofl_err
ofl_msg_unpack(uint8_t *buf, size_t buf_len, struct ofl_msg_header **msg, uint32_t *xid, struct ofl_exp *exp) {
    struct ofp_header *oh;
    size_t len = buf_len;
    ofl_err error = 0;
    if (len < sizeof(struct ofp_header)) {
        OFL_LOG_WARN(LOG_MODULE, "Received message is shorter than ofp_header.");
        if (xid != NULL) {
            *xid = 0x00000000;
        }
        return ofl_error(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
    }

    oh = (struct ofp_header *)buf;

    if (oh->version != OFP_VERSION) {
        OFL_LOG_WARN(LOG_MODULE, "Received message has wrong version.");
        return ofl_error(OFPET_HELLO_FAILED, OFPHFC_INCOMPATIBLE);
    }

    if (xid != NULL) {
        *xid = ntohl(oh->xid);
    }

    if (len != ntohs(oh->length)) {
        OFL_LOG_WARN(LOG_MODULE, "Received message length does not match the length field.");
        return ofl_error(OFPET_BAD_REQUEST, OFPBRC_BAD_LEN);
    }

    switch (oh->type) {
        case OFPT_HELLO:{
            error = ofl_msg_unpack_empty(oh, &len, msg);
            break;
        }
        /* Most messages omited for space */
        case OFPT_QUE_CN:
        case OFPT_QUE_CR:
            error = ofl_msg_unpack_que_cn_cr(oh, &len, msg)
            break;
        default: {
            error = ofl_error(OFPET_BAD_REQUEST, OFPGMFC_BAD_TYPE);
        }
      ...
    (*msg)->type = (enum ofp_type)oh->type;
    return 0
}

二、在交换机端获取队列构建新消息

1.交换机获取队列长度

因为ofswitch13只有一个OFSwitch13PriorityQueue实现,而且它继承的是Queue,Queue有一个GetNPackets()方法,返回的是The number of packets currently stored in the Queue,也就是队列里的数据包长度(当然也可以用GetNBytes()返回字节数)。

/**
 * @brief 定时查询交换机队列长度
 * 
 * @param openFlowDev OFSwitch13Device
 */
void QueryAllQueLength(Ptr<OFSwitch13Device> openFlowDev) {
  //获取交换机的端口数量
  size_t portSize = openFlowDev->GetSwitchPortSize();
  uint64_t dpid = openFlowDev->GetDpId();
  for(uint32_t i = 0; i < portSize; i++){
    Ptr<OFSwitch13Port> ofPort = openFlowDev->GetSwitchPort(i+1);
    Ptr<OFSwitch13Queue> ofQue = ofPort->GetPortQueue();
    uint16_t queueLength = ofQue->GetNPackets();
    uint16_t state = ofPort->GetCongestionState();
    uint16_t count = ofPort->GetCongestionRecoverCount();
    uint32_t port_no = i+1;
    //判断是否大于阈值
    if(queueLength > 20){
      // NS_LOG_INFO("The Port " << i+1 << " queueLength is " << queueLength);
      if(state == 0){
        state = 1;
        ofPort->SetCongestionState(state);
      }
      openFlowDev->SendQueueCongestionNotifyMessage(dpid,queueLength,port_no);
      count = 0;
      ofPort->SetCongestionRecoverCount(count);
    }else{
      if(state == 1){
        count++;
        ofPort->SetCongestionRecoverCount(count);
      }
      if(count == 3){
        openFlowDev->SendQueueCongestionRecoverMessage(dpid,queueLength,port_no);
        // NS_LOG_INFO("The count is " << count);
        count = 0;
        state = 0;
        ofPort->SetCongestionRecoverCount(count);
        ofPort->SetCongestionState(state);
      }
    }
    //OFSwitch13Device构造发送函数,发送到控制器
  }
  
  // Reschedule the function call
  Time delay = MicroSeconds(10); // Set the desired time interval
  Simulator::Schedule(delay, &QueryAllQueLength, openFlowDev);
}

2.交换机构建新消息

在OFSwitch13Device(相当于一个交换机设备)中,定义SendQueueCongestionNotifyMessage函数用来构建openflow消息包括消息头和消息体

 struct ofl_msg_que_cn_cr msg;
  msg.header.type = OFPT_QUE_CN;
  msg.queue_length = queueLength;
  msg.port_no = port_no;

通过datapath的dp_send_message发送到控制器

int
OFSwitch13Device::SendQueueCongestionNotifyMessage (uint64_t dpid, uint16_t queueLength, uint32_t port_no)
{
  NS_LOG_FUNCTION (this << queueLength);
  Ptr<OFSwitch13Device> openFlowDev = GetDevice(dpid);
  Ptr<RemoteController> remoteCtrl = openFlowDev->GetFirstRemoteController();
  // Create the packet_in message.
  struct ofl_msg_que_cn_cr msg;
  msg.header.type = OFPT_QUE_CN;
  msg.queue_length = queueLength;
  msg.port_no = port_no;
  // struct sender senderCtrl;
  // senderCtrl.remote = remoteCtrl->m_remote;
  // senderCtrl.conn_id = 0; // TODO No support for auxiliary connections.
  // senderCtrl.xid = 0;
  return dp_send_message (m_datapath, (struct ofl_msg_header *)&msg, 0);
}

三、在控制器处理新消息

首先判断属于哪个消息,在HandleSwitchMsg中增加两个case:OFPT_QUE_CN和OFPT_QUE_CR

ofl_err
OFSwitch13Controller::HandleSwitchMsg (
  struct ofl_msg_header *msg, Ptr<RemoteSwitch> swtch, uint32_t xid)
{
  NS_LOG_FUNCTION (this << swtch << xid);

  // Dispatches control messages to appropriate handler functions.
  switch (msg->type)
    {
    case OFPT_HELLO:
      return HandleHello (msg, swtch, xid);

    case OFPT_PACKET_IN:
      return HandlePacketIn (
        (struct ofl_msg_packet_in*)msg, swtch, xid);
    //......
    case OFPT_QUE_CN:
      return HandleQueCn (
        (struct ofl_msg_que_cn_cr*)msg, swtch, xid);
    case OFPT_QUE_CR:
      return HandleQueCr (
        (struct ofl_msg_que_cn_cr*)msg, swtch, xid);
    //......
    case OFPT_EXPERIMENTER:
    default:
      return ofl_error (OFPET_BAD_REQUEST, OFPGMFC_BAD_TYPE);
    }
}

它们的函数声明在ofswitch13-controller.h与ofswitch13-tsfcc-controller.h中

/* ofswitch13-controller.h */
virtual ofl_err HandleQueCn (
    struct ofl_msg_que_cn_cr *msg, Ptr<const RemoteSwitch> swtch,
    uint32_t xid);

virtual ofl_err HandleQueCr (
    struct ofl_msg_que_cn_cr *msg, Ptr<const RemoteSwitch> swtch,
    uint32_t xid);
/* ofswitch13-controller.cc */
ofl_err
OFSwitch13Controller::HandleQueCn (
  struct ofl_msg_que_cn_cr *msg, Ptr<const RemoteSwitch> swtch,
  uint32_t xid)
{
  NS_LOG_FUNCTION (this << swtch << xid);

  ofl_msg_free ((struct ofl_msg_header*)msg, 0);
  return 0;
}

ofl_err
OFSwitch13Controller::HandleQueCr (
  struct ofl_msg_que_cn_cr *msg, Ptr<const RemoteSwitch> swtch,
  uint32_t xid)
{
  NS_LOG_FUNCTION (this << swtch << xid);

  ofl_msg_free ((struct ofl_msg_header*)msg, 0);
  return 0;
}
/* ofswitch13-tsfcc-controller.h */  
ofl_err HandleQueCn (
    struct ofl_msg_que_cn_cr *msg, Ptr<const RemoteSwitch> swtch,
    uint32_t xid);

  ofl_err HandleQueCr (
    struct ofl_msg_que_cn_cr *msg, Ptr<const RemoteSwitch> swtch,
    uint32_t xid);

/* ofswitch13-tsfcc-controller.cc */

/**
 * @brief 用于处理队列超过阈值接收到的OpenFlow消息,流程为:先区分象鼠流,再根据队列长度判断进行哪一种拥塞控制方案
 * 
 * @param msg OpenFlow消息
 * @param swtch 交换机
 * @param xid xid
 * @return ofl_err 错误结构体
 */
ofl_err
OFSwitch13TsfccController::HandleQueCn (
  struct ofl_msg_que_cn_cr *msg, Ptr<const RemoteSwitch> swtch,
  uint32_t xid)
{
  NS_LOG_FUNCTION (this << swtch << xid);
  //自己定义实现
  
  ofl_msg_free ((struct ofl_msg_header*)msg, 0);
  return 0;
}
/**
 * @brief 用于处理接收到队列长度恢复到阈值以下的OpenFlow消息,流程为:先区分象鼠流,再根据BDP等对象鼠流进行不同的rwnd值的增加
 * 
 * @param msg OpenFlow消息
 * @param swtch 交换机
 * @param xid xid
 * @return ofl_err 错误结构体
 */
ofl_err
OFSwitch13TsfccController::HandleQueCr (
  struct ofl_msg_que_cn_cr *msg, Ptr<const RemoteSwitch> swtch,
  uint32_t xid)
{
  NS_LOG_FUNCTION (this << swtch << xid);

  //自己定义实现
  ofl_msg_free ((struct ofl_msg_header*)msg, 0);
  return 0;
}