SOME/IP:用Python實現協議訂閱、Offer、訂閱ACK與報文接收

文章目錄

  • 前言
  • 一、代碼層次
  • 二、詳細代碼
    • 1. eth_scapy_sd.py
    • 2、eth_scapy_someip.py
    • 3、network_define.py
    • 4、packet_define.py
    • 5、unpack_define.py
    • 6、someip_controller.py


前言

1、需要pip安裝scapy庫
2、需要修改根據實際情況配置network_define.py
3、執行someip_controller.py運行本案例
4、本文參考github: eth-scapy-someip

注:最近沒啥時間寫文章,有興趣自己研究下<~~>


一、代碼層次

someip/
└── module/├── eth_scapy_sd.py          # SOME/IP SD服務發現結構體定義├── eth_scapy_someip.py      # SOME/IP標準結構體定義├── network_define.py        # 網絡配置(IP/端口/多播組)├── packet_define.py         # 協議字段定義(Offer/Subscribe/ACK)├── unpack_define.py         # 接收報文解析器└── someip_controller.py     # 主控制邏輯(訂閱/OFFER/ACK流程管理)

二、詳細代碼

1. eth_scapy_sd.py

import ctypes
import collectionsfrom eth_scapy_someip import SOMEIPfrom scapy.fields import *
from scapy.packet import *
from scapy.layers.inet6 import IP6Fieldclass _SDPacketBase(Packet):"""base class to be used among all SD Packet definitions."""# use this dictionary to set default values for desired fields (mostly on subclasses# where not all fields are defined locally)# - key : field_name, value : desired value# - it will be used from 'init_fields' function, upon packet initialization## example : _defaults = {'field_1_name':field_1_value,'field_2_name':field_2_value}_defaults = {}def _set_defaults(self):"""goes through '_defaults' dict setting field default values (for those that have been defined)."""for key in self._defaults.keys():try:self.get_field(key)except KeyError:passelse:self.setfieldval(key, self._defaults[key])def init_fields(self):"""perform initialization of packet fields with desired values.NOTE : this funtion will only be called *once* upon class (or subclass) construction"""Packet.init_fields(self)self._set_defaults()# SD ENTRY
#  - Service
#  - EventGroup
class _SDEntry(_SDPacketBase):"""Base class for SDEntry_* packages."""TYPE_FMT = ">B"TYPE_PAYLOAD_I = 0# ENTRY TYPES : SERVICETYPE_SRV_FINDSERVICE = 0x00TYPE_SRV_OFFERSERVICE = 0x01TYPE_SRV = (TYPE_SRV_FINDSERVICE, TYPE_SRV_OFFERSERVICE)# ENTRY TYPES : EVENGROUPTYPE_EVTGRP_SUBSCRIBE = 0x06TYPE_EVTGRP_SUBSCRIBE_ACK = 0x07TYPE_EVTGRP = (TYPE_EVTGRP_SUBSCRIBE, TYPE_EVTGRP_SUBSCRIBE_ACK)# overall len (UT usage)OVERALL_LEN = 16# 定義Entries Array結構體fields_desc = [ByteField("type", 0),ByteField("index_1", 0),ByteField("index_2", 0),BitField("n_opt_1", 0, 4),BitField("n_opt_2", 0, 4),ShortField("srv_id", 0),ShortField("inst_id", 0),ByteField("major_ver", 0),X3BytesField("ttl", 0),]def guess_payload_class(self, payload):"""decode SDEntry depending on its type."""pl_type = struct.unpack(_SDEntry.TYPE_FMT,payload[_SDEntry.TYPE_PAYLOAD_I : _SDEntry.TYPE_PAYLOAD_I + 1],)[0]if pl_type in _SDEntry.TYPE_SRV:return SDEntry_Serviceelif pl_type in _SDEntry.TYPE_EVTGRP:return SDEntry_EventGroupclass SDEntry_Service(_SDEntry):"""Service Entry."""_defaults = {"type": _SDEntry.TYPE_SRV_FINDSERVICE}name = "Service Entry"fields_desc = [_SDEntry, IntField("minor_ver", 0)]class SDEntry_EventGroup(_SDEntry):"""EventGroup Entry."""_defaults = {"type": _SDEntry.TYPE_EVTGRP_SUBSCRIBE}name = "Eventgroup Entry"fields_desc = [_SDEntry,BitField("res", 0, 12),BitField("cnt", 0, 4),ShortField("eventgroup_id", 0),]# SD Option
#  - Configuration
#  - LoadBalancing
#  - IPv4 EndPoint
#  - IPv6 EndPoint
#  - IPv4 MultiCast
#  - IPv6 MultiCast
#  - IPv4 EndPoint
#  - IPv6 EndPoint
class _SDOption(_SDPacketBase):"""Base class for SDOption_* packages."""TYPE_FMT = ">B"TYPE_PAYLOAD_I = 2CFG_TYPE = 0x01CFG_OVERALL_LEN = (4  # overall length of CFG SDOption,empty 'cfg_str' (to be used from UT))LOADBALANCE_TYPE = 0x02LOADBALANCE_LEN = 0x05LOADBALANCE_OVERALL_LEN = 8  # overall length of LB SDOption (to be used from UT)IP4_ENDPOINT_TYPE = 0x04IP4_ENDPOINT_LEN = 0x0009IP4_MCAST_TYPE = 0x14IP4_MCAST_LEN = 0x0009IP4_SDENDPOINT_TYPE = 0x24IP4_SDENDPOINT_LEN = 0x0009IP4_OVERALL_LEN = 12  # overall length of IP4 SDOption (to be used from UT)IP6_ENDPOINT_TYPE = 0x06IP6_ENDPOINT_LEN = 0x0015IP6_MCAST_TYPE = 0x16IP6_MCAST_LEN = 0x0015IP6_SDENDPOINT_TYPE = 0x26IP6_SDENDPOINT_LEN = 0x0015IP6_OVERALL_LEN = 24  # overall length of IP6 SDOption (to be used from UT)def guess_payload_class(self, payload):"""decode SDOption depending on its type."""pl_type = struct.unpack(_SDOption.TYPE_FMT,payload[_SDOption.TYPE_PAYLOAD_I : _SDOption.TYPE_PAYLOAD_I + 1],)[0]if pl_type == _SDOption.CFG_TYPE:return SDOption_Configelif pl_type == self.LOADBALANCE_TYPE:return SDOption_LoadBalanceelif pl_type == self.IP4_ENDPOINT_TYPE:return SDOption_IP4_EndPointelif pl_type == self.IP4_MCAST_TYPE:return SDOption_IP4_Multicastelif pl_type == self.IP4_SDENDPOINT_TYPE:return SDOption_IP4_SD_EndPointelif pl_type == self.IP6_ENDPOINT_TYPE:return SDOption_IP6_EndPointelif pl_type == self.IP6_MCAST_TYPE:return SDOption_IP6_Multicastelif pl_type == self.IP6_SDENDPOINT_TYPE:return SDOption_IP6_SD_EndPointclass _SDOption_Header(_SDOption):fields_desc = [ShortField("len", None),ByteField("type", 0),ByteField("res_hdr", 0),]class _SDOption_Tail(_SDOption):fields_desc = [ByteField("res_tail", 0),ByteEnumField("l4_proto", 0x06, {0x06: "TCP", 0x11: "UDP"}),ShortField("port", 0),]class _SDOption_IP4(_SDOption):fields_desc = [_SDOption_Header, IPField("addr", "0.0.0.0"), _SDOption_Tail]class _SDOption_IP6(_SDOption):fields_desc = [_SDOption_Header,IP6Field("addr", "2001:cdba:0000:0000:0000:0000:3257:9652"),_SDOption_Tail,]class SDOption_Config(_SDOption):# offset to be added upon length calculation (corresponding to header's "Reserved" field)LEN_OFFSET = 0x01name = "Config Option"# default values specification_defaults = {"type": _SDOption.CFG_TYPE}# package fields definitonfields_desc = [_SDOption_Header, StrField("cfg_str", "")]def post_build(self, p, pay):# length computation excluding 16b_length and 8b_typel = self.lenif l is None:l = len(self.cfg_str) + self.LEN_OFFSETp = struct.pack("!H", l) + p[2:]return p + payclass SDOption_LoadBalance(_SDOption):name = "LoadBalance Option"# default values specification_defaults = {"type": _SDOption.LOADBALANCE_TYPE, "len": _SDOption.LOADBALANCE_LEN}# package fields definitonfields_desc = [_SDOption_Header, ShortField("priority", 0), ShortField("weight", 0)]# SDOPTIONS : IPv4-specific
class SDOption_IP4_EndPoint(_SDOption_IP4):name = "IP4 EndPoint Option"# default values specification_defaults = {"type": _SDOption.IP4_ENDPOINT_TYPE, "len": _SDOption.IP4_ENDPOINT_LEN}class SDOption_IP4_Multicast(_SDOption_IP4):name = "IP4 Multicast Option"# default values specification_defaults = {"type": _SDOption.IP4_MCAST_TYPE, "len": _SDOption.IP4_MCAST_LEN}class SDOption_IP4_SD_EndPoint(_SDOption_IP4):name = "IP4 SDEndPoint Option"# default values specification_defaults = {"type": _SDOption.IP4_SDENDPOINT_TYPE,"len": _SDOption.IP4_SDENDPOINT_LEN,}# SDOPTIONS : IPv6-specific
class SDOption_IP6_EndPoint(_SDOption_IP6):name = "IP6 EndPoint Option"# default values specification_defaults = {"type": _SDOption.IP6_ENDPOINT_TYPE, "len": _SDOption.IP6_ENDPOINT_LEN}class SDOption_IP6_Multicast(_SDOption_IP6):name = "IP6 Multicast Option"# default values specification_defaults = {"type": _SDOption.IP6_MCAST_TYPE, "len": _SDOption.IP6_MCAST_LEN}class SDOption_IP6_SD_EndPoint(_SDOption_IP6):name = "IP6 SDEndPoint Option"# default values specification_defaults = {"type": _SDOption.IP6_SDENDPOINT_TYPE,"len": _SDOption.IP6_SDENDPOINT_LEN,}#
# SD PACKAGE DEFINITION
#
class SD(_SDPacketBase):"""SD PacketNOTE :   when adding 'entries' or 'options', do not use list.append() method but create a new liste.g. :  p = SD()p.option_array = [SDOption_Config(),SDOption_IP6_EndPoint()]"""SOMEIP_MSGID_SRVID = 0xFFFFSOMEIP_MSGID_SUBID = 0x1SOMEIP_MSGID_EVENTID = 0x100SOMEIP_PROTO_VER = 0x01SOMEIP_IFACE_VER = 0x01SOMEIP_MSG_TYPE = SOMEIP.TYPE_NOTIFICATIONname = "SD"# Flags definition: {"name":(mask,offset)}_sdFlag = collections.namedtuple("Flag", "mask offset")FLAGSDEF = {"REBOOT": _sdFlag(mask=0x80, offset=7),  # ReBoot flag"UNICAST": _sdFlag(mask=0x40, offset=6),  # UniCast flag}fields_desc = [ByteField("flags", 0),X3BytesField("res", 0),FieldLenField(name="len_entry_array", default=None, length_of="entry_array", fmt="!I"),PacketListField(name="entry_array",default=None,pkt_cls=_SDEntry,length_from=lambda pkt: pkt.len_entry_array,),FieldLenField(name="len_option_array", default=None, length_of="option_array", fmt="!I"),PacketListField(name="option_array",default=None,pkt_cls=_SDOption,length_from=lambda pkt: pkt.len_option_array,),]def __init__(self, *args, **kwargs):super(SD, self).__init__(*args, **kwargs)self.explicit = 1def getFlag(self, name):"""get particular flag from bitfield."""name = name.upper()if name in self.FLAGSDEF:return (self.flags & self.FLAGSDEF[name].mask) >> self.FLAGSDEF[name].offsetelse:return Nonedef setFlag(self, name, value):"""Set particular flag on bitfield.:param str name : name of the flag to set (see SD.FLAGSDEF):param int value : either 0x1 or 0x0 (provided int will be ANDed with 0x01)"""name = name.upper()if name in self.FLAGSDEF:self.flags = (self.flags & ctypes.c_ubyte(~self.FLAGSDEF[name].mask).value) | (value & 0x01) << self.FLAGSDEF[name].offsetdef setEntryArray(self, entry_list):"""Add entries to entry_array.:param entry_list: list of entries to be added. Single entry object also accepted"""if isinstance(entry_list, list):self.entry_array = entry_listelse:self.entry_array = [entry_list]def setOptionArray(self, option_list):"""Add options to option_array.:param option_list: list of options to be added. Single option object also accepted"""if isinstance(option_list, list):self.option_array = option_listelse:self.option_array = [option_list]def getSomeip(self, stacked=False):"""return SD-initialized SOME/IP packet:param stacked: boolean. Either just SOME/IP packet or stacked over SD-self"""p = SOMEIP()p.msg_id.srv_id = SD.SOMEIP_MSGID_SRVIDp.msg_id.sub_id = SD.SOMEIP_MSGID_SUBIDp.msg_id.event_id = SD.SOMEIP_MSGID_EVENTIDp.proto_ver = SD.SOMEIP_PROTO_VERp.iface_ver = SD.SOMEIP_IFACE_VERp.msg_type = SD.SOMEIP_MSG_TYPEif stacked:return p / selfelse:return pdef get_someip_with_session_id(self,session_id, stacked=False):p = SOMEIP()p.msg_id.srv_id = SD.SOMEIP_MSGID_SRVIDp.msg_id.sub_id = SD.SOMEIP_MSGID_SUBIDp.msg_id.event_id = SD.SOMEIP_MSGID_EVENTIDp.proto_ver = SD.SOMEIP_PROTO_VERp.iface_ver = SD.SOMEIP_IFACE_VERp.msg_type = SD.SOMEIP_MSG_TYPEp.req_id.session_id = session_idif stacked:return p / selfelse:return p

2、eth_scapy_someip.py

from scapy.layers.inet import *
from scapy.fields import *
from scapy.packet import *"""SOMEIP PACKAGE DEFINITION"""class _SOMEIP_MessageId(Packet):"""MessageId subpacket."""name = 'MessageId'fields_desc = [ShortField('srv_id', 0),BitEnumField('sub_id', 0, 1, {0: 'METHOD_ID', 1: 'EVENT_ID'}),ConditionalField(BitField('method_id', 0, 15), lambda pkt: pkt.sub_id == 0),ConditionalField(BitField('event_id', 0, 15), lambda pkt: pkt.sub_id == 1)]def extract_padding(self, p):return '', pclass _SOMEIP_RequestId(Packet):""" RequestId subpacket."""name = 'RequestId'fields_desc = [ShortField('client_id', 0),ShortField('session_id', 0)]def extract_padding(self, p):return '', pclass SOMEIP(Packet):""" SOME/IP Packet."""# Default valuesPROTOCOL_VERSION = 0x01INTERFACE_VERSION = 0x01# Lenght offset (without payload)LEN_OFFSET = 0x08# SOME/IP TYPE VALUESTYPE_REQUEST = 0x00TYPE_REQUEST_NO_RET = 0x01TYPE_NOTIFICATION = 0x02TYPE_REQUEST_ACK = 0x40TYPE_REQUEST_NORET_ACK = 0x41TYPE_NOTIFICATION_ACK = 0x42TYPE_RESPONSE = 0x80TYPE_ERROR = 0x81TYPE_RESPONSE_ACK = 0xc0TYPE_ERROR_ACK = 0xc1# SOME/IP-TP TYPE VALUESTYPE_REQUEST_SEGMENT = 0x20TYPE_REQUEST_NO_RET_SEGMENT = 0x21TYPE_NOTIFICATION_SEGMENT = 0x22TYPE_REQUEST_ACK_SEGMENT = 0x60TYPE_REQUEST_NORET_ACK_SEGMENT = 0x61TYPE_NOTIFICATION_ACK_SEGMENT = 0x62TYPE_RESPONSE_SEGMENT = 0xa0TYPE_ERROR_SEGMENT = 0xa1TYPE_RESPONSE_ACK_SEGMENT = 0xe0TYPE_ERROR_ACK_SEGMENT = 0xe1SOMEIP_TP_TYPES = frozenset({TYPE_REQUEST_SEGMENT, TYPE_REQUEST_NO_RET_SEGMENT, TYPE_NOTIFICATION_SEGMENT,TYPE_REQUEST_ACK_SEGMENT, TYPE_REQUEST_NORET_ACK_SEGMENT,TYPE_NOTIFICATION_ACK_SEGMENT, TYPE_RESPONSE_SEGMENT, TYPE_ERROR_SEGMENT,TYPE_RESPONSE_ACK_SEGMENT, TYPE_ERROR_ACK_SEGMENT})SOMEIP_TP_TYPE_BIT_MASK = 0x20# SOME/IP RETURN CODESRET_E_OK = 0x00RET_E_NOT_OK = 0x01RET_E_UNKNOWN_SERVICE = 0x02RET_E_UNKNOWN_METHOD = 0x03RET_E_NOT_READY = 0x04RET_E_NOT_REACHABLE = 0x05RET_E_TIMEOUT = 0x06RET_E_WRONG_PROTOCOL_V = 0x07RET_E_WRONG_INTERFACE_V = 0x08RET_E_MALFORMED_MSG = 0x09RET_E_WRONG_MESSAGE_TYPE = 0x0a# SOME/IP-TP More Segments FlagSOMEIP_TP_LAST_SEGMENT = 0SOMEIP_TP_MORE_SEGMENTS = 1_OVERALL_LEN_NOPAYLOAD = 16  # UTname = 'SOME/IP'fields_desc = [PacketField('msg_id', _SOMEIP_MessageId(), _SOMEIP_MessageId),  # MessageIDIntField('len', None),  # LengthPacketField('req_id', _SOMEIP_RequestId(), _SOMEIP_RequestId),  # RequestIDByteField('proto_ver', PROTOCOL_VERSION),  # Protocol versionByteField('iface_ver', INTERFACE_VERSION),  # Interface versionByteEnumField('msg_type', TYPE_REQUEST, {  # -- Message type --TYPE_REQUEST: 'REQUEST',  # 0x00TYPE_REQUEST_NO_RET: 'REQUEST_NO_RETURN',  # 0x01TYPE_NOTIFICATION: 'NOTIFICATION',  # 0x02TYPE_REQUEST_ACK: 'REQUEST_ACK',  # 0x40TYPE_REQUEST_NORET_ACK: 'REQUEST_NO_RETURN_ACK',  # 0x41TYPE_NOTIFICATION_ACK: 'NOTIFICATION_ACK',  # 0x42TYPE_RESPONSE: 'RESPONSE',  # 0x80TYPE_ERROR: 'ERROR',  # 0x81TYPE_RESPONSE_ACK: 'RESPONSE_ACK',  # 0xc0TYPE_ERROR_ACK: 'ERROR_ACK',  # 0xc1}),ByteEnumField('retcode', 0, {  # -- Return code --RET_E_OK: 'E_OK',  # 0x00RET_E_NOT_OK: 'E_NOT_OK',  # 0x01RET_E_UNKNOWN_SERVICE: 'E_UNKNOWN_SERVICE',  # 0x02RET_E_UNKNOWN_METHOD: 'E_UNKNOWN_METHOD',  # 0x03RET_E_NOT_READY: 'E_NOT_READY',  # 0x04RET_E_NOT_REACHABLE: 'E_NOT_REACHABLE',  # 0x05RET_E_TIMEOUT: 'E_TIMEOUT',  # 0x06RET_E_WRONG_PROTOCOL_V: 'E_WRONG_PROTOCOL_VERSION',  # 0x07RET_E_WRONG_INTERFACE_V: 'E_WRONG_INTERFACE_VERSION',  # 0x08RET_E_MALFORMED_MSG: 'E_MALFORMED_MESSAGE',  # 0x09RET_E_WRONG_MESSAGE_TYPE: 'E_WRONG_MESSAGE_TYPE',  # 0x0a}),ConditionalField(BitField('offset', 0, 28), lambda pkt: pkt.msg_type in SOMEIP.SOMEIP_TP_TYPES),ConditionalField(BitField('reserved', 0, 3), lambda pkt: pkt.msg_type in SOMEIP.SOMEIP_TP_TYPES),ConditionalField(BitEnumField('more_segments', 0, 1, {SOMEIP_TP_LAST_SEGMENT: 'Last_Segment',SOMEIP_TP_MORE_SEGMENTS: 'More_Segments'}), lambda pkt: pkt.msg_type in SOMEIP.SOMEIP_TP_TYPES)]def post_build(self, p, pay):length = self.len# length computation : RequestID + PROTOVER_IFACEVER_TYPE_RETCODE + PAYLOADif length is None:length = self.LEN_OFFSET + len(pay)p = p[:4] + struct.pack('!I', length) + p[8:]return p + payfor i in range(15):bind_layers(UDP, SOMEIP, sport=30490 + i)bind_layers(TCP, SOMEIP, sport=30490 + i)

3、network_define.py

class EthParameter:# 廣播sd_network_card = "eth0.62"sd_ip = "239.0.0.255"# 本機server_network_card = "Realtek PCIe GbE Family Controller #2"server_ip = "192.168.62.31"# 域控client_network_card = "eth0.62"client_ip = "192.168.62.11"sd_port = 30490producer_port = 30500consumer_prot = 30501

4、packet_define.py

import eth_scapy_sd as sdfrom network_define import EthParameterfrom scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDPclass SomeipPacker:def __init__(self):self.eth_para = EthParameter()self.sd_session_id = 1self.client_session_id = 1def update_sd_session_id(self):"""更新session_id,從0遞增到65535"""self.sd_session_id += 1if self.sd_session_id >= 65535:self.sd_session_id = 0def update_client_session_id(self):self.client_session_id += 1if self.client_session_id >= 65535:self.client_session_id = 0# ===========================##          序列化             ## ===========================#def packet_subscribe(self, service_id: list, ttl: int):"""組事件訂閱包"""subscribe_packager = sd.SD()subscribe_packager.setFlag("REBOOT", 1)subscribe_packager.setFlag("UNICAST", 1)subscribe_packager.len_entry_array = 16 * len(service_id)subscribe_packager.len_option_array = 24for i in service_id:subscribe_packager.entry_array.append(sd.SDEntry_EventGroup(type=sd.SDEntry_EventGroup.TYPE_EVTGRP_SUBSCRIBE,srv_id=i,inst_id=0x1,n_opt_1=2,major_ver=0x1,ttl=ttl,eventgroup_id=0x1,))subscribe_packager.option_array = [# UDP EndPointsd.SDOption_IP4_EndPoint(addr=self.eth_para.server_ip,l4_proto=0x11,port=self.eth_para.consumer_prot,),# TCP EndPointsd.SDOption_IP4_EndPoint(addr=self.eth_para.server_ip,l4_proto=0x06,port=self.eth_para.consumer_prot,),]print(subscribe_packager.show())subscribe_package = (Ether()/ IP(src=self.eth_para.server_ip, dst=self.eth_para.client_ip)/ UDP(sport=self.eth_para.sd_port, dport=self.eth_para.sd_port)/ subscribe_packager.get_someip_with_session_id(self.client_session_id, True))self.update_client_session_id()return subscribe_packagedef packet_subscribe_ack(self, service_id: list, ttl: int):"""組ack包"""ack_packager = sd.SD()ack_packager.setFlag("REBOOT", 1)ack_packager.setFlag("UNICAST", 1)ack_packager.len_entry_array = 16 * len(service_id)ack_packager.len_option_array = 0for i in service_id:ack_packager.entry_array.append(sd.SDEntry_EventGroup(type=sd.SDEntry_EventGroup.TYPE_EVTGRP_SUBSCRIBE_ACK,srv_id=i,inst_id=0x1,major_ver=0x1,ttl=ttl,eventgroup_id=0x1,))print(ack_packager.show())ack_package = (Ether()/ IP(src=self.eth_para.server_ip, dst=self.eth_para.client_ip)/ UDP(sport=self.eth_para.sd_port, dport=self.eth_para.sd_port)/ ack_packager.get_someip_with_session_id(self.client_session_id, True))self.update_client_session_id()return ack_packagedef packet_offer(self, service_id: list, ttl: int):"""組offer包"""offer_packager = sd.SD()offer_packager.setFlag("REBOOT", 1)offer_packager.setFlag("UNICAST", 1)offer_packager.len_entry_array = 16 * len(service_id)offer_packager.len_option_array = 12for i in service_id:offer_packager.entry_array.append(sd.SDEntry_Service(type=sd.SDEntry_Service.TYPE_SRV_OFFERSERVICE,srv_id=i,inst_id=0x1,n_opt_1=1,major_ver=0x1,minor_ver=0x01,ttl=ttl,))offer_packager.option_array = [sd.SDOption_IP4_EndPoint(addr=self.eth_para.server_ip,l4_proto=0x11,port=self.eth_para.producer_port,)]print(offer_packager.show())offer_package = (Ether()/ IP(src=self.eth_para.server_ip, dst=self.eth_para.sd_ip)/ UDP(sport=self.eth_para.sd_port, dport=self.eth_para.sd_port)/ offer_packager.get_someip_with_session_id(self.sd_session_id, True))self.update_sd_session_id()return offer_package

5、unpack_define.py

from scapy.packet import Raw
from typing import Optional
from dataclasses import dataclass@dataclass
class SomeIPHeaderParams:service_id: intevent_id: int      # 或 event_id,取決于消息類型client_id: intsession_id: intmsg_type: intreturn_code: intlength: intprotocol_version: intinterface_version: intclass SomeipUnpacker:@staticmethoddef get_someip_header_params(receive_packet) -> Optional[SomeIPHeaderParams]:"""獲取someip header"""try:if receive_packet.haslayer("SOME/IP"):someip_layer = receive_packet["SOME/IP"]# 獲取someip header值service_id = someip_layer.msg_id.srv_idsub_id = someip_layer.msg_id.sub_idevent_id = sub_id << 15 | someip_layer.msg_id.event_id# 提取請求 ID(Client ID 和 Session ID)req_id = someip_layer.req_idclient_id = req_id.client_idsession_id = req_id.session_id# 構造頭部對象return SomeIPHeaderParams(service_id=service_id,event_id=event_id,client_id=client_id,session_id=session_id,msg_type=someip_layer.msg_type,return_code=someip_layer.retcode,length=someip_layer.len,protocol_version=someip_layer.proto_ver,interface_version=someip_layer.iface_ver,)except (AttributeError, ValueError) as e:print(f"解析失敗: {e}")return None@staticmethoddef get_someip_payload(receive_packet):"""獲取someip payload"""try:someip_payload = receive_packet[Raw].load# print(f"someip_payload: {someip_payload}")return someip_payloadexcept AttributeError:return None

6、someip_controller.py

import threading
import socket
import time
from module.packet_define import *
from module.unpack_define import *from scapy.sendrecv import sendp, sniffclass SomeipController(threading.Thread):def __init__(self, someip_packer: SomeipPacker, someip_unpacker: SomeipUnpacker):super().__init__()self.someip_packer = someip_packerself.someip_unpacker = someip_unpackerself._stop_event = threading.Event()self.running = Truedef run(self):# 創建并發送offer以及訂閱ack報文(這里忽略訂閱,直接回復訂閱ack)service_id_list = [0xA994, 0xA995]_offer = self.someip_packer.packet_offer(service_id=service_id_list,ttl=3)_offer_sender = SomeipSendLoop(network_card=EthParameter.sd_network_card,someip_packer=_offer,cycle_time_ms=2000,)# 回復訂閱ACK_subscribe_ack = self.someip_packer.packet_subscribe_ack(service_id=service_id_list,ttl=3)_subscribe_ack_sender = SomeipSendLoop(network_card=EthParameter.sd_network_card,someip_packer=_subscribe_ack,cycle_time_ms=2000,)# 創建并發送訂閱報文(訂閱0xAB03, 0xAB04)_subscribe = self.someip_packer.packet_subscribe(service_id=[0xAB03,0xAB04], ttl=0x03)_subscribe_sender = SomeipSendLoop(network_card=EthParameter.server_network_card,someip_packer=_subscribe,cycle_time_ms=2000,)# 啟動接收模塊someip_receiver = SomeIpReceiver(EthParameter.server_network_card)while self._stop_event:time.sleep(0.01)def stop(self):self.running = False  # 設置標志位為 False 來停止self._stop_event.set()self.join()class SomeipSendLoop(threading.Thread):def __init__(self, network_card, someip_packer, cycle_time_ms):super().__init__()self.network_card = network_cardself.someip_packer = someip_packerself.cycle_time_ms = cycle_time_msself._stop_event = threading.Event()self.start()def run(self):while self._stop_event:if self.someip_packer:sendp(self.someip_packer, iface=self.network_card, verbose=0)time.sleep(self.cycle_time_ms / 1000)def stop(self):self._stop_event.set()self.join()class SomeIpReceiver(threading.Thread):def __init__(self,eth_desc: str,someip_unpacker):super().__init__()self._eth_desc = eth_descself._unpacker = someip_unpackerself._terminated = Falseself._stop_event =threading.Event()self.start()def packet_callback(self, packet):if packet:header_param = self._unpacker.get_someip_header_params(receive_packet=packet)print(f"msg_type: {header_param.msg_type}")def run(self):sniff(iface=self._eth_desc,prn=self.packet_callback,filter="udp",stop_filter=lambda x: self._terminated,)while self._stop_event:# 保持sniff嗅探不中斷time.sleep(1)def stop(self):self._terminated = Trueself._stop_event.set()self.join()# 如果協議有TCP
class SomeipTcpSocket(threading.Thread):def __init__(self):super().__init__()self.tcp_sock: socket = Noneself._stop_event = threading.Event()self.start()def create_tcp_socket(self):self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.tcp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.tcp_sock.bind((EthParameter.icc_android_ip, EthParameter.consumer_prot))self.tcp_sock.connect((EthParameter.adcc_ip, EthParameter.producer_port))def run(self):self.create_tcp_socket()while self._stop_event:# 保持TCP通信time.sleep(1)def stop(self):self._stop_event.set()self.join()if __name__ == "__main__":s_socket = SomeipTcpSocket()s_packer = SomeipPacker()s_unpacker = SomeipUnpacker()sim_server = SomeipController(s_packer,s_unpacker)sim_server.start()

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/73517.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/73517.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/73517.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Linux內核系列】:文件系統收尾以及軟硬鏈接詳解

&#x1f525; 本文專欄&#xff1a;Linux &#x1f338;作者主頁&#xff1a;努力努力再努力wz &#x1f4aa; 今日博客勵志語錄&#xff1a; 世界上只有一種個人英雄主義&#xff0c;那么就是面對生活的種種失敗卻依然熱愛著生活 內容回顧 那么在之前的學習中&#xff0c;我們…

最新版Chrome瀏覽器加載ActiveX控件技術--allWebPlugin中間件一鍵部署瀏覽器擴展

allWebPlugin簡介 allWebPlugin中間件是一款為用戶提供安全、可靠、便捷的瀏覽器插件服務的中間件產品&#xff0c;致力于將瀏覽器插件重新應用到所有瀏覽器。它將現有ActiveX控件直接嵌入瀏覽器&#xff0c;實現插件加載、界面顯示、接口調用、事件回調等。支持Chrome、Firefo…

基于SpringBoot和MybatisPlus實現通用Controller

基于SpringBoot和MybatisPlus實現通用Controller&#xff0c;只需要創建實體類和mapper接口&#xff0c;單表增刪改查接口就已經實現&#xff0c;提升開發效率 1.定義通用controller package com.xian.controller;import cn.hutool.core.map.MapUtil; import com.baomidou.my…

Axure大屏可視化原型模板及素材:數據可視化的高效解決方案

數據可視化已成為企業決策、運營分析、市場洞察的重要工具。數據可視化大屏&#xff0c;作為數據展示和交互的直觀平臺&#xff0c;能夠實時呈現關鍵數據&#xff0c;幫助企業快速做出決策。Axure作為原型設計領域的領先工具&#xff0c;以其豐富的組件庫、強大的交互設計能力和…

YOLOE:實時查看任何事物

摘要 https://arxiv.org/pdf/2503.07465v1 目標檢測和分割在計算機視覺應用中得到了廣泛應用&#xff0c;然而&#xff0c;盡管YOLO系列等傳統模型高效且準確&#xff0c;但它們受限于預定義的類別&#xff0c;阻礙了在開放場景中的適應性。最近的開放集方法利用文本提示、視覺…

【品鉑科技工業生產應用案例解析】

品鉑科技&#xff08;Pinpoint&#xff09;在工業領域的高精度定位解決方案已廣泛應用于電力、鋼鐵、倉儲、化工、地鐵等場景&#xff0c;以下為典型應用案例及技術方案&#xff1a; 一、?電力行業&#xff1a;上海閔行電廠人員定位? 白鶴灘水力發電站 ?項目需求?&#x…

7-Zip 功能介紹

7-Zip 是一款開源、高效的文件壓縮與解壓縮工具&#xff0c;支持多種格式&#xff0c;以高壓縮率和靈活性著稱。以下是其核心功能&#xff1a; 多格式支持 壓縮 / 解壓&#xff1a;支持 7z&#xff08;默認格式&#xff0c;壓縮率極高&#xff09;、ZIP、RAR、GZIP、BZIP2、TAR…

這是我第一次寫關於aapenal服務器管理控制面板的文章

首先我們來認識一下服務器管理面板的所有功能 ? 網站管理功能&#xff1a; 支持創建和管理多個網站。配置虛擬主機&#xff08;Vhost&#xff09;和域名綁定。自動安裝常用應用&#xff08;如WordPress、Joomla等&#xff09;。 ? 文件管理功能&#xff1a; 文件上傳、…

小語言模型(SLM)技術解析:如何在有限資源下實現高效AI推理

引言&#xff1a;為什么小語言模型&#xff08;SLM&#xff09;是2025年的技術焦點&#xff1f; 2025年&#xff0c;人工智能領域正經歷一場“由大變小”的革命。盡管大語言模型&#xff08;LLM&#xff09;如GPT-4、Gemini Ultra等在復雜任務中表現驚艷&#xff0c;但其高昂的…

jmeter:登錄接口的token用于下一個接口

問題&#xff1a; 僅僅登錄接口可以使用&#xff0c;其他接口進行測試的時候都是報錯&#xff1a;賬號已經失效 原因&#xff1a; 應該是登錄接口的token并沒有用到下一個接口上來 解決方法 1、目錄建設如下&#xff1a; 2、先添加一個后置處理器&#xff1a;查看結果數&…

1、操作系統引論

一、操作系統 會使用linux系統 建議大家先學會linux的基礎指令&#xff0c;可以看菜鳥教程網站進行學習。 1、各種定義 操作系統定義 管理計算機的 硬件 和軟件資源&#xff0c; 能對各類作業進行調度&#xff0c;方便用戶使用計算機的程序集合。操作系統運行在內核態&#xf…

KVM安全模塊生產環境配置與優化指南

KVM安全模塊生產環境配置與優化指南 一、引言 在當今復雜多變的網絡安全環境下&#xff0c;生產環境中KVM&#xff08;Kernel-based Virtual Machine&#xff09;的安全配置顯得尤為重要。本指南旨在詳細闡述KVM安全模塊的配置方法&#xff0c;結合強制訪問控制&#xff08;M…

深入解析工廠模式及其C#實現

工廠模式&#xff08;Factory Pattern&#xff09;是設計模式中的一種創建型模式&#xff0c;它通過工廠方法來創建對象&#xff0c;而不是讓客戶端代碼直接實例化對象。這樣可以避免客戶端與具體類的緊密耦合&#xff0c;從而提高代碼的靈活性、可維護性和擴展性。工廠模式能夠…

【愚公系列】《高效使用DeepSeek》009-PPT大綱自動生成

標題詳情作者簡介愚公搬代碼頭銜華為云特約編輯,華為云云享專家,華為開發者專家,華為產品云測專家,CSDN博客專家,CSDN商業化專家,阿里云專家博主,阿里云簽約作者,騰訊云優秀博主,騰訊云內容共創官,掘金優秀博主,亞馬遜技領云博主,51CTO博客專家等。近期榮譽2022年度…

SpringCloud系列教程(十四):Sentinel持久化

Sentinel之前已經搭建和應用成功了&#xff0c;但是它有一個很大的缺點就是官方沒有提供持久化的方案&#xff0c;從項目源碼上看感覺這款工具也沒有完成的太好&#xff0c;所以需要我們去對它進行二次開發。要補充的功能大概如下&#xff1a; 1、將Sentinel接入nacos中&#…

AGI大模型(3):大模型生成內容

1 大模型是怎么生成內容的 簡單來說就是靠"猜"! 雖然?常不可思議,但事實就是這樣,現階段所有的 NLP 任務,都不意味著機器真正理解這個世界,它只是在玩?字游戲,進??次??次的概率解謎,本質上和我們玩報紙上的填字游戲是?個邏輯。只是我們靠知識和智慧,…

Go語言環境搭建并執行第一個Go程序

目錄 一、Windows環境搭建 二、vscode安裝插件 三、運行第一個go程序 一、Windows環境搭建 下載Go&#xff1a;All releases - The Go Programming Language 這里是Windows搭建&#xff0c;選擇的是windows-amd64.msi&#xff0c;也可以選擇zip直接解壓縮到指定目錄 選擇msi…

Java數據結構第二十三期:Map與Set的高效應用之道(二)

專欄&#xff1a;Java數據結構秘籍 個人主頁&#xff1a;手握風云 目錄 一、哈希表 1.1. 概念 1.2. 沖突 1.3. 避免沖突 1.4. 解決沖突 1.5. 實現 二、OJ練習 2.1. 只出現一次的數字 2.2. 隨機鏈表的復制 2.3. 寶石與石頭 一、哈希表 1.1. 概念 順序結構以及平衡樹中…

OpenHarmony子系統開發 - Rust編譯構建指導

OpenHarmony子系統開發 - Rust編譯構建指導 一、Rust模塊配置規則和指導 概述 Rust是一門靜態強類型語言&#xff0c;具有更安全的內存管理、更好的運行性能、原生支持多線程開發等優勢。Rust官方也使用Cargo工具來專門為Rust代碼創建工程和構建編譯。 OpenHarmony為了集成C…

【SpringMVC】常用注解:@ModelAttribute

1.作用 該注解是在SpringMVC4.3版本后新加入的。它可以修飾方法和參數。出現在方法上&#xff0c;表示當前方法會在控制器的方法之前執行。它可以修飾 沒有返回值的方法&#xff0c;也可以修飾沒有返回值的方法。它修飾參數&#xff0c;獲取指定 的數據給參數賦值。 當表單提…