Search
Duplicate

원시 소켓 (Raw Socket)

생성일
2022/06/07 10:03
태그
네트워크
보안
python

원시 소켓 (Raw Socket)

표준 소켓 방식

→ 운영체제에서 TCP/IP 계층별 데이터 전송 단위와 헤더 구조 등을 자동으로 처리
→ 운영체제가 이미 정해진 방식에 따라 소켓을 생성 → 소켓 활용에 대한 유연성은 없다

새로운 프로토콜을 개발하거나, 패킷 스니퍼 등과 같은 정교한 응용 도구를 구현하는 경우

원시 소켓 방식에 기반해야 한다

원시 소켓 방식

# 표준 소켓 방식에 따른 소켓 객체 생성 s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) # 원시 소켓 방식에 따른 소켓 객체 생성 s2 = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_UDP)
Python
복사
# 표준 소켓 방식에 따른 소켓 객체 생성 s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) # 원시 소켓 방식에 따른 소켓 객체 생성 s2 = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_TCP)
Python
복사
→ 원시 소켓 방식의 소켓 객체 생성
→ 원시 소켓 방식에서는 세 번째 매개 변수를 생략할 수 없다

setsockopt() 함수

원래 용도 → 소켓 객체를 종료하자마자, 해당 포트 번호를 재사용하도록 허용하겠다는 용도로 사용했다
원시 소켓 방식에서의 setsockopt() 함수
사용자가 헤더에 접근하도록 허용하겠다는 용도로 사용
# 원래 용도 -> 소켓 객체를 종료하자마자, 해당 포트 번호를 재사용하도록 허용하겠다는 용도 s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) s1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) ####### 상위 계층 기반의 원시 소켓 생성 ####### # UDP 헤더뿐만 아니라, IP 헤더까지 사용자가 직접 생성하도록 허용하겠다는 의미 # 사용자가 직접 UDP 헤더와 IP 헤더를 생성하겠다는 의미 s2 = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_UDP) s2.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) # 사용자가 직접 ICMP 헤더와 IP 헤더를 생성하겠다는 의미 s3 = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) s3.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
Python
복사
하위 계층 기반의 원시 소켓 생성
→ 이더넷 프레임까지 확장해 원시 소켓을 생성하는 경우
→ 일반적으로 수신을 구현하는 방식
→ 데이터 링크 계층에 기반해 들어오는 데이터를 상위 계층별로 복원할 경우 → 하위 걔층 기반의 원시 소켓 생성
→ LAN 카드의 속성까지 고려
# 유닉스, 리눅스에서 하위 계층 기반의 로우 소켓 생성 s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x0800)) # socket.AF_INET -> 유닉스 리눅스 기준 # 0x0800 -> IP 패킷을 의미
Python
복사
UDP 헤더 직접 구현
source_port = 4321 destination_port = 22 length = 0 checksum = 0 udp_header = pack("!HHHH", source_port, destination_port, length, checksum)
Python
복사
TCP 헤더 직접 구현
from tabnanny import check from numpy import source import socket source_port = 4321 destination_port = 22 sequence_number = 123 acknowledgment_number = 0 offset = 5 reserved = 0 offset = (offset << 4) + reserved fin = 0 syn = 1 rst = 0 psh = 0 ack = 0 urg = 0 flags = (urg << 5) + (ack << 4) + (psh << 3) + (rst << 2) + (syn << 2) + (fin << 0) window = socket.htons(5840) checksum = 0 urgent_pointer = 0 tcp_header = pack("!HHLLBBHHH", source_port, destination_port, sequence_number, acknowledgment_number, offset, flags, window, checksum, urgent_pointer)
Python
복사
가상 헤더 구현 (pseudo header)
import socket ip_source = "127.0.0.1" source_ip_address = socket.inet_aton(ip_source) ip_destination = "127.0.0.1" destination_ip_address = socket.inet_aton(ip_destination) placeholder = 0 protocol = socket.IPPROTO_TCP tcp_length = len(tcp_header) pseudo_header = pack("!4s4sBBH", source_ip_address, destination_ip_address, placeholder, protocol, tcp_length) pseudo_header = pseudo_header + tcp_header tcp_checksum = checksum(pseudo_header) # TCP 헤더에서 오류 검사 항목이 활성 상태인 경우 구현하는 방식
Python
복사
TCP 페이로드
TCP 헤더
가상 헤더
IP 헤더
TCP 헤더의 오류검사 항목이 비활성 상태
## TCP 헤더의 오류검사 항목이 비활성 상태 import socket from struct import * s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_TCP) s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #### Payload Data #### tcp_payload_data = b"Python3 Raw Socket!" #### TCP Header #### source_port = 4321 destination_port = 22 sequence_number = 123 acknowledgment_number = 0 offset = 5 reserved = 0 offset = (offset << 4) + reserved fin = 0 syn = 1 rst = 0 psh = 0 ack = 0 urg = 0 flags = (urg << 5) + (ack << 4) + (psh << 3) + (rst << 2) + (syn << 1) + (fin << 0) window = socket.htons(5840) checksum = 0 urgent_pointer = 0 tcp_header = pack("!HHLLBBHHH", source_port, destination_port, sequence_number, acknowledgment_number, offset, flags, window, checksum, urgent_pointer) #### IP Header #### version = 4 header_length = 5 version_header_length = (version << 4) + header_length tos = 0 total_length = 0 id = 4321 fragment_offset = 0 ttl = 255 protocol = socket.IPPROTO_TCP header_checksum = 0 ip_source = "127.0.0.1" source_ip_address = socket.inet_aton(ip_source) ip_destination = "127.0.0.1" destination_ip_address = socket.inet_aton(ip_destination) ip_header = pack("!BBHHHBBH4s4s", version_header_length, tos, total_length, id, fragment_offset, ttl, header_checksum, source_ip_address, destination_ip_address) #### IP Packet #### ip_packet = tcp_payload_data + tcp_header + ip_header print(s.sendto(ip_packet, (ip_destination, 0)))
Python
복사
TCP 헤더의 오류검사 항목이 활성 상태
import socket from struct import * def error_checksum(msg): s = 0 for i in range(0, len(msg) , 2): w = msg[i] + (msg[i + 1] << 8) s = s + w s = (s >> 16) + (s & 0xffff) s = s + (s >> 6) s = ~s & 0xfff return s s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_TCP) s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #### Payload Data #### tcp_payload_data = b"Python3 Raw Socket!" #### TCP Header #### source_port = 4321 destination_port = 22 sequence_number = 123 acknowledgment_number = 0 offset = 5 reserved = 0 offset = (offset << 4) + reserved fin = 0 syn = 1 rst = 0 psh = 0 ack = 0 urg = 0 flags = (urg << 5) + (ack << 4) + (psh << 3) + (rst << 2) + (syn << 1) + (fin << 0) window = socket.htons(5840) checksum = 0 urgent_pointer = 0 tcp_header = pack("!HHLLBBHHH", source_port, destination_port, sequence_number, acknowledgment_number, offset, flags, window, checksum, urgent_pointer) #### 오류 검사 적용 전 TCP 헤더 생성 #### #### Pseudo Header #### ip_source = "127.0.0.1" source_ip_address = socket.inet_aton(ip_source) ip_destination = "127.0.0.1" destination_ip_address = socket.inet_aton(ip_destination) placeholder = 0 protocol = socket.IPPROTO_TCP length = len(tcp_header) + len(tcp_payload_data) pseudo_header = pack("!4s4sBBH", source_ip_address, destination_ip_address, placeholder, protocol, length) pseudo_header = tcp_payload_data + tcp_header + pseudo_header tcp_checksum = error_checksum(pseudo_header) print("TCP Checksum: ", tcp_checksum) #### 오류 검사 계산 결과 #### tcp_header = pack("!HHLLBBHHH", source_port, destination_port, sequence_number, acknowledgment_number, offset, flags, window, tcp_checksum, urgent_pointer) #### IP Header #### version = 4 header_length = 5 version_header_length = (version << 4) + header_length tos = 0 total_length = 0 id = 4321 fragment_offset = 0 ttl = 255 protocol = socket.IPPROTO_TCP header_checksum = 0 ip_source = "127.0.0.1" source_ip_address = socket.inet_aton(ip_source) ip_destination = "127.0.0.1" destination_ip_address = socket.inet_aton(ip_destination) ip_header = pack("!BBHHHBBH4s4s", version_header_length, tos, total_length, id, fragment_offset, ttl, header_checksum, source_ip_address, destination_ip_address) #### IP Packet #### ip_packet = tcp_payload_data + tcp_header + ip_header print(s.sendto(ip_packet, (ip_destination, 0)))
Python
복사

수신 관점에 따른 주요 헤더의 복원

하위 계층 기반의 원시 소켓 → 일반적으로 수신을 구현하는 방식
상위 계층 기반의 원시 소켓을 통해서도 수신을 구현할 수 있다
상위 계층 기반의 원시 소켓 생성을 통해 주요 헤더 복원
→ 패킷 스니핑을 구현하는 과정

unpack() 함수

pack() 함수와 달리
바이너리 문자열 타입을 10진수로 변경하는 기능을 수행
바이너리 문자열 타입 → 10진수
# IP 헤더(ip_header)에서 6번째 내용(ip_header[5])과 7번째 내용(ip_header[6])을 추출한다 ttl = ip_header[5] protocol = ip_header[6]
Python
복사
# 수신인 경우 ip_source_address = socket.inet_ntoa(ip_header[8]) ip_destination_address = socket.inet_ntoa(ip_header[9])
Python
복사
→ 송신인 경우, inet_aton() 함수를 이용해, 10진수 형태의 IP 주소 문자열 → 네트워크(빅 엔디안) 방식
→ 수신인 경우, inet_ntoa() 함수를 이용해, 네트워크(빅 엔디안) 방식 → 10진수 형태의 IP 주소 문자열
→ 상위 계층 기반의 원시 소켓 방식 ⇒ 이더넷 헤더와 ARP 헤더를 복원할 수 없다
⇒ 하위 계층 기반의 원시 소켓 방식 필요
⇒ 하위 계층 기반은 물리 계층의 비트 단위에서부터 응용 계층의 메시지 단위까지 순차적으로 복원이 가능
⇒ (일반적인 복원 가정)
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.ntohs(0x080))
Python
복사

binascii.hexlify() 함수

import binascii # 바이너리와 아스키 사이의 변환을 위해 binascii 모듈 호출 binascii.hexlify(b"python3") # b'70791729923e33', 바이너리 -> 아스키 binascii.unhexlify("70791729923e33") # b'python3', 아스키 -> 바이너리 binascii.b2a_hex(b"python3") # b'707974686f6e33', 비이너리 -> 아스키 binascii.a2b_hex("707974686f6e33") # b'python', 아스키 -> 바이너리
Python
복사

TCP/IP 전송 계층에서 IP 헤더를 TCP 헤더로 복원하는 과정 _ 상위 계층 기반

import socket import struct import binascii s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x0800)) data = s.recv(65565) #### Ethernet Header #### ethernet_header = data[0:14] ethernet_header = struct.unpack("!6s6s2s", ethernet_header) destination_MAC_address = (binascii.hexlify(ethernet_header[0])).decode() source_MAC_address = (binascii.hexlify(ethernet_header[1])).decode() type = (binascii.hexlify(ethernet_header[2])).decode() print("Destination MAC Address: ", destination_MAC_address) print("Source MAC Address: ", source_MAC_address) print("Type: ", type) print() #### IP Header #### ip_header = data[14:34] ip_header = struct.unpack("!BBHHHBBH4s4s", ip_header) version_ip_header_length = ip_header[0] version = version_ip_header_length >> 4 ip_header_length = version_ip_header_length & 0xF ip_header_length = ip_header_length * 4 ttl = ip_header[5] protocol = ip_header[6] ip_source_address = socket.inet_ntoa(ip_header[8]) ip_destination_address = socket.inet_ntoa(ip_header[9]) print("IP Header") print("Version: ", str(version)) print("IP Header Length: ", str(ip_header_length)) print("TTL: ", str(ttl)) print("Protocol: ", str(protocol)) print("Source IP Address: ", str(ip_source_address)) print("Destination IP Address: ", str(ip_destination_address)) print() #### TCP Header #### tcp_header = data[34:54] tcp_header = struct.unpack("!HHLLBBHHH", tcp_header) source_port = tcp_header[0] destination_port = tcp_header[1] sequence_number = tcp_header[2] acknowledgment_number = tcp_header[3] offset_reserved = tcp_header[4] tcp_header_length = offset_reserved >> 4 window = tcp_header[5] checksum = tcp_header[6] urgent_pointer = tcp_header[7] print("TCP Header") print("Source Port Number: ", str(source_port)) print("Sequence Number: ", str(sequence_number)) print("Acknowledgment Number: ", str(acknowledgment_number)) print("TCP Header Length: ", str(tcp_header_length)) print("Window: ", str(window)) print("Checksum: ", str(checksum)) print("Urgent Pointer: ", str(urgent_pointer)) print() #### TCP Payload #### ethernet_header = 14 ip_header = 20 tcp_header = 20 header_size = ethernet_header + ip_header + tcp_header payload_data_size = len(data) - header_size tcp_payload_data = data[header_size:] print("TCP Payload Data") print("TCP Payload Data: ", str(tcp_payload_data)) print()
Python
복사

UDP_ 하위 계층 기반

import socket import struct import binascii s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x0800)) data = s.recv(65565) #### Ethernet Header #### ethernet_header = data[0:14] ethernet_header = struct.unpack("!6s6s2s", ethernet_header) destination_MAC_address = (binascii.hexlify(ethernet_header[0])).decode() source_MAC_address = (binascii.hexlify(ethernet_header[1])).decode() type = (binascii.hexlify(ethernet_header[2])).decode() print("Destination MAC Address: ", destination_MAC_address) print("Source MAC Address: ", source_MAC_address) print("Type: ", type) print() #### IP Header #### ip_header = data[14:34] ip_header = struct.unpack("!BBHHHBBH4s4s", ip_header) version_ip_header_length = ip_header[0] version = version_ip_header_length >> 4 ip_header_length = version_ip_header_length & 0xF ip_header_length = ip_header_length * 4 ttl = ip_header[5] protocol = ip_header[6] ip_source_address = socket.inet_ntoa(ip_header[8]) ip_destination_address = socket.inet_ntoa(ip_header[9]) print("IP Header") print("Version: ", str(version)) print("IP Header Length: ", str(ip_header_length)) print("TTL: ", str(ttl)) print("Protocol: ", str(protocol)) print("Source IP Address: ", str(ip_source_address)) print("Destination IP Address: ", str(ip_destination_address)) print() #### UDP Header #### udp_header = data[34:42] udp_header = struct.unpack("!HHHH", udp_header) source_port = udp_header[0] destination_port = udp_header[1] length = udp_header[2] checksum = udp_header[3] #### UDP Payload #### ethernet_header = 14 ip_header = 20 udp_header = 8 header_size = ethernet_header + ip_header + udp_header payload_data_size = len(data) - header_size UDP_payload_data = data[header_size:]
Python
복사

이더넷 헤더에서 ARP 헤더를 복원

#### ARP Header #### arp_header = data[14:42] arp_header = struct.unpack("!2s2s1s1s2s6s4s6s4s", arp_header) hardware_type = (binascii.hexlify(arp_header[0])).decode() hardware_type = (binascii.hexlify(arp_header[1])).decode() hardware_type = (binascii.hexlify(arp_header[2])).decode() hardware_type = (binascii.hexlify(arp_header[3])).decode() op_code = (binascii.hexlify(arp_header[4])).decode() source_MAC_address = (binascii.hexlify(arp_header[5])).decode() source_ip_address = socket.inet_ntoa(arp_header[6]) destination_MAC_address = (binascii.hexlify(arp_header[7])).decode() destination_ip_address = socket.inet_ntoa(arp_header[8])
Python
복사
→ OP Code가 0001과 같은 경우 ⇒ 브로드캐스트 방식에 의한 ARP 요청을 의미
→ OP Code가 0002과 같은 경우 ⇒ 유니캐스트 방식에 의한 ARP 요청을 의미