2010年5月12日星期三

intel e1000 网卡 napi分析(转)

转:http://sh-neo.spaces.live.com/blog/cns!1E3CA285E5F9E122!526.entry


内核如何从网卡接收数据,传统的过程:
1.数据到达网卡;
2.网卡产生一个中断给内核;
3.内核使用I/O指令,从网卡I/O区域中去读取数据;

我们在许多网卡驱动中(很老那些),都可以在网卡的中断函数中见到这一过程。

但是,这一种方法,有一种重要的问题,就是大流量的数据来到,网卡会产生大量的中断,内核在中断上下文中,会浪费大量的资源来处理中断本身。所以,就有一个问题,“可不可以不使用中断”,这就是轮询技术,所谓NAPI技术,说来也不神秘,就是说,内核屏蔽中断,然后隔一会儿就去问网卡,“你有没有数据啊?”……

从这个描述本身可以看到,如果数据量少,轮询同样占用大量的不必要的CPU资源,大家各有所长吧

OK,另一个问题,就是从网卡的I/O区域,包括I/O寄存器或I/O内存中去读取数据,这都要CPU去读,也要占用CPU资源,“CPU从I/O区域读,然后把它放到内存(这个内存指的是系统本身的物理内存,跟外设的内存不相干,也叫主内存)中”。于是自然地,就想到了DMA技术——让网卡直接从主内存之间读写它们的I/O数据,CPU,这儿不干你事,自己找乐子去:
1.首先,内核在主内存中为收发数据建立一个环形的缓冲队列(通常叫DMA环形缓冲区)。
2.内核将这个缓冲区通过DMA映射,把这个队列交给网卡;
3.网卡收到数据,就直接放进这个环形缓冲区了——也就是直接放进主内存了;然后,向系统产生一个中断;
4.内核收到这个中断,就取消DMA映射,这样,内核就直接从主内存中读取数据;

——呵呵,这一个过程比传统的过程少了不少工作,因为设备直接把数据放进了主内存,不需要CPU的干预,效率是不是提高不少?

对应以上4步,来看它的具体实现:
1)分配环形DMA缓冲区
Linux内核中,用skb来描述一个缓存,所谓分配,就是建立一定数量的skb,然后用e1000_rx_ring 环形缓冲区队列描述符连接起来
2)建立DMA映射
内核通过调用
dma_map_single(struct device *dev,void *buffer,size_t size,enum dma_data_direction direction)
建立映射关系。
struct device *dev 描述一个设备;
buffer:把哪个地址映射给设备;也就是某一个skb——要映射全部,当然是做一个双向链表的循环即可;
size:缓存大小;
direction:映射方向——谁传给谁:一般来说,是“双向”映射,数据在设备和内存之间双向流动;
对于PCI设备而言(网卡一般是PCI的),通过另一个包裹函数pci_map_single,这样,就把buffer交给设备了!设备可以直接从里边读/取数据。
3)这一步由硬件完成;
4)取消映射
dma_unmap_single,对PCI而言,大多调用它的包裹函数pci_unmap_single,不取消的话,缓存控制权还在设备手里,要调用它,把主动权掌握在CPU手里——因为我们已经接收到数据了,应该由CPU把数据交给上层网络栈;当然,不取消之前,通常要读一些状态位信息,诸如此类,一般是调用dma_sync_single_for_cpu()让CPU在取消映射前,就可以访问DMA缓冲区中的内容。


原代码分析
基于linux v2.6.26

//e1000_probe 网卡初始化 (重点关注两部分 1注册poll函数 2设置接收缓冲的大小)
static int __devinit e1000_probe(struct pci_dev *pdev,const struct pci_device_id *ent){
struct net_device *netdev;
struct e1000_adapter *adapter;
....
err=pci_enable_device(pdev);
...
err=pci_set_dma_mask(pdev,DMA_64BIT_MASK); //设置pci设备的dma掩码
...
netdev = alloc_etherdev(sizeof(struct e1000_adapter)); //为e1000网卡对应的net_device结构分配内存
...
pci_set_drvdata(pdev,netdev);
adapter=netdev_priv(netdev);
adapter->netdev=netdev;
adapter->pdev=pdev;
...
mmio_start = pci_resource_start(pdev,0);
mmio_len = pci_resource_len(pdev,0);
....
/*将e1000网卡驱动的相应函数注册到net_device中*/
netdev->open = &e1000_open;
netdev->stop = &e1000_close;
...
netif_napi_add(netdev,&adapter->napi,e1000_clean,64); // 注册poll函数为e1000_clean, weight为64
...
netdev->mem_start = mmio_start;
netdev->mem_end = mmio_start+mmio_len;
....
if(e1000e_read_mac_addr(&adapter->hw)) ndev_err(...); //从网卡设备的EEPROM中读取mac地址
memcpy(netdev->dev_addr, adapter->hw.mac.addr, netdev->addr_len);
memcpy(netdev->perm_addr, adapter->hw.mac.addr, netdev->addr_len);
....
adapter->rx_ring->count = 256; //设置接收环型缓冲区队列的缺省大小
...
e1000_reset(adapter);
...
strcpy(netdev->name,"eth%d");
err= register_netdev(netdev); //将当前网络设备注册到系统的dev_base[]设备数组当中
....
return 0;
}
e1000_open 各种数据结构初始化 (环形缓冲区队列的初始化)
static int e1000_open(struct net_device *netdev){
struct e1000_adapter *adapter = netdev_priv(netdev);
....
err = e1000_setup_all_rx_resoures(adapter) //预先分配缓冲区资源
....
err = e1000_request_irq(adapter); //分配irq中断
....
}
int e1000_setup_all_rx_resources(struct e1000_adapter *adapter){
int i,err=0;
for(i=0 ; inum_rx_queues ; i++){
err = e1000_setup_rx_resources(adapter,&adapter->rx_ring[i]);
if(err){
...
}
}
return err;
}
e1000_rx_ring 环形缓冲区队列(接收缓冲队列由多个描述符组成,每个描述符中都包含一个缓冲区buffer,该buffer以dma方式存放数据包,整个缓冲队列以环形排列 每个描述符都有一个状态变量以表示该缓冲区buffer是否可以被新到的数据包覆盖)
struct e1000_rx_ring{
void *desc; //指向该环形缓冲区
dma_addr_t dma; //dma物理地址
unsigned int size;
unsigned int count; //环形队列由多少个描述符组成,这个在probe中定义了
unsigned int next_to_use; //下一个可使用的描述符号
unsigned int next_to_clean; //该描述符状态(是否正在使用,是否脏)
struct e1000_buffer *buffer_info; //缓冲区buffer
...
}
struct e1000_buffer{
struct sk_buff *skb;
....
}
static int e1000_setup_rx_resources(struct e1000_adapt *adapter, struct e1000_rx_ring *rxdr){
struct pci_dev *pdev = adapter->pdev;
int size,desc_len;
size = sizeof(struct e1000_buffer) * rxdr->count;
rxdr->buffer_info = vmalloc(size);
memset(rxdr->buffer_info,0,size); //分配buffer所使用的内存
....
if(adapter->hw.mac_type <= e1000_82547_rec_2)
desc_len = sizeof(struct e1000_rx_desc);
else ....

rxdr->size = rxdr->count * desc_len;
rxdr->size = ALIGN(rxdr->size,4096);
rxdr->desc = pci_alloc_consistent(pdev,rxdr->size,&rxdr->dma);
...
memset(rxdr->desc,0,rxdr->size); //分配缓冲队列所使用的内存
rxdr->next_to_clean =0;
rxdr->next_to_use =0;
return 0;
}
e1000_up 启动网卡函数 调用alloc_rx_buf来建立环形缓冲队列
int e1000_up(struct e1000_adapter *adapter){
e1000_configure(adatper);
....
}
static void e1000_configure(struct e1000_adapter *adapter){
struct net_device *netdev = adapter->netdev;
int i;
...
e1000_configure_rx(adapter);
...
for (i=0;inum_rx_queues;i++){
struct e1000_rx_ring *ring = &adapter ->rx_ring[i];
adapter->alloc_rx_buf(adapter,ring,E1000_DESC_UNUSED(ring)); //从这里就可以看出 环形缓冲区并不是一开始就完全建好的,建了部分
}
...
}
static void e1000_configure_rx(struct e1000_adapter *adapter){
....
adapter->clean_rx = e1000_clean_rx_irq; //后面会提到的poll()
adapter->alloc_rx_buf = e1000_alloc_rx_irq //建立环形缓冲队列函数 这里实际调用的是e1000_alloc_rx_buffers
}
e1000_alloc_rx_buffers ----因为其中有些参数要看完下面的才能理解,所以这个函数最后再写

e1000_intr e1000的中断处理函数
static irqreturn_t e1000_intr(int irq,void *data){
struct net_device *netdev = data;
struct e1000_adapter *adapter = netdev_priv(netdev);
..
u32 icr = E1000_READ_REG(hw,ICR);
#ifdef CONFIG_E1000_NAPI
int i;
#endif
...
#ifdef CONFIG_E1000_NAPI //进入轮询模式
if(unlikely(hw->mac_type E1000_WRITE_REG(hw,IMC,~0); //关闭中断
E1000_WRITE_FLUSH(hw);
}
if (likely(netif_rx_schedule_prep(netdev,&adapter->napi))){ //确定该设备处于运行状态, 而且还未被添加到网络层的poll队列中
...
__netif_rx_schedule(netdev,&adapter->napi); //将当前设备netdevice加到与cpu相关的softnet_data的轮旬设备列表poll_list中并触发NET_RX_SOFTIRQ软中断
}
#else //进入中断模式
{ ...
for(i=0;i if (unlikely(!adapter->clean_rx(adapter, adapter->rx_ring) &.... //执行clean_rx()中关于中断模式的代码 不走napi路径
break;
...
}
}
....
return IRQ_HANDLED;
}
static inline int netif_rx_schedule_prep(struct net_device *dev,struct napi_struct *napi){
return napi_schedule_prep(napi);
}
static inline int napi_schedule_prep(struct napi_struct *n){
return !napi_disable_pending(n) &&
!test_and_set_bit(NAPI_STATE_SCHED, &n->state); //测试该设备是否已被被加到poll队列
}
static inline int napi_disable_pending (struct napi_struct *n){
return test_bit(NAPI_STATE_DISABLE,&n->state); //测试该设备是否停止运行
}
static inline void __netif_rx_schedule(struct net_device *dev,struct napi_struct *napi){
__napi_schedule(napi);
}
void __napi_schedule(struct napi_struct *n){
unsigned long flags;
local_irq_save(flags);
list_add_tail(&n->poll_list, &__get_cpu_var(softnet_data).poll_list);
__raise_softirq_irqoff(NET_RX_SOFTIRQ); 触发软中断
local_irq_restore(flags);
}
#define __raise_softirq_irqoff(nr) do {or_softirq_pending(iUL<<(nr)); } while(0)
用到的数据结构napi_struct
struct napi_struct{
struct list_head poll_list; //poll_list链表
unsigned long state //设备状态信息 往上看看
int weight; //设备预处理数据配额,作用是轮询时更公平的对待各个设备
int (*poll) (strcut napi_struct *,int);
.....
}
接下来就是软中断处理函数net_rx_action()
static void net_rx_action(struct softirq_action *h){
struct list_head *list = &__get_cpu_var(softnet_data).poll_list;
unsigned long start_time = jiffies;
int budget = netdev_budget; //处理限额,一次最多只能处理这么多数据包
....
local_irq_disable();
while (!list_empty(list)){
struct napi_struct *n;
int work,
weight;
if (unlikely(budget < 0 || jiffies != start_time)) //如果实际工作量work 超过限额,或处理时间超过1秒,则出于系统响应考虑立即从软中断处理函数中跳出来, work是poll的返回值 限额budget每次都会根据返回的work值重新计算 ,配额weight和work配合来实现轮询算法,具体算法要看完e1000_clean(),e1000_rx_irq()才能清楚
goto softnet_break;
local_irq_enalbe();
n = list_entry(list->next,struct napi_struct,poll_list);
weight = n->weight;
work 0;
if (test_bit(NAPI_STATE_SCHED,&n->state))
work = n->poll(n,weight); //调用设备的poll函数e1000_clean()
....
budget -= work; //更新限额
local_irq_disable();
if (unlikely(work == weight)){ //处理量大于配额
if(unlikely(napi_disable_pending(n)))
__napi_complete(n);
else
list_move_tail(&n->poll_list,list); //该设备还有要接收的数据没被处理,因为轮询算法 被移动到poll_llst尾部等待处理
}
...
}
out:
local_irq_enable();
...
return;
softnet_break:
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
goto out;
}
e1000网卡poll函数 e1000_clean()
static int e1000_clean(struct napi_struct *napi,int budget){ //此处的budget实际上是传过来的weight,不要和上面的budget弄混了
struct e1000_adapter *adapter = container_of(napi,struct e1000_adapter,napi);
struct net_device *poll_dev = adapter->netdev;
int work_done = 0;
adapter = poll_dev->priv;
.....
adapter ->clean_rx(adapter,&adapter ->rx_ring[0],&work_done,budget); //实际调用的是clean_rx_irq()
...
if(work_done ...
netif_rx_complete(poll_dev,napi); //__napi_complete()的包装函数
e1000_irq_enable(adapter); //开中断
}
return work_done;
}
static inline void netif_rx_complete(struct net_device *dev,struct napi_struct *napi){
unsigned long flags;
local_irq_save(flags);
__netif_rx_complete(dev,napi);
local_irq_restore(flags);
}
static inline void __netif_rx_complete(struct net_device *dev,struct napi_struct *napi){
__napi_complete(napi);
}
static inline void __napi_complete(struct napi_struct *n){
....
list_del(&n->poll_list);
clear_bit(NAPI_STATE_SCHED,&n->state);
}
设备轮询接收机制中最重要的函数e1000_clean_rx_irq()
#ifdef CONFIG_E1000_NAPI
e1000_clean_rx_irq(struct e1000_adapter *adapter,struct e1000_rx_ring *rx_ring,int *work_done,int work_to_do) //work_to_do实际上是传过来的配额weight
.....
{
struct net_device *netdev = adapter->netdev;
struct pci_dev *pdev = adapter->pdev;
struct e1000_rx_desc *rx_desc,*next_rxd;
struct e1000_buffer *buffer_info, *next_buffer;
...
unsigned int i;
int cleaned_count = 0;
....
i = rx_ring->next_to_clean; //next_to_clean是下一个可以被清除的描述符索引,上面讲过环形缓冲队列由多个描述符组成,每个描述符都有一个用于存放接收数据包的缓冲区buffer,这里所说的“可以被清除”并不是将其删除,而是标记这个缓冲区的数据已经处理(可能正在处理),但是否处理完了要看rx_desc->status&E1000_RXD_STAT_DD,当有新数据需要使用缓冲区时,只是将已处理的缓冲区覆盖而已, 这里的i可以理解为可以被新数据覆盖的缓冲区序号
rx_desc = E1000_RX_DESC(*rx_ring,i); //得到相应的描述符
buffer_info = &rx_ring->buffer_info[i];
while(rx_desc->status & E1000_RXD_STAT_DD){ //测试其状态是否为已删除
struct sk_buff *skb;
u8 status;
#ifdef CONFIG_E1000_NAPI
if (*wrok_done>=work_to_do) //如果所完成的工作>配额则直接退出
break;
(*work_done) ++
#endif
status = rx_desc->status;
skb = buffer_info->skb; //得到缓冲区中的数据
buffer_info->skb = NULL;
prefetch(skb->data-NET_IP_ALIGN);
if(++i == rx_ring->count) //处理环形缓冲区达到队列末尾的情况,因为是环形的,所以到达末尾的下一个就是队列头,这样整个队列就不断地循环处理。然后获取下一格描述符的状态,看看是不是处理删除状态。如果处于就会将新到达的数据覆盖旧的缓冲区,如果不处于则跳出循环,并将当前缓冲区索引号置为下一次查询的目标
i = 0;
next_rxd = E1000_RX_DESC(*rx_ring,i);
next_buffer = &rx_ring->buffer_info[i];
cleaned = true ;
cleaned_count ++;
pci_unmap_single(pdev,buffer_info->dma,buffer_info->length,PCI_DMA_FROMDEVICE); //* 取消映射,因为通过DMA,网卡已经把数据放在了主内存中,这里一取消,也就意味着,CPU可以处理主内存中的数据了 */
....
//checksum
...
#ifdef CONFIG_E1000_NAPI
netif_receive_skb(skb); //交由上层协议处理 , 如果数据包比较大,处理时间会相对较长
#else
netif_rx(skb); //进入中断模式 将数据包插入接收队列中,等待软中断处理 中断模式不用环形接收缓冲队列
#endif
netdev->last_rx = jiffies;

next_desc:
rx_desc->status =0;
if(unlikely(cleaned_count >= E1000_RX_BUFFER_WRITE)){
adapter->alloc_rx_buf(adapter,rx_ring,cleaned_count); //在e1000_up中已经调用了这个函数为环形缓冲区队列中的每个缓冲区分配了sk_buff内存,但是如果接收到数据以后,调用netif_receive_skb(skb)向上提交数据以后,这段内存将始终被这个skb占用(直到上层处理完以后才会调用_kfree_skb释放),换句话说,就是当前缓冲区必须重新申请分配sk_buff内存,为下一个数据作准备
cleaned_count = 0;
}
rx_desc = next_rxd;
buffer_info = next_buffer;
}
rx_ring->next_to_clean = i;
cleaned_count = E1000_DESC_UNUSED(rx_ring);
if(cleaned_count)
adapter->alloc_rx_buf(adapter,rx_ring,cleaned_count);
...
return cleaned;
}
static void e1000_alloc_rx_buffers(struct e1000_adapter *adapter,struct e1000_rx_ring *rx_ring,int cleaned_count){
struct net_device *netdev = adapter->netdev;
struct pci_dev *pdev = adapter->pdev;
struct e1000_rx_desc *rx_desc;
struct e1000_buffer *buffer_info;
struct sk_buff *skb;
unsigned int i;
unsigned int bufsz = adapter->rx_buffer_len+NET_IP_ALIGN;
i=rx_ring->next_to_use;
buffer_info = &rx_ring->buffer_info[i];
while (cleaned_count--){
skb = buffer_info ->skb;
if(skb){
....
}
skb = netdev_alloc_skb(netdev,bufsz); //skb缓存的分配
if(unlikely(!skb)){
adapter->alloc_rx_buff_failed++;
break;
}
skb_reserve(skb,NET_IP_ALIGN);
buffer_info->skb = skb;
buffer_info->length = adapter ->rx_buffer_len;
map_skb:
buffer_info->dma = pci_map_single(pdev,skb->data, adapter->rx_buffer_len,PCI_DMA_FROMDEVICE); //建立DMA映射,把每一个缓冲区skb->data都映射给了设备,缓存区描述符利用dma保存了每一次映射的地址
....
rx_desc = E1000_RX_DESC(*rx_ring, i);
rx_desc->buffer_addr = cpu_to_le64(buffer_info->dma);
if (unlikely(++i == rx_ring->count)) //达到环形缓冲区末尾
i =0 ;
buffer_info = &rx_ring->buffer_info[i];
}
if(likely(rx_ring->netx_to_use!=i)){
rx_ring->next_to_use = i;
if (unlikely(i-- == 0))
i = (rx_ring->count - 1);
...
}
}

简要流程

没有评论: