원시 소켓 (Raw Socket)
표준 소켓 방식
→ 운영체제에서 TCP/IP 계층별 데이터 전송 단위와 헤더 구조 등을 자동으로 처리
→ 운영체제가 이미 정해진 방식에 따라 소켓을 생성 → 소켓 활용에 대한 유연성은 없다
새로운 프로토콜을 개발하거나, 패킷 스니퍼 등과 같은 정교한 응용 도구를 구현하는 경우
→ 원시 소켓 방식에 기반해야 한다
원시 소켓 방식
# 표준 소켓 방식에 따른 소켓 객체 생성
s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
# 원시 소켓 방식에 따른 소켓 객체 생성
s2 = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_UDP)
Python
복사
# 표준 소켓 방식에 따른 소켓 객체 생성
s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
# 원시 소켓 방식에 따른 소켓 객체 생성
s2 = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_TCP)
Python
복사
→ 원시 소켓 방식의 소켓 객체 생성
→ 원시 소켓 방식에서는 세 번째 매개 변수를 생략할 수 없다
setsockopt() 함수
•
원래 용도 → 소켓 객체를 종료하자마자, 해당 포트 번호를 재사용하도록 허용하겠다는 용도로 사용했다
•
원시 소켓 방식에서의 setsockopt() 함수
◦
사용자가 헤더에 접근하도록 허용하겠다는 용도로 사용
# 원래 용도 -> 소켓 객체를 종료하자마자, 해당 포트 번호를 재사용하도록 허용하겠다는 용도
s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
s1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
####### 상위 계층 기반의 원시 소켓 생성 #######
# UDP 헤더뿐만 아니라, IP 헤더까지 사용자가 직접 생성하도록 허용하겠다는 의미
# 사용자가 직접 UDP 헤더와 IP 헤더를 생성하겠다는 의미
s2 = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_UDP)
s2.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
# 사용자가 직접 ICMP 헤더와 IP 헤더를 생성하겠다는 의미
s3 = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
s3.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
Python
복사
하위 계층 기반의 원시 소켓 생성
→ 이더넷 프레임까지 확장해 원시 소켓을 생성하는 경우
→ 일반적으로 수신을 구현하는 방식
→ 데이터 링크 계층에 기반해 들어오는 데이터를 상위 계층별로 복원할 경우 → 하위 걔층 기반의 원시 소켓 생성
→ LAN 카드의 속성까지 고려
# 유닉스, 리눅스에서 하위 계층 기반의 로우 소켓 생성
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x0800))
# socket.AF_INET -> 유닉스 리눅스 기준
# 0x0800 -> IP 패킷을 의미
Python
복사
UDP 헤더 직접 구현
source_port = 4321
destination_port = 22
length = 0
checksum = 0
udp_header = pack("!HHHH", source_port, destination_port, length, checksum)
Python
복사
TCP 헤더 직접 구현
from tabnanny import check
from numpy import source
import socket
source_port = 4321
destination_port = 22
sequence_number = 123
acknowledgment_number = 0
offset = 5
reserved = 0
offset = (offset << 4) + reserved
fin = 0
syn = 1
rst = 0
psh = 0
ack = 0
urg = 0
flags = (urg << 5) + (ack << 4) + (psh << 3) + (rst << 2) + (syn << 2) + (fin << 0)
window = socket.htons(5840)
checksum = 0
urgent_pointer = 0
tcp_header = pack("!HHLLBBHHH", source_port, destination_port, sequence_number, acknowledgment_number, offset, flags, window, checksum, urgent_pointer)
Python
복사
가상 헤더 구현 (pseudo header)
import socket
ip_source = "127.0.0.1"
source_ip_address = socket.inet_aton(ip_source)
ip_destination = "127.0.0.1"
destination_ip_address = socket.inet_aton(ip_destination)
placeholder = 0
protocol = socket.IPPROTO_TCP
tcp_length = len(tcp_header)
pseudo_header = pack("!4s4sBBH", source_ip_address, destination_ip_address, placeholder, protocol, tcp_length)
pseudo_header = pseudo_header + tcp_header
tcp_checksum = checksum(pseudo_header)
# TCP 헤더에서 오류 검사 항목이 활성 상태인 경우 구현하는 방식
Python
복사
TCP 페이로드 | TCP 헤더 | 가상 헤더 | IP 헤더 |
TCP 헤더의 오류검사 항목이 비활성 상태
## TCP 헤더의 오류검사 항목이 비활성 상태
import socket
from struct import *
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_TCP)
s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#### Payload Data ####
tcp_payload_data = b"Python3 Raw Socket!"
#### TCP Header ####
source_port = 4321
destination_port = 22
sequence_number = 123
acknowledgment_number = 0
offset = 5
reserved = 0
offset = (offset << 4) + reserved
fin = 0
syn = 1
rst = 0
psh = 0
ack = 0
urg = 0
flags = (urg << 5) + (ack << 4) + (psh << 3) + (rst << 2) + (syn << 1) + (fin << 0)
window = socket.htons(5840)
checksum = 0
urgent_pointer = 0
tcp_header = pack("!HHLLBBHHH", source_port, destination_port, sequence_number, acknowledgment_number, offset, flags, window, checksum, urgent_pointer)
#### IP Header ####
version = 4
header_length = 5
version_header_length = (version << 4) + header_length
tos = 0
total_length = 0
id = 4321
fragment_offset = 0
ttl = 255
protocol = socket.IPPROTO_TCP
header_checksum = 0
ip_source = "127.0.0.1"
source_ip_address = socket.inet_aton(ip_source)
ip_destination = "127.0.0.1"
destination_ip_address = socket.inet_aton(ip_destination)
ip_header = pack("!BBHHHBBH4s4s", version_header_length, tos, total_length, id, fragment_offset, ttl, header_checksum, source_ip_address, destination_ip_address)
#### IP Packet ####
ip_packet = tcp_payload_data + tcp_header + ip_header
print(s.sendto(ip_packet, (ip_destination, 0)))
Python
복사
TCP 헤더의 오류검사 항목이 활성 상태
import socket
from struct import *
def error_checksum(msg):
s = 0
for i in range(0, len(msg) , 2):
w = msg[i] + (msg[i + 1] << 8)
s = s + w
s = (s >> 16) + (s & 0xffff)
s = s + (s >> 6)
s = ~s & 0xfff
return s
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_TCP)
s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#### Payload Data ####
tcp_payload_data = b"Python3 Raw Socket!"
#### TCP Header ####
source_port = 4321
destination_port = 22
sequence_number = 123
acknowledgment_number = 0
offset = 5
reserved = 0
offset = (offset << 4) + reserved
fin = 0
syn = 1
rst = 0
psh = 0
ack = 0
urg = 0
flags = (urg << 5) + (ack << 4) + (psh << 3) + (rst << 2) + (syn << 1) + (fin << 0)
window = socket.htons(5840)
checksum = 0
urgent_pointer = 0
tcp_header = pack("!HHLLBBHHH", source_port, destination_port, sequence_number, acknowledgment_number, offset, flags, window, checksum, urgent_pointer)
#### 오류 검사 적용 전 TCP 헤더 생성 ####
#### Pseudo Header ####
ip_source = "127.0.0.1"
source_ip_address = socket.inet_aton(ip_source)
ip_destination = "127.0.0.1"
destination_ip_address = socket.inet_aton(ip_destination)
placeholder = 0
protocol = socket.IPPROTO_TCP
length = len(tcp_header) + len(tcp_payload_data)
pseudo_header = pack("!4s4sBBH", source_ip_address, destination_ip_address, placeholder, protocol, length)
pseudo_header = tcp_payload_data + tcp_header + pseudo_header
tcp_checksum = error_checksum(pseudo_header)
print("TCP Checksum: ", tcp_checksum)
#### 오류 검사 계산 결과 ####
tcp_header = pack("!HHLLBBHHH", source_port, destination_port, sequence_number, acknowledgment_number, offset, flags, window, tcp_checksum, urgent_pointer)
#### IP Header ####
version = 4
header_length = 5
version_header_length = (version << 4) + header_length
tos = 0
total_length = 0
id = 4321
fragment_offset = 0
ttl = 255
protocol = socket.IPPROTO_TCP
header_checksum = 0
ip_source = "127.0.0.1"
source_ip_address = socket.inet_aton(ip_source)
ip_destination = "127.0.0.1"
destination_ip_address = socket.inet_aton(ip_destination)
ip_header = pack("!BBHHHBBH4s4s", version_header_length, tos, total_length, id, fragment_offset, ttl, header_checksum, source_ip_address, destination_ip_address)
#### IP Packet ####
ip_packet = tcp_payload_data + tcp_header + ip_header
print(s.sendto(ip_packet, (ip_destination, 0)))
Python
복사
수신 관점에 따른 주요 헤더의 복원
하위 계층 기반의 원시 소켓 → 일반적으로 수신을 구현하는 방식
상위 계층 기반의 원시 소켓을 통해서도 수신을 구현할 수 있다
상위 계층 기반의 원시 소켓 생성을 통해 주요 헤더 복원
→ 패킷 스니핑을 구현하는 과정
unpack() 함수
•
pack() 함수와 달리
•
바이너리 문자열 타입을 10진수로 변경하는 기능을 수행
•
바이너리 문자열 타입 → 10진수
# IP 헤더(ip_header)에서 6번째 내용(ip_header[5])과 7번째 내용(ip_header[6])을 추출한다
ttl = ip_header[5]
protocol = ip_header[6]
Python
복사
# 수신인 경우
ip_source_address = socket.inet_ntoa(ip_header[8])
ip_destination_address = socket.inet_ntoa(ip_header[9])
Python
복사
→ 송신인 경우, inet_aton() 함수를 이용해, 10진수 형태의 IP 주소 문자열 → 네트워크(빅 엔디안) 방식
→ 수신인 경우, inet_ntoa() 함수를 이용해, 네트워크(빅 엔디안) 방식 → 10진수 형태의 IP 주소 문자열
→ 상위 계층 기반의 원시 소켓 방식 ⇒ 이더넷 헤더와 ARP 헤더를 복원할 수 없다
⇒ 하위 계층 기반의 원시 소켓 방식 필요
⇒ 하위 계층 기반은 물리 계층의 비트 단위에서부터 응용 계층의 메시지 단위까지 순차적으로 복원이 가능
⇒ (일반적인 복원 가정)
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.ntohs(0x080))
Python
복사
binascii.hexlify() 함수
import binascii
# 바이너리와 아스키 사이의 변환을 위해 binascii 모듈 호출
binascii.hexlify(b"python3")
# b'70791729923e33', 바이너리 -> 아스키
binascii.unhexlify("70791729923e33")
# b'python3', 아스키 -> 바이너리
binascii.b2a_hex(b"python3")
# b'707974686f6e33', 비이너리 -> 아스키
binascii.a2b_hex("707974686f6e33")
# b'python', 아스키 -> 바이너리
Python
복사
TCP/IP 전송 계층에서 IP 헤더를 TCP 헤더로 복원하는 과정 _ 상위 계층 기반
import socket
import struct
import binascii
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x0800))
data = s.recv(65565)
#### Ethernet Header ####
ethernet_header = data[0:14]
ethernet_header = struct.unpack("!6s6s2s", ethernet_header)
destination_MAC_address = (binascii.hexlify(ethernet_header[0])).decode()
source_MAC_address = (binascii.hexlify(ethernet_header[1])).decode()
type = (binascii.hexlify(ethernet_header[2])).decode()
print("Destination MAC Address: ", destination_MAC_address)
print("Source MAC Address: ", source_MAC_address)
print("Type: ", type)
print()
#### IP Header ####
ip_header = data[14:34]
ip_header = struct.unpack("!BBHHHBBH4s4s", ip_header)
version_ip_header_length = ip_header[0]
version = version_ip_header_length >> 4
ip_header_length = version_ip_header_length & 0xF
ip_header_length = ip_header_length * 4
ttl = ip_header[5]
protocol = ip_header[6]
ip_source_address = socket.inet_ntoa(ip_header[8])
ip_destination_address = socket.inet_ntoa(ip_header[9])
print("IP Header")
print("Version: ", str(version))
print("IP Header Length: ", str(ip_header_length))
print("TTL: ", str(ttl))
print("Protocol: ", str(protocol))
print("Source IP Address: ", str(ip_source_address))
print("Destination IP Address: ", str(ip_destination_address))
print()
#### TCP Header ####
tcp_header = data[34:54]
tcp_header = struct.unpack("!HHLLBBHHH", tcp_header)
source_port = tcp_header[0]
destination_port = tcp_header[1]
sequence_number = tcp_header[2]
acknowledgment_number = tcp_header[3]
offset_reserved = tcp_header[4]
tcp_header_length = offset_reserved >> 4
window = tcp_header[5]
checksum = tcp_header[6]
urgent_pointer = tcp_header[7]
print("TCP Header")
print("Source Port Number: ", str(source_port))
print("Sequence Number: ", str(sequence_number))
print("Acknowledgment Number: ", str(acknowledgment_number))
print("TCP Header Length: ", str(tcp_header_length))
print("Window: ", str(window))
print("Checksum: ", str(checksum))
print("Urgent Pointer: ", str(urgent_pointer))
print()
#### TCP Payload ####
ethernet_header = 14
ip_header = 20
tcp_header = 20
header_size = ethernet_header + ip_header + tcp_header
payload_data_size = len(data) - header_size
tcp_payload_data = data[header_size:]
print("TCP Payload Data")
print("TCP Payload Data: ", str(tcp_payload_data))
print()
Python
복사
UDP_ 하위 계층 기반
import socket
import struct
import binascii
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x0800))
data = s.recv(65565)
#### Ethernet Header ####
ethernet_header = data[0:14]
ethernet_header = struct.unpack("!6s6s2s", ethernet_header)
destination_MAC_address = (binascii.hexlify(ethernet_header[0])).decode()
source_MAC_address = (binascii.hexlify(ethernet_header[1])).decode()
type = (binascii.hexlify(ethernet_header[2])).decode()
print("Destination MAC Address: ", destination_MAC_address)
print("Source MAC Address: ", source_MAC_address)
print("Type: ", type)
print()
#### IP Header ####
ip_header = data[14:34]
ip_header = struct.unpack("!BBHHHBBH4s4s", ip_header)
version_ip_header_length = ip_header[0]
version = version_ip_header_length >> 4
ip_header_length = version_ip_header_length & 0xF
ip_header_length = ip_header_length * 4
ttl = ip_header[5]
protocol = ip_header[6]
ip_source_address = socket.inet_ntoa(ip_header[8])
ip_destination_address = socket.inet_ntoa(ip_header[9])
print("IP Header")
print("Version: ", str(version))
print("IP Header Length: ", str(ip_header_length))
print("TTL: ", str(ttl))
print("Protocol: ", str(protocol))
print("Source IP Address: ", str(ip_source_address))
print("Destination IP Address: ", str(ip_destination_address))
print()
#### UDP Header ####
udp_header = data[34:42]
udp_header = struct.unpack("!HHHH", udp_header)
source_port = udp_header[0]
destination_port = udp_header[1]
length = udp_header[2]
checksum = udp_header[3]
#### UDP Payload ####
ethernet_header = 14
ip_header = 20
udp_header = 8
header_size = ethernet_header + ip_header + udp_header
payload_data_size = len(data) - header_size
UDP_payload_data = data[header_size:]
Python
복사
이더넷 헤더에서 ARP 헤더를 복원
#### ARP Header ####
arp_header = data[14:42]
arp_header = struct.unpack("!2s2s1s1s2s6s4s6s4s", arp_header)
hardware_type = (binascii.hexlify(arp_header[0])).decode()
hardware_type = (binascii.hexlify(arp_header[1])).decode()
hardware_type = (binascii.hexlify(arp_header[2])).decode()
hardware_type = (binascii.hexlify(arp_header[3])).decode()
op_code = (binascii.hexlify(arp_header[4])).decode()
source_MAC_address = (binascii.hexlify(arp_header[5])).decode()
source_ip_address = socket.inet_ntoa(arp_header[6])
destination_MAC_address = (binascii.hexlify(arp_header[7])).decode()
destination_ip_address = socket.inet_ntoa(arp_header[8])
Python
복사
→ OP Code가 0001과 같은 경우 ⇒ 브로드캐스트 방식에 의한 ARP 요청을 의미
→ OP Code가 0002과 같은 경우 ⇒ 유니캐스트 방식에 의한 ARP 요청을 의미