给岁月以文明,而不是给文明以岁月

文章分类

关于博主

头像不见了

使用 golang 发送接收组播数据


/ 阅读 276

1、组播

组播又称多播,是一种“一对多”的通讯方式。主机可以加入一个组播组(组播IP地址),当源主机向该组播组发送数据时,该组播组的成员都可以收到数据的拷贝,解决了单播情况下数据的重复拷贝及带宽的重复占用,也解决了广播方式下带宽资源的浪费。

IANA(internet assigned number authority)把D类地址空间分配给IP组播,其范围是从224.0.0.0到239.255.255.255。这个范围下又进一步细分,比如224.0.0.0~224.0.0.255为预留的组播地址(永久组地址),224.0.1.0~238.255.255.255为用户可用的组播地址(临时组地址),全网范围内有效等。

维基百科:组播组播地址

2、接收组播数据

首先建立一个组播类型,保存组播地址、端口、使用的网卡的ip地址以及接收缓冲区大小

import (
	"errors"
	"net"
	"strconv"

	"github.com/go-reuseport"
	"golang.org/x/net/ipv4"
)

// Multicaster 组播
type Multicaster struct {
    // 组播地址
    MultiAddr     string
    // 组播端口
    MultiPort     int
    // 接受组播的网卡的ip4地址
    InterfaceAddr string
    buffer        []byte
    buffersize    int
}

// NewMulticaster 组播实例
func NewMulticaster(addr string, port int, interfaceAddr string) Multicaster {
	return Multicaster{
		MultiAddr:     addr,
		MultiPort:     port,
		InterfaceAddr: interfaceAddr,
		buffersize:    4096,
	}
}

// SetBufferSize 设置缓冲区大小
func (mc *Multicaster) SetBufferSize(size int) {
	mc.buffersize = size
}

简单起见,直接写个listen函数,调用后执行加入组播组、接收数据、处理数据等操作。

// Listen 启动监听
func (mc *Multicaster) Listen(handler func(*net.Addr, []byte) (int, bool)) error {
    ...
}

listen 的参数是一个回调函数,用来处理接受到缓冲区里的数据,并返回已处理的数据长度。

第一步,查找使用的网卡,这里根据ip地址(InterfaceAddr)来查找对应的网卡

mc.buffer = make([]byte, 0, mc.buffersize)

// 查找网卡
found := false
var eth net.Interface

interfaces, err := net.Interfaces()
if err != nil {
    fmt.Printf("获取网卡信息出错")
    return err
}

for _, ifa := range interfaces {
    address, _ := ifa.Addrs()
    for _, address := range address {
        if ipnet, ok := address.(*net.IPNet); ok {
            ifIP4 := ipnet.IP.To4()
            if ifIP4 != nil && ifIP4.String() == mc.InterfaceAddr {
                eth = ifa
                found = true
                break
            }
        }
    }
}

if !found {
    fmt.Printf("未找到网卡, interface_addr:" + mc.InterfaceAddr)
    return errors.New("not found interface, interface_addr:" + mc.InterfaceAddr)
}

接下来,监听组播端口,将网卡加入组播组,循环接收并处理数据

go func() {
    // 一个开源库,设置ip端口可以重复绑定,即底层设置SO_REUSEADDR标志,看了标准库没找到怎么设置
    c, err := reuseport.ListenPacket("udp", ":"+strconv.Itoa(mc.MultiPort))
    if err != nil {
        fmt.Printf("ListenPacket error")
        return
    }

    defer c.Close()

    p := ipv4.NewPacketConn(c)

    // 加入组播组
    if err := p.JoinGroup(&eth, &net.UDPAddr{IP: net.ParseIP(mc.MultiAddr)}); err != nil {
        fmt.Printf("JoinGroup error")
        return
    }

    fmt.Printf("JoinGroup success")

    b := make([]byte, 2048)

    for {
        // 接收UDP数据包
        n, _, src, err := p.ReadFrom(b)
        if err != nil {
            fmt.Printf("ReadFrom error")
        }

        mc.buffer = append(mc.buffer, b[0:n]...)

        // 处理数据,并从缓冲区中移除已处理的数据
        if handledLen, success := handler(&src, mc.buffer); success {
            mc.buffer = mc.buffer[0 : len(mc.buffer)-handledLen]
        }
    }
}()

3、发送组播数据

直接向组播地址发送UDP报文即可

addr, _ := net.ResolveUDPAddr("udp4", "224.0.0.250:8888")
conn, err := net.DialUDP("udp4", nil, addr)
if err != nil {
    fmt.Println(err)
    return
}

packet := []byte("Hello, wa ha ha")
_, err := conn.Write(packet)
if err != nil {
    fmt.Println(err)
}