程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> 網頁編程 >> PHP編程 >> 關於PHP編程 >> 二層(鏈路層)數據包發送過程分析

二層(鏈路層)數據包發送過程分析

編輯:關於PHP編程

二層(鏈路層)數據包發送過程分析


二層(鏈路層)數據包發送過程分析

——lvyilong316

說明:本系列博文所涉及內核版本為2.6.32
當上層准備好一個包之後,交給鏈路層,鏈路層數據包發送主要通過dev_queue_xmit函數處理。數據包的發送可分為兩種,一種是正常的傳輸流程,即通過網卡驅動,另一種是通過軟中斷(見注3)。為了理解方便,首先看一下dev_queue_xmi函數的整體調用關系圖。

ldev_queue_xmit

本函數用來將帶發送的skb加入一個dev的隊列(Queue),調用這個函數前必須設置好skb的device和priority,本函數可以在中斷上下文中被調用。

返回值:

返回非0(正數或負數)表示函數出錯,返回0表示成功,但是並不表示數據包被成功發送出去,因為數據包可能因為限速等原因被丟掉。

函數執行後傳入的skb將被釋放,所以如果想控制數據包,實現對skb的重傳時需要增加skb的引用計數。

當調用此函數時中斷必須是打開的,因為BHenable必須要求IRQenable,否則會造成死鎖。

  1. int dev_queue_xmit(struct sk_buff *skb)
  2. {
  3. struct net_device *dev = skb->dev;
  4. struct netdev_queue *txq;
  5. struct Qdisc *q;
  6. int rc = -ENOMEM;
  7. /* GSO will handle the following emulations directly. */
  8. if (netif_needs_gso(dev, skb))
  9. goto gso;
  10. if (skb_has_frags(skb) &&
  11. !(dev->features & NETIF_F_FRAGLIST) &&
  12. __skb_linearize(skb))
  13. goto out_kfree_skb;
  14. //如果skb有分片但是發送設備不支持分片,或分片中有分片在高端內存但發送設備不支持DMA,需要將所有段重新組合成一個段 ,這裡__skb_linearize其實就是__pskb_pull_tail(skb, skb->data_len),這個函數基本上等同於pskb_may_pull ,pskb_may_pull的作用就是檢測skb對應的主buf中是否有足夠的空間來pull出len長度,如果不夠就重新分配skb並將frags中的數據拷貝入新分配的主buff中,而這裡將參數len設置為skb->datalen, 也就是會將所有的數據全部拷貝到主buff中,以這種方式完成skb的線性化。
  15. if (skb_shinfo(skb)->nr_frags &&
  16. (!(dev->features & NETIF_F_SG) || illegal_highdma(dev, skb)) &&
  17. __skb_linearize(skb))
  18. goto out_kfree_skb;
  19. //如果數據包沒有被計算校驗和並且發送設備不支持這個協議的校驗,則在此進行校驗和的計算(注1)。如果上面已經線性化了一次,這裡的__skb_linearize就會直接返回,注意區別frags和frag_list,前者是將多的數據放到單獨分配的頁面中,sk_buff只有一個。而後者則是連接多個sk_buff
  20. if (skb->ip_summed == CHECKSUM_PARTIAL) {
  21. skb_set_transport_header(skb, skb->csum_start -
  22. skb_headroom(skb));
  23. if (!dev_can_checksum(dev, skb) && skb_checksum_help(skb))
  24. goto out_kfree_skb;
  25. }
  26. gso:
  27. //關閉軟中斷,禁止cpu搶占
  28. rcu_read_lock_bh();
  29. //選擇一個發送隊列,如果設備提供了select_queue回調函數就使用它,否則由內核選擇一個隊列,這裡只是Linux內核多隊列的實現,但是要真正的使用都隊列,需要網卡支持多隊列才可以,一般的網卡都只有一個隊列。在調用alloc_etherdev分配net_device是,設置隊列的個數
  30. txq = dev_pick_tx(dev, skb);
  31. // 從netdev_queue結構上獲取設備的qdisc
  32. q = rcu_dereference(txq->qdisc);
  33. //如果該設備有隊列可用,就調用__dev_xmit_skb
  34. if (q->enqueue) {
  35. rc = __dev_xmit_skb(skb, q, dev, txq);
  36. goto out;
  37. }
  38. //下面的處理是在沒有發送隊列的情況,軟設備一般沒有發送隊列:如lo、tunnle;我們所要做的就是直接調用驅動的hard_start_xmit將它發送出去 如果發送失敗就直接丟棄,因為沒有隊列可以保存它
  39. if (dev->flags & IFF_UP) { //確定設備是否開啟
  40. int cpu = smp_processor_id(); /* ok because BHs are off */
  41. if (txq->xmit_lock_owner != cpu) {//是否在同一個cpu上
  42. HARD_TX_LOCK(dev, txq, cpu);
  43. if (!netif_tx_queue_stopped(txq)) {//確定隊列是運行狀態
  44. rc = NET_XMIT_SUCCESS;
  45. if (!dev_hard_start_xmit(skb, dev, txq)) {
  46. HARD_TX_UNLOCK(dev, txq);
  47. goto out;
  48. }
  49. }
  50. HARD_TX_UNLOCK(dev, txq);
  51. if (net_ratelimit())
  52. printk(KERN_CRIT "Virtual device %s asks to "
  53. "queue packet!\n", dev->name);
  54. } else {// txq->xmit_lock_owner == cpu的情況,說明發生遞歸
  55. if (net_ratelimit())
  56. printk(KERN_CRIT "Dead loop on virtual device "
  57. "%s, fix it urgently!\n", dev->name);
  58. }
  59. }
  60. rc = -ENETDOWN;
  61. rcu_read_unlock_bh();
  62. out_kfree_skb:
  63. kfree_skb(skb);
  64. return rc;
  65. out:
  66. rcu_read_unlock_bh();
  67. return rc;
  68. }

l__dev_xmit_skb

__dev_xmit_skb函數主要做兩件事情:

(1)如果流控對象為空的,試圖直接發送數據包。

(2)如果流控對象不空,將數據包加入流控對象,並運行流控對象。

  1. static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
  2. struct net_device *dev,
  3. struct netdev_queue *txq)
  4. {
  5. spinlock_t *root_lock = qdisc_lock(q);//見注2
  6. int rc;
  7. spin_lock(root_lock); //鎖qdisc
  8. if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {//判斷隊列是否失效
  9. kfree_skb(skb);
  10. rc = NET_XMIT_DROP;
  11. } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
  12. !test_and_set_bit(__QDISC_STATE_RUNNING, &q->state)) {
  13. /*
  14. * This is a work-conserving queue; there are no old skbs
  15. * waiting to be sent out; and the qdisc is not running -
  16. * xmit the skb directly.
  17. */
  18. __qdisc_update_bstats(q, skb->len);
  19. if (sch_direct_xmit(skb, q, dev, txq, root_lock))
  20. __qdisc_run(q);
  21. else
  22. clear_bit(__QDISC_STATE_RUNNING, &q->state);
  23. rc = NET_XMIT_SUCCESS;
  24. } else {
  25. rc = qdisc_enqueue_root(skb, q);
  26. qdisc_run(q);
  27. }
  28. spin_unlock(root_lock);
  29. return rc;
  30. }

lqdisc_run

有兩個時機將會調用qdisc_run():

1.__dev_xmit_skb()

2.軟中斷服務線程NET_TX_SOFTIRQ

  1. static inline void qdisc_run(struct Qdisc *q)
  2. {
  3. if (!test_and_set_bit(__QDISC_STATE_RUNNING, &q->state))//將隊列設置為運行狀態
  4. __qdisc_run(q);
  5. }

l__qdisc_run

  1. void __qdisc_run(struct Qdisc *q)
  2. {
  3. unsigned long start_time = jiffies;
  4. while (qdisc_restart(q)) { //返回值大於0,說明流控對象非空
  5. /*如果發現本隊列運行的時間太長了,將會停止隊列的運行,並將隊列加入output_queue鏈表頭
  6. * Postpone processing if (延遲處理)
  7. * 1. another process needs the CPU;
  8. * 2. we've been doing it for too long.
  9. */
  10. if (need_resched() || jiffies != start_time) { //已經不允許繼續運行本流控對象
  11. __netif_schedule(q); //將本qdisc加入每cpu變量softnet_data的output_queue鏈表中
  12. break;
  13. }
  14. }
  15. //清除隊列的運行標識
  16. clear_bit(__QDISC_STATE_RUNNING, &q->state);
  17. }

循環調用qdisc_restart發送數據,下面這個函數qdisc_restart是真正發送數據包的函數,它從隊列上取下一個幀,然後嘗試將它發送出去,若發送失敗則一般是重新入隊。

此函數返回值為:發送成功時返回剩余隊列長度,發送失敗時返回0(若發送成功且剩余隊列長度為0也返回0)

lqdisc_restart

__QDISC_STATE_RUNNING狀態保證同一時刻只有一個cpu在處理這個qdisc,qdisc_lock(q)用來保證對這個隊列的順序訪問。

通常netif_tx_lock用來確保本設備驅動的順序(獨占)訪問的,qdisc_lock(q)用來保證qdisc的順序訪問,這兩個是互斥的,獲得其中一個必須釋放另一個。

  1. static inline int qdisc_restart(struct Qdisc *q)
  2. {
  3. struct netdev_queue *txq;
  4. struct net_device *dev;
  5. spinlock_t *root_lock;
  6. struct sk_buff *skb;
  7. /* Dequeue packet */
  8. skb = dequeue_skb(q); //一開始就調用dequeue函數
  9. if (unlikely(!skb))
  10. return 0; //返回0說明隊列是空的或者被限制
  11. root_lock = qdisc_lock(q);
  12. dev = qdisc_dev(q);
  13. txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));
  14. return sch_direct_xmit(skb, q, dev, txq, root_lock); //用於發送數據包
  15. }

lsch_direct_xmit

發送一個skb,將隊列置為__QDISC_STATE_RUNNING狀態,保證只有一個cpu運行這個函數,返回0表示隊列為空或者發送受限,大於0表示隊列非空。

  1. int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
  2. struct net_device *dev, struct netdev_queue *txq,
  3. spinlock_t *root_lock)
  4. {
  5. int ret = NETDEV_TX_BUSY;
  6. spin_unlock(root_lock);// release qdisc,因為後面要獲取設備鎖
  7. // 調用__netif_tx_lockà spin_lock(&txq->_xmit_lock,,保證設備驅動的獨占訪問
  8. HARD_TX_LOCK(dev, txq, smp_processor_id());
  9. if (!netif_tx_queue_stopped(txq) && //設備沒有被停止,且發送隊列沒有被凍結
  10. !netif_tx_queue_frozen(txq))
  11. ret = dev_hard_start_xmit(skb, dev, txq); //發送數據包
  12. HARD_TX_UNLOCK(dev, txq); // 調用__netif_tx_unlock
  13. spin_lock(root_lock);
  14. switch (ret) {
  15. case NETDEV_TX_OK: //如果設備成功將數據包發送出去
  16. ret = qdisc_qlen(q); //返回剩余的隊列長度
  17. break;
  18. case NETDEV_TX_LOCKED: //獲取設備鎖失敗
  19. ret = handle_dev_cpu_collision(skb, txq, q);
  20. break;
  21. default: //設備繁忙,重新入隊發送(利用softirq)
  22. if (unlikely (ret != NETDEV_TX_BUSY && net_ratelimit()))
  23. printk(KERN_WARNING "BUG %s code %d qlen %d\n",
  24. dev->name, ret, q->q.qlen);
  25. ret = dev_requeue_skb(skb, q);
  26. break;
  27. }
  28. if (ret && (netif_tx_queue_stopped(txq) ||
  29. netif_tx_queue_frozen(txq)))
  30. ret = 0;
  31. return ret;
  32. }

ldev_hard_start_xmit

  1. int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev,
  2. struct netdev_queue *txq)
  3. {
  4. const struct net_device_ops *ops = dev->netdev_ops;
  5. int rc;
  6. if (likely(!skb->next)) {
  7. //從這裡可以看出,對於每一個發送的包也會發給ptype_all一份, 而packet套接字創建時對於proto為ETH_P_ALL的會在ptype_all中注冊一個成員,因此對於協議號為ETH_P_ALL的packet套接字來說,發送和接受的數據都能收到
  8. if (!list_empty(&ptype_all))
  9. dev_queue_xmit_nit(skb, dev);
  10. if (netif_needs_gso(dev, skb)) {
  11. if (unlikely(dev_gso_segment(skb)))
  12. goto out_kfree_skb;
  13. if (skb->next)
  14. goto gso;
  15. }
  16. //如果發送設備不需要skb->dst,則在此將其釋放
  17. if (dev->priv_flags & IFF_XMIT_DST_RELEASE)
  18. skb_dst_drop(skb);
  19. //調用設備注冊的發送函數,即dev->netdev_ops-> ndo_start_xmit(skb, dev)
  20. rc = ops->ndo_start_xmit(skb, dev);
  21. if (rc == NETDEV_TX_OK)
  22. txq_trans_update(txq);
  23. return rc;
  24. }
  25. gso:
  26. ……
  27. }

ldev_queue_xmit_nit

  1. static void dev_queue_xmit_nit(struct sk_buff *skb, struct net_device *dev)
  2. {
  3. struct packet_type *ptype;
  4. #ifdef CONFIG_NET_CLS_ACT
  5. if (!(skb->tstamp.tv64 && (G_TC_FROM(skb->tc_verd) & AT_INGRESS)))
  6. net_timestamp(skb); //記錄該數據包輸入的時間戳
  7. #else
  8. net_timestamp(skb);
  9. #endif
  10. rcu_read_lock();
  11. list_for_each_entry_rcu(ptype, &ptype_all, list) {
  12. /* Never send packets back to the socket they originated from */
  13. //遍歷ptype_all鏈表,查找所有符合輸入條件的原始套接口,並循環將數據包輸入到滿足條件的套接口
  14. if ((ptype->dev == dev || !ptype->dev) &&
  15. (ptype->af_packet_priv == NULL ||
  16. (struct sock *)ptype->af_packet_priv != skb->sk)) {
  17. //由於該數據包是額外輸入到這個原始套接口的,因此需要克隆一個數據包
  18. struct sk_buff *skb2 = skb_clone(skb, GFP_ATOMIC);
  19. if (!skb2)
  20. break;
  21. /* skb->nh should be correctly(確保頭部偏移正確)
  22. set by sender, so that the second statement is
  23. just protection against buggy protocols.
  24. */
  25. skb_reset_mac_header(skb2);
  26. if (skb_network_header(skb2) < skb2->data ||
  27. skb2->network_header > skb2->tail) {
  28. if (net_ratelimit())//net_ratelimit用來保證網絡代碼中printk的頻率
  29. printk(KERN_CRIT "protocol %04x is "
  30. "buggy, dev %s\n",
  31. skb2->protocol, dev->name);
  32. skb_reset_network_header(skb2); //重新設置L3頭部偏移
  33. }
  34. skb2->transport_header = skb2->network_header;
  35. skb2->pkt_type = PACKET_OUTGOING;
  36. ptype->func(skb2, skb->dev, ptype, skb->dev);//調用協議(ptype_all)接受函數
  37. }
  38. }
  39. rcu_read_unlock();
  40. }

?環回設備

對於環回設備loopback,設備的ops->ndo_start_xmit被初始化為loopback_xmit函數。

  1. static const struct net_device_ops loopback_ops = {
  2. .ndo_init = loopback_dev_init,
  3. .ndo_start_xmit= loopback_xmit,
  4. .ndo_get_stats = loopback_get_stats,
  5. };

drivers/net/loopback.c

  1. static netdev_tx_t loopback_xmit(struct sk_buff *skb,
  2. struct net_device *dev)
  3. {
  4. struct pcpu_lstats *pcpu_lstats, *lb_stats;
  5. int len;
  6. skb_orphan(skb);
  7. skb->protocol = eth_type_trans(skb, dev);
  8. /* it's OK to use per_cpu_ptr() because BHs are off */
  9. pcpu_lstats = dev->ml_priv;
  10. lb_stats = per_cpu_ptr(pcpu_lstats, smp_processor_id());
  11. len = skb->len;
  12. if (likely(netif_rx(skb) == NET_RX_SUCCESS)) { //直接調用了netif_rx進行了接收處理
  13. lb_stats->bytes += len;
  14. lb_stats->packets++;
  15. } else
  16. lb_stats->drops++;
  17. return NETDEV_TX_OK;
  18. }


  • 注:
1.CHECKSUM_PARTIAL表示使用硬件checksum,L4層的偽頭的校驗已經完畢,並且已經加入uh->check字段中,此時只需要設備計算整個頭4層頭的校驗值。


2.整個數據包發送邏輯中會涉及到三個用於互斥訪問的代碼:

(1)spinlock_t*root_lock=qdisc_lock(q);

(2)test_and_set_bit(__QDISC_STATE_RUNNING,&q->state)

(3)__netif_tx_lockàspin_lock(&txq->_xmit_lock)

其中(1)(3)分別對應一個spinlock,(2)對應一個隊列狀態。在了解代碼中如何使用這三個同步方法時,首先看一下相關數據結構的關系,如下。

圖中綠色部分表示(1)(3)兩處spinlock。首先看(1)處對應的代碼:

  1. static inline spinlock_t *qdisc_lock(struct Qdisc *qdisc)
  2. {
  3. return &qdisc->q.lock;
  4. }

所以root_lock是用於控制qdisc中skb隊列訪問的鎖,當需要對skb隊列進行enqueue、dequeue、requeue時,就需要加鎖。

__QDISC_STATE_RUNNING標志用於保證一個流控對象(qdisc)不會同時被多個cpu訪問。

而(3)處的spinlock,即structnetdev_queue中的_xmit_lock,則用於保證dev的注冊函數的互斥訪問,即deriver的同步。

另外,內核代碼注釋中寫到,(1)和(3)是互斥的,獲得(1)處的鎖時必須先保證釋放(3)處的鎖,反之亦然,為什麼要這樣還沒有想明白。。。。哪位大神知道還望指點

3.已經有了dev_queue_xmit函數,為什麼還需要軟中斷來發送呢?

我們可以看到在dev_queue_xmit中將skb進行了一些處理(比如合並成一個包,計算校驗和等),處理完的skb是可以直接發送的了,這時dev_queue_xmit也會先將skb入隊(skb一般都是在這個函數中入隊的),並且調用qdisc_run嘗試發送,但是有可能發送失敗,這時就將skb重新入隊,調度軟中斷,並且自己直接返回。

軟中斷只是發送隊列中的skb以及釋放已經發送的skb,它無需再對skb進行線性化或者校驗和處理。另外在隊列被停止的情況下,dev_queue_xmit仍然可以把包加入隊列,但是不能發送,這樣在隊列被喚醒的時候就需要通過軟中斷來發送停止期間積壓的包。簡而言之,dev_queue_xmit是對skb做些最後的處理並且第一次嘗試發送,軟中斷是將前者發送失敗或者沒發完的包發送出去。(其實發送軟中斷還有一個作用,就是釋放已經發送的包,因為某些情況下發送是在硬件中斷中完成的,為了提高硬件中斷處理效率,內核提供一種方式將釋放skb放到軟中斷中進行,這時只要調用dev_kfree_skb_irq,它將skb加入softnet_data的completion_queue中,然後開啟發送軟中斷,net_tx_action會在軟中斷中將completion_queue中的skb全部釋放掉)

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved