-
Notifications
You must be signed in to change notification settings - Fork 4
Node‐media‐server
https://github.com/illuspas/Node-Media-Server
[node_media_server.js](https://github.com/illuspas/Node-Media-Server/blob/master/src/node_media_server.js)
node_media_server 의 진입점
이라고 봐도 될 것 같다.
node_media_server 는 내부적으로 앞으로 나올, 여러 개의 서버로 나뉘어진다.
[node_rtmp_server.js](https://github.com/illuspas/Node-Media-Server/blob/master/src/node_rtmp_server.js)
rtmp 프로토콜을 처리하는 서버로 rtmp 프로토콜 포트(기본적으로 1935)를 열어서 rtmp 데이터를 수신한다.
node_media_server 는 가장 먼저 node_rtmp_server 를 실행한다. (config 를 따로 지정해주면 그 config에 맞게끔 생성)
**class NodeMediaServer {**
constructor(config) {
this.config = config;
}
run() {
Logger.setLogType(this.config.logType);
Logger.log(`Node Media Server v${Package.version}`);
**// NODE_RTMP_SERVER 인스턴스 생성
if (this.config.rtmp) {
this.nrs = new NodeRtmpServer(this.config);
this.nrs.run();
}
// 아래에서 NODE_HTTP_SERVER, NODE_TRANS_SERVER, NODE_RELAY_SERVER, NODE_FISSION_SERVER 도 생성함.**
node_rtmp_server 는 rtmp(1935포트), rtmps(443포트) 프로토콜을 전부 지원하는 것 같다.
rtmps 는 rtmp 에 SSL/TLS 암호화를 추가한 버전이다.
config 를 통해서 rtmp 에 SSL/TLS 암호화를 설정해주면 추가로 tls 소켓(기본 443)을 하나 더 여는 것 같다.
여기서, https 도 443 포트를 쓰지만, 애초에 프로토콜부터 달라서 rtmps://xxx.xxx.xxx… 와 https://xxx.xxx.xxx/…. 로 구분되어서 들어오기 때문에 같은 443 포트여도 프로토콜을 구분할 수 있다.
추후에 rtmp 서버에 보안을 신경쓴다면 rtmps 로 업그레이드 할 수도 있겠다.
node_rtmp_server는 내부적으로 net
모듈을 사용해서 tcp 소켓 통신을 한다.
net.createServer (SSL/TLS 설정이 있다면 추가로 tls.createServer 도 진행) 로 tcp 서버를 생성하면서, 연결이 수립되었을 때 node_rtmp_session 인스턴스를 생성한다.
node_rtmp_session 은 socket 에 대한 정보가 있어서 socket.on(’data’) 같은 방식으로 활용할 수 있다.
[node_rtmp_session.js](https://github.com/illuspas/Node-Media-Server/blob/master/src/node_rtmp_session.js)
소스 코드를 열어보자마자 머리가 어지러워진다.
tcp 서버와 소켓 연결(tcp handshake 같은 것을 하고)이 되고나면, 이제는 rtmp 에 관련된 사전 작업을 node_rtmp_session 쪽에서 담당한다.
node_rtmp_session 은 랜덤 문자열로 이루어진 sessionID 를 id 로 가진다.
또한, 여러가지 상수들이 정의되어있고, res.write 를 위한 socket 에 대한 정보를 가지고 있다.
tcp handshake 처럼 rtmp 도,
(rtmp로 스트림을 송신하는 스트리머)스트리머
↔ rtmp 서버
↔ 시청자
(rtmp로 요청하는 시청자)
사이에서 handshake 를 한다.
tcp socket 을 통해 데이터가 들어올 때 ( socket.on(’data’, callback)
) 아래와 같은 handshake 를 수립한다.
우선 handshake 의 단계는 총 3단계이다.`
switch (this.handshakeState) {
1단계
let bytes = data.length;
let p = 0; // data 중에서 현재 가리키는 곳 (포인터)
let n = 0;
case RTMP_HANDSHAKE_UNINIT:
// Logger.log('RTMP_HANDSHAKE_UNINIT');
this.handshakeState = RTMP_HANDSHAKE_0;
this.handshakeBytes = 0;
bytes -= 1;
p += 1;
break;
우선 1단계부터 살펴보면, RTMP_HANDSHAKE_UNINIT 일 떄, 즉 초기화도 안된 맨 처음 상태일 때이다.
-
this.handshakeState 를
RTMP_HANDSHAKE_UNINIT
에서RTMP_HANDSHAKE_0
으로 한 단계 상승시켜주고, -
this.handshakeBytes 를 0으로 만든다.
왜 0으로 갑자기 초기화 하는걸까?
이는 앞으로 있을 2단계의 handshake 단계에 필요한 데이터들이 정해져있다.
const RTMP_HANDSHAKE_SIZE = 1536;
handshake 는 1536 바이트만큼의 데이터를 가지고 진행하는데 socket 을 통해서 1536 만큼이 한 번에 들어올 수도 있지만
여러 번에 걸쳐서 1536 만큼 들어올 수도 있다.
따라서 0으로 초기화시키고 1536 바이트만큼 데이터가 들어올 때까지 this.handshakeBytes 에 누적시키는 역할을 한다.
bytes -= 1 을 빼주는 이유는, 원래
RTMP_HANDSHAKE_UNINIT
단계는프로토콜의 버전이 일치하는지 확인
하는 단계이다.이때 클라이언트가 서버로 프로토콜의 버전을 1바이트에 담아서 보낸다. 다시 말해서, 서버로 넘어온 맨 첫 번째 1 바이트는 프로토콜의 버전이다.
const RTMP_VERSION = 3;
코드를 뜯어보면 RTMP_VERSION 을 3이라고 상수로 지정해놨는데 현재는 클라이언트와 프로토콜 버전을 비교하는 로직이 없다. 왜 없어졌을까 이건
아무튼 원래라면 RTMP_VERSION 이 일치하는지 확인을 해야하고, 서버의 프로토콜 버전도 클라이언트로 보내서 클라이언트 측에서 일치하는지도 확인해야하는데 이러한 과정이 없는 것 같다. 굳이 중요하지 않다고 판단한걸까
2단계
case RTMP_HANDSHAKE_0:
// Logger.log('RTMP_HANDSHAKE_0');
n = RTMP_HANDSHAKE_SIZE - this.handshakeBytes;
n = n <= bytes ? n : bytes;
data.copy(this.handshakePayload, this.handshakeBytes, p, p + n);
this.handshakeBytes += n;
bytes -= n;
p += n;
if (this.handshakeBytes === RTMP_HANDSHAKE_SIZE) {
this.handshakeState = RTMP_HANDSHAKE_1;
this.handshakeBytes = 0;
let s0s1s2 = Handshake.generateS0S1S2(this.handshakePayload);
this.socket.write(s0s1s2);
}
break;
RTMP_HANDSHAKE_0 단계에서는,
클라이언트에서 서버로, 1536 바이트만큼의 랜덤 데이터를 생성해서 보낸다.
마찬가지로 서버도 클라이언트로 1536 바이트만큼의 랜덤 데이터를 생성해서 보낸다.
3단계
case RTMP_HANDSHAKE_1:
// Logger.log('RTMP_HANDSHAKE_1');
n = RTMP_HANDSHAKE_SIZE - this.handshakeBytes;
n = n <= bytes ? n : bytes;
data.copy(this.handshakePayload, this.handshakeBytes, p, n);
this.handshakeBytes += n;
bytes -= n;
p += n;
if (this.handshakeBytes === RTMP_HANDSHAKE_SIZE) {
this.handshakeState = RTMP_HANDSHAKE_2;
this.handshakeBytes = 0;
this.handshakePayload = null;
}
break;
case RTMP_HANDSHAKE_2:
}
클라이언트는 서버로부터 2단계에서 받은 랜덤 데이터를 그대로 다시 서버로 보낸다.
원래는 여기에서도, 서버가 2단계에서 보낸 데이터를 정말 그대로 다시 돌려보냈는지 확인하는 절차가 있다고 한다.
node-media-server 는 그냥 무엇이든 다시 돌아오기만 하면 handshake 가 정상적으로 되었다고 판단하는 듯 하다.
handshake 가 정상적으로 끝났다면 이제 들어오는 데이터는 전부 rtmp 데이터이다. 이제는 rtmp chunk 데이터를 읽어야 한다.
rtmp chunk 도 읽는 단계가 총 5단계로 나뉜다.
너무 많으므로 GPT 의 힘을 잠깐 빌리자면
RTMP 청크를 파싱하기 위해 사용되는 상태들은 다음과 같습니다:
-
RTMP_PARSE_INIT
: 초기 상태로, 기본 헤더의 첫 번째 바이트를 처리합니다. -
RTMP_PARSE_BASIC_HEADER
: 기본 헤더의 나머지 바이트를 처리합니다. -
RTMP_PARSE_MESSAGE_HEADER
: 메시지 헤더를 처리합니다. -
RTMP_PARSE_EXTENDED_TIMESTAMP
: 확장된 타임스탬프를 처리합니다. -
RTMP_PARSE_PAYLOAD
: 페이로드 데이터를 처리합니다.
case RTMP_PARSE_INIT:
this.parserBytes = 1;
this.parserBuffer[0] = data[p + offset++];
if (0 === (this.parserBuffer[0] & 0x3f)) {
this.parserBasicBytes = 2;
} else if (1 === (this.parserBuffer[0] & 0x3f)) {
this.parserBasicBytes = 3;
} else {
this.parserBasicBytes = 1;
}
this.parserState = RTMP_PARSE_BASIC_HEADER;
break;
-
목적: 기본 헤더의 첫 번째 바이트를 읽어들여 청크 유형과 채널 ID를 확인합니다.
-
구현:
this.parserBytes = 1; this.parserBuffer[0] = data[p + offset++];
-
this.parserBytes = 1
: 기본 헤더의 첫 번째 바이트를 읽었기 때문에, 현재까지 읽은 바이트 수를 1로 초기화합니다. -
data[p + offset++]
: 데이터의 첫 번째 바이트를 **this.parserBuffer
*에 저장하고, **offset
*을 증가시켜 다음 바이트를 가리키도록 합니다.
-
-
기본 헤더의 길이(
parserBasicBytes
)를 결정하는 조건문을 통해, 채널 ID에 따라 1, 2, 3 바이트 길이의 헤더를 설정합니다.
-
목적: 기본 헤더의 나머지 바이트를 읽어들입니다.
-
구현:
while (this.parserBytes < this.parserBasicBytes && offset < bytes) { this.parserBuffer[this.parserBytes++] = data[p + offset++]; }
-
this.parserBytes < this.parserBasicBytes
: 아직 기본 헤더의 모든 바이트를 읽지 않았다면, 계속 데이터를 읽습니다. - 기본 헤더의 모든 바이트를 읽은 후에는 **다음 상태 (
RTMP_PARSE_MESSAGE_HEADER
)**로 전환합니다.
-
-
목적: 메시지 헤더를 읽어들여 타임스탬프, 메시지 길이 등을 파싱합니다.
-
구현:
size = rtmpHeaderSize[this.parserBuffer[0] >> 6] + this.parserBasicBytes; while (this.parserBytes < size && offset < bytes) { this.parserBuffer[this.parserBytes++] = data[p + offset++]; } if (this.parserBytes >= size) { this.rtmpPacketParse(); this.parserState = RTMP_PARSE_EXTENDED_TIMESTAMP; }
- *
size
*는 메시지 헤더의 크기를 결정하며, 헤더 타입에 따라 다릅니다. - 메시지 헤더가 모두 읽히면 **
rtmpPacketParse()
*를 호출하여 메시지 헤더의 내용을 해석합니다. - 이후, 확장된 타임스탬프(
RTMP_PARSE_EXTENDED_TIMESTAMP
) 상태로 전환합니다.
- *
-
목적: 타임스탬프가 0xFFFFFF일 경우, 확장된 타임스탬프를 처리합니다.
-
구현:
if (this.parserPacket.header.timestamp === 0xffffff) size += 4; while (this.parserBytes < size && offset < bytes) { this.parserBuffer[this.parserBytes++] = data[p + offset++]; } if (this.parserBytes >= size) { if (this.parserPacket.header.timestamp === 0xffffff) { extended_timestamp = this.parserBuffer.readUInt32BE(rtmpHeaderSize[this.parserPacket.header.fmt] + this.parserBasicBytes); } else { extended_timestamp = this.parserPacket.header.timestamp; } }
-
확장된 타임스탬프는 기본 타임스탬프가 최대값(
0xFFFFFF
)에 도달했을 때 사용되며, 이 경우 4바이트를 추가로 읽어 타임스탬프를 해석합니다. - 확장된 타임스탬프를 읽은 후 타임스탬프 값을 설정하고,
RTMP_PARSE_PAYLOAD
상태로 전환합니다.
-
확장된 타임스탬프는 기본 타임스탬프가 최대값(
-
목적: 메시지의 페이로드 데이터를 읽어들여 실제 스트리밍 데이터를 처리합니다.
-
구현:
size = Math.min(this.inChunkSize - (this.parserPacket.bytes % this.inChunkSize), this.parserPacket.header.length - this.parserPacket.bytes); size = Math.min(size, bytes - offset); if (size > 0) { data.copy(this.parserPacket.payload, this.parserPacket.bytes, p + offset, p + offset + size); } this.parserPacket.bytes += size; offset += size;
-
청크 크기 (
inChunkSize
) 단위로 데이터를 읽어들입니다.inChunkSize
는 수신할 수 있는 최대 청크 크기를 설정하는 값입니다. -
페이로드 데이터를 **
parserPacket.payload
*에 복사하며, 현재까지 읽은 바이트 수를 추적합니다. - 페이로드가 모두 읽히면 **
rtmpHandler()
*를 호출하여 데이터를 처리하며, 초기 상태(RTMP_PARSE_INIT
)로 전환하여 다음 청크를 처리할 준비를 합니다.
-
청크 크기 (
-
inAckSize
및inLastAck
:- *
inAckSize
*는 현재까지 수신한 바이트 수를 추적하는 변수로, 일정한 크기 이상의 데이터를 수신하면 ACK (Acknowledgment) 패킷을 전송합니다. - *
this.sendACK(this.inAckSize)
*를 통해 클라이언트에게 ACK 메시지를 보내며, 데이터가 정상적으로 수신되었음을 알립니다.
- *
-
비트레이트 계산:
-
비트레이트는 수신된 바이트 수와 경과 시간(
diff
)을 사용하여 계산됩니다. - 이를 통해 실시간 스트리밍 데이터의 속도를 추적하며, 네트워크 상태를 모니터링하거나 최적화하는 데 사용됩니다.
-
비트레이트는 수신된 바이트 수와 경과 시간(
페이로드에 rtmp 스트림 데이터가 잘 들어왔다면 이제는 데이터를 처리해야한다.
rtmpHandler() {
switch (this.parserPacket.header.type) {
case RTMP_TYPE_SET_CHUNK_SIZE:
case RTMP_TYPE_ABORT:
case RTMP_TYPE_ACKNOWLEDGEMENT:
case RTMP_TYPE_WINDOW_ACKNOWLEDGEMENT_SIZE:
case RTMP_TYPE_SET_PEER_BANDWIDTH:
return 0 === this.rtmpControlHandler() ? -1 : 0;
case RTMP_TYPE_EVENT:
return 0 === this.rtmpEventHandler() ? -1 : 0;
case RTMP_TYPE_AUDIO:
return this.rtmpAudioHandler();
case RTMP_TYPE_VIDEO:
return this.rtmpVideoHandler();
case RTMP_TYPE_FLEX_MESSAGE:
case RTMP_TYPE_INVOKE:
return this.rtmpInvokeHandler();
case RTMP_TYPE_FLEX_STREAM: // AMF3
case RTMP_TYPE_DATA: // AMF0
return this.rtmpDataHandler();
}
}
페이로드에 담긴 패킷의 헤더에 따라 처리가 조금씩 다르다.
이것도 중요한 것을 제외하고는 GPT 의 힘을 빌리자.
-
rtmpControlHandler()
: 제어 메시지를 처리하여 청크 크기 변경, ACK 처리, 대역폭 설정 등을 수행합니다. -
rtmpEventHandler()
: 스트림 이벤트(시작, 종료, 건조 등)와 같은 상태 변화를 처리합니다. -
rtmpAudioHandler()
: 오디오 데이터를 처리하여 스트리밍 오디오를 관리합니다. -
rtmpVideoHandler()
: 비디오 데이터를 처리하여 스트리밍 비디오를 관리합니다. -
rtmpInvokeHandler()
: 클라이언트와 서버 간의 명령 호출을 처리합니다. (예:connect()
,publish()
,play()
등) -
rtmpDataHandler()
: 메타데이터와 같은 스트림 관련 데이터를 처리합니다.
- 🚀 웹 소켓의 실시간 양방향 통신 (feat. WS vs Socket.io)
- 🤸♂️ 비제어 컴포넌트로 렌더링 최적화 하기
- 👷♀️ Shared Worker로 클라이언트의 소켓 통신 개선하기
- 👨👩👧👦 다중 탭에서 하나의 소켓을 공유할 수 있을까?
- 🚀 [Socket.io] 클라이언트의 실시간 채팅 구현기
- ☕ NestJS를 통한 일관적인 시스템 설계
✈️ 외부의 사용자가 Object Storage에 접근하지 못하는 권한 제어- 🦢 nestjs에서 swagger 사용해보기
- 🛐 NestJS Nginx Request Data Size 문제
- 🔁 다시보기를 위한 Node‐Media‐Server, FFMpeg 분석