CC BY 4.0 (除特别声明或转载文章外)
如果这篇博客帮助到你,可以请我喝一杯咖啡~
远程直接内存访问(即 Remote Direct Memory Access)是一种直接内存访问技术,它让计算机可以直接存取其他计算机的内存,而不需要经过操作系统和处理器耗时的处理。RDMA 将数据从一个系统快速移动到远程系统存储器中,不对操作系统造成任何影响。
RDMA 技术最早出现在 Infiniband 网络,用于 HPC 高性能计算集群的互联。支持 RDMA 协议的设备主要有 Infiniband、RoCE、iWARP 网卡,在 HPC、并行存储系统等领域得到广泛应用。
RDMA VS TCP/IP
传统的基于 Socket 套接字(TCP/IP 协议栈)的网络通信,需要经过操作系统软件协议栈,数据在系统 DRAM、处理器 Cache 和网卡 Buffer 之间来回拷贝搬移,因此占用了大量的 CPU 计算资源和内存总线带宽,也加大了网络延时。举例来说,40Gbps 的 TCP/IP 流能耗尽主流服务器的所有 CPU 资源;RDMA 则解决了传统 TCP/IP 通信的技术痛点。例如,在 40Gbps 场景下,CPU 占用率从 100%下降到 5%,网络延时从 ms 级降低到 10us 以下。
RDMA 技术的原理及其与 TCP/IP 架构的对比如下图所示。
因此,自我理解 RDMA 为利用相关的硬件和网络技术,服务器的网卡之间可以直接读内存,最终达到高带宽、低延迟和低资源占用率的效果。应用程序不需要参与数据传输过程,只需要指定内存读写地址,开启传输并等待传输完成即可。
RDMA 的三种实现
目前 RDMA 有三种不同的硬件实现。分别是 InfiniBand、iWarp(internet Wide Area RDMA Protocol)、RoCE(RDMA over Converged Ethernet)。其中,Infiniband 是一种专为 RDMA 设计的网络,从硬件级别保证可靠传输 , 而 RoCE 和 iWARP 都是基于以太网的 RDMA 技术,支持相应的 verbs 接口,如下图所示。
从图中不难发现,RoCE 协议存在 RoCEv1 和 RoCEv2 两个版本,主要区别 RoCEv1 是基于以太网链路层实现的 RDMA 协议(交换机需要支持 PFC 等流控技术,在物理层保证可靠传输),而 RoCEv2 是以太网 TCP/IP 协议中 UDP 层实现。从性能上,很明显 Infiniband 网络最好,但网卡和交换机是价格也很高,然而 RoCEv2 和 iWARP 仅需使用特殊的网卡就可以了,价格也相对便宜很多。
自己归纳一下,三种实现之间的区别:
- IB 网是从硬件上支持 RDMA,各方面性能上最优,但是对硬件成本要求最高,需要支持该技术的网卡和交换机。
- 一个允许在以太网上执行 RDMA 的网络协议。 其较低的网络标头是以太网标头,其较高的网络标头(包括数据)是 InfiniBand 标头。 这支持在标准以太网基础设施(交换机)上使用 RDMA。 只有网卡应该是特殊的,支持 RoCE。
- iWarp 一个允许在 TCP 上执行 RDMA 的网络协议,可能比普通以太网还要慢,不适合用于生产环境。只有网卡应该是特殊的,并且支持 iWARP(如果使用 CPU 卸载),否则所有 iWARP 堆栈都可以在 SW 中实现,并且丧失了大部分 RDMA 性能优势。IB 和 RoCE 中存在的功能在 iWARP 中不受支持。 这支持在标准以太网基础设施(交换机)上使用 RDMA。
相关概念
QP(Queue Pair)
Queue Pairs(QP),每对 QP 由 Send Queue(SQ)和 Receive Queue(RQ)构成,这些队列中管理着各种类型的消息。QP 会被映射到应用的虚拟地址空间,使得应用直接通过它访问 RNIC 网卡。
CQ(Complete Queue)
完成队列包含了发送到工作队列(WQ)中已完成的工作请求(WR)。每次完成表示一个特定的 WR 执行完毕(包括成功完成的 WR 和不成功完成的 WR)。完成队列是一个用来告知应用程序已结束的工作请求的信息(状态、操作码、大小、来源)的机制。CQ 有 n 个完成队列实体(CQE)。CQE 的数量在 CQ 创建的时候被指定。当一个 CQP 被轮询到,它就从 CQ 中被删除。CQ 是一个 CQE 的先进选出(FIFO)队列。CQ 能服务于发送队列、接收队列或者同时服务于这两种队列。多个不同 QP 中的工作请求(WQ)可联系到同一个 CQ 上。
MR(Memory Region)
内存注册机制允许应用程序申请一些连续的虚拟内存空间或者连续的物理内存空间,将这些内存空间提供给网络适配器作为虚拟的连续缓冲区,缓冲区使用虚拟地址。内存注册进程锁定了内存页。(为了防止页被替换出去,同时保持物理和虚拟内存的映射)在注册期间,操作系统检查被注册块的许可。注册进程将虚拟地址与物理地址的映射表写入网络适配器。在注册内存时,对应内存区域的权限会被设定。权限包括本地写、远程读、远程写、原子操作、绑定。每个内存注册(MR)有一个远程的和一个本地的标志(r_key,l_key)。本地标志被本地的 HCA 用来访问本地内存,例如在接收数据操作的期间。远程标志提供给远程 HCA 用来在 RDMA 操作期间允许远程进程访问本地的系统内存。同一内存缓冲区可以被多次注册(甚至设置不同的操作权限),并且每次注册都会生成不同的标志。
发送请求(SR)
SR 定义了数据的发送量、从哪里、发送方式、是否通过 RDMA、到哪里。
RR 定义用来放置通过非 RDMA 操作接收到的数据的缓冲区。如没有定义缓冲区,并且有个传输者 尝试执行一个发送操作或者一个带立即数的 RDMA 写操作,那么接收者将会发出接收未就绪的错误(RNR)。
接收请求(RR)
RR 定义用来放置通过非 RDMA 操作接收到的数据的缓冲区。如没有定义缓冲区,并且有个传输者 尝试执行一个发送操作或者一个带立即数的 RDMA 写操作,那么接收者将会发出接收未就绪的错误(RNR)。
RDMA 工作过程
- 当一个应用执行 RDMA 读或写请求时,不执行任何数据复制.在不需要任何内核内存参与的条件下,RDMA 请求从运行在用户空间中的应用中发送到本地 NIC( 网卡)。
- NIC 读取缓冲的内容,并通过网络传送到远程 NIC。
- 在网络上传输的 RDMA 信息包含目标虚拟地址、内存钥匙和数据本身.请求既可以完全在用户空间中处理(通过轮询用户级完成排列) ,又或者在应用一直睡眠到请求完成时的情况下通过系统中断处理.RDMA 操作使应用可以从一个远程应用的内存中读数据或向这个内存写数据。
- 目标 NIC 确认内存钥匙,直接将数据写人应用缓存中.用于操作的远程虚拟内存地址包含在 RDMA 信息中。
Review:TCP 通信流程
Server
- 调用
socket()
创建一个套接字 - 调用
bind()
绑定到一个端口 - 调用
listen()
监听该端口 - 调用
accept()
等待客户端连接上来(阻塞) - 建立 TCP 连接
- 调用
send()
/receive()
通过该连接进行通信
Client
- 调用
socket()
创建一个套接字 - 调用
connect()
连上服务端 - 建立 TCP 连接
- 调用
send()
/receive()
通过该连接进行通信
RDMA 通信流程
获取 RDMA 设备列表(ibv_get_device_list
)
获得机器上的 RDMA 设备。有点像 CUDA。
/* 1 获取设备列表 */
int num_devices;
struct ibv_device **dev_list = ibv_get_device_list(&num_devices);
if (!dev_list || !num_devices)
{
fprintf(stderr, "failed to get IB devices\n");
rc = 1;
goto main_exit;
}
打开一个 RDMA 设备,获取一个上下文(ibv_open_device、ibv_context)
/* 2 打开设备,获取设备上下文 */
struct ibv_device *ib_dev = dev_list[0];
res.ib_ctx = ibv_open_device(ib_dev);
if (!res.ib_ctx)
{
fprintf(stderr, "failed to open device \n");
rc = 1;
goto main_exit;
}
释放 RDMA 设备列表占用的资源(ibv_free_device_list)
个人理解这一步是在给前两步擦屁股…因为通常需要操作的是单个 RDMA 设备,而第一步获得的是一个列表。多余的部分自然是要释放掉。
/* 3 释放设备列表占用的资源 */
ibv_free_device_list(dev_list);
dev_list = NULL;
ib_dev = NULL;
查询 RDMA 设备端口信息(ibv_query_port、ibv_port_attr)
/* 4 查询设备端口状态 */
if (ibv_query_port(res.ib_ctx, 1, &res.port_attr))
{
fprintf(stderr, "ibv_query_port on port failed\n");
rc = 1;
goto main_exit;
}
分配一个 Protection Domain(ibv_alloc_pd、ibv_pd)
/* 5 创建PD(Protection Domain) */
res.pd = ibv_alloc_pd(res.ib_ctx);
if (!res.pd)
{
fprintf(stderr, "ibv_alloc_pd failed\n");
rc = 1;
goto main_exit;
}
创建一个 Complete Queue(ibv_create_cq、ibv_cq)
/* 6 创建CQ(Complete Queue) */
int cq_size = 10;
res.cq = ibv_create_cq(res.ib_ctx, cq_size, NULL, NULL, 0);
if (!res.cq)
{
fprintf(stderr, "failed to create CQ with %u entries\n", cq_size);
rc = 1;
goto main_exit;
}
注册一块 Memory Region(ibv_reg_mr、ibv_mr)
/* 7 注册MR(Memory Region) */
int size = MSG_SIZE;
res.buf = (char *)malloc(size);
if (!res.buf)
{
fprintf(stderr, "failed to malloc %Zu bytes to memory buffer\n", size);
rc = 1;
goto main_exit;
}
memset(res.buf, 0, size);
int mr_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
res.mr = ibv_reg_mr(res.pd, res.buf, size, mr_flags);
if (!res.mr)
{
fprintf(stderr, "ibv_reg_mr failed with mr_flags=0x%x\n", mr_flags);
rc = 1;
goto main_exit;
}
fprintf(stdout, "MR was registered with addr=%p, lkey=0x%x, rkey=0x%x, flags=0x%x\n",
res.buf, res.mr->lkey, res.mr->rkey, mr_flags);
创建一个 Queue Pair(ibv_create_qp、ibv_qp)
/* 8 创建QP(Queue Pair) */
struct ibv_qp_init_attr qp_init_attr;
memset(&qp_init_attr, 0, sizeof(qp_init_attr));
qp_init_attr.qp_type = IBV_QPT_RC;
qp_init_attr.sq_sig_all = 1;
qp_init_attr.send_cq = res.cq;
qp_init_attr.recv_cq = res.cq;
qp_init_attr.cap.max_send_wr = 1;
qp_init_attr.cap.max_recv_wr = 1;
qp_init_attr.cap.max_send_sge = 1;
qp_init_attr.cap.max_recv_sge = 1;
res.qp = ibv_create_qp(res.pd, &qp_init_attr);
if (!res.qp)
{
fprintf(stderr, "failed to create QP\n");
rc = 1;
goto main_exit;
}
fprintf(stdout, "QP was created, QP number=0x%x\n", res.qp->qp_num);
交换控制信息(使用 Socket 或 RDMA_CM API)
/* 9 交换控制信息 */
struct cm_con_data_t local_con_data; // 发送给远程主机的信息
struct cm_con_data_t remote_con_data; // 接收远程主机发送过来的信息
struct cm_con_data_t tmp_con_data;
local_con_data.addr = htonll((uintptr_t)res.buf);
local_con_data.rkey = htonl(res.mr->rkey);
local_con_data.qp_num = htonl(res.qp->qp_num);
local_con_data.lid = htons(res.port_attr.lid);
if (sock_sync_data(server_ip, sizeof(struct cm_con_data_t), (char *)&local_con_data, (char *)&tmp_con_data) < 0)
{
fprintf(stderr, "failed to exchange connection data between sides\n");
rc = 1;
goto main_exit;
}
remote_con_data.addr = ntohll(tmp_con_data.addr);
remote_con_data.rkey = ntohl(tmp_con_data.rkey);
remote_con_data.qp_num = ntohl(tmp_con_data.qp_num);
remote_con_data.lid = ntohs(tmp_con_data.lid);
/* save the remote side attributes, we will need it for the post SR */
res.remote_props = remote_con_data;
fprintf(stdout, "Remote address = 0x%" PRIx64 "\n", remote_con_data.addr);
fprintf(stdout, "Remote rkey = 0x%x\n", remote_con_data.rkey);
fprintf(stdout, "Remote QP number = 0x%x\n", remote_con_data.qp_num);
fprintf(stdout, "Remote LID = 0x%x\n", remote_con_data.lid);
转换 QP 状态 RESET->INIT->RTR->RTS(ibv_modify_qp)
- 状态:RESET -> INIT -> RTR -> RTS
- 要严格按照顺序进行转换
- QP 刚创建时状态为 RESET
- INIT 之后就可以调用 ibv_post_recv 提交一个 receive buffer 了
- 当 QP 进入 RTR(ready to receive)状态以后,便开始进行接收处理
- RTR 之后便可以转为 RTS(ready to send),RTS 状态下可以调用 ibv_post_send
/* 10 转换QP状态 */
// RESET -> INIT
struct ibv_qp_attr attr;
int flags;
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_INIT;
attr.port_num = 1; // IB 端口号
attr.pkey_index = 0;
attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS;
rc = ibv_modify_qp(res.qp, &attr, flags);
if (rc)
fprintf(stderr, "failed to modify QP state to INIT\n");
//INIT -> RTR(Ready To Receive)
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTR;
attr.path_mtu = IBV_MTU_256;
attr.dest_qp_num = res.remote_props.qp_num;
attr.rq_psn = 0;
attr.max_dest_rd_atomic = 1;
attr.min_rnr_timer = 0x12;
attr.ah_attr.is_global = 0;
attr.ah_attr.dlid = res.remote_props.lid;
attr.ah_attr.sl = 0;
attr.ah_attr.src_path_bits = 0;
attr.ah_attr.port_num = 1;
flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER;
rc = ibv_modify_qp(res.qp, &attr, flags);
if (rc)
fprintf(stderr, "failed to modify QP state to RTR\n");
//RTR -> RTS(Ready To Send)
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTS;
attr.timeout = 0x12;
attr.retry_cnt = 6;
attr.rnr_retry = 0;
attr.sq_psn = 0;
attr.max_rd_atomic = 1;
flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC;
rc = ibv_modify_qp(res.qp, &attr, flags);
if (rc)
fprintf(stderr, "failed to modify QP state to RTS\n");
创建发送任务/接收任务(ibv_send_wr / ibv_recv_wr)
- ibv_send_wr(send work request)
- 该任务会被提交到 QP 中的 SQ(Send Queue)中
- 发送任务有三种操作:Send,Read,Write
- Send 操作需要对方执行相应的 Receive 操作
- Read/Write 直接操作对方内存,对方无感知
- 把要发送的数据的内存地址,大小,密钥告诉 HCA
- Read/Write 还需要告诉 HCA 远程的内存地址和密钥
/* 11 创建发送任务ibv_send_wr */
struct ibv_send_wr sr;
struct ibv_sge sge;
struct ibv_send_wr *bad_wr = NULL;
int rc;
/* prepare the scatter/gather entry */
memset(&sge, 0, sizeof(sge));
sge.addr = (uintptr_t)res->buf;
sge.length = MSG_SIZE;
sge.lkey = res->mr->lkey;
/* prepare the send work request */
memset(&sr, 0, sizeof(sr));
sr.next = NULL;
sr.wr_id = 0;
sr.sg_list = &sge;
sr.num_sge = 1;
sr.opcode = opcode;
sr.send_flags = IBV_SEND_SIGNALED;
if (opcode != IBV_WR_SEND)
{
sr.wr.rdma.remote_addr = res->remote_props.addr;
sr.wr.rdma.rkey = res->remote_props.rkey;
}
提交发送任务/接收任务(ibv_post_send / ibv_post_recv)
rc = ibv_post_send(res->qp, &sr, &bad_wr);
if (rc)
fprintf(stderr, "failed to post SR\n");
return rc;
轮询任务完成信息(ibv_poll_cq)
/* 13 轮询任务结果 */
struct ibv_wc wc;
int poll_result;
int rc = 0;
do
{
poll_result = ibv_poll_cq(res->cq, 1, &wc);
} while (poll_result == 0);
完整示例代码(RDMA_Read_Write_Demo.c
)
/*
* BUILD COMMAND:
* gcc -Wall -o RDMA_Read_Write_Demo RDMA_Read_Write_Demo.c -libverbs
*/
/******************************************************************************
*
* RDMA Aware Networks Programming Example
*
* This code demonstrates how to perform the following operations using the * VPI Verbs API:
*
* Send
* Receive
* RDMA Read
* RDMA Write
*
*****************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdint.h>
#include <inttypes.h>
#include <endian.h>
#include <byteswap.h>
#include <getopt.h>
#include <sys/time.h>
#include <arpa/inet.h>
#include <infiniband/verbs.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
/* poll CQ timeout in millisec (2 seconds) */
#define MAX_POLL_CQ_TIMEOUT 20000
#define MSG "SEND operation "
#define MSG_SIZE 1048576 //1MB
#if __BYTE_ORDER == __LITTLE_ENDIAN
static inline uint64_t htonll(uint64_t x) { return bswap_64(x); }
static inline uint64_t ntohll(uint64_t x) { return bswap_64(x); }
#elif __BYTE_ORDER == __BIG_ENDIAN
static inline uint64_t htonll(uint64_t x) { return x; }
static inline uint64_t ntohll(uint64_t x) { return x; }
#else
#error __BYTE_ORDER is neither __LITTLE_ENDIAN nor __BIG_ENDIAN
#endif
/* structure to exchange data which is needed to connect the QPs */
struct cm_con_data_t
{
uint64_t addr; /* Buffer address */
uint32_t rkey; /* Remote key */
uint32_t qp_num; /* QP number */
uint16_t lid; /* LID of the IB port */
} __attribute__((packed));
/* structure of system resources */
struct resources
{
struct ibv_device_attr device_attr; /* Device attributes */
struct ibv_port_attr port_attr; /* IB port attributes */
struct cm_con_data_t remote_props; /* values to connect to remote side */
struct ibv_context *ib_ctx; /* device handle */
struct ibv_pd *pd; /* PD handle */
struct ibv_cq *cq; /* CQ handle */
struct ibv_qp *qp; /* QP handle */
struct ibv_mr *mr; /* MR handle for buf */
char *buf; /* memory buffer pointer */
};
/******************************************************************************
* Function: sock_connect
*
* Input
* servername URL of server to connect to (NULL for server mode)
* port port of service
*
* Output
* none
*
* Returns
* socket (fd) on success, negative error code on failure
*
* Description
* Connect a socket. If servername is specified a client connection will be
* initiated to the indicated server and port. Otherwise listen on the
* indicated port for an incoming connection.
*
******************************************************************************/
static int sock_connect(const char *servername, int port)
{
struct addrinfo *resolved_addr = NULL;
struct addrinfo *iterator;
char service[6];
int sockfd = -1;
int listenfd = 0;
int tmp;
struct addrinfo hints =
{
.ai_flags = AI_PASSIVE,
.ai_family = AF_INET,
.ai_socktype = SOCK_STREAM};
if (sprintf(service, "%d", port) < 0)
goto sock_connect_exit;
/* Resolve DNS address, use sockfd as temp storage */
sockfd = getaddrinfo(servername, service, &hints, &resolved_addr);
if (sockfd < 0)
{
fprintf(stderr, "%s for %s:%d\n", gai_strerror(sockfd), servername, port);
goto sock_connect_exit;
}
/* Search through results and find the one we want */
for (iterator = resolved_addr; iterator; iterator = iterator->ai_next)
{
sockfd = socket(iterator->ai_family, iterator->ai_socktype, iterator->ai_protocol);
if (sockfd >= 0)
{
if (servername)
{
/* Client mode. Initiate connection to remote */
if ((tmp = connect(sockfd, iterator->ai_addr, iterator->ai_addrlen)))
{
fprintf(stdout, "failed connect \n");
close(sockfd);
sockfd = -1;
}
}
else
{
/* Server mode. Set up listening socket an accept a connection */
listenfd = sockfd;
sockfd = -1;
if (bind(listenfd, iterator->ai_addr, iterator->ai_addrlen))
goto sock_connect_exit;
listen(listenfd, 1);
sockfd = accept(listenfd, NULL, 0);
}
}
}
sock_connect_exit:
if (listenfd)
close(listenfd);
if (resolved_addr)
freeaddrinfo(resolved_addr);
if (sockfd < 0)
{
if (servername)
fprintf(stderr, "Couldn't connect to %s:%d\n", servername, port);
else
{
perror("server accept");
fprintf(stderr, "accept() failed\n");
}
}
return sockfd;
}
/******************************************************************************
* Function: sock_sync_data
*
* Input
Table 5 -
* sock socket to transfer data on
* xfer_size size of data to transfer
* local_data pointer to data to be sent to remote (local_data是一个指向要发送到远程的数据的指针)
*
* Output
* remote_data pointer to buffer to receive remote data
*
* Returns
* 0 on success, negative error code on failure
*
* Description
* Sync data across a socket. The indicated local data will be sent to the
* remote. It will then wait for the remote to send its data back. It is
* assumed that the two sides are in sync and call this function in the proper
* order. Chaos will ensue if they are not. :)
*
* Also note this is a blocking function and will wait for the full data to be
* received from the remote.
*
******************************************************************************/
int sock_sync_data(char *server_ip, int xfer_size, char *local_data, char *remote_data)
{
int sock = -1;
int port = 10002;
int rc;
if (server_ip)
{
sock = sock_connect(server_ip, port);
if (sock < 0)
{
fprintf(stderr, "failed to establish TCP connection to server %s, port %d\n",
server_ip, port);
rc = -1;
}
}
else
{
fprintf(stdout, "waiting on port %d for TCP connection\n", port);
sock = sock_connect(NULL, port);
if (sock < 0)
{
fprintf(stderr, "failed to establish TCP connection with client\n");
rc = -1;
}
}
fprintf(stdout, "TCP connection was established\n");
int read_bytes = 0;
int total_read_bytes = 0;
rc = write(sock, local_data, xfer_size);
if (rc < xfer_size)
fprintf(stderr, "Failed writing data during sock_sync_data\n");
else
rc = 0;
while (!rc && total_read_bytes < xfer_size)
{
read_bytes = read(sock, remote_data, xfer_size);
if (read_bytes > 0)
total_read_bytes += read_bytes;
else
rc = read_bytes;
}
sleep(2); //sleep 2s
if (sock > 0)
close(sock);
return rc;
}
/******************************************************************************
End of socket operations
******************************************************************************/
/* poll_completion */
/******************************************************************************
* Function: poll_completion
*
* Input
* res pointer to resources structure
*
* Output
* none
*
* Returns
* 0 on success, 1 on failure
*
* Description
* Poll the completion queue for a single event. This function will continue to
* poll the queue until MAX_POLL_CQ_TIMEOUT milliseconds have passed. (轮询获得一个CQ事件)
*
******************************************************************************/
static int poll_completion(struct resources *res)
{
/* 13 轮询任务结果 */
struct ibv_wc wc;
int poll_result;
int rc = 0;
do
{
poll_result = ibv_poll_cq(res->cq, 1, &wc);
} while (poll_result == 0);
if (poll_result < 0)
{
/* poll CQ failed */
fprintf(stderr, "poll CQ failed\n");
rc = 1;
}
else
{
if (wc.status != IBV_WC_SUCCESS)
{
fprintf(stderr, "got bad completion with status: 0x%x, vendor syndrome: 0x%x\n", wc.status,
wc.vendor_err);
rc = 1;
}
}
return rc;
}
/******************************************************************************
* Function: post_send
*
* Input
* res pointer to resources structure
* opcode IBV_WR_SEND, IBV_WR_RDMA_READ or IBV_WR_RDMA_WRITE
*
* Output
* none
*
* Returns
* 0 on success, error code on failure
*
* Description
* This function will create and post a send work request
******************************************************************************/
static int post_send(struct resources *res, int opcode)
{
/* 11 创建发送任务ibv_send_wr */
struct ibv_send_wr sr;
struct ibv_sge sge;
struct ibv_send_wr *bad_wr = NULL;
int rc;
/* prepare the scatter/gather entry */
memset(&sge, 0, sizeof(sge));
sge.addr = (uintptr_t)res->buf;
sge.length = MSG_SIZE;
sge.lkey = res->mr->lkey;
/* prepare the send work request */
memset(&sr, 0, sizeof(sr));
sr.next = NULL;
sr.wr_id = 0;
sr.sg_list = &sge;
sr.num_sge = 1;
sr.opcode = opcode;
sr.send_flags = IBV_SEND_SIGNALED;
if (opcode != IBV_WR_SEND)
{
sr.wr.rdma.remote_addr = res->remote_props.addr;
sr.wr.rdma.rkey = res->remote_props.rkey;
}
/* 12 提交发送任务 */
rc = ibv_post_send(res->qp, &sr, &bad_wr);
if (rc)
fprintf(stderr, "failed to post SR\n");
return rc;
}
/******************************************************************************
* Function: post_receive
*
* Input
* res pointer to resources structure
*
* Output
* none
*
* Returns
* 0 on success, error code on failure
*
* Description
*
******************************************************************************/
static int post_receive(struct resources *res)
{
/* 11 创建接收任务ibv_resv_wr */
struct ibv_recv_wr rr;
struct ibv_sge sge;
struct ibv_recv_wr *bad_wr;
int rc;
/* prepare the scatter/gather entry */
memset(&sge, 0, sizeof(sge));
sge.addr = (uintptr_t)res->buf;
sge.length = MSG_SIZE;
sge.lkey = res->mr->lkey;
/* prepare the receive work request */
memset(&rr, 0, sizeof(rr));
rr.next = NULL;
rr.wr_id = 0;
rr.sg_list = &sge;
rr.num_sge = 1;
/* 12 提交接收任务 */
rc = ibv_post_recv(res->qp, &rr, &bad_wr);
if (rc)
fprintf(stderr, "failed to post RR\n");
return rc;
}
/******************************************************************************
* Function: resources_destroy
*
* Input
* res pointer to resources structure
*
* Output
* none
*
* Returns
* 0 on success, 1 on failure
*
* Description
* Cleanup and deallocate all resources used
******************************************************************************/
static int resources_destroy(struct resources *res)
{
int rc = 0;
if (res->qp)
if (ibv_destroy_qp(res->qp))
{
fprintf(stderr, "failed to destroy QP\n");
rc = 1;
}
if (res->mr)
if (ibv_dereg_mr(res->mr))
{
fprintf(stderr, "failed to deregister MR\n");
rc = 1;
}
if (res->buf)
free(res->buf);
if (res->cq)
if (ibv_destroy_cq(res->cq))
{
fprintf(stderr, "failed to destroy CQ\n");
rc = 1;
}
if (res->pd)
if (ibv_dealloc_pd(res->pd))
{
fprintf(stderr, "failed to deallocate PD\n");
rc = 1;
}
if (res->ib_ctx)
if (ibv_close_device(res->ib_ctx))
{
fprintf(stderr, "failed to close device context\n");
rc = 1;
}
return rc;
}
/******************************************************************************
* Function: main
*
* Input
* argc number of items in argv
* argv command line parameters
*
* Output
* none
*
* Returns
* 0 on success, 1 on failure
*
* Description
* Main program code
******************************************************************************/
int main(int argc, char *argv[])
{
struct resources res;
int rc = 1;
char *server_ip = NULL;
if (optind == argc - 1)
server_ip = argv[optind]; //获取客户端连接服务器的IP
/* init all of the resources, so cleanup will be easy */
memset(&res, 0, sizeof res);
/* 1 获取设备列表 */
int num_devices;
struct ibv_device **dev_list = ibv_get_device_list(&num_devices);
if (!dev_list || !num_devices)
{
fprintf(stderr, "failed to get IB devices\n");
rc = 1;
goto main_exit;
}
/* 2 打开设备,获取设备上下文 */
struct ibv_device *ib_dev = dev_list[0];
res.ib_ctx = ibv_open_device(ib_dev);
if (!res.ib_ctx)
{
fprintf(stderr, "failed to open device \n");
rc = 1;
goto main_exit;
}
/* 3 释放设备列表占用的资源 */
ibv_free_device_list(dev_list);
dev_list = NULL;
ib_dev = NULL;
/* 4 查询设备端口状态 */
if (ibv_query_port(res.ib_ctx, 1, &res.port_attr))
{
fprintf(stderr, "ibv_query_port on port failed\n");
rc = 1;
goto main_exit;
}
/* 5 创建PD(Protection Domain) */
res.pd = ibv_alloc_pd(res.ib_ctx);
if (!res.pd)
{
fprintf(stderr, "ibv_alloc_pd failed\n");
rc = 1;
goto main_exit;
}
/* 6 创建CQ(Complete Queue) */
int cq_size = 10;
res.cq = ibv_create_cq(res.ib_ctx, cq_size, NULL, NULL, 0);
if (!res.cq)
{
fprintf(stderr, "failed to create CQ with %u entries\n", cq_size);
rc = 1;
goto main_exit;
}
/* 7 注册MR(Memory Region) */
int size = MSG_SIZE;
res.buf = (char *)malloc(size);
if (!res.buf)
{
fprintf(stderr, "failed to malloc %Zu bytes to memory buffer\n", size);
rc = 1;
goto main_exit;
}
memset(res.buf, 0, size);
int mr_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
res.mr = ibv_reg_mr(res.pd, res.buf, size, mr_flags);
if (!res.mr)
{
fprintf(stderr, "ibv_reg_mr failed with mr_flags=0x%x\n", mr_flags);
rc = 1;
goto main_exit;
}
fprintf(stdout, "MR was registered with addr=%p, lkey=0x%x, rkey=0x%x, flags=0x%x\n",
res.buf, res.mr->lkey, res.mr->rkey, mr_flags);
/* 8 创建QP(Queue Pair) */
struct ibv_qp_init_attr qp_init_attr;
memset(&qp_init_attr, 0, sizeof(qp_init_attr));
qp_init_attr.qp_type = IBV_QPT_RC;
qp_init_attr.sq_sig_all = 1;
qp_init_attr.send_cq = res.cq;
qp_init_attr.recv_cq = res.cq;
qp_init_attr.cap.max_send_wr = 1;
qp_init_attr.cap.max_recv_wr = 1;
qp_init_attr.cap.max_send_sge = 1;
qp_init_attr.cap.max_recv_sge = 1;
res.qp = ibv_create_qp(res.pd, &qp_init_attr);
if (!res.qp)
{
fprintf(stderr, "failed to create QP\n");
rc = 1;
goto main_exit;
}
fprintf(stdout, "QP was created, QP number=0x%x\n", res.qp->qp_num);
/* 9 交换控制信息 */
struct cm_con_data_t local_con_data; // 发送给远程主机的信息
struct cm_con_data_t remote_con_data; // 接收远程主机发送过来的信息
struct cm_con_data_t tmp_con_data;
local_con_data.addr = htonll((uintptr_t)res.buf);
local_con_data.rkey = htonl(res.mr->rkey);
local_con_data.qp_num = htonl(res.qp->qp_num);
local_con_data.lid = htons(res.port_attr.lid);
if (sock_sync_data(server_ip, sizeof(struct cm_con_data_t), (char *)&local_con_data, (char *)&tmp_con_data) < 0)
{
fprintf(stderr, "failed to exchange connection data between sides\n");
rc = 1;
goto main_exit;
}
remote_con_data.addr = ntohll(tmp_con_data.addr);
remote_con_data.rkey = ntohl(tmp_con_data.rkey);
remote_con_data.qp_num = ntohl(tmp_con_data.qp_num);
remote_con_data.lid = ntohs(tmp_con_data.lid);
/* save the remote side attributes, we will need it for the post SR */
res.remote_props = remote_con_data;
fprintf(stdout, "Remote address = 0x%" PRIx64 "\n", remote_con_data.addr);
fprintf(stdout, "Remote rkey = 0x%x\n", remote_con_data.rkey);
fprintf(stdout, "Remote QP number = 0x%x\n", remote_con_data.qp_num);
fprintf(stdout, "Remote LID = 0x%x\n", remote_con_data.lid);
/* 10 转换QP状态 */
// RESET -> INIT
struct ibv_qp_attr attr;
int flags;
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_INIT;
attr.port_num = 1; // IB 端口号
attr.pkey_index = 0;
attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS;
rc = ibv_modify_qp(res.qp, &attr, flags);
if (rc)
fprintf(stderr, "failed to modify QP state to INIT\n");
//INIT -> RTR(Ready To Receive)
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTR;
attr.path_mtu = IBV_MTU_256;
attr.dest_qp_num = res.remote_props.qp_num;
attr.rq_psn = 0;
attr.max_dest_rd_atomic = 1;
attr.min_rnr_timer = 0x12;
attr.ah_attr.is_global = 0;
attr.ah_attr.dlid = res.remote_props.lid;
attr.ah_attr.sl = 0;
attr.ah_attr.src_path_bits = 0;
attr.ah_attr.port_num = 1;
flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER;
rc = ibv_modify_qp(res.qp, &attr, flags);
if (rc)
fprintf(stderr, "failed to modify QP state to RTR\n");
//RTR -> RTS(Ready To Send)
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTS;
attr.timeout = 0x12;
attr.retry_cnt = 6;
attr.rnr_retry = 0;
attr.sq_psn = 0;
attr.max_rd_atomic = 1;
flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC;
rc = ibv_modify_qp(res.qp, &attr, flags);
if (rc)
fprintf(stderr, "failed to modify QP state to RTS\n");
int choice;
while (1)
{
printf("\n\n\n");
printf("*********************************************************************************************\n");
printf("* 1:RDMA_READ 2:RDMA_WRITE 3:SEND 4:RECEIVE 5:Read Local Buffer 6:Write Local Buffer 7:Exit *\n");
printf("*********************************************************************************************\n");
printf("please input your choice : ");
scanf("%d", &choice);
getchar();
switch (choice)
{
case 1:
memset(res.buf, 0, MSG_SIZE);
post_send(&res, IBV_WR_RDMA_READ);
if (poll_completion(&res))
{
fprintf(stderr, "poll completion failed 2\n");
}
else
{
printf("Reading remote's buffer(addr:0x%x, rkey:0x%x) : %s\n", res.remote_props.addr, res.remote_props.rkey, res.buf);
}
break;
case 2:
memset(res.buf, 0, MSG_SIZE);
printf("Writing remote's buffer(addr:0x%x, rkey:0x%x) : ", res.remote_props.addr, res.remote_props.rkey);
fgets(res.buf, MSG_SIZE, stdin);
post_send(&res, IBV_WR_RDMA_WRITE);
if (poll_completion(&res))
{
fprintf(stderr, "poll completion failed 2\n");
}
else
{
printf("success\n");
}
break;
case 3:
memset(res.buf, 0, MSG_SIZE);
printf("Sending : ");
fgets(res.buf, MSG_SIZE, stdin);
post_send(&res, IBV_WR_SEND);
if (poll_completion(&res))
{
fprintf(stderr, "poll completion failed 2\n");
}
else
{
printf("success\n");
}
break;
case 4:
printf("receiving: ");
post_receive(&res);
if (poll_completion(&res))
{
fprintf(stderr, "poll completion failed 2\n");
}
else
{
printf("%s", res.buf);
}
break;
case 5:
printf("Reading local buffer(addr:0x%x): %s\n", (uintptr_t)res.buf, res.buf);
break;
case 6:
memset(res.buf, 0, MSG_SIZE);
printf("Writing local buffer(addr:0x%x) : %s", (uintptr_t)res.buf, res.buf);
fgets(res.buf, MSG_SIZE, stdin);
break;
case 7:
goto main_exit;
default:
printf("invalid choice.\n");
break;
}
}
rc = 0;
main_exit:
if (resources_destroy(&res))
{
fprintf(stderr, "failed to destroy resources\n");
rc = 1;
}
fprintf(stdout, "\ntest result is %d\n", rc);
return rc;
}