導航:首頁 > 編程系統 > linuxsplice函數

linuxsplice函數

發布時間:2024-01-02 03:08:56

1. 如何在linux下開啟napi

天重點對linux網路數據包的處理做下分析,但是並不關繫到上層協議,僅僅到鏈路層。
之前轉載過一篇文章,對NAPI做了比較詳盡的分析,本文結合Linux內核源代碼,對當前網路數據包的處理進行梳理。根據NAPI的處理特性,對設備提出一定的要求
1、設備需要有足夠的緩沖區,保存多個數據分組
2、可以禁用當前設備中斷,然而不影響其他的操作。
當前大部分的設備都支持NAPI,但是為了對之前的保持兼容,內核還是對之前中斷方式提供了兼容。我們先看下NAPI具體的處理方式。我們都知道中斷分為中斷上半部和下半部,上半部完成的任務很是簡單,僅僅負責把數據保存下來;而下半部負責具體的處理。為了處理下半部,每個CPU有維護一個softnet_data結構。我們不對此結構做詳細介紹,僅僅描述和NAPI相關的部分。結構中有一個poll_list欄位,連接所有的輪詢設備。還 維護了兩個隊列input_pkt_queue和process_queue。這兩個用戶傳統不支持NAPI方式的處理。前者由中斷上半部的處理函數吧數據包入隊,在具體的處理時,使用後者做中轉,相當於前者負責接收,後者負責處理。最後是一個napi_struct的backlog,代表一個虛擬設備供輪詢使用。在支持NAPI的設備下,每個設備具備一個緩沖隊列,存放到來數據。每個設備對應一個napi_struct結構,該結構代表該設備存放在poll_list中被輪詢。而設備還需要提供一個poll函數,在設備被輪詢到後,會調用poll函數對數據進行處理。基本邏輯就是這樣,下面看下具體流程。
中斷上半部:
非NAPI:
非NAPI對應的上半部函數為netif_rx,位於Dev.,c中

int netif_rx(struct sk_buff *skb)
{
int ret;
/* if netpoll wants it, pretend we never saw it */
/*如果是net_poll想要的,則不作處理*/
if (netpoll_rx(skb))
return NET_RX_DROP;
/*檢查時間戳*/
net_timestamp_check(netdev_tstamp_prequeue, skb);
trace_netif_rx(skb);
#ifdef CONFIG_RPS
if (static_key_false(&rps_needed)) {
struct rps_dev_flow voidflow, *rflow = &voidflow;
int cpu;
/*禁用搶占*/
preempt_disable();
rcu_read_lock();

cpu = get_rps_cpu(skb->dev, skb, &rflow);
if (cpu < 0)
cpu = smp_processor_id();
/*把數據入隊*/
ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
rcu_read_unlock();
preempt_enable();
} else
#endif
{
unsigned int qtail;
ret = enqueue_to_backlog(skb, get_cpu(), &qtail);
put_cpu();
}
return ret;
}

中間RPS暫時不關心,這里直接調用enqueue_to_backlog放入CPU的全局隊列input_pkt_queue

static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
unsigned int *qtail)
{
struct softnet_data *sd;
unsigned long flags;
/*獲取cpu相關的softnet_data變數*/
sd = &per_cpu(softnet_data, cpu);
/*關中斷*/
local_irq_save(flags);
rps_lock(sd);
/*如果input_pkt_queue的長度小於最大限制,則符合條件*/
if (skb_queue_len(&sd->input_pkt_queue) <= netdev_max_backlog) {
/*如果input_pkt_queue不為空,說明虛擬設備已經得到調度,此時僅僅把數據加入
input_pkt_queue隊列即可
*/
if (skb_queue_len(&sd->input_pkt_queue)) {
enqueue:
__skb_queue_tail(&sd->input_pkt_queue, skb);
input_queue_tail_incr_save(sd, qtail);
rps_unlock(sd);
local_irq_restore(flags);
return NET_RX_SUCCESS;
}
/* Schele NAPI for backlog device
* We can use non atomic operation since we own the queue lock
*/
/*否則需要調度backlog 即虛擬設備,然後再入隊。napi_struct結構中的state欄位如果標記了NAPI_STATE_SCHED,則表明該設備已經在調度,不需要再次調度*/
if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
if (!rps_ipi_queued(sd))
____napi_schele(sd, &sd->backlog);
}
goto enqueue;
}
/*到這里緩沖區已經不足了,必須丟棄*/
sd->dropped++;
rps_unlock(sd);
local_irq_restore(flags);
atomic_long_inc(&skb->dev->rx_dropped);
kfree_skb(skb);
return NET_RX_DROP;
}

該函數邏輯也比較簡單,主要注意的是設備必須先添加調度然後才能接受數據,添加調度調用了____napi_schele函數,該函數把設備對應的napi_struct結構插入到softnet_data的poll_list鏈表尾部,然後喚醒軟中斷,這樣在下次軟中斷得到處理時,中斷下半部就會得到處理。不妨看下源碼
static inline void ____napi_schele(struct softnet_data *sd,
struct napi_struct *napi)
{
list_add_tail(&napi->poll_list, &sd->poll_list);
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
}

NAPI方式
NAPI的方式相對於非NAPI要簡單許多,看下e100網卡的中斷處理函數e100_intr,核心部分
if (likely(napi_schele_prep(&nic->napi))) {
e100_disable_irq(nic);//屏蔽當前中斷
__napi_schele(&nic->napi);//把設備加入到輪訓隊列
}

if條件檢查當前設備是否 可被調度,主要檢查兩個方面:1、是否已經在調度 2、是否禁止了napi pending.如果符合條件,就關閉當前設備的中斷,調用__napi_schele函數把設備假如到輪訓列表,從而開啟輪詢模式。
分析:結合上面兩種方式,還是可以發現兩種方式的異同。其中softnet_data作為主導結構,在NAPI的處理方式下,主要維護輪詢鏈表。NAPI設備均對應一個napi_struct結構,添加到鏈表中;非NAPI沒有對應的napi_struct結構,為了使用NAPI的處理流程,使用了softnet_data結構中的back_log作為一個虛擬設備添加到輪詢鏈表。同時由於非NAPI設備沒有各自的接收隊列,所以利用了softnet_data結構的input_pkt_queue作為全局的接收隊列。這樣就處理而言,可以和NAPI的設備進行兼容。但是還有一個重要區別,在NAPI的方式下,首次數據包的接收使用中斷的方式,而後續的數據包就會使用輪詢處理了;而非NAPI每次都是通過中斷通知。
下半部:
下半部的處理函數,之前提到,網路數據包的接發對應兩個不同的軟中斷,接收軟中斷NET_RX_SOFTIRQ的處理函數對應net_rx_action

static void net_rx_action(struct softirq_action *h)
{
struct softnet_data *sd = &__get_cpu_var(softnet_data);
unsigned long time_limit = jiffies + 2;
int budget = netdev_budget;
void *have;
local_irq_disable();
/*遍歷輪詢表*/
while (!list_empty(&sd->poll_list)) {
struct napi_struct *n;
int work, weight;
/* If softirq window is exhuasted then punt.
* Allow this to run for 2 jiffies since which will allow
* an average latency of 1.5/HZ.
*/
/*如果開支用完了或者時間用完了*/
if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit)))
goto softnet_break;
local_irq_enable();
/* Even though interrupts have been re-enabled, this
* access is safe because interrupts can only add new
* entries to the tail of this list, and only ->poll()
* calls can remove this head entry from the list.
*/
/*獲取鏈表中首個設備*/
n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list);
have = netpoll_poll_lock(n);
weight = n->weight;
/* This NAPI_STATE_SCHED test is for avoiding a race
* with netpoll's poll_napi(). Only the entity which
* obtains the lock and sees NAPI_STATE_SCHED set will
* actually make the ->poll() call. Therefore we avoid
* accidentally calling ->poll() when NAPI is not scheled.
*/
work = 0;
/*如果被設備已經被調度,則調用其處理函數poll函數*/
if (test_bit(NAPI_STATE_SCHED, &n->state)) {
work = n->poll(n, weight);//後面weight指定了一個額度
trace_napi_poll(n);
}
WARN_ON_ONCE(work > weight);
/*總額度遞減*/
budget -= work;
local_irq_disable();
/* Drivers must not modify the NAPI state if they
* consume the entire weight. In such cases this code
* still "owns" the NAPI instance and therefore can
* move the instance around on the list at-will.
*/
/*如果work=weight的話。任務就完成了,把設備從輪詢鏈表刪除*/
if (unlikely(work == weight)) {
if (unlikely(napi_disable_pending(n))) {
local_irq_enable();
napi_complete(n);
local_irq_disable();
} else {
if (n->gro_list) {
/* flush too old packets
* If HZ < 1000, flush all packets.
*/
local_irq_enable();
napi_gro_flush(n, HZ >= 1000);
local_irq_disable();
}
/*每次處理完就把設備移動到列表尾部*/
list_move_tail(&n->poll_list, &sd->poll_list);
}
}
netpoll_poll_unlock(have);
}
out:
net_rps_action_and_irq_enable(sd);
#ifdef CONFIG_NET_DMA
/*
* There may not be any more sk_buffs coming right now, so push
* any pending DMA copies to hardware
*/
dma_issue_pending_all();
#endif
return;
softnet_break:
sd->time_squeeze++;
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
goto out;
}

這里有處理方式比較直觀,直接遍歷poll_list鏈表,處理之前設置了兩個限制:budget和time_limit。前者限制本次處理數據包的總量,後者限制本次處理總時間。只有二者均有剩餘的情況下,才會繼續處理。處理期間同樣是開中斷的,每次總是從鏈表表頭取設備進行處理,如果設備被調度,其實就是檢查NAPI_STATE_SCHED位,則調用 napi_struct的poll函數,處理結束如果沒有處理完,則把設備移動到鏈表尾部,否則從鏈表刪除。NAPI設備對應的poll函數會同樣會調用__netif_receive_skb函數上傳協議棧,這里就不做分析了,感興趣可以參考e100的poll函數e100_poll。

而非NAPI對應poll函數為process_backlog。

static int process_backlog(struct napi_struct *napi, int quota)
{
int work = 0;
struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);
#ifdef CONFIG_RPS
/* Check if we have pending ipi, its better to send them now,
* not waiting net_rx_action() end.
*/
if (sd->rps_ipi_list) {
local_irq_disable();
net_rps_action_and_irq_enable(sd);
}
#endif
napi->weight = weight_p;
local_irq_disable();
while (work < quota) {
struct sk_buff *skb;
unsigned int qlen;
/*涉及到兩個隊列process_queue和input_pkt_queue,數據包到來時首先填充input_pkt_queue,
而在處理時從process_queue中取,根據這個邏輯,首次處理process_queue必定為空,檢查input_pkt_queue
如果input_pkt_queue不為空,則把其中的數據包遷移到process_queue中,然後繼續處理,減少鎖沖突。
*/
while ((skb = __skb_dequeue(&sd->process_queue))) {
local_irq_enable();
/*進入協議棧*/
__netif_receive_skb(skb);
local_irq_disable();
input_queue_head_incr(sd);
if (++work >= quota) {
local_irq_enable();
return work;
}
}
rps_lock(sd);
qlen = skb_queue_len(&sd->input_pkt_queue);
if (qlen)
skb_queue_splice_tail_init(&sd->input_pkt_queue,
&sd->process_queue);
if (qlen < quota - work) {
/*
* Inline a custom version of __napi_complete().
* only current cpu owns and manipulates this napi,
* and NAPI_STATE_SCHED is the only possible flag set on backlog.
* we can use a plain write instead of clear_bit(),
* and we dont need an smp_mb() memory barrier.
*/
list_del(&napi->poll_list);
napi->state = 0;
quota = work + qlen;
}
rps_unlock(sd);
}
local_irq_enable();
return work;
}

函數還是比較簡單的,需要注意的每次處理都攜帶一個配額,即本次只能處理quota個數據包,如果超額了,即使沒處理完也要返回,這是為了保證處理器的公平使用。處理在一個while循環中完成,循環條件正是work < quota,首先會從process_queue中取出skb,調用__netif_receive_skb上傳給協議棧,然後增加work。當work即將大於quota時,即++work >= quota時,就要返回。當work還有剩餘額度,但是process_queue中數據處理完了,就需要檢查input_pkt_queue,因為在具體處理期間是開中斷的,那麼期間就有可能有新的數據包到來,如果input_pkt_queue不為空,則調用skb_queue_splice_tail_init函數把數據包遷移到process_queue。如果剩餘額度足夠處理完這些數據包,那麼就把虛擬設備移除輪詢隊列。這里有些疑惑就是最後為何要增加額度,剩下的額度已經足夠處理這些數據了呀?根據此流程不難發現,其實執行的是在兩個隊列之間移動數據包,然後再做處理。

2. 數組方法里push,pop,shift,unshift,join,splice分別是什麼作用

數組操作函數有:push,pop,join,shift,unshift,slice,splice,concat
(1)push 和 pop
這兩個函數都是對數組從尾部進行壓入或彈出操作。push(arg1,arg2,...)可以每次壓入一個或多個元素,並返回更新後的數組長度。注意如果參數也是數組的話,則是將全部數組當做一個元素壓入到原本的數組裡面去。pop() 函數則每次只會彈出結尾的元素,並返回彈出的元素,若是是對空組數調用 pop() 則返回undefined。
示例:
var oldArr=[1,2,3];
alert(oldArr.push(4,[5,6]))//這里只會將[5,6]當做一個元素來策畫,返回更新後的數組長度5
此時 oldArr = [1,2,3,4,[5,6]]
oldArr.pop()//這里彈出最後一個元素[5,6],而不是6
此時 oldArr = [1,2,3,4]
oldArr.pop()-->4
oldArr.pop()-->3
oldArr.pop()-->2
oldArr.pop()-->1
alert(oldArr.pop())-->undefined(空數組彈出)
(2)unshift 和 shift
unshift() 方法可向數組的開頭添加一個或更多元素,並返回新的長度。unshift() 方法將把它的參數插入 arrayObject 的頭部,並將已經存在的元素順次地移到較高的下標處,以便留出空間。該方法的第一個參數將成為數組的新元素 0,如果還有第二個參數,它將成為新的元素 1,以此類推。
請注意,unshift() 方法不創建新的創建,而是直接修改原有的數組。另外在 Internet Explorer 瀏覽器中 unshift() 無法執行!

閱讀全文

與linuxsplice函數相關的資料

熱點內容
軟體傳輸文件 瀏覽:184
密碼記錄器ios 瀏覽:412
兩個電腦數據怎麼一樣 瀏覽:829
順豐有什麼買東西的app 瀏覽:377
數位板word 瀏覽:939
win7寬頻連接出現多重網路 瀏覽:268
更改程序圖標c語言 瀏覽:629
網路電視偷停怎麼辦 瀏覽:418
linux連接ftp 瀏覽:512
es文件瀏覽器視頻筆記 瀏覽:874
mac無法打開描述文件 瀏覽:134
什麼軟體打文件 瀏覽:53
資料庫無數據變成0 瀏覽:899
名企筆試如何刷編程題 瀏覽:49
js跳到頁面某地 瀏覽:550
jsp展示clob欄位 瀏覽:779
nyx在網路上是什麼意思 瀏覽:145
樂播農業app是什麼 瀏覽:530
編程框架如何開發 瀏覽:136
金庸群俠傳3修改代碼 瀏覽:712

友情鏈接