From a64005759f9408b92648f9f8cabc3f6f8e77da9e Mon Sep 17 00:00:00 2001 From: bgrozev Date: Fri, 1 Apr 2022 16:15:10 -0500 Subject: [PATCH] Remove colibri1 (#898) * Remove colibri v1 classes. * ref: Remove ColibriConfig. * Remove injected SSRCs and leftovers. * Remove GID. * Remove rtpDescriptions from colibri API. * Remove redundant API call. * Clean up the colibri API a bit. * ref: Remove the reInvite flag from colibri. --- .../java/org/jitsi/jicofo/FocusManager.java | 70 +- .../conference/JitsiMeetConferenceImpl.java | 100 +- .../conference/ParticipantInviteRunnable.java | 11 +- .../conference/colibri/v1/BridgeSession.java | 406 -------- .../colibri/v1/ColibriConference.java | 240 ----- .../colibri/v1/ColibriConferenceImpl.java | 941 ------------------ .../colibri/v1/OctoChannelAllocator.java | 332 ------ .../colibri/v1/OctoParticipant.java | 271 ----- .../jitsi/jicofo/lipsynchack/LipSyncHack.java | 3 +- .../org/jitsi/jicofo/ConferenceConfig.kt | 6 - .../kotlin/org/jitsi/jicofo/JicofoServices.kt | 2 - .../kotlin/org/jitsi/jicofo/OctoConfig.kt | 7 - .../org/jitsi/jicofo/bridge/BridgeSelector.kt | 11 +- .../conference/colibri/ColibriConfig.kt | 32 - .../colibri/ColibriSessionManager.kt | 38 +- .../conference/colibri/v1/ColibriException.kt | 53 - .../colibri/v1/ColibriV1SessionManager.kt | 597 ----------- .../conference/colibri/v1/ParticipantInfo.kt | 46 - .../colibri/v2/ColibriV2SessionManager.kt | 45 +- .../conference/source/ConferenceSourceMap.kt | 37 +- .../conference/source/EndpointSourceSet.kt | 4 +- .../jitsi/jicofo/conference/source/Source.kt | 12 +- src/main/resources/reference.conf | 12 - src/test/java/mock/MockParticipant.java | 2 +- .../AllocThreadingTestColibriConference.java | 235 ----- .../kotlin/org/jitsi/jicofo/OctoConfigTest.kt | 10 - .../jitsi/jicofo/bridge/BridgeSelectorTest.kt | 4 +- .../ParticipantInviteRunnableTest.kt | 2 +- .../source/ConferenceSourceMapTest.kt | 46 +- .../source/EndpointSourceSetTest.kt | 68 +- .../jicofo/conference/source/SourceTest.kt | 6 +- 31 files changed, 76 insertions(+), 3573 deletions(-) delete mode 100644 src/main/java/org/jitsi/jicofo/conference/colibri/v1/BridgeSession.java delete mode 100644 src/main/java/org/jitsi/jicofo/conference/colibri/v1/ColibriConference.java delete mode 100644 src/main/java/org/jitsi/jicofo/conference/colibri/v1/ColibriConferenceImpl.java delete mode 100644 src/main/java/org/jitsi/jicofo/conference/colibri/v1/OctoChannelAllocator.java delete mode 100644 src/main/java/org/jitsi/jicofo/conference/colibri/v1/OctoParticipant.java delete mode 100644 src/main/kotlin/org/jitsi/jicofo/conference/colibri/ColibriConfig.kt delete mode 100644 src/main/kotlin/org/jitsi/jicofo/conference/colibri/v1/ColibriException.kt delete mode 100644 src/main/kotlin/org/jitsi/jicofo/conference/colibri/v1/ColibriV1SessionManager.kt delete mode 100644 src/main/kotlin/org/jitsi/jicofo/conference/colibri/v1/ParticipantInfo.kt delete mode 100644 src/test/java/mock/xmpp/colibri/AllocThreadingTestColibriConference.java diff --git a/src/main/java/org/jitsi/jicofo/FocusManager.java b/src/main/java/org/jitsi/jicofo/FocusManager.java index 7ce1ac49a8..4439fdd708 100644 --- a/src/main/java/org/jitsi/jicofo/FocusManager.java +++ b/src/main/java/org/jitsi/jicofo/FocusManager.java @@ -49,11 +49,6 @@ public class FocusManager */ private final static Logger logger = new LoggerImpl(FocusManager.class.getName()); - /** - * The pseudo-random generator which is to be used when generating IDs. - */ - private static final Random RANDOM = new Random(); - /** * The thread that expires {@link JitsiMeetConference}s. */ @@ -75,13 +70,7 @@ public class FocusManager private final List conferencesCache = new CopyOnWriteArrayList<>(); /** - * The set of the IDs of conferences in {@link #conferences}. - */ - private final Set conferenceGids = new HashSet<>(); - - /** - * The object used to synchronize access to {@link #conferences} and - * {@link #conferenceGids}. + * The object used to synchronize access to {@link #conferences}. */ private final Object conferencesSyncRoot = new Object(); @@ -97,12 +86,6 @@ public class FocusManager */ private final Statistics statistics = new Statistics(); - /** - * The ID of this Jicofo instance, used to generate conference GIDs. The special value 0 is valid in the Octo - * protocol, but only used when no value is explicitly configured. - */ - private int octoId; - /** * Clock to use for pin timeouts. */ @@ -130,26 +113,6 @@ public FocusManager() public void start() { expireThread.start(); - - int octoId = 0; - Integer configuredId = OctoConfig.config.getId(); - if (configuredId != null) - { - octoId = configuredId; - } - if (octoId < 1 || octoId > 0xffff) - { - logger.warn( - "Jicofo ID is not set correctly set (value=" + octoId + "). Configure a valid value [1-65535] by " - + "setting org.jitsi.jicofo.SHORT_ID in sip-communicator.properties or jicofo.octo.id in jicofo.conf. " - + "Future versions will require this for Octo."); - this.octoId = 0; - } - else - { - logger.info("Initialized octoId=" + octoId); - this.octoId = octoId; - } } /** @@ -286,17 +249,15 @@ private JitsiMeetConferenceImpl createConference( JitsiMeetConferenceImpl conference; synchronized (conferencesSyncRoot) { - long id = generateConferenceId(); String jvbVersion = getBridgeVersionForConference(room); conference = new JitsiMeetConferenceImpl( room, this, config, logLevel, - id, jvbVersion, includeInStatistics); + jvbVersion, includeInStatistics); conferences.put(room, conference); conferencesCache.add(conference); - conferenceGids.add(id); } if (includeInStatistics) @@ -307,27 +268,6 @@ private JitsiMeetConferenceImpl createConference( return conference; } - /** - * Generates a conference ID which is currently not used by an existing - * conference in a specific format (6 hexadecimal symbols). - * @return the generated ID. - */ - private long generateConferenceId() - { - long id; - - synchronized (conferencesSyncRoot) - { - do - { - id = (octoId << 16) | RANDOM.nextInt(0x1_0000); - } - while (conferenceGids.contains(id)); - } - - return id; - } - /** * Destroys the conference for given room name. * @param roomName full MUC room name to destroy. @@ -365,7 +305,6 @@ public void conferenceEnded(JitsiMeetConferenceImpl conference) { conferences.remove(roomName); conferencesCache.remove(conference); - conferenceGids.remove(conference.getId()); // It is not clear whether the code below necessarily needs to // hold the lock or not. @@ -554,11 +493,6 @@ public JSONObject getStats() return statistics; } - public boolean isJicofoIdConfigured() - { - return octoId != 0; - } - @NotNull OrderedJsonObject getDebugState(boolean full) { diff --git a/src/main/java/org/jitsi/jicofo/conference/JitsiMeetConferenceImpl.java b/src/main/java/org/jitsi/jicofo/conference/JitsiMeetConferenceImpl.java index 4b5ece3d9f..cc69ede4a8 100644 --- a/src/main/java/org/jitsi/jicofo/conference/JitsiMeetConferenceImpl.java +++ b/src/main/java/org/jitsi/jicofo/conference/JitsiMeetConferenceImpl.java @@ -23,7 +23,6 @@ import org.jitsi.jicofo.auth.*; import org.jitsi.jicofo.bridge.*; import org.jitsi.jicofo.conference.colibri.*; -import org.jitsi.jicofo.conference.colibri.v1.*; import org.jitsi.jicofo.conference.colibri.v2.*; import org.jitsi.jicofo.conference.source.*; import org.jitsi.jicofo.lipsynchack.*; @@ -218,8 +217,6 @@ public class JitsiMeetConferenceImpl */ private boolean videoLimitReached = false; - private final long gid; - /** * Requested bridge version from a pin. null if not pinned. */ @@ -245,7 +242,6 @@ public JitsiMeetConferenceImpl( ConferenceListener listener, @NotNull JitsiMeetConfig config, Level logLevel, - long gid, String jvbVersion, boolean includeInStatistics) { @@ -260,21 +256,12 @@ public JitsiMeetConferenceImpl( this.includeInStatistics = includeInStatistics; this.jicofoServices = Objects.requireNonNull(JicofoServices.jicofoServicesSingleton); - this.gid = gid; this.jvbVersion = jvbVersion; - if (ColibriConfig.config.getEnableColibri2()) - { - colibriSessionManager = new ColibriV2SessionManager( - jicofoServices.getXmppServices().getServiceConnection().getXmppConnection(), - jicofoServices.getBridgeSelector(), - this, - logger); - } - else - { - colibriSessionManager - = new ColibriV1SessionManager(jicofoServices, gid, this, colibriRequestCallback, logger); - } + colibriSessionManager = new ColibriV2SessionManager( + jicofoServices.getXmppServices().getServiceConnection().getXmppConnection(), + jicofoServices.getBridgeSelector(), + this, + logger); colibriSessionManager.addListener(colibriSessionManagerListener); logger.info("Created new conference."); @@ -285,10 +272,9 @@ public JitsiMeetConferenceImpl( ConferenceListener listener, @NotNull JitsiMeetConfig config, Level logLevel, - long gid, String jvbVersion) { - this(roomName, listener, config, logLevel, gid, jvbVersion, false); + this(roomName, listener, config, logLevel, jvbVersion, false); } /** @@ -1085,10 +1071,9 @@ public StanzaError onSessionTerminate(@NotNull JingleSession session, JingleIQ i */ private void propagateNewSources(Participant sourceOwner, ConferenceSourceMap sources) { - final ConferenceSourceMap finalSources = sources - .copy() - .strip(ConferenceConfig.config.stripSimulcast(), true) - .unmodifiable(); + final ConferenceSourceMap finalSources = (ConferenceConfig.config.stripSimulcast()) + ? sources.copy().stripSimulcast().unmodifiable() + : sources.copy().unmodifiable(); if (finalSources.isEmpty()) { logger.debug("No new sources to propagate."); @@ -1117,7 +1102,7 @@ public void onTransportInfo(@NotNull JingleSession session, List getRtpDescriptions( - @NotNull List contents) - { - Map rtpDescriptions = new HashMap<>(); - for (ContentPacketExtension content : contents) - { - RtpDescriptionPacketExtension rtpDescription - = content.getFirstChildOfType(RtpDescriptionPacketExtension.class); - if (rtpDescription != null) - { - rtpDescriptions.put(content.getName(), rtpDescription); - } - } - - return rtpDescriptions; - } - /** * Find the first {@link IceUdpTransportPacketExtension} in a list of Jingle contents. */ @@ -1416,10 +1368,11 @@ private StanzaError removeSources( return null; } - colibriSessionManager.removeSources( + colibriSessionManager.updateParticipant( participant, - sourcesAcceptedToBeRemoved, - removeColibriSourcesFromLocalBridge); + null, + participant.getSources(), + !removeColibriSourcesFromLocalBridge); if (sendSourceRemove) { @@ -1436,10 +1389,9 @@ private StanzaError removeSources( */ private void sendSourceRemove(ConferenceSourceMap sources, Participant except) { - final ConferenceSourceMap finalSources = sources - .copy() - .strip(ConferenceConfig.config.stripSimulcast(), true) - .unmodifiable(); + final ConferenceSourceMap finalSources = ConferenceConfig.config.stripSimulcast() + ? sources.copy().stripSimulcast().unmodifiable() + : sources.copy().unmodifiable(); if (finalSources.isEmpty()) { logger.debug("No sources to remove."); @@ -1616,8 +1568,6 @@ public OrderedJsonObject getDebugState() o.put("conference_sources", conferenceSources.toJson()); o.put("audio_limit_reached", audioLimitReached); o.put("video_limit_reached", videoLimitReached); - o.put("gid", gid); - return o; } @@ -1704,14 +1654,6 @@ public int getParticipantCount() return participants.size(); } - /** - * Conference ID. - */ - public long getId() - { - return gid; - } - /** * Get pinned bridge version. Returns null if not pinned. */ @@ -2104,9 +2046,9 @@ public void numVideoSendersChanged(int numVideoSenders) } /** - * Listener for events from {@link ColibriV1SessionManager}. + * Listener for events from {@link ColibriSessionManager}. */ - private class ColibriSessionManagerListener implements ColibriV1SessionManager.Listener + private class ColibriSessionManagerListener implements ColibriSessionManager.Listener { @Override public void bridgeCountChanged(int bridgeCount) diff --git a/src/main/java/org/jitsi/jicofo/conference/ParticipantInviteRunnable.java b/src/main/java/org/jitsi/jicofo/conference/ParticipantInviteRunnable.java index d388454aaa..2ca9a87158 100644 --- a/src/main/java/org/jitsi/jicofo/conference/ParticipantInviteRunnable.java +++ b/src/main/java/org/jitsi/jicofo/conference/ParticipantInviteRunnable.java @@ -179,7 +179,7 @@ private void doRun() ColibriAllocation colibriAllocation; try { - colibriAllocation = colibriSessionManager.allocate(participant, offer.getContents(), reInvite); + colibriAllocation = colibriSessionManager.allocate(participant, offer.getContents()); } catch (BridgeSelectionFailedException e) { @@ -357,10 +357,6 @@ else if (!canceled) // not trigger a retry here. meetConference.onInviteFailed(this); } - else if (reInvite) - { - colibriSessionManager.updateParticipant(participant, null, null, null); - } // TODO: include force-mute in the initial allocation, instead of sending 2 additional colibri messages. if (chatRoom != null && !participant.hasModeratorRights()) @@ -456,7 +452,10 @@ private boolean doInviteOrReinvite(Jid address, Offer offer, ColibriAllocation c ConferenceSourceMap conferenceSources = meetConference.getSources().copy(); // Add the bridge's feedback sources. conferenceSources.add(colibriAllocation.getSources()); - conferenceSources.strip(ConferenceConfig.config.stripSimulcast(), true); + if (ConferenceConfig.config.stripSimulcast()) + { + conferenceSources.stripSimulcast(); + } conferenceSources.stripByMediaType(participant.getSupportedMediaTypes()); // Remove the participant's own sources (if they're present) conferenceSources.remove(participant.getMucJid()); diff --git a/src/main/java/org/jitsi/jicofo/conference/colibri/v1/BridgeSession.java b/src/main/java/org/jitsi/jicofo/conference/colibri/v1/BridgeSession.java deleted file mode 100644 index eeda31b32d..0000000000 --- a/src/main/java/org/jitsi/jicofo/conference/colibri/v1/BridgeSession.java +++ /dev/null @@ -1,406 +0,0 @@ -/* - * Jicofo, the Jitsi Conference Focus. - * - * Copyright @ 2015-Present 8x8, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jitsi.jicofo.conference.colibri.v1; - -import edu.umd.cs.findbugs.annotations.*; -import org.jetbrains.annotations.*; -import org.jitsi.impl.protocol.xmpp.*; -import org.jitsi.jicofo.*; -import org.jitsi.jicofo.bridge.*; -import org.jitsi.jicofo.conference.*; -import org.jitsi.jicofo.conference.colibri.*; -import org.jitsi.jicofo.conference.source.*; -import org.jitsi.utils.logging2.*; -import org.jitsi.xmpp.extensions.colibri.*; -import org.jivesoftware.smack.*; - -import java.util.*; - -/** - * Represents a colibri session to a specific {@link Bridge} instance in a specific conference. - */ -class BridgeSession -{ - private final static Random RANDOM = new Random(); - private final Logger logger; - - /** - * The {@link Bridge}. - */ - @NotNull final Bridge bridge; - - /** - * The bridge session's id. - *

- * At the time of this writing it's used to distinguish between current - * and outdated ICE failed notifications coming from the client. - *

- * It can often happen that during a bridge failure multiple clients - * will send ICE failed messages because all of them will have - * connectivity broken. Jicofo will mark the bridge as unhealthy when - * processing the first notification and any following ones should be - * discarded. - */ - public final String id = Integer.toHexString(RANDOM.nextInt(0x1_000000)); - - /** - * The list of participants in the conference which use this - * {@link BridgeSession}. - */ - @NotNull final List participants = new LinkedList<>(); - - /** - * The {@link ColibriConference} instance used to communicate with - * the jitsi-videobridge represented by this {@link BridgeSession}. - */ - public final ColibriConference colibriConference; - - /** - * The single {@link OctoParticipant} for this bridge session, if any. - */ - private OctoParticipant octoParticipant; - - /** - * Indicates if the bridge used in this conference is faulty. We use - * this flag to skip channel expiration step when the conference is being - * disposed of. - */ - public boolean hasFailed = false; - - @NotNull private final ColibriV1SessionManager colibriSessionManager; - - @NotNull private final ColibriRequestCallback colibriRequestCallback; - - /** - * Initializes a new {@link BridgeSession} instance. - * - * @param bridge the {@link Bridge} which the new - * {@link BridgeSession} instance is to represent. - */ - BridgeSession( - @NotNull JitsiMeetConferenceImpl jitsiMeetConference, - @NotNull ColibriV1SessionManager colibriSessionManager, - @NotNull ColibriRequestCallback colibriRequestCallback, - @NonNull AbstractXMPPConnection xmppConnection, - @NotNull Bridge bridge, - long gid, - @NotNull Logger parentLogger) - { - this.colibriSessionManager = colibriSessionManager; - this.colibriRequestCallback = colibriRequestCallback; - this.bridge = bridge; - this.colibriConference = new ColibriConferenceImpl(xmppConnection); - colibriConference.setName(jitsiMeetConference.getRoomName()); - colibriConference.setGID(Long.toHexString(gid)); - colibriConference.setRtcStatsEnabled(jitsiMeetConference.getConfig().getRtcStatsEnabled()); - colibriConference.setCallStatsEnabled(jitsiMeetConference.getConfig().getCallStatsEnabled()); - ChatRoom chatRoom = jitsiMeetConference.getChatRoom(); - if (chatRoom != null) - { - String meetingId = chatRoom.getMeetingId(); - if (meetingId != null) - { - colibriConference.setMeetingId(meetingId); - } - } - colibriConference.setJitsiVideobridge(bridge.getJid()); - - logger = parentLogger.createChildLogger(BridgeSession.class.getName()); - logger.addContext("bs_id", id); - } - - void addParticipant(Participant participant) - { - participants.add(participant); - bridge.endpointAdded(); - } - - /** - * Disposes of this {@link BridgeSession}, attempting to expire the - * COLIBRI conference. - */ - void dispose() - { - // We will not expire channels if the bridge is faulty or when our connection is down. - if (!hasFailed) - { - colibriConference.expireConference(); - } - else - { - // TODO: make sure this doesn't block waiting for a response - colibriConference.dispose(); - } - - // TODO: should we terminate (or clear) #participants? - } - - /** - * Expires the COLIBRI channels for all participants. - * - * @return the list of participants which were removed from - * {@link #participants} as a result of this call (does not include - * the Octo participant). - */ - List terminateAll() - { - List terminatedParticipants = new LinkedList<>(); - // sync on what? - for (Participant participant : new LinkedList<>(participants)) - { - ParticipantInfo participantInfo = colibriSessionManager.getParticipantInfo(participant); - if (participantInfo != null && participantInfo.getHasColibriSession()) - { - terminatedParticipants.add(participant); - participant.setInviteRunnable(null); - participantInfo.setTransport(null); - participantInfo.setColibriChannels(null); - participantInfo.setColibriAllocation(null); - participantInfo.setHasColibriSession(false); - } - } - - if (octoParticipant != null) - { - terminateOctoParticipant(); - } - - return terminatedParticipants; - } - - /** - * Expires the COLIBRI channels allocated for a specific {@link Participant} and removes the participant from - * {@link #participants}. - * - * @param participant the {@link Participant} for which to expire the COLIBRI channels. - * @return {@code true} if the participant was a member of {@link #participants} and was removed as a result of - * this call, and {@code false} otherwise. - */ - public boolean terminate(@NotNull Participant participant, ParticipantInfo participantInfo) - { - boolean removed = participants.remove(participant); - - ColibriConferenceIQ channelsInfo = participantInfo == null ? null : participantInfo.getColibriChannels(); - if (channelsInfo != null && !hasFailed) - { - logger.info("Expiring channels for: " + participant + " on: " + bridge); - colibriConference.expireChannels(channelsInfo); - } - else - { - logger.warn("No channels to be expired for participant " + participant); - } - - return removed; - } - - private void terminateOctoParticipant() - { - OctoParticipant participant = octoParticipant; - if (participant != null) - { - participant.setChannelAllocator(null); - } - octoParticipant = null; - } - - /** - * Sends a COLIBRI message which updates the channels for a particular - * {@link Participant} in this {@link BridgeSession}, setting the - * participant's RTP description, sources, transport information, etc. - */ - void updateColibriChannels(Participant participant) - { - ParticipantInfo participantInfo = colibriSessionManager.getParticipantInfo(participant); - colibriConference.updateChannelsInfo( - participantInfo == null ? null : participantInfo.getColibriChannels(), - participantInfo == null ? null : participantInfo.getRtpDescriptionMap(), - participant.getSources(), - participantInfo == null ? null : participantInfo.getTransport(), - participant.getEndpointId(), - null); - } - - /** - * Sends a COLIBRI message which updates the channels for the Octo - * participant in this {@link BridgeSession}. - */ - private void updateColibriOctoChannels(OctoParticipant octoParticipant) - { - if (octoParticipant != null) - { - colibriConference.updateChannelsInfo( - octoParticipant.getColibriChannelsInfo(), - null, - octoParticipant.getSources(), - null, - null, - octoParticipant.getRelays()); - } - } - - /** - * Returns the Octo participant for this {@link BridgeSession}. If - * a participant doesn't exist yet, it is created. - * - * @return the {@link OctoParticipant} for this {@link BridgeSession}. - */ - private OctoParticipant getOrCreateOctoParticipant() - { - if (octoParticipant != null) - { - return octoParticipant; - } - - List remoteRelays = colibriSessionManager.getAllRelays(bridge.getRelayId()); - return getOrCreateOctoParticipant(new LinkedList<>(remoteRelays)); - } - - /** - * Returns the Octo participant for this {@link BridgeSession}. If - * a participant doesn't exist yet, it is created and initialized - * with {@code relays} as the list of remote Octo relays. - * - * @return the {@link OctoParticipant} for this {@link BridgeSession}. - */ - private OctoParticipant getOrCreateOctoParticipant(List relays) - { - if (octoParticipant == null) - { - octoParticipant = createOctoParticipant(relays); - } - return octoParticipant; - } - - /** - * Adds sources and source groups to this {@link BridgeSession}'s Octo - * participant. If the Octo participant's session is already - * established, then the sources are added and a colibri message is - * sent to the bridge. Otherwise, they are scheduled to be added once - * the session is established. - * - * @param sources the sources to add. - */ - void addSourcesToOcto(ConferenceSourceMap sources) - { - if (!OctoConfig.config.getEnabled()) - { - return; - } - - OctoParticipant octoParticipant = getOrCreateOctoParticipant(); - - synchronized (octoParticipant) - { - if (octoParticipant.isSessionEstablished()) - { - octoParticipant.addSources(sources); - updateColibriOctoChannels(octoParticipant); - } - else - { - // The allocator will take care of updating these when the - // session is established. - octoParticipant.queueRemoteSourcesToAdd(sources); - } - } - } - - /** - * Removes sources and source groups - */ - void removeSourcesFromOcto(ConferenceSourceMap sourcesToRemove) - { - OctoParticipant octoParticipant = this.octoParticipant; - if (octoParticipant != null) - { - synchronized (octoParticipant) - { - if (octoParticipant.isSessionEstablished()) - { - octoParticipant.removeSources(sourcesToRemove); - - updateColibriOctoChannels(octoParticipant); - } - else - { - octoParticipant.queueRemoteSourcesToRemove(sourcesToRemove); - } - } - } - } - - /** - * Sets the list of Octo relays for this {@link BridgeSession}. - * - * @param allRelays all relays in the conference (including the relay - * of the bridge of this {@link BridgeSession}). - */ - void setRelays(List allRelays) - { - List remoteRelays = new LinkedList<>(allRelays); - remoteRelays.remove(bridge.getRelayId()); - - logger.info("Updating Octo relays for " + bridge); - - OctoParticipant octoParticipant = getOrCreateOctoParticipant(remoteRelays); - octoParticipant.setRelays(remoteRelays); - if (octoParticipant.isSessionEstablished()) - { - updateColibriOctoChannels(octoParticipant); - } - } - - /** - * Creates an {@link OctoParticipant} for this {@link BridgeSession} - * and starts an {@link OctoChannelAllocator} to allocate channels for - * it. - * - * @param relays the list of Octo relay ids to set to the newly - * allocated channels. - * @return the instance which was created. - */ - private OctoParticipant createOctoParticipant(List relays) - { - logger.info("Creating an Octo participant for " + bridge); - - OctoParticipant octoParticipant = new OctoParticipant(relays, logger, bridge.getJid()); - - ConferenceSourceMap remoteSources = colibriSessionManager.getSources(participants); - - octoParticipant.addSources(remoteSources); - - OctoChannelAllocator channelAllocator - = new OctoChannelAllocator(colibriRequestCallback, this, octoParticipant, logger); - octoParticipant.setChannelAllocator(channelAllocator); - - TaskPools.getIoPool().execute(channelAllocator); - - return octoParticipant; - } - - @Override - public String toString() - { - return String.format( - "BridgeSession[id=%s, bridge=%s]@%d", - id, - bridge, - hashCode()); - } -} diff --git a/src/main/java/org/jitsi/jicofo/conference/colibri/v1/ColibriConference.java b/src/main/java/org/jitsi/jicofo/conference/colibri/v1/ColibriConference.java deleted file mode 100644 index 5b9a36bc2f..0000000000 --- a/src/main/java/org/jitsi/jicofo/conference/colibri/v1/ColibriConference.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Jicofo, the Jitsi Conference Focus. - * - * Copyright @ 2015-Present 8x8, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jitsi.jicofo.conference.colibri.v1; - -import org.jetbrains.annotations.*; -import org.jitsi.jicofo.conference.source.*; -import org.jitsi.utils.*; -import org.jitsi.xmpp.extensions.colibri.*; -import org.jitsi.xmpp.extensions.jingle.*; - -import org.jxmpp.jid.*; - -import java.util.*; - -/** - * This is Colibri conference allocated on the videobridge. It exposes - * operations like allocating/expiring channels, updating channel transport - * and so on. - * - * @author Pawel Domas - */ -public interface ColibriConference -{ - /** - * Sets Jitsi videobridge XMPP address to be used to allocate - * the conferences. - * - * @param videobridgeJid the videobridge address to be set. - */ - void setJitsiVideobridge(Jid videobridgeJid); - - void setMeetingId(@NotNull String meetingId); - - /** - * Returns the identifier assigned for our conference by the videobridge. - * Will returns null if no conference has been allocated yet for - * this instance. - */ - String getConferenceId(); - - /** - * Sets world readable name that identifies the conference. - * @param name the new name. - */ - void setName(EntityBareJid name); - - /** - * Gets world readable name that identifies the conference. - * @return the name. - */ - EntityBareJid getName(); - - /** - * Creates channels on the videobridge for given parameters. - * - * @param endpointId the ID of the Colibri endpoint. - * @param statsId the statistics Id to use if any. - * @param peerIsInitiator true if peer is ICE an initiator - * of ICE session. - * @param contents content list that describes peer media. - * @return ColibriConferenceIQ that describes allocated channels. - */ - default ColibriConferenceIQ createColibriChannels( - String endpointId, - String statsId, - boolean peerIsInitiator, - List contents) - throws ColibriException - { - return createColibriChannels( - endpointId, - statsId, - peerIsInitiator, - contents, - new ConferenceSourceMap() /* sources */, - null /* relays */); - } - - /** - * Creates channels on the videobridge for given parameters. - * - * @param endpointId the ID of the Colibri endpoint. - * @param statsId the statistics Id to use if any. - * @param peerIsInitiator true if peer is ICE an initiator - * of ICE session. - * @param contents content list that describes peer media. - * @param sources the sources to include with the channel creation request, - * if any. - * @param relays the Octo relay IDs to include in the channel creation - * request, if any. - * @return ColibriConferenceIQ that describes allocated channels. - * - * @throws ColibriException if channel allocation fails. - */ - ColibriConferenceIQ createColibriChannels( - String endpointId, - String statsId, - boolean peerIsInitiator, - List contents, - @NotNull ConferenceSourceMap sources, - List relays) - throws ColibriException; - - /** - * Does Colibri channels update of RTP description, SSRC and transport - * information. This is a combined request and what it will contain depends - * which parameters are provided. Most of them is optional here. Request - * will be sent only if any data has been provided. Sends a colibri request, - * but does not wait for a response. - * - * @param localChannelsInfo (mandatory) ColibriConferenceIQ that - * contains the description of the channels for which update request will be - * sent to the bridge. - * @param rtpInfoMap (optional) the map of Colibri content name to - * RtpDescriptionPacketExtension which will be used to update - * the RTP description of the channel in corresponding content described by - * localChannelsInfo. - * @param sources the set of sources to be included in the Colibri channel update. - */ - default void updateChannelsInfo( - ColibriConferenceIQ localChannelsInfo, - Map rtpInfoMap, - @NotNull ConferenceSourceMap sources) - { - updateChannelsInfo( - localChannelsInfo, - rtpInfoMap, - sources, - null, null, null); - } - - /** - * Does Colibri channels update of RTP description, SSRC and transport - * information. This is a combined request and what it will contain depends - * which parameters are provided. Most of them is optional here. Request - * will be sent only if any data has been provided. Sends a colibri request, - * but does not wait for a response. - * - * @param localChannelsInfo (mandatory) ColibriConferenceIQ that - * contains the description of the channels for which update request will be - * sent to the bridge. - * @param rtpInfoMap (optional) the map of Colibri content name to - * RtpDescriptionPacketExtension which will be used to update - * the RTP description of the channel in corresponding content described by - * localChannelsInfo. - * @param sources the sett of sources to include in the Colibri channel update request. - * @param bundleTransport (mandatory) the - * IceUdpTransportPacketExtension which will be used to set - * "bundle" transport of the first channel bundle from - * localChannelsInfo. - * @param endpointId the ID of the endpoint for which the update applies - * (it is implicit that the update only works for channels of a single - * @param relays the Octo relay IDs to set. - * participant/endpoint). - */ - void updateChannelsInfo( - ColibriConferenceIQ localChannelsInfo, - Map rtpInfoMap, - @NotNull ConferenceSourceMap sources, - IceUdpTransportPacketExtension bundleTransport, - String endpointId, - List relays); - - /** - * Updates simulcast layers on the bridge. - * @param localChannelsInfo ColibriConferenceIQ that contains - * the description of the channel for which SSRC groups information will be - * updated on the bridge. - */ - void updateSourcesInfo(ConferenceSourceMap sources, ColibriConferenceIQ localChannelsInfo); - - /** - * Updates the transport of a specific channel bundle. - * - * @param transport the transport packet extension that contains channel - * bundle transport candidates. - * @param channelBundleId the ID of the channel bundle for which to update - * the transport. - */ - void updateBundleTransportInfo( - IceUdpTransportPacketExtension transport, - String channelBundleId); - - /** - * Expires the channels described by given ColibriConferenceIQ. - * - * @param channelInfo the ColibriConferenceIQ that contains - * information about the channel to be expired. - */ - void expireChannels(ColibriConferenceIQ channelInfo); - - /** - * Expires all channels in current conference and this instance goes into - * disposed state(like calling {@link #dispose()} method). It must not be - * used anymore. - */ - void expireConference(); - - /** - * Mutes audio or video channels described in given IQ by changing their media - * direction to {@code sendonly}. - * @param channelsInfo the IQ that describes the channels to be muted. - * @param mute true to mute or false to unmute channels - * described in channelsInfo. - * @param mediaType optional mediaType of the channel to mute; defaults to audio. - * @return true if the operation has succeeded or false - * otherwise. - */ - boolean muteParticipant(ColibriConferenceIQ channelsInfo, boolean mute, MediaType mediaType); - - /** - * Disposes of any resources allocated by this instance. Once disposed this - * instance must not be used anymore. - */ - void dispose(); - - /** - * Sets the "global" id of the conference. - */ - void setGID(String gid) ; - - void setRtcStatsEnabled(boolean rtcStatsEnabled); - - void setCallStatsEnabled(boolean callStatsEnabled); -} diff --git a/src/main/java/org/jitsi/jicofo/conference/colibri/v1/ColibriConferenceImpl.java b/src/main/java/org/jitsi/jicofo/conference/colibri/v1/ColibriConferenceImpl.java deleted file mode 100644 index 3ec545747a..0000000000 --- a/src/main/java/org/jitsi/jicofo/conference/colibri/v1/ColibriConferenceImpl.java +++ /dev/null @@ -1,941 +0,0 @@ -/* - * Jicofo, the Jitsi Conference Focus. - * - * Copyright @ 2015-Present 8x8, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jitsi.jicofo.conference.colibri.v1; - -import org.jetbrains.annotations.*; -import org.jitsi.jicofo.conference.source.*; -import org.jitsi.jicofo.xmpp.*; -import org.jitsi.utils.*; -import org.jitsi.utils.logging2.*; -import org.jitsi.utils.stats.*; -import org.jitsi.xmpp.extensions.colibri.*; -import org.jitsi.xmpp.extensions.jingle.*; -import org.jivesoftware.smack.*; -import org.jivesoftware.smack.packet.*; -import org.json.simple.*; -import org.jxmpp.jid.*; - -import java.time.*; -import java.util.*; - -import static org.apache.commons.lang3.StringUtils.*; - -/** - * Default implementation of {@link ColibriConference} that uses Smack for - * handling XMPP connection. Handles conference state, allocates and expires - * channels per single conference ID. Conference ID is stored after first - * allocate channels request. - * - * @author Pawel Domas - */ -public class ColibriConferenceImpl - implements ColibriConference -{ - public final static Stats stats = new Stats(); - - private final static Logger logger = new LoggerImpl(ColibriConferenceImpl.class.getName()); - - /** - * The instance of XMPP connection. - */ - private final AbstractXMPPConnection connection; - - /** - * XMPP address of videobridge component. - */ - private Jid jitsiVideobridge; - - /** - * The {@link ColibriConferenceIQ} that stores the state of whole conference - */ - private ColibriConferenceIQ conferenceState = new ColibriConferenceIQ(); - - /** - * Synchronization root to sync access to {@link #colibriBuilder} and - * {@link #conferenceState}. - */ - private final Object syncRoot = new Object(); - - /** - * Custom type of semaphore that allows only 1 thread to send initial - * Colibri IQ that creates the conference. - * It means that if {@link #conferenceState} has no ID then only 1 thread - * will be allowed to send allocate request to the bridge. Other threads - * will be suspended until we have the response. Error response to create - * request will cause OperationFailedException on waiting threads. - * - * By "create request" we mean a channel allocation Colibri IQ that has no - * conference id specified. - */ - private final ConferenceCreationSemaphore createConfSemaphore = new ConferenceCreationSemaphore(); - - /** - * The exception produced by the allocator thread which is to be passed to - * the waiting threads, so that they will throw exceptions consistent with - * the allocator thread. - * - * Note: this is only used to modify the message logged when an exception - * is thrown. It is NOT used to decide whether to throw an exception or not. - */ - private ColibriException allocateChannelsException = null; - - /** - * Utility used for building Colibri queries. - */ - private final ColibriBuilder colibriBuilder = new ColibriBuilder(conferenceState); - - /** - * Flag indicates that this instance has been disposed and should not be - * used anymore. - */ - private boolean disposed; - - /** - * Creates new instance of ColibriConferenceImpl. - * @param connection XMPP connection object that wil be used by the new - * instance to communicate. - */ - public ColibriConferenceImpl(@NotNull AbstractXMPPConnection connection) - { - this.connection = connection; - } - - /** - * Sets the "global" ID of the conference. - * @param gid the value to set. - */ - @Override - public void setGID(String gid) - { - conferenceState.setGID(gid); - } - - @Override - public void setRtcStatsEnabled(boolean rtcStatsEnabled) { - conferenceState.setRtcStatsEnabled(rtcStatsEnabled); - } - - @Override - public void setCallStatsEnabled(boolean callStatsEnabled) { - conferenceState.setCallStatsEnabled(callStatsEnabled); - } - - /** - * Checks if this instance has been disposed already and if so prints - * a warning message. It will also cancel execution in case - * {@link #jitsiVideobridge} is null or empty. - * - * @param operationName the name of the operation that will not happen and - * should be mentioned in the warning message. - * - * @return true if this instance has been disposed already or - * false otherwise. - */ - private boolean checkIfDisposed(String operationName) - { - if (disposed) - { - logger.warn("Not doing " + operationName + " - instance disposed"); - return true; - } - if (jitsiVideobridge == null) - { - logger.error( - "Not doing " + operationName + " - bridge not initialized"); - return true; - } - return false; - } - - /** - * {@inheritDoc} - */ - @Override - public void setJitsiVideobridge(Jid videobridgeJid) - { - if (isNotBlank(conferenceState.getID())) - { - throw new IllegalStateException("Cannot change the bridge on active conference"); - } - this.jitsiVideobridge = videobridgeJid; - } - - /** - * {@inheritDoc} - */ - @Override - public String getConferenceId() - { - return conferenceState.getID(); - } - - /** - * {@inheritDoc} - *

- * Blocks until a reply is received (and might also block waiting for - * the conference to be allocated before sending the request). - */ - @Override - public ColibriConferenceIQ createColibriChannels( - String endpointId, - String statsId, - boolean peerIsInitiator, - List contents, - @NotNull ConferenceSourceMap sources, - List octoRelayIds) - throws ColibriException - { - ColibriConferenceIQ allocateRequest; - - MapsOfExtensions extensions = new MapsOfExtensions(sources); - - boolean conferenceExisted; - try - { - synchronized (syncRoot) - { - // Only if not in 'disposed' state - if (checkIfDisposed("createColibriChannels")) - { - return null; - } - - conferenceExisted - = !acquireCreateConferenceSemaphore(endpointId); - - colibriBuilder.reset(); - - colibriBuilder.addAllocateChannelsReq( - true /* use bundle */, - endpointId, - statsId, - peerIsInitiator, - contents, - extensions.sourcePacketExtensions, - extensions.sourceGroupPacketExtensions, - octoRelayIds); - - allocateRequest = colibriBuilder.getRequest(jitsiVideobridge); - } - - logger.debug("Sending alloc request"); - - logStanza("Channel allocate request", allocateRequest); - - // FIXME retry allocation on timeout ? - Stanza response = sendAllocRequest(endpointId, allocateRequest); - - logStanza("Channel allocate response", response); - - // Verify the response and throw OperationFailedException - // if it's not a success - maybeThrowOperationFailed(allocateRequest, response); - - /* - * Update the complete ColibriConferenceIQ representation maintained by - * this instance with the information given by the (current) response. - */ - // FIXME: allocations!!! should be static method - synchronized (syncRoot) - { - ColibriAnalyser analyser = new ColibriAnalyser(conferenceState); - - analyser.processChannelAllocResp((ColibriConferenceIQ) response); - } - - /* - * Formulate the result to be returned to the caller which is a subset - * of the whole conference information kept by this CallJabberImpl and - * includes the remote channels explicitly requested by the method - * caller and their respective local channels. - */ - return ColibriAnalyser.getResponseContents((ColibriConferenceIQ) response, contents); - - } - finally - { - releaseCreateConferenceSemaphore(endpointId); - } - } - - /** - * Verifies the JVB's response to allocate channel request and sets - * {@link #allocateChannelsException}. - * - * @param response the packet received from the bridge (with {@code null} - * meaning a timeout) as a response to a request to allocate Colibri - * channels. - * - * @throws TimeoutException in case of a timeout. - * @throws ConferenceNotFoundException if the request referenced a colibri - * conference which does not exist on the bridge. - * @throws BadRequestException if the response - * @throws WrongResponseTypeException if the response contains no error, but - * is not of the expected {@link ColibriConferenceIQ} type. - * @throws ColibriException in case the response contained an XMPP error - * not listed above. - */ - private void maybeThrowOperationFailed(Stanza request, Stanza response) - throws ColibriException - { - synchronized (syncRoot) - { - ColibriException exception = null; - if (response == null) - { - exception = new TimeoutException( - "Timed out waiting for a response for " + request.getStanzaId()); - } - else if (response.getError() != null) - { - StanzaError error = response.getError(); - if (StanzaError.Condition.bad_request.equals(error.getCondition())) - { - // Currently jitsi-videobridge returns the same error type - // (bad-request) for two separate cases: - // 1. The request was valid, but the conference ID was not - // found (e.g. it has expired) - // 2. The request was invalid (e.g. the endpoint ID format - // was invalid). - // - // We want to handle the two cases differently, so we - // distinguish them by matching the string. - if (error.getDescriptiveText() != null && - error.getDescriptiveText().matches("Conference not found for ID:.*")) - { - exception = new ConferenceNotFoundException(error.getConditionText()); - } - else - { - exception = new BadRequestException(response.toXML().toString()); - } - } - else - { - exception = new ColibriException("XMPP error: " + response.toXML()); - } - } - else if (!(response instanceof ColibriConferenceIQ)) - { - exception = new WrongResponseTypeException(response.getClass().getCanonicalName()); - } - - this.allocateChannelsException = exception; - if (exception != null) - { - throw exception; - } - } - } - - /** - * Obtains create conference semaphore. If the conference does not exist yet - * (ID == null) then only first thread will be allowed to obtain it and all - * other threads will have to wait for it to process response packet. - * - * Methods exposed for unit test purpose. - * - * @param endpointId the ID of the Colibri endpoint (conference participant) - * - * @return true if current thread is conference creator. - * - * @throws ColibriException if the current thread is not the conference - * creator thread and the conference creator thread produced an exception. - * The exception will be a clone of the original. - */ - protected boolean acquireCreateConferenceSemaphore(String endpointId) - throws ColibriException - { - return createConfSemaphore.acquire(); - } - - /** - * Releases "create conference semaphore". Must be called to release the - * semaphore possibly in "finally" block. - * - * @param endpointId the ID of the colibri conference endpoint(participant) - */ - protected void releaseCreateConferenceSemaphore(String endpointId) - { - createConfSemaphore.release(); - } - - /** - * Sends Colibri packet and waits for response in - * {@link #createColibriChannels(String, String, boolean, List)} - * call. - * - * Exposed for unit tests purpose. - * - * @param endpointId The ID of the Colibri endpoint. - * @param request Colibri IQ to be send towards the bridge. - * - * @return Packet which is JVB response or null if - * the request timed out. - * - * @throws ColibriException If sending the packet fails (see - * {@link UtilKt#sendIqAndGetResponse(AbstractXMPPConnection, IQ)} (IQ)}). - */ - protected Stanza sendAllocRequest(String endpointId, ColibriConferenceIQ request) - throws ColibriException - { - try - { - long start = System.nanoTime(); - Stanza reply = UtilKt.sendIqAndGetResponse(connection, request); - long end = System.nanoTime(); - stats.allocateChannelsRequestTook(end - start); - return reply; - } - catch (SmackException.NotConnectedException e) - { - throw new ColibriException(e.getMessage()); - } - } - - private void logStanza(String message, Stanza stanza) - { - if (!logger.isDebugEnabled()) - { - return; - } - - String stanzaStr = stanza == null ? "null" : stanza.toXML().toString(); - stanzaStr = stanzaStr.replace(">", ">\n"); - - logger.debug(message + "\n" + stanzaStr); - } - - /** - * {@inheritDoc} - * - * Does not block nor wait for a response. - */ - @Override - public void expireChannels(ColibriConferenceIQ channelInfo) - { - ColibriConferenceIQ request; - - synchronized (syncRoot) - { - // Only if not in 'disposed' state - if (checkIfDisposed("expireChannels")) - { - return; - } - - colibriBuilder.reset(); - - colibriBuilder.addExpireChannelsReq(channelInfo); - - request = colibriBuilder.getRequest(jitsiVideobridge); - } - - if (request != null) - { - logStanza("Expire peer channels", request); - - // Send and forget - UtilKt.tryToSendStanza(connection, request); - } - } - - /** - * {@inheritDoc} - * - * Does not block or wait for a response. - */ - @Override - public void updateSourcesInfo(ConferenceSourceMap sources, ColibriConferenceIQ localChannelsInfo) - { - ColibriConferenceIQ request; - - synchronized (syncRoot) - { - if (checkIfDisposed("updateSourcesInfo")) - { - return; - } - - if (isBlank(conferenceState.getID())) - { - logger.error("Have not updated source info on the bridge - no conference in progress"); - return; - } - - colibriBuilder.reset(); - - boolean send = false; - - MapsOfExtensions extensions = new MapsOfExtensions(sources); - if (!extensions.sourcePacketExtensions.isEmpty() - && colibriBuilder.addSourceInfo(extensions.sourcePacketExtensions, localChannelsInfo)) - { - send = true; - } - // ssrcGroups - if (!extensions.sourceGroupPacketExtensions.isEmpty() - && colibriBuilder.addSourceGroupsInfo(extensions.sourceGroupPacketExtensions, localChannelsInfo)) - { - send = true; - } - - request = send ? colibriBuilder.getRequest(jitsiVideobridge) : null; - } - - if (request != null) - { - logStanza("Sending source update: ", request); - - UtilKt.tryToSendStanza(connection, request); - } - } - - /** - * {@inheritDoc} - * - * Does not block or wait for a response. - */ - @Override - public void updateBundleTransportInfo(IceUdpTransportPacketExtension transport, String channelBundleId) - { - ColibriConferenceIQ request; - - synchronized (syncRoot) - { - if (checkIfDisposed("updateBundleTransportInfo")) - { - return; - } - - colibriBuilder.reset(); - colibriBuilder.addBundleTransportUpdateReq(transport, channelBundleId); - request = colibriBuilder.getRequest(jitsiVideobridge); - } - - if (request != null) - { - logStanza("Sending bundle transport info update: ", request); - - UtilKt.tryToSendStanza(connection, request); - } - } - - /** - * {@inheritDoc} - * - * Does not block or wait for a response. - */ - @Override - public void expireConference() - { - ColibriConferenceIQ request; - - synchronized (syncRoot) - { - if (checkIfDisposed("expireConference")) - { - return; - } - - colibriBuilder.reset(); - - if (isBlank(conferenceState.getID())) - { - logger.info("Nothing to expire - no conference allocated yet"); - return; - } - - // Expire all channels - if (colibriBuilder.addExpireChannelsReq(conferenceState)) - { - request = colibriBuilder.getRequest(jitsiVideobridge); - - if (request != null) - { - logStanza("Expire conference: ", request); - - UtilKt.tryToSendStanza(connection, request); - } - } - - // Reset conference state - conferenceState = new ColibriConferenceIQ(); - - // Mark instance as 'disposed' - dispose(); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void dispose() - { - this.disposed = true; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean muteParticipant(ColibriConferenceIQ channelsInfo, boolean mute, MediaType mediaType) - { - if (checkIfDisposed("muteParticipant")) - { - return false; - } - - String conferenceId = conferenceState.getID(); - if (isBlank(conferenceId)) - { - logger.warn("Failed to mute, conferenceId is blank."); - return false; - } - - ColibriConferenceIQ request = new ColibriConferenceIQ(); - request.setID(conferenceId); - request.setName(conferenceState.getName()); - request.setType(IQ.Type.set); - request.setTo(jitsiVideobridge); - - ColibriConferenceIQ.Content content = channelsInfo.getContent(mediaType.toString()); - if (content == null) - { - logger.error("Failed to mute, no 'content' for media type " + mediaType); - return false; - } - - ColibriConferenceIQ.Content requestContent = new ColibriConferenceIQ.Content(content.getName()); - for (ColibriConferenceIQ.Channel channel : content.getChannels()) - { - ColibriConferenceIQ.Channel requestChannel = new ColibriConferenceIQ.Channel(); - requestChannel.setID(channel.getID()); - requestChannel.setDirection(mute ? "sendonly" : "sendrecv"); - requestContent.addChannel(requestChannel); - } - - if (requestContent.getChannelCount() == 0) - { - logger.error("Failed to mute, content has no channels."); - return false; - } - - request.addContent(requestContent); - try - { - IQ response = UtilKt.sendIqAndGetResponse(connection, request); - if (response != null && response.getType() == IQ.Type.result - && response instanceof ColibriConferenceIQ) - { - ColibriConferenceIQ responseConfIq = (ColibriConferenceIQ)response; - ColibriConferenceIQ.Content responseContent = responseConfIq.getContent(mediaType.toString()); - if (responseContent != null) - { - // check the count of the channels matching the requested direction - long matchingChannelsCount = responseContent.getChannels().stream().filter( - channel -> channel.getDirection().equals(mute ? "sendonly" : "sendrecv")).count(); - - // mute succeeded if all channels match the criteria - return matchingChannelsCount == content.getChannels().size(); - } - } - } - catch(SmackException.NotConnectedException e) - { - logger.error("Error muting for media type " + mediaType, e); - return false; - } - - return false; - } - - /** - * Sets world readable name that identifies the conference. - * @param name the new name. - */ - @Override - public void setName(EntityBareJid name) - { - conferenceState.setName(name); - } - - @Override - public void setMeetingId(@NotNull String meetingId) - { - conferenceState.setMeetingId(meetingId); - } - - /** - * Gets world readable name that identifies the conference. - * @return the name. - */ - @Override - public EntityBareJid getName() - { - return conferenceState.getName(); - } - - /** - * {@inheritDoc} - */ - @Override - public void updateChannelsInfo( - ColibriConferenceIQ localChannelsInfo, - Map descriptionMap, - @NotNull ConferenceSourceMap sources, - IceUdpTransportPacketExtension bundleTransport, - String endpointId, - List relays) - { - ColibriConferenceIQ request; - if (localChannelsInfo == null) - { - logger.error("Can not update channels -- null"); - return; - } - - synchronized (syncRoot) - { - if (checkIfDisposed("updateChannelsInfo")) - { - return; - } - - colibriBuilder.reset(); - - boolean send = false; - - // RTP description - if (descriptionMap != null) - { - for (Map.Entry entry : descriptionMap.entrySet()) - { - ColibriConferenceIQ.Channel channel - = localChannelsInfo.getContent(entry.getKey()) - .getChannels().get(0); - send |= colibriBuilder.addRtpDescription( - entry.getValue(), - entry.getKey(), - channel); - } - } - MapsOfExtensions extensions = new MapsOfExtensions(sources); - // SSRCs - if (colibriBuilder.addSourceInfo(extensions.sourcePacketExtensions, localChannelsInfo)) - { - send = true; - } - // SSRC groups - if (colibriBuilder.addSourceGroupsInfo(extensions.sourceGroupPacketExtensions, localChannelsInfo)) - { - send = true; - } - // Bundle transport - if (bundleTransport != null && colibriBuilder.addBundleTransportUpdateReq(bundleTransport, endpointId)) - { - send = true; - } - if (relays != null && colibriBuilder.addOctoRelays(relays, localChannelsInfo)) - { - send = true; - } - - request = send ? colibriBuilder.getRequest(jitsiVideobridge) : null; - } - - if (request != null) - { - logStanza("Sending channel info update: ", request); - - UtilKt.tryToSendStanza(connection, request); - } - } - - /** - * Custom type of semaphore that allows only 1 thread to send initial - * Colibri IQ that creates the conference. - * It means that if {@link #conferenceState} has no ID then only 1 thread - * will be allowed to send allocate request to the bridge. Other threads - * will be suspended until we have the response(from which we get our - * conference ID). Error response to create request will cause - * OperationFailedException on the threads waiting on this - * semaphore. - */ - class ConferenceCreationSemaphore - { - /** - * Stores reference to conference creator thread instance. - */ - private Thread creatorThread; - - /** - * Acquires conference creation semaphore. If we don't have conference - * ID yet then only first thread to obtain will be allowed to go through - * and all other threads will be suspended until it finishes it's job. - * Once we have a conference allocated all threads are allowed to go - * through immediately. - * - * @return true if current thread has just become a conference - * creator. That is the thread that sends first channel allocate - * request that results in new conference created. - * - * @throws ColibriException if we are not conference creator - * thread and conference creator has failed to create the - * conference while we've been waiting on this semaphore. - */ - public boolean acquire() - throws ColibriException - { - synchronized (syncRoot) - { - if (conferenceState.getID() == null && creatorThread == null) - { - creatorThread = Thread.currentThread(); - - if (logger.isDebugEnabled()) - { - logger.debug("I'm the conference creator - " + Thread.currentThread().getName()); - } - - return true; - } - else - { - if (logger.isDebugEnabled()) - { - logger.debug("Will have to wait until the conference is created - " - + Thread.currentThread().getName()); - } - - while (creatorThread != null) - { - try - { - syncRoot.wait(); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - } - - if (conferenceState.getID() == null) - { - throw allocateChannelsException.clone("Creator thread has failed to allocate channels: "); - } - - if (logger.isDebugEnabled()) - { - logger.debug("Conference created ! Continuing with channel allocation -" + - Thread.currentThread().getName()); - } - } - } - return false; - } - - /** - * Releases this semaphore instance. If we're a conference creator then - * all waiting thread will be woken up. - */ - public void release() - { - synchronized (syncRoot) - { - if (creatorThread == Thread.currentThread()) - { - if (logger.isDebugEnabled()) - { - logger.debug("Conference creator is releasing the lock - " + Thread.currentThread().getName()); - } - - creatorThread = null; - syncRoot.notifyAll(); - } - } - } - } - - public static class Stats { - /** - * An average of the time it takes to make allocate channel requests - * to JVB. - */ - private final MovingAverage allocateChannelsReqTimes = new MovingAverage<>(Duration.ofMinutes(1)); - - /** - * Notify the stats object how long an allocate channels request took - * to execute - * @param nanos the time, in nanoseconds - */ - void allocateChannelsRequestTook(long nanos) - { - allocateChannelsReqTimes.add(nanos); - } - - @SuppressWarnings("unchecked") - public JSONObject toJson() - { - JSONObject json = new JSONObject(); - json.put("avg_allocate_channels_req_time_nanos", allocateChannelsReqTimes.get()); - - return json; - } - } - - /** - * A helper class to convert a {@link ConferenceSourceMap} into a set of {@link SourcePacketExtension}s and - * {@link SourceGroupPacketExtension}s organized by media type. - */ - private static class MapsOfExtensions - { - private final Map> sourcePacketExtensions = new HashMap<>(); - private final Map> sourceGroupPacketExtensions = new HashMap<>(); - - private MapsOfExtensions(ConferenceSourceMap sources) - { - // Make sure we have entries with an empty list when there are no sources for a given media type, because - // the ColibriBuilder API expects them in order to properly signal source removal. - sourcePacketExtensions.put("audio", new ArrayList<>()); - sourcePacketExtensions.put("video", new ArrayList<>()); - sourceGroupPacketExtensions.put("audio", new ArrayList<>()); - sourceGroupPacketExtensions.put("video", new ArrayList<>()); - - sources.forEach((owner, endpointSourceSet) -> - { - for (Source source : endpointSourceSet.getSources()) - { - List l = sourcePacketExtensions.computeIfAbsent( - source.getMediaType().toString(), - (k) -> new ArrayList<>()); - l.add(source.toPacketExtension(owner)); - } - - for (SsrcGroup ssrcGroup : endpointSourceSet.getSsrcGroups()) - { - List l = sourceGroupPacketExtensions.computeIfAbsent( - ssrcGroup.getMediaType().toString(), - (k) -> new ArrayList<>()); - l.add(ssrcGroup.toPacketExtension()); - } - }); - } - } -} diff --git a/src/main/java/org/jitsi/jicofo/conference/colibri/v1/OctoChannelAllocator.java b/src/main/java/org/jitsi/jicofo/conference/colibri/v1/OctoChannelAllocator.java deleted file mode 100644 index bde76f8137..0000000000 --- a/src/main/java/org/jitsi/jicofo/conference/colibri/v1/OctoChannelAllocator.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * Jicofo, the Jitsi Conference Focus. - * - * Copyright @ 2015-Present 8x8, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jitsi.jicofo.conference.colibri.v1; - -import org.jitsi.jicofo.*; -import org.jitsi.jicofo.codec.*; -import org.jitsi.jicofo.conference.*; -import org.jitsi.jicofo.conference.colibri.*; -import org.jitsi.jicofo.conference.source.*; -import org.jitsi.xmpp.extensions.colibri.*; -import org.jitsi.xmpp.extensions.jingle.*; -import org.jitsi.utils.logging2.*; -import org.jxmpp.jid.*; - -import java.util.*; - -import static org.apache.commons.lang3.StringUtils.isNotBlank; - -/** - * Allocates colibri channels for an {@link OctoParticipant}. The "offer" that - * we create is specific to Octo, and there is no Jingle session like in a - * regular {@link Participant}. The session is considered established once - * the colibri channels have been allocated. - * - * @author Boris Grozev - */ -class OctoChannelAllocator implements Runnable -{ - /** - * The logger for this instance. - */ - private final Logger logger; - - /** - * The callback to use when a colibri request fails or succeeds. - */ - private final ColibriRequestCallback colibriRequestCallback; - - /** - * The {@link BridgeSession} on which - * to allocate channels for the participant. - */ - private final BridgeSession bridgeSession; - - /** - * A flag which indicates whether channel allocation is canceled. Raising - * this makes the allocation thread discontinue the allocation process and - * return. - */ - private volatile boolean canceled = false; - - /** - * The colibri channels that this allocator has allocated. They'll be - * cleaned up if the allocator is canceled or failed at any point. - */ - private ColibriConferenceIQ colibriChannels; - - /** - * The {@link OctoParticipant} for which colibri channels will be allocated. - */ - private final OctoParticipant participant; - - /** - * Initializes a new {@link OctoChannelAllocator} instance which is meant to invite a specific {@link Participant} - * into a conference. - */ - public OctoChannelAllocator( - ColibriRequestCallback colibriRequestCallback, - BridgeSession bridgeSession, - OctoParticipant participant, - Logger parentLogger) - { - this.colibriRequestCallback = colibriRequestCallback; - this.bridgeSession = bridgeSession; - this.participant = participant; - logger = parentLogger.createChildLogger(OctoChannelAllocator.class.getName()); - logger.addContext("bridge", bridgeSession.bridge.getJid().toString()); - } - - /** - * Entry point for the {@link OctoChannelAllocator} task. - */ - @Override - public void run() - { - try - { - doRun(); - } - catch (Throwable e) - { - logger.error("Channel allocator failed: ", e); - cancel(); - } - finally - { - if (canceled && colibriChannels != null) - { - bridgeSession.colibriConference.expireChannels(colibriChannels); - } - - if (participant != null) - { - participant.channelAllocatorCompleted(this); - } - } - } - - private void doRun() - { - Offer offer = new Offer( - new ConferenceSourceMap(), - JingleOfferFactory.INSTANCE.createOffer(OfferOptionsKt.getOctoOptions())); - if (canceled) - { - return; - } - - colibriChannels = allocateChannels(offer.getContents()); - if (canceled) - { - return; - } - - if (colibriChannels == null) - { - logger.error("Channel allocator failed: " + participant); - - // Cancel this task - nothing to be done after failure - cancel(); - return; - } - - if (participant != null) - { - participant.setColibriChannelsInfo(colibriChannels); - } - } - - /** - * Allocates Colibri channels for this {@link OctoChannelAllocator}'s - * {@link OctoParticipant} on {@link #bridgeSession}. - * - * @return a {@link ColibriConferenceIQ} which describes the allocated - * channels, or {@code null}. - */ - private ColibriConferenceIQ allocateChannels(List contents) - { - Jid jvb = bridgeSession.bridge.getJid(); - - // The bridge is faulty, i.e. shouldn't be used anymore. - boolean faulty; - // The participants on this bridge should be re-invited. - boolean restartConference; - try - { - logger.info("Allocating octo channels on " + bridgeSession.bridge); - - ColibriConferenceIQ colibriChannels = doAllocateChannels(contents); - - // null means canceled, because colibriConference has been - // disposed by another thread - if (colibriChannels == null) - { - cancel(); - return null; - } - - bridgeSession.bridge.setIsOperational(true); - colibriRequestCallback.requestSucceeded(bridgeSession.bridge); - return colibriChannels; - } - catch (ConferenceNotFoundException e) - { - // The conference on the bridge has likely expired. We want to re-invite the conference participants, - // though the bridge is not faulty. - restartConference = true; - faulty = false; - logger.error(jvb + " - conference ID not found (expired?):" + e.getMessage()); - } - catch (BadRequestException e) - { - // The bridge indicated that our request is invalid. This does not mean the bridge is faulty, and retrying - // will likely result in the same error, so we don't call the failure callback. We observe this when an - // endpoint uses an ID not accepted by the bridge (via a custom client). - restartConference = false; - faulty = false; - logger.error(jvb + " - the bridge indicated bad-request: " + e.getMessage()); - } - catch (ColibriException e) - { - // All other errors indicate that the bridge is faulty: timeout, wrong response type, or something else. - restartConference = true; - faulty = true; - logger.error(jvb + " - failed to allocate channels, will consider the bridge faulty: " + e.getMessage(), e); - } - - // We only get here if we caught an exception. - if (faulty) - { - bridgeSession.bridge.setIsOperational(false); - bridgeSession.hasFailed = true; - } - - cancel(); - - // If the ColibriConference is in use, and we want to retry. - if (restartConference && isNotBlank(bridgeSession.colibriConference.getConferenceId())) - { - colibriRequestCallback.requestFailed(bridgeSession.bridge); - } - - return null; - } - - /** - * Raises the {@code canceled} flag, which causes the thread to not continue - * with the allocation process. - */ - public void cancel() - { - canceled = true; - } - - /** - * @return the {@link OctoParticipant} of this {@link OctoChannelAllocator}. - */ - public OctoParticipant getParticipant() - { - return participant; - } - - @Override - public String toString() - { - return String.format( - "%s[%s, %s]@%d", - this.getClass().getSimpleName(), - bridgeSession, - participant, - hashCode()); - } - - /** - * {@inheritDoc} - */ - private ColibriConferenceIQ doAllocateChannels(List offer) - throws ColibriException - { - // This is a blocking call. - ColibriConferenceIQ result = - bridgeSession.colibriConference.createColibriChannels( - null /* endpoint */, - null /* statsId */, - false/* initiator */, - offer, - participant.getSources(), - participant.getRelays()); - - if (result == null) - { - return null; - } - - // The colibri channels have now been allocated and we know their IDs. - // Now we check for any scheduled updates to the sources and source - // groups, as well as the relays. - synchronized (participant) - { - participant.setColibriChannelsInfo(result); - - // Check if the sources of the participant need an update. - boolean update = false; - - if (participant.updateSources()) - { - update = true; - logger.info("Will update the sources of the Octo participant " + this); - } - - // Check if the relays need an update. We always use the same set - // of relays for the audio and video channels, so just check video. - ColibriConferenceIQ.Channel channel = result.getContent("video").getChannel(0); - if (!(channel instanceof ColibriConferenceIQ.OctoChannel)) - { - logger.error("Expected to find an OctoChannel in the response, found" + channel + " instead."); - } - else - { - List responseRelays = ((ColibriConferenceIQ.OctoChannel) channel).getRelays(); - if (!new HashSet<>(responseRelays).equals(new HashSet<>(participant.getRelays()))) - { - update = true; - - logger.info( - "Relays need updating. Response: " + responseRelays - + ", participant:" + participant.getRelays()); - } - } - - if (update) - { - bridgeSession.colibriConference.updateChannelsInfo( - participant.getColibriChannelsInfo(), - null, - participant.getSources(), - null, - null, - participant.getRelays()); - } - - participant.setSessionEstablished(true); - } - - return result; - } -} diff --git a/src/main/java/org/jitsi/jicofo/conference/colibri/v1/OctoParticipant.java b/src/main/java/org/jitsi/jicofo/conference/colibri/v1/OctoParticipant.java deleted file mode 100644 index be10460d48..0000000000 --- a/src/main/java/org/jitsi/jicofo/conference/colibri/v1/OctoParticipant.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Jicofo, the Jitsi Conference Focus. - * - * Copyright @ 2015-Present 8x8, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jitsi.jicofo.conference.colibri.v1; - -import org.jitsi.jicofo.conference.*; -import org.jitsi.jicofo.conference.source.*; -import org.jitsi.utils.logging2.*; -import org.jitsi.xmpp.extensions.colibri.*; -import org.jxmpp.jid.*; - -import java.util.*; - -/** - * Implements a participant for Octo. Manages the colibri - * channels used for Octo on a particular jitsi-videobridge instance, and - * the sources and source groups which need to be added to these colibri - * channels (i.e. all sources and source groups from real participants in the - * conference on other bridges). - * - * @author Boris Grozev - */ -class OctoParticipant -{ - /** - * Information about Colibri channels allocated for this peer (if any). - */ - private ColibriConferenceIQ colibriChannelsInfo; - - /** - * List of remote source addition or removal operations that have not yet been signaled to this participant. - */ - private final SourceAddRemoveQueue remoteSourcesQueue = new SourceAddRemoveQueue(); - - /** - * Used to synchronize access to {@link #channelAllocator}. - */ - private final Object channelAllocatorSyncRoot = new Object(); - - /** - * The {@link OctoChannelAllocator}, if any, which is currently - * allocating channels for this participant. - */ - private OctoChannelAllocator channelAllocator = null; - - /** - * A flag which determines when the session can be considered established. - * This is initially set to false, and raised when the colibri channels - * are allocated (at which point we know they IDs and we can send updates - * via colibri). - */ - private boolean sessionEstablished = false; - - /** - * The list of remote Octo relay IDs for this {@link OctoParticipant}, - * i.e. the relays which will be set for the Octo channels. - * - * This should be the list of the relay IDs of all bridges in the conference, - * with the current bridge's relay ID removed. - */ - private List relays; - - /** - * The sources associated with this octo participant, i.e. the sources of all endpoints on different bridges. - */ - private final ConferenceSourceMap sources = new ConferenceSourceMap(); - - private final Logger logger; - - /** - * Initializes a new {@link OctoParticipant} instance. - * @param relays the list of Octo relays - * @param bridgeJid the JID of the bridge that this participant - */ - OctoParticipant(List relays, Logger parentLogger, Jid bridgeJid) - { - logger = parentLogger.createChildLogger(getClass().getName()); - logger.addContext("bridge", bridgeJid.getResourceOrEmpty().toString()); - this.relays = relays; - } - - /** - * Queue a "source-add" for remote sources, to be signaled once the session is established. - * - * @param sourcesToAdd the remote sources for the "source-add". - */ - public void queueRemoteSourcesToAdd(ConferenceSourceMap sourcesToAdd) - { - remoteSourcesQueue.sourceAdd(sourcesToAdd); - } - - /** - * Queue a "source-remove" for remote sources, to be signaled once the session is established. - * - * @param sourcesToRemove the remote sources for the "source-remove". - */ - public void queueRemoteSourcesToRemove(ConferenceSourceMap sourcesToRemove) - { - remoteSourcesQueue.sourceRemove(sourcesToRemove); - } - - /** - * Sets information about Colibri channels allocated for this participant. - * - * @param colibriChannelsInfo the IQ that holds colibri channels state. - */ - public void setColibriChannelsInfo(ColibriConferenceIQ colibriChannelsInfo) - { - this.colibriChannelsInfo = colibriChannelsInfo; - } - - /** - * Returns {@link ColibriConferenceIQ} that describes Colibri channels - * allocated for this participant. - */ - public ColibriConferenceIQ getColibriChannelsInfo() - { - return colibriChannelsInfo; - } - - /** - * Replaces the {@link OctoChannelAllocator}, which is currently - * allocating channels for this participant (if any) with the specified - * channel allocator (if any). - * @param channelAllocator the channel allocator to set, or {@code null} - * to clear it. - */ - public void setChannelAllocator(OctoChannelAllocator channelAllocator) - { - synchronized (channelAllocatorSyncRoot) - { - if (this.channelAllocator != null) - { - // There is an ongoing thread allocating channels and sending - // an invite for this participant. Tell it to stop. - logger.warn("Canceling " + this.channelAllocator); - this.channelAllocator.cancel(); - } - - this.channelAllocator = channelAllocator; - } - } - - /** - * Signals to this {@link Participant} that a specific - * {@link OctoChannelAllocator} has completed its task and its thread - * is about to terminate. - * @param channelAllocator the {@link OctoChannelAllocator} which has - * completed its task and its thread is about to terminate. - */ - public void channelAllocatorCompleted(OctoChannelAllocator channelAllocator) - { - synchronized (channelAllocatorSyncRoot) - { - if (this.channelAllocator == channelAllocator) - { - this.channelAllocator = null; - } - } - } - - /** - * Removes a set of sources from this participant. - */ - public void removeSources(ConferenceSourceMap sourcesToRemove) - { - logger.debug(() -> "Removing sources: " + sourcesToRemove); - sources.remove(sourcesToRemove); - logger.debug(() -> "Remaining sources: " + sources); - } - - public void addSources(ConferenceSourceMap sourcesToAdd) - { - logger.debug(() -> "Adding sources: " + sourcesToAdd); - this.sources.add(sourcesToAdd); - logger.debug(() -> "Resulting sources: " + sources); - } - - - - /** - * Sets the list of Octo relay IDs for this {@link OctoParticipant}. - * @param relays the relays to set. - */ - void setRelays(List relays) - { - this.relays = relays; - } - - /** - * @return the list of Octo relay IDs for this {@link OctoParticipant}> - */ - List getRelays() - { - return relays; - } - - public ConferenceSourceMap getSources() - { - return sources.unmodifiable(); - } - - /** - * {@inheritDoc} - */ - synchronized public boolean isSessionEstablished() - { - return sessionEstablished; - } - - /** - * Sets the "session established" flag. - * @param sessionEstablished the value to set. - */ - synchronized void setSessionEstablished(boolean sessionEstablished) - { - this.sessionEstablished = sessionEstablished; - } - - /** - * Updates the sources and source groups of this participant with the - * sources and source groups scheduled to be added or removed via - * {@link #queueRemoteSourcesToAdd(ConferenceSourceMap)} and - * {@link #queueRemoteSourcesToRemove(ConferenceSourceMap)}. - * - * @return {@code true} if the call resulted in this participant's sources to change, and {@code false} otherwise. - */ - synchronized boolean updateSources() - { - boolean changed = false; - - for (SourcesToAddOrRemove sourcesToAddOrRemove : remoteSourcesQueue.clear()) - { - changed = true; - - AddOrRemove action = sourcesToAddOrRemove.getAction(); - ConferenceSourceMap sources = sourcesToAddOrRemove.getSources(); - - if (action == AddOrRemove.Add) - { - addSources(sources); - } - else if (action == AddOrRemove.Remove) - { - removeSources(sources); - } - } - - return changed; - } - - @Override - public String toString() - { - return "OctoParticipant[relays=" + relays + "]@" + hashCode(); - } -} diff --git a/src/main/java/org/jitsi/jicofo/lipsynchack/LipSyncHack.java b/src/main/java/org/jitsi/jicofo/lipsynchack/LipSyncHack.java index 0c3617f1d6..13823ec991 100644 --- a/src/main/java/org/jitsi/jicofo/lipsynchack/LipSyncHack.java +++ b/src/main/java/org/jitsi/jicofo/lipsynchack/LipSyncHack.java @@ -211,8 +211,7 @@ else if (!hasValidStreamId(source)) source.getSsrc(), source.getMediaType(), source.getName(), - audioStreamId + " " + trackId, - false)); + audioStreamId + " " + trackId)); logger.debug("Merged video SSRC " + source.getSsrc() + " into " + audioSource); } } diff --git a/src/main/kotlin/org/jitsi/jicofo/ConferenceConfig.kt b/src/main/kotlin/org/jitsi/jicofo/ConferenceConfig.kt index c74909fc76..dc53a36723 100644 --- a/src/main/kotlin/org/jitsi/jicofo/ConferenceConfig.kt +++ b/src/main/kotlin/org/jitsi/jicofo/ConferenceConfig.kt @@ -38,12 +38,6 @@ class ConferenceConfig private constructor() { } fun enableAutoOwner(): Boolean = enableAutoOwner - val injectSsrcForRecvOnlyEndpoints: Boolean by config { - "org.jitsi.jicofo.INJECT_SSRC_FOR_RECVONLY_ENDPOINTS".from(legacyConfig) - "jicofo.conference.inject-ssrc-for-recv-only-endpoints".from(newConfig) - } - fun injectSsrcForRecvOnlyEndpoints(): Boolean = injectSsrcForRecvOnlyEndpoints - val maxSsrcsPerUser: Int by config { "org.jitsi.jicofo.MAX_SSRC_PER_USER".from(legacyConfig) "jicofo.conference.max-ssrcs-per-user".from(newConfig) diff --git a/src/main/kotlin/org/jitsi/jicofo/JicofoServices.kt b/src/main/kotlin/org/jitsi/jicofo/JicofoServices.kt index d12bfb9550..3258c657f2 100644 --- a/src/main/kotlin/org/jitsi/jicofo/JicofoServices.kt +++ b/src/main/kotlin/org/jitsi/jicofo/JicofoServices.kt @@ -32,7 +32,6 @@ import org.jitsi.jicofo.bridge.BridgeConfig import org.jitsi.jicofo.bridge.BridgeMucDetector import org.jitsi.jicofo.bridge.BridgeSelector import org.jitsi.jicofo.bridge.JvbDoctor -import org.jitsi.jicofo.conference.colibri.v1.ColibriConferenceImpl import org.jitsi.jicofo.health.HealthConfig import org.jitsi.jicofo.health.JicofoHealthChecker import org.jitsi.jicofo.jibri.JibriConfig @@ -203,7 +202,6 @@ open class JicofoServices { // We want to avoid exposing unnecessary hierarchy levels in the stats, // so we merge the FocusManager and ColibriConference stats in the root object. putAll(focusManager.stats) - putAll(ColibriConferenceImpl.stats.toJson()) put("bridge_selector", bridgeSelector.stats) jibriDetector?.let { put("jibri_detector", it.stats) } diff --git a/src/main/kotlin/org/jitsi/jicofo/OctoConfig.kt b/src/main/kotlin/org/jitsi/jicofo/OctoConfig.kt index 77f7ae3876..63fa195a96 100644 --- a/src/main/kotlin/org/jitsi/jicofo/OctoConfig.kt +++ b/src/main/kotlin/org/jitsi/jicofo/OctoConfig.kt @@ -17,17 +17,10 @@ */ package org.jitsi.jicofo -import org.jitsi.config.JitsiConfig.Companion.legacyConfig import org.jitsi.config.JitsiConfig.Companion.newConfig import org.jitsi.metaconfig.config -import org.jitsi.metaconfig.optionalconfig class OctoConfig { - val id: Int? by optionalconfig { - "org.jitsi.jicofo.SHORT_ID".from(legacyConfig) - "jicofo.octo.id".from(newConfig) - } - val enabled: Boolean by config { "jicofo.octo.enabled".from(newConfig) } diff --git a/src/main/kotlin/org/jitsi/jicofo/bridge/BridgeSelector.kt b/src/main/kotlin/org/jitsi/jicofo/bridge/BridgeSelector.kt index bb75532184..38e99e64b0 100644 --- a/src/main/kotlin/org/jitsi/jicofo/bridge/BridgeSelector.kt +++ b/src/main/kotlin/org/jitsi/jicofo/bridge/BridgeSelector.kt @@ -18,7 +18,6 @@ package org.jitsi.jicofo.bridge import org.jitsi.jicofo.OctoConfig -import org.jitsi.jicofo.conference.colibri.ColibriConfig import org.jitsi.utils.OrderedJsonObject import org.jitsi.utils.concurrent.CustomizableThreadFactory import org.jitsi.utils.event.AsyncEventEmitter @@ -190,12 +189,10 @@ class BridgeSelector @JvmOverloads constructor( } } - if (ColibriConfig.config.enableColibri2) { - candidateBridges = candidateBridges.filter { it.supportsColibri2() } - if (candidateBridges.isEmpty()) { - logger.warn("There are no bridges with colibri2 support.") - return null - } + candidateBridges = candidateBridges.filter { it.supportsColibri2() } + if (candidateBridges.isEmpty()) { + logger.warn("There are no bridges with colibri2 support.") + return null } // If there are active bridges, prefer those. diff --git a/src/main/kotlin/org/jitsi/jicofo/conference/colibri/ColibriConfig.kt b/src/main/kotlin/org/jitsi/jicofo/conference/colibri/ColibriConfig.kt deleted file mode 100644 index 1f926c0f00..0000000000 --- a/src/main/kotlin/org/jitsi/jicofo/conference/colibri/ColibriConfig.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Jicofo, the Jitsi Conference Focus. - * - * Copyright @ 2021 - present 8x8, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jitsi.jicofo.conference.colibri - -import org.jitsi.config.JitsiConfig.Companion.newConfig -import org.jitsi.metaconfig.config - -class ColibriConfig private constructor() { - val enableColibri2: Boolean by config { - "jicofo.colibri.enable-colibri2".from(newConfig) - } - - companion object { - @JvmField - val config = ColibriConfig() - } -} diff --git a/src/main/kotlin/org/jitsi/jicofo/conference/colibri/ColibriSessionManager.kt b/src/main/kotlin/org/jitsi/jicofo/conference/colibri/ColibriSessionManager.kt index 8b84ae4f0b..46864d3513 100644 --- a/src/main/kotlin/org/jitsi/jicofo/conference/colibri/ColibriSessionManager.kt +++ b/src/main/kotlin/org/jitsi/jicofo/conference/colibri/ColibriSessionManager.kt @@ -19,13 +19,11 @@ package org.jitsi.jicofo.conference.colibri import org.jitsi.jicofo.bridge.Bridge import org.jitsi.jicofo.conference.Participant -import org.jitsi.jicofo.conference.colibri.v1.ColibriV1SessionManager import org.jitsi.jicofo.conference.source.ConferenceSourceMap import org.jitsi.utils.MediaType import org.jitsi.utils.OrderedJsonObject import org.jitsi.xmpp.extensions.jingle.ContentPacketExtension import org.jitsi.xmpp.extensions.jingle.IceUdpTransportPacketExtension -import org.jitsi.xmpp.extensions.jingle.RtpDescriptionPacketExtension interface ColibriSessionManager { fun addListener(listener: Listener) @@ -35,7 +33,7 @@ interface ColibriSessionManager { fun expire() /** Remove a participant, expiring all resources allocated for it */ - fun removeParticipant(participant: Participant) + fun removeParticipant(participant: Participant) = removeParticipants(listOf(participant)) /** * Remove a set of participants, expiring all resources allocated for them. @@ -44,40 +42,27 @@ interface ColibriSessionManager { */ fun removeParticipants(participants: Collection) - /** - * Note at the time this is called [participant.sources] have already been updated. - * TODO: remove in favor of updateParticipant - */ - fun addSources(participant: Participant, sources: ConferenceSourceMap) - /** - * Note at the time this is called [participant.sources] have already been updated. - * TODO: remove in favor of updateParticipant - */ - fun removeSources( - participant: Participant, - sources: ConferenceSourceMap, - /** - * If this is `false`, the source removal will only be signaled to remote bridges. This is used to avoid sending - * an unnecessary "remove sources" message prior to the endpoint itself being expired (the "remove sources" - * message for remote bridges is always necessary). - */ - removeSourcesFromLocalBridge: Boolean - ) fun mute(participant: Participant, doMute: Boolean, mediaType: MediaType): Boolean val bridgeCount: Int val bridgeRegions: Set @Throws(ColibriAllocationFailedException::class) fun allocate( participant: Participant, - contents: List, - reInvite: Boolean + contents: List ): ColibriAllocation + /** For use in java because @JvmOverloads is not available for interfaces. */ fun updateParticipant( participant: Participant, transport: IceUdpTransportPacketExtension? = null, sources: ConferenceSourceMap? = null, - rtpDescriptions: Map? = null + ) = updateParticipant(participant, transport, sources, false) + + fun updateParticipant( + participant: Participant, + transport: IceUdpTransportPacketExtension? = null, + sources: ConferenceSourceMap? = null, + suppressLocalBridgeUpdate: Boolean = false ) fun getBridgeSessionId(participant: Participant): String? @@ -91,9 +76,6 @@ interface ColibriSessionManager { /** * Interface for events fired by [ColibriSessionManager]. - * - * Note that [ColibriV1SessionManager] calls these while holding its internal lock, so listeners should be careful - * not to perform action that will cause a deadlock. */ interface Listener { /** The number of bridges changed. */ diff --git a/src/main/kotlin/org/jitsi/jicofo/conference/colibri/v1/ColibriException.kt b/src/main/kotlin/org/jitsi/jicofo/conference/colibri/v1/ColibriException.kt deleted file mode 100644 index 99fc8edc74..0000000000 --- a/src/main/kotlin/org/jitsi/jicofo/conference/colibri/v1/ColibriException.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Jicofo, the Jitsi Conference Focus. - * - * Copyright @ 2019-Present 8x8 Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jitsi.jicofo.conference.colibri.v1 - -/** An exception in Colibri channel allocation. */ -open class ColibriException(message: String) : Exception(message) { - open fun clone(prefix: String): ColibriException = ColibriException(prefix + message) -} - -/** - * A [ColibriException] indicating the remote Colibri endpoint responded to our request with with a bad-request error. - */ -class BadRequestException(message: String) : ColibriException(message) { - override fun clone(prefix: String) = BadRequestException(prefix + message) -} - -/** - * A [ColibriException] indicating that the remote Colibri endpoint responded to our request with a - * "conference not found" error. - */ -class ConferenceNotFoundException(message: String) : ColibriException(message) { - override fun clone(prefix: String) = ConferenceNotFoundException(prefix + message) -} - -/** - * A [ColibriException] indicating we reached a timeout waiting for a response to our request. - */ -class TimeoutException(message: String = "Timed out waiting for a response.") : ColibriException(message) { - override fun clone(prefix: String) = TimeoutException(prefix + message) -} - -/** - * An exception indicating the remote Colibri endpoint responded to our - * request with a response of the wrong type (not a ColibriConferenceIQ). - */ -class WrongResponseTypeException(message: String) : ColibriException(message) { - override fun clone(prefix: String) = WrongResponseTypeException(prefix + message) -} diff --git a/src/main/kotlin/org/jitsi/jicofo/conference/colibri/v1/ColibriV1SessionManager.kt b/src/main/kotlin/org/jitsi/jicofo/conference/colibri/v1/ColibriV1SessionManager.kt deleted file mode 100644 index 9a413ab699..0000000000 --- a/src/main/kotlin/org/jitsi/jicofo/conference/colibri/v1/ColibriV1SessionManager.kt +++ /dev/null @@ -1,597 +0,0 @@ -/* - * Jicofo, the Jitsi Conference Focus. - * - * Copyright @ 2021-Present 8x8, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jitsi.jicofo.conference.colibri.v1 - -import org.apache.commons.lang3.StringUtils -import org.jitsi.jicofo.JicofoServices -import org.jitsi.jicofo.OctoConfig -import org.jitsi.jicofo.bridge.Bridge -import org.jitsi.jicofo.conference.JitsiMeetConferenceImpl -import org.jitsi.jicofo.conference.Participant -import org.jitsi.jicofo.conference.ParticipantInviteRunnable -import org.jitsi.jicofo.conference.colibri.BadColibriRequestException -import org.jitsi.jicofo.conference.colibri.BridgeFailedException -import org.jitsi.jicofo.conference.colibri.BridgeSelectionFailedException -import org.jitsi.jicofo.conference.colibri.ColibriAllocation -import org.jitsi.jicofo.conference.colibri.ColibriAllocationFailedException -import org.jitsi.jicofo.conference.colibri.ColibriConferenceDisposedException -import org.jitsi.jicofo.conference.colibri.ColibriConferenceExpiredException -import org.jitsi.jicofo.conference.colibri.ColibriParsingException -import org.jitsi.jicofo.conference.colibri.ColibriRequestCallback -import org.jitsi.jicofo.conference.colibri.ColibriSessionManager -import org.jitsi.jicofo.conference.source.ConferenceSourceMap -import org.jitsi.jicofo.conference.source.EndpointSourceSet -import org.jitsi.jicofo.conference.source.Source -import org.jitsi.protocol.xmpp.util.TransportSignaling -import org.jitsi.utils.MediaType -import org.jitsi.utils.OrderedJsonObject -import org.jitsi.utils.event.SyncEventEmitter -import org.jitsi.utils.logging2.Logger -import org.jitsi.utils.logging2.createChildLogger -import org.jitsi.xmpp.extensions.colibri.ColibriConferenceIQ -import org.jitsi.xmpp.extensions.jingle.ContentPacketExtension -import org.jitsi.xmpp.extensions.jingle.IceRtcpmuxPacketExtension -import org.jitsi.xmpp.extensions.jingle.IceUdpTransportPacketExtension -import org.jitsi.xmpp.extensions.jingle.RtpDescriptionPacketExtension -import org.jxmpp.jid.Jid -import java.util.concurrent.ConcurrentHashMap - -/** - * Manage all Colibri sessions for a conference. - */ -class ColibriV1SessionManager( - private val jicofoServices: JicofoServices, - /** - * A "global" identifier of this [JitsiMeetConferenceImpl] (i.e. - * a unique ID across a set of independent jicofo instances). - */ - val gid: Long, - /** - * The associated conference. - * TODO: remove this reference. - */ - private val jitsiMeetConference: JitsiMeetConferenceImpl, - private val colibriRequestCallback: ColibriRequestCallback, - parentLogger: Logger -) : ColibriSessionManager { - /** The list of [BridgeSession]s currently used. */ - private val bridgeSessions = mutableListOf() - private val syncRoot = Any() - private val logger = createChildLogger(parentLogger) - private val participantInfoMap = ConcurrentHashMap() - - private val eventEmitter = SyncEventEmitter() - override fun addListener(listener: ColibriSessionManager.Listener) = eventEmitter.addHandler(listener) - override fun removeListener(listener: ColibriSessionManager.Listener) = eventEmitter.removeHandler(listener) - - /** Expire all colibri sessions. */ - override fun expire() = synchronized(syncRoot) { - // Expire all bridge sessions - bridgeSessions.forEach { it.dispose() } - bridgeSessions.clear() - eventEmitter.fireEvent { bridgeCountChanged(0) } - } - - /** Get the set of regions of the bridges. */ - override val bridgeRegions: Set - get() = synchronized(syncRoot) { - return bridgeSessions.mapNotNull { it.bridge.region }.toSet() - } - - /** Remove a set of participants */ - override fun removeParticipants(participants: Collection) = synchronized(syncRoot) { - participants.forEach { removeParticipant(it) } - } - - /** Removes a participant, terminating its colibri session. */ - override fun removeParticipant(participant: Participant) { - val participantInfo = participantInfoMap.remove(participant) - val bridgeSession = findBridgeSession(participant) - - // Expire the OctoEndpoints for this participant on other bridges. - if (bridgeSession != null) { - participant.setInviteRunnable(null) - bridgeSession.terminate(participant, participantInfo) - - val removedSources = participant.sources - - synchronized(syncRoot) { - operationalBridges() - .filter { it != bridgeSession } - .forEach { bridge -> bridge.removeSourcesFromOcto(removedSources) } - maybeExpireBridgeSession(bridgeSession) - } - } - } - - @Throws(ColibriAllocationFailedException::class) - override fun allocate( - participant: Participant, - contents: List, - reInvite: Boolean - ): ColibriAllocation { - val bridgeSession: BridgeSession - val participantInfo: ParticipantInfo - synchronized(syncRoot) { - // Some bridges in the conference may have become non-operational. Allocating channels for a new participant - // requires communication with all bridges, so we remove them from the conference first. - removeNonOperationalBridges() - - val bridge: Bridge = selectBridge(participant) - if (!bridge.isOperational) { - // TODO should we throw here? - logger.error("The selected bridge is non-operational: $bridge") - } - - participantInfo = ParticipantInfo() - participantInfoMap[participant] = participantInfo - bridgeSession = findBridgeSession(bridge) ?: addBridgeSession(bridge) - bridgeSession.addParticipant(participant) - logger.info("Added participant id=${participant.chatMember.name}, bridge=${bridgeSession.bridge.jid}") - } - - return allocateChannels(participant, participantInfo, bridgeSession, contents).also { - if (reInvite) { - addSourcesToOcto(participant.sources, bridgeSession) - } - } - } - - @Throws(ColibriAllocationFailedException::class) - private fun allocateChannels( - participant: Participant, - participantInfo: ParticipantInfo, - bridgeSession: BridgeSession, - contents: List - ): ColibriAllocation { - val bridge = bridgeSession.bridge - // We want to re-invite the participants in this conference. - try { - logger.info("Using $bridge to allocate channels for: $participant") - - // null means canceled, because colibriConference has been disposed by another thread - val colibriChannels = bridgeSession.colibriConference.createColibriChannels( - participant.endpointId, - participant.statId, - true /* initiator */, - contents - ) ?: throw ColibriConferenceDisposedException() - - val transport: IceUdpTransportPacketExtension - val sources: ConferenceSourceMap - try { - transport = parseTransport(colibriChannels) - sources = parseSources(colibriChannels) - } catch (e: ColibriParsingException) { - // This is not an error coming from the bridge, so the channels are still active. Make sure they are - // expired. - bridgeSession.colibriConference.expireChannels(colibriChannels) - throw BridgeFailedException(bridgeSession.bridge, restartConference = false) - } - - // Hard-code 5000 instead of parsing the colibri response because this code is temporary (and the - // previous code did the same). - val colibriAllocation = ColibriAllocation( - sources, - transport, - bridgeSession.bridge.region, - bridgeSession.id, - 5000 - ) - bridgeSession.bridge.setIsOperational(true) - colibriRequestCallback.requestSucceeded(bridge) - participantInfo.colibriChannels = colibriChannels - participantInfo.colibriAllocation = colibriAllocation - - return colibriAllocation - } catch (e: ConferenceNotFoundException) { - // The conference on the bridge has likely expired. We want to re-invite the conference participants, - // though the bridge is not faulty. - logger.error("$bridge - conference ID not found (expired?): ${e.message}") - throw ColibriConferenceExpiredException( - bridge, - // If the ColibriConference is in use, and we want to retry. - restartConference = StringUtils.isNotBlank(bridgeSession.colibriConference.conferenceId) - ) - } catch (e: BadRequestException) { - // The bridge indicated that our request is invalid. This does not mean the bridge is faulty, and retrying - // will likely result in the same error. - // We observe this when an endpoint uses an ID not accepted by the new bridge (via a custom client). - // TODO: Jicofo should not allow such endpoints. - logger.error("$bridge - the bridge indicated bad-request: ${e.message}") - throw BadColibriRequestException() - } catch (e: ColibriException) { - // All other errors indicate that the bridge is faulty: timeout, wrong response type, or something else. - bridgeSession.bridge.setIsOperational(false) - bridgeSession.hasFailed = true - logger.error("$bridge - failed to allocate channels, will consider the bridge faulty: ${e.message}", e) - throw BridgeFailedException( - bridge, - restartConference = StringUtils.isNotBlank(bridgeSession.colibriConference.conferenceId) - ) - } - } - - @Throws(ColibriParsingException::class) - private fun parseTransport(colibriConferenceIQ: ColibriConferenceIQ): IceUdpTransportPacketExtension { - // Look for any channels that reference a channel-bundle and extract the associated transport element. - val channelBundleId = - colibriConferenceIQ.contents.flatMap { it.channels }.mapNotNull { it.channelBundleId }.firstOrNull() - ?: throw ColibriParsingException("No channel with a channel-bundle-id found") - val channelBundle = colibriConferenceIQ.getChannelBundle(channelBundleId) - ?: throw ColibriParsingException("No channel-bundle found with ID=$channelBundleId") - val transport = channelBundle.transport - ?: throw ColibriParsingException("channel-bundle has no transport") - - if (!transport.isRtcpMux) { - transport.addChildExtension(IceRtcpmuxPacketExtension()) - } - - return transport - } - - @Throws(ColibriParsingException::class) - private fun parseSources(colibriConferenceIQ: ColibriConferenceIQ): ConferenceSourceMap { - val sources = ConferenceSourceMap() - colibriConferenceIQ.contents.forEach { content -> - val mediaType = MediaType.parseString(content.name) - content.channels.forEach { channel -> - channel.sources.firstOrNull()?.let { sourcePacketExtension -> - sources.add( - ParticipantInviteRunnable.SSRC_OWNER_JVB, - EndpointSourceSet( - Source( - sourcePacketExtension.ssrc, - mediaType, // assuming either audio or video the source name: jvb-a0 or jvb-v0 - Source.nameForIdAndMediaType("jvb", mediaType, 0), - "mixedmslabel mixedlabel" + content.name + "0", - false - ) - ) - ) - } - } - } - - return sources - } - - /** - * Add sources for a participant. - */ - override fun addSources(participant: Participant, sources: ConferenceSourceMap) { - val bridgeSession = findBridgeSession(participant) - if (bridgeSession == null) { - logger.warn("No bridge session for $participant") - return - } - - val colibriChannelsInfo = participantInfoMap[participant]?.colibriChannels - if (colibriChannelsInfo == null) { - logger.warn("No colibriChannelsInfo for $participant") - return - } - - bridgeSession.colibriConference.updateSourcesInfo( - participant.sources, - colibriChannelsInfo - ) - - if (sources.isNotEmpty()) { - addSourcesToOcto(sources, bridgeSession) - } - } - - /** - * Get the set of relay IDs of bridges currently used, excluding [exclude]. - * TODO: don't expose. - */ - fun getAllRelays(exclude: String?): List = synchronized(syncRoot) { - return operationalBridges().mapNotNull { bridge -> bridge.bridge.relayId } - .filter { it != exclude }.toList() - } - - /** Remove sources for a [participant] */ - override fun removeSources( - participant: Participant, - sources: ConferenceSourceMap, - removeSourcesFromLocalBridge: Boolean - ) { - // TODO error handling - val bridgeSession = findBridgeSession(participant) - if (bridgeSession != null) { - if (removeSourcesFromLocalBridge) { - bridgeSession.colibriConference.updateSourcesInfo( - participant.sources, - participantInfoMap[participant]?.colibriChannels - ) - } - removeSourcesFromOcto(sources, bridgeSession) - } - } - - /** Get the ID of the bridge session for a [participant], or null if there's none. */ - override fun getBridgeSessionId(participant: Participant): String? = synchronized(syncRoot) { - return participantInfoMap[participant]?.colibriAllocation?.bridgeSessionId - } - - /** - * Handles the event of a set of bridges going down. Removes the associated bridge sessions. - * - * @return the set of participants which were on removed bridges (and now need to be re-invited). - */ - override fun removeBridges(bridges: Set): List { - val participantsToReinvite: MutableList = ArrayList() - var bridgesRemoved = 0 - - synchronized(syncRoot) { - bridges.forEach { bridge -> - val bridgeSession = findBridgeSession(bridge.jid) - if (bridgeSession != null) { - logger.error("One of our bridges failed: $bridge") - - // Note: the Jingle sessions are still alive, we'll just - // (try to) move to a new bridge and send transport-replace. - participantsToReinvite.addAll(bridgeSession.terminateAll()) - bridgesRemoved++ - bridgeSessions.remove(bridgeSession) - } - } - eventEmitter.fireEvent { bridgeCountChanged(bridgeSessions.size) } - eventEmitter.fireEvent { failedBridgesRemoved(bridgesRemoved) } - updateOctoRelays() - } - return participantsToReinvite.map { it.endpointId } - } - - /** The number of bridges currently used. */ - override val bridgeCount - get() = synchronized(syncRoot) { bridgeSessions.size } - - /** - * Mute a participant. - * @return true iff successful. - * TODO: improve error handling. - */ - override fun mute(participant: Participant, doMute: Boolean, mediaType: MediaType): Boolean { - val bridgeSession = findBridgeSession(participant) - if (bridgeSession == null) { - logger.error("No bridge session for $participant") - return false - } - - val participantInfo = participantInfoMap[participant] - if (participantInfo == null) { - logger.error("No participantInfo for $participant") - return false - } - - if (participantInfo.isMuted(mediaType) == doMute) { - // Nothing to do. - return true - } - - val participantChannels = participantInfo.colibriChannels - if (participantChannels == null) { - logger.error("No colibri channels for $participant") - return false - } - - return if (bridgeSession.colibriConference.muteParticipant(participantChannels, doMute, mediaType)) { - participantInfo.mute(doMute, mediaType) - true - } else { - logger.error("Failed to ${if (doMute) "mute" else "unmute"} $participant.") - false - } - } - - fun getSources(except: List): ConferenceSourceMap { - val sources = jitsiMeetConference.sources - val sourcesCopy = sources.copy() - sources.keys.forEach { sourceJid -> - // If the return value is used to create a new octo participant then - // we skip participants without a bridge session (which happens when - // a bridge fails & participant are re-invited). The reason why we - // do this is to avoid adding sources to the (newly created) octo - // participant from soon to be re-invited (and hence soon to be local) - // participants, causing a weird transition from octo participant to - // local participant in the new bridge. - if (except.any { it.mucJid == sourceJid } || (sourceJid != null && !hasColibriSession(sourceJid))) { - sourcesCopy.remove(sourceJid) - } - } - return sourcesCopy.unmodifiable - } - - private fun hasColibriSession(mucJid: Jid) = getParticipantInfo(mucJid).let { it != null && it.hasColibriSession } - private fun getParticipantInfo(mucJid: Jid) = - participantInfoMap.entries.firstOrNull { it.key.mucJid == mucJid }?.value - - /** Get the [ParticipantInfo] structure associated with a participant. This persists across re-invites. */ - fun getParticipantInfo(participant: Participant) = participantInfoMap[participant] - - override fun updateParticipant( - participant: Participant, - transport: IceUdpTransportPacketExtension?, - sources: ConferenceSourceMap?, - rtpDescriptions: Map? - ) = synchronized(syncRoot) { - - val participantInfo = participantInfoMap[participant] - ?: run { - logger.error("No ParticipantInfo for $participant") - return - } - - if (rtpDescriptions != null) { - participantInfo.rtpDescriptionMap = rtpDescriptions - } - - if (transport != null) { - val existingTransport = participantInfo.transport - if (existingTransport == null) { - participantInfo.transport = transport - } else { - TransportSignaling.mergeTransportExtension(existingTransport, transport) - } - } - - // The sources for the participant have been updated in the Participant object. - - val bridgeSession = findBridgeSession(participant) - if (bridgeSession == null) { - logger.warn("No bridge found for a participant: " + participant.chatMember.name) - return - } - - bridgeSession.updateColibriChannels(participant) - - // If we accepted any new sources from the participant, update the state of all remote bridges. - if (sources != null && sources.isNotEmpty()) { - addSourcesToOcto(sources, bridgeSession) - } - } - - private fun addBridgeSession(bridge: Bridge): BridgeSession = synchronized(syncRoot) { - val bridgeSession = BridgeSession( - jitsiMeetConference, - this, - colibriRequestCallback, - jicofoServices.xmppServices.serviceConnection.xmppConnection, - bridge, - gid, - logger - ) - bridgeSessions.add(bridgeSession) - eventEmitter.fireEvent { bridgeCountChanged(bridgeSessions.size) } - if (operationalBridges().count() >= 2) { - if (!jicofoServices.focusManager.isJicofoIdConfigured) { - logger.warn( - "Enabling Octo while the jicofo ID is not set. Configure a valid value [1-65535] by " + - "setting org.jitsi.jicofo.SHORT_ID in sip-communicator.properties or jicofo.octo.id in " + - "jicofo.conf. Future versions will require this for Octo." - ) - } - // Octo needs to be enabled (by inviting an Octo participant for each bridge), or if it is already enabled - // the list of relays for each bridge may need to be updated. - updateOctoRelays() - } - - return bridgeSession - } - - /** - * Expires the session with a particular bridge if it has no real (non-octo) - * participants left. - * @param bridgeSession the bridge session to expire. - */ - private fun maybeExpireBridgeSession(bridgeSession: BridgeSession) = synchronized(syncRoot) { - if (bridgeSession.participants.isEmpty()) { - bridgeSession.terminateAll() - bridgeSessions.remove(bridgeSession) - eventEmitter.fireEvent { bridgeCountChanged(bridgeSessions.size) } - updateOctoRelays() - } - } - - private fun getBridges(): Map = synchronized(syncRoot) { - return bridgeSessions.filter { !it.hasFailed }.associate { Pair(it.bridge, it.participants.size) } - } - - private fun selectBridge(participant: Participant): Bridge { - if (findBridgeSession(participant) != null) { - // This should never happen. - throw IllegalStateException("The participant already has a bridge:" + participant.chatMember.name) - } - - val version = jitsiMeetConference.bridgeVersion - if (version != null) { - logger.info("Selecting bridge. Conference is pinned to version \"$version\"") - } - - return jicofoServices.bridgeSelector.selectBridge(getBridges(), participant.chatMember.region, version) - ?: throw BridgeSelectionFailedException() - } - - private fun updateOctoRelays() { - if (!OctoConfig.config.enabled) { - return - } - - synchronized(syncRoot) { - val allRelays = getAllRelays(null) - - logger.info("Updating Octo relays: $allRelays") - operationalBridges().forEach { it.setRelays(allRelays) } - } - } - - private fun removeNonOperationalBridges() { - val nonOperationalBridges: Set = bridgeSessions - .filter { it.hasFailed || !it.bridge.isOperational } - .map { it.bridge }.toSet() - - if (nonOperationalBridges.isNotEmpty()) { - removeBridges(nonOperationalBridges) - } - } - - private fun findBridgeSession(jid: Jid) = synchronized(syncRoot) { - bridgeSessions.find { it.bridge.jid == jid } - } - private fun findBridgeSession(bridge: Bridge) = findBridgeSession(bridge.jid) - /** Return the bridge session for a specific [Participant], or null if there isn't one. */ - private fun findBridgeSession(participant: Participant) = synchronized(syncRoot) { - bridgeSessions.find { it.participants.contains(participant) } - } - - /** - * Update octo channels on all bridges except `except`, removing the specified set of `sources`. - * @param sources the sources to remove. - * @param except the bridge session which is not to be updated. - */ - private fun removeSourcesFromOcto(sources: ConferenceSourceMap, except: BridgeSession) = synchronized(syncRoot) { - operationalBridges() - .filter { it != except } - .forEach { it.removeSourcesFromOcto(sources) } - } - - /** - * Adds the specified sources and source groups to the Octo participants - * of all bridges except for `exclude`. - * @param except the bridge to which sources will not be added (i.e. the - * bridge to which the participant whose sources we are adding is - * connected). - * @param sources the sources to add. - */ - private fun addSourcesToOcto(sources: ConferenceSourceMap, except: BridgeSession) = synchronized(syncRoot) { - operationalBridges() - .filter { it != except } - .forEach { it.addSourcesToOcto(sources) } - } - - /** - * Get a stream of those bridges which are operational. - */ - private fun operationalBridges(): List = synchronized(syncRoot) { - bridgeSessions.filter { !it.hasFailed && it.bridge.isOperational } - } - - override val debugState = OrderedJsonObject() -} diff --git a/src/main/kotlin/org/jitsi/jicofo/conference/colibri/v1/ParticipantInfo.kt b/src/main/kotlin/org/jitsi/jicofo/conference/colibri/v1/ParticipantInfo.kt deleted file mode 100644 index 93e6968ef9..0000000000 --- a/src/main/kotlin/org/jitsi/jicofo/conference/colibri/v1/ParticipantInfo.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Jicofo, the Jitsi Conference Focus. - * - * Copyright @ 2021-Present 8x8, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jitsi.jicofo.conference.colibri.v1 - -import org.jitsi.jicofo.conference.colibri.ColibriAllocation -import org.jitsi.utils.MediaType -import org.jitsi.xmpp.extensions.colibri.ColibriConferenceIQ -import org.jitsi.xmpp.extensions.jingle.IceUdpTransportPacketExtension -import org.jitsi.xmpp.extensions.jingle.RtpDescriptionPacketExtension - -/** - * Additional structures associated with a [Participant] that may need to persist between [BridgeSession]s (when the - * participant moves from one bridge to another). - * Boris: I am not sure if all of these are actually needed. I'm moving them here in an attempt to isolate them to the - * "old colibri" code as opposed to [Participant] itself. - */ -class ParticipantInfo( - /** The map of the most recently received RTP description for each Colibri content. */ - var rtpDescriptionMap: Map? = null, - /** Whether this participant has an associated active [BridgeSession]? */ - var hasColibriSession: Boolean = true, - var transport: IceUdpTransportPacketExtension? = null, - var colibriChannels: ColibriConferenceIQ? = null, - private val mutedByMediaType: MutableMap = mutableMapOf(), - var colibriAllocation: ColibriAllocation? = null -) { - fun mute(doMute: Boolean, mediaType: MediaType) { - mutedByMediaType[mediaType] = doMute - } - fun isMuted(mediaType: MediaType) = mutedByMediaType[mediaType] ?: false -} diff --git a/src/main/kotlin/org/jitsi/jicofo/conference/colibri/v2/ColibriV2SessionManager.kt b/src/main/kotlin/org/jitsi/jicofo/conference/colibri/v2/ColibriV2SessionManager.kt index c3812e485b..dba916e6a5 100644 --- a/src/main/kotlin/org/jitsi/jicofo/conference/colibri/v2/ColibriV2SessionManager.kt +++ b/src/main/kotlin/org/jitsi/jicofo/conference/colibri/v2/ColibriV2SessionManager.kt @@ -43,7 +43,6 @@ import org.jitsi.xmpp.extensions.colibri2.Colibri2Error import org.jitsi.xmpp.extensions.colibri2.ConferenceModifiedIQ import org.jitsi.xmpp.extensions.jingle.ContentPacketExtension import org.jitsi.xmpp.extensions.jingle.IceUdpTransportPacketExtension -import org.jitsi.xmpp.extensions.jingle.RtpDescriptionPacketExtension import org.jivesoftware.smack.AbstractXMPPConnection import org.jivesoftware.smack.StanzaCollector import org.jivesoftware.smack.packet.ErrorIQ @@ -124,7 +123,6 @@ class ColibriV2SessionManager( clear() } - override fun removeParticipant(participant: Participant) = removeParticipants(listOf(participant)) override fun removeParticipants(participants: Collection) = synchronized(syncRoot) { participants.forEach { it.setInviteRunnable(null) } logger.debug { "Asked to remove participants: ${participants.map { it.endpointId}}" } @@ -174,28 +172,6 @@ class ColibriV2SessionManager( } } - /** - * We don't keep track of source-add/source-removes manually and simply take the updated sources from the - * participant object. - */ - override fun addSources(participant: Participant, sources: ConferenceSourceMap) = - updateParticipant(participant, sources = participant.sources) - - /** - * We don't keep track of source-add/source-removes manually and simply take the updated sources from the - * participant object. - */ - override fun removeSources( - participant: Participant, - sources: ConferenceSourceMap, - removeSourcesFromLocalBridge: Boolean - ) = doUpdateParticipant( - participant, - transport = null, - sources = participant.sources, - suppressLocalBridgeUpdate = !removeSourcesFromLocalBridge - ) - /** * TODO: Is it really necessary to wait for a response? */ @@ -270,8 +246,7 @@ class ColibriV2SessionManager( @Throws(ColibriAllocationFailedException::class) override fun allocate( participant: Participant, - contents: List, - reInvite: Boolean + contents: List ): ColibriAllocation { logger.info("Allocating for ${participant.endpointId}") val stanzaCollector: StanzaCollector @@ -452,24 +427,6 @@ class ColibriV2SessionManager( participant: Participant, transport: IceUdpTransportPacketExtension?, sources: ConferenceSourceMap?, - // This param is not used for colibri2 - rtpDescriptions: Map? - ) = doUpdateParticipant( - participant = participant, - transport = transport, - sources = sources, - suppressLocalBridgeUpdate = false - ) - - private fun doUpdateParticipant( - participant: Participant, - transport: IceUdpTransportPacketExtension?, - sources: ConferenceSourceMap?, - /** - * If this is `true`, the update will only be signaled to remote bridges. This is used to avoid sending - * an unnecessary "remove sources" message to the local bridge prior to the endpoint itself being expired. - * TODO: cleanup when colibri1 is removed - */ suppressLocalBridgeUpdate: Boolean ) = synchronized(syncRoot) { logger.info("Updating $participant with transport=$transport, sources=$sources") diff --git a/src/main/kotlin/org/jitsi/jicofo/conference/source/ConferenceSourceMap.kt b/src/main/kotlin/org/jitsi/jicofo/conference/source/ConferenceSourceMap.kt index f98dcb5c28..b6d91cf674 100644 --- a/src/main/kotlin/org/jitsi/jicofo/conference/source/ConferenceSourceMap.kt +++ b/src/main/kotlin/org/jitsi/jicofo/conference/source/ConferenceSourceMap.kt @@ -150,37 +150,16 @@ open class ConferenceSourceMap( return extensions } - /** - * Removes all [Source]s that have the [Source.injected] flag from this map. - * @returns the map - */ - open fun stripInjected() = synchronized(syncRoot) { strip(stripInjected = true) } - /** Use a kotlin map for easy pretty printing. Inefficient. */ override fun toString(): String = endpointSourceSets.toMap().toString() /** * Strip simulcast SSRCs from each entry in the map. Modifies the map in place. - * See also [EndpointSourceSet.stripSimulcast]. - */ - open fun stripSimulcast() = synchronized(syncRoot) { strip(stripSimulcast = true) } - - /** - * Strip simulcast and/or injected SSRCs from each entry in the map. Modifies the map in place. - * - * This is defined separately to improve performance because the two operations are often performed together. */ @JvmOverloads - open fun strip(stripSimulcast: Boolean = false, stripInjected: Boolean = false) = synchronized(syncRoot) { - // Nothing to strip - if (!stripSimulcast && !stripInjected) return this - + open fun stripSimulcast() = synchronized(syncRoot) { endpointSourceSets.forEach { (owner, sources) -> - val stripped = when { - stripSimulcast -> sources.stripSimulcast(stripInjected = stripInjected) - stripInjected -> sources.stripInjected() - else -> sources - } + val stripped = sources.stripSimulcast() if (stripped.isEmpty()) { endpointSourceSets.remove(owner) } else { @@ -244,21 +223,9 @@ class UnmodifiableConferenceSourceMap( override fun remove(owner: Jid?) = throw UnsupportedOperationException("remove() not supported in unmodifiable view") - override fun stripInjected() = - throw UnsupportedOperationException("removeInjected() not supported in unmodifiable view") - override fun stripSimulcast() = throw UnsupportedOperationException("stripSimulcast() not supported in unmodifiable view") - override fun strip(stripSimulcast: Boolean, stripInjected: Boolean) = - throw UnsupportedOperationException("strip() not supported in unmodifiable view") - override fun stripByMediaType(retain: Set) = throw UnsupportedOperationException("stripByMediaType() is not supported in unmodifiable view") } - -fun EndpointSourceSet.stripInjected() = EndpointSourceSet( - sources.filter { !it.injected }.toSet(), - // Just maintain the groups. We never use groups with injected SSRCs, and "injected" should go away at some point. - ssrcGroups -) diff --git a/src/main/kotlin/org/jitsi/jicofo/conference/source/EndpointSourceSet.kt b/src/main/kotlin/org/jitsi/jicofo/conference/source/EndpointSourceSet.kt index e2b2f3161c..a699310f10 100644 --- a/src/main/kotlin/org/jitsi/jicofo/conference/source/EndpointSourceSet.kt +++ b/src/main/kotlin/org/jitsi/jicofo/conference/source/EndpointSourceSet.kt @@ -257,7 +257,7 @@ operator fun EndpointSourceSet.plus(ssrcGroup: SsrcGroup) = * SSRCs that receivers of simulcast streams need to know about, i.e. that jitsi-videobridge uses that SSRC as the * target SSRC when rewriting streams. */ -fun EndpointSourceSet.stripSimulcast(stripInjected: Boolean = false): EndpointSourceSet { +fun EndpointSourceSet.stripSimulcast(): EndpointSourceSet { val groupsToRemove = mutableSetOf() val ssrcsToRemove = mutableSetOf() @@ -277,7 +277,7 @@ fun EndpointSourceSet.stripSimulcast(stripInjected: Boolean = false): EndpointSo } } return EndpointSourceSet( - sources.filter { !ssrcsToRemove.contains(it.ssrc) && !(stripInjected && it.injected) }.toSet(), + sources.filter { !ssrcsToRemove.contains(it.ssrc) }.toSet(), ssrcGroups - groupsToRemove ) } diff --git a/src/main/kotlin/org/jitsi/jicofo/conference/source/Source.kt b/src/main/kotlin/org/jitsi/jicofo/conference/source/Source.kt index 929df014ef..d5f23d6ec6 100644 --- a/src/main/kotlin/org/jitsi/jicofo/conference/source/Source.kt +++ b/src/main/kotlin/org/jitsi/jicofo/conference/source/Source.kt @@ -32,12 +32,7 @@ data class Source( /** Optional name */ val name: String? = null, /** Optional msid */ - val msid: String? = null, - /** - * An internal flag indicating that this source was locally generated by jicofo and isn't/shouldn't be signalled - * to endpoints. - */ - val injected: Boolean = false + val msid: String? = null ) { /** Create a [Source] from an XML extension. */ constructor(mediaType: MediaType, sourcePacketExtension: SourcePacketExtension) : this( @@ -45,8 +40,7 @@ data class Source( mediaType, sourcePacketExtension.name, sourcePacketExtension.getChildExtensionsOfType(ParameterPacketExtension::class.java) - .filter { it.name == "msid" }.map { it.value }.firstOrNull(), - sourcePacketExtension.isInjected + .filter { it.name == "msid" }.map { it.value }.firstOrNull() ) /** Serializes this [Source] to XML */ @@ -65,7 +59,6 @@ data class Source( if (encodeMsid && msid != null) { addChildExtension(ParameterPacketExtension("msid", msid)) } - isInjected = injected } /** @@ -92,7 +85,6 @@ data class Source( put("media_type", mediaType.toString()) put("name", name ?: "null") put("msid", msid ?: "null") - put("injected", injected) } companion object { diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 5dbd7d96cb..5867930039 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -169,10 +169,6 @@ jicofo { } } - colibri { - // Whether to use the new version of COLIBRI - enable-colibri2 = false - } conference { // Whether to automatically grant the 'owner' role to the first participant in the conference (and subsequently to // the next in line when the current owner leaves). @@ -181,10 +177,6 @@ jicofo { // How long to wait for the initial participant in a conference. initial-timeout = 15 seconds - // Whether jicofo should inject a random SSRC for endpoints which don't advertise any SSRCs. This is a temporary - // workaround for an issue with signaling endpoints for Octo. - inject-ssrc-for-recv-only-endpoints = true - // The maximum number of sources an endpoint is allowed to signal in a conferencee. max-ssrcs-per-user = 20 @@ -289,10 +281,6 @@ jicofo { // two MUST be in sync (otherwise bridges will crash because they won't know how to // deal with octo channels). enabled = false - - // An identifier of the Jicofo instance, used for the purpose of generating conference IDs unique across a set of - // Jicofo instances. Valid values are [1, 65535]. The value 0 is used when none is explicitly configured. - #id = 1234 } rest { diff --git a/src/test/java/mock/MockParticipant.java b/src/test/java/mock/MockParticipant.java index 4e93e44dba..2cd0aab1ca 100644 --- a/src/test/java/mock/MockParticipant.java +++ b/src/test/java/mock/MockParticipant.java @@ -478,7 +478,7 @@ public Jid getMyJid() private Source addLocalSSRC(String media, long ssrc) { - Source source = new Source(ssrc, MediaType.parseString(media), null, null, false); + Source source = new Source(ssrc, MediaType.parseString(media), null, null); localSSRCs.add(getMyJid(), new EndpointSourceSet(source)); diff --git a/src/test/java/mock/xmpp/colibri/AllocThreadingTestColibriConference.java b/src/test/java/mock/xmpp/colibri/AllocThreadingTestColibriConference.java deleted file mode 100644 index 8d7f15c380..0000000000 --- a/src/test/java/mock/xmpp/colibri/AllocThreadingTestColibriConference.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Jicofo, the Jitsi Conference Focus. - * - * Copyright @ 2015-Present 8x8, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package mock.xmpp.colibri; - -import edu.umd.cs.findbugs.annotations.*; -import org.jitsi.jicofo.conference.colibri.v1.*; -import org.jitsi.xmpp.extensions.colibri.*; - -import org.jivesoftware.smack.*; -import org.jivesoftware.smack.packet.*; - -import java.util.concurrent.*; - -/** - * Extended version of ColibriConferenceImpl that allows to block - * threads at certain points of channel allocation process. - * - * @author Pawel Domas - */ -public class AllocThreadingTestColibriConference - extends ColibriConferenceImpl -{ - - /** - * Stores endpoint name of conference creator. - */ - private String confCreator; - - /** - * Blocking queue used to put and acquire conference creator endpoint. - */ - private final BlockingQueue confCreatorQueue = new ArrayBlockingQueue<>(1); - - /** - * Indicates whether creator thread should be suspended before it sends it's - * request. Call {@link #resumeConferenceCreate()} to resume it's job. - */ - private boolean blockConferenceCreation; - - /** - * Lock used to block conference creator's thread before it sends "create" - * request. - */ - private final Object createConferenceSync = new Object(); - - /** - * The queue used to put and acquire endpoint names arriving on - * {@code ColibriConferenceImpl.ConferenceCreationSemaphore}. - * Used to verify if all running threads have reached the semaphore. - */ - private final BlockingQueue createConfSemaphoreQueue = new LinkedBlockingQueue<>(); - - /** - * Blocking queue used to put and acquire endpoints that have sent it's - * request packets. - */ - private final BlockingQueue requestsSentQueue = new LinkedBlockingQueue<>(); - - /** - * Indicates if threads should be blocked before response is received. - */ - private boolean blockResponseReceive; - - /** - * Lock used to stop threads before they get their response packet. - */ - private final Object blockResponseReceiveLock = new Object(); - - /** - * Blocking queue used to put and acquire endpoints that have received their - * response packets. Used to verify if all threads have received their - * response packets. - */ - private final BlockingQueue responseReceivedQueue = new LinkedBlockingQueue<>(); - - /** - * Creates new instance of ColibriConferenceImpl. - * - * @param connection XMPP connection object that wil be used by new - * instance. - */ - public AllocThreadingTestColibriConference(AbstractXMPPConnection connection) - { - super(connection); - } - - /** - * Sets whether or not conference creator thread should be blocked before - * before it manages to sent it's "create" request. - * - * @param block true to block creator thread or false to - * leave it alone. - */ - public void blockConferenceCreator(boolean block) - { - blockConferenceCreation = block; - } - - /** - * Releases conference create thread if it was blocked. - */ - public void resumeConferenceCreate() - { - synchronized (createConferenceSync) - { - blockConferenceCreation = false; - - createConferenceSync.notifyAll(); - } - } - - /** - * Tries to obtain the name of conference creator endpoint. - * - * @return the name of conference creator endpoint or null if we - * have failed to obtain within 5 seconds timeout. - * - * @throws InterruptedException if thread has been interrupted while waiting - */ - public String obtainConferenceCreator() - throws InterruptedException - { - return confCreatorQueue.poll(5, TimeUnit.SECONDS); - } - - @Override - protected boolean acquireCreateConferenceSemaphore(String endpointId) - throws ColibriException - { - createConfSemaphoreQueue.add(endpointId); - - boolean isCreator = super.acquireCreateConferenceSemaphore(endpointId); - - if (isCreator) - { - confCreator = endpointId; - confCreatorQueue.add(endpointId); - } - - return isCreator; - } - - public int allocRequestsSentCount() - { - return requestsSentQueue.size(); - } - - public void blockResponseReceive(boolean blockResponseReceive) - { - this.blockResponseReceive = blockResponseReceive; - } - - public String nextRequestSent(long timeoutSeconds) - throws InterruptedException - { - return requestsSentQueue.poll(timeoutSeconds, TimeUnit.SECONDS); - } - - public String nextResponseReceived(long timeoutSeconds) - throws InterruptedException - { - return responseReceivedQueue.poll(timeoutSeconds, TimeUnit.SECONDS); - } - - @SuppressFBWarnings("WA_NOT_IN_LOOP") - @Override - protected Stanza sendAllocRequest(String endpointId, - ColibriConferenceIQ request) - throws ColibriException - { - boolean isCreator = confCreator.equals(endpointId); - synchronized (createConferenceSync) - { - if (isCreator && blockConferenceCreation) - { - try - { - createConferenceSync.wait(); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - } - } - - requestsSentQueue.add(endpointId); - - Stanza response = super.sendAllocRequest(endpointId, request); - - synchronized (blockResponseReceiveLock) - { - if (blockResponseReceive && !isCreator) - { - try - { - blockResponseReceiveLock.wait(); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - } - } - - responseReceivedQueue.add(endpointId); - - return response; - } - - public void resumeResponses() - { - synchronized (blockResponseReceiveLock) - { - blockResponseReceive = false; - - blockResponseReceiveLock.notifyAll(); - } - } -} diff --git a/src/test/kotlin/org/jitsi/jicofo/OctoConfigTest.kt b/src/test/kotlin/org/jitsi/jicofo/OctoConfigTest.kt index 6d7ee92506..2f0948c549 100644 --- a/src/test/kotlin/org/jitsi/jicofo/OctoConfigTest.kt +++ b/src/test/kotlin/org/jitsi/jicofo/OctoConfigTest.kt @@ -39,15 +39,5 @@ class OctoConfigTest : ShouldSpec() { } } } - context("ID") { - context("default") { - OctoConfig.config.id shouldBe null - } - context("set") { - withNewConfig("jicofo.octo.id=1234") { - OctoConfig.config.id shouldBe 1234 - } - } - } } } diff --git a/src/test/kotlin/org/jitsi/jicofo/bridge/BridgeSelectorTest.kt b/src/test/kotlin/org/jitsi/jicofo/bridge/BridgeSelectorTest.kt index 2ee0acd021..19c852cc0d 100644 --- a/src/test/kotlin/org/jitsi/jicofo/bridge/BridgeSelectorTest.kt +++ b/src/test/kotlin/org/jitsi/jicofo/bridge/BridgeSelectorTest.kt @@ -142,7 +142,7 @@ class BridgeSelectorTest : ShouldSpec() { splitSelector.selectBridge(mapOf(jvb1 to 1, jvb2 to 2, jvb3 to 3), null) shouldBe jvb2 } - context(config = colibri2Config, "Colibri2 support") { + context("Colibri2 support") { val selector = BridgeSelector(clock) val jvb1 = selector.addJvbAddress(jid1).apply { setStats(stress = 0.2, colibri2 = false) } selector.selectBridge() shouldBe null @@ -209,5 +209,3 @@ private val splitConfig = """ $enableOctoConfig jicofo.bridge.selection-strategy=SplitBridgeSelectionStrategy """.trimIndent() - -private val colibri2Config = "jicofo.colibri.enable-colibri2=true" diff --git a/src/test/kotlin/org/jitsi/jicofo/conference/ParticipantInviteRunnableTest.kt b/src/test/kotlin/org/jitsi/jicofo/conference/ParticipantInviteRunnableTest.kt index d31a10f9c7..b0ef58b665 100644 --- a/src/test/kotlin/org/jitsi/jicofo/conference/ParticipantInviteRunnableTest.kt +++ b/src/test/kotlin/org/jitsi/jicofo/conference/ParticipantInviteRunnableTest.kt @@ -76,7 +76,7 @@ class ParticipantInviteRunnableTest : ShouldSpec({ ) ) val colibriSessionManager = mockk { - every { allocate(any(), any(), any()) } returns ColibriAllocation( + every { allocate(any(), any()) } returns ColibriAllocation( feedbackSources, IceUdpTransportPacketExtension(), null, diff --git a/src/test/kotlin/org/jitsi/jicofo/conference/source/ConferenceSourceMapTest.kt b/src/test/kotlin/org/jitsi/jicofo/conference/source/ConferenceSourceMapTest.kt index 1d456b30e6..31ac358b6e 100644 --- a/src/test/kotlin/org/jitsi/jicofo/conference/source/ConferenceSourceMapTest.kt +++ b/src/test/kotlin/org/jitsi/jicofo/conference/source/ConferenceSourceMapTest.kt @@ -43,7 +43,7 @@ class ConferenceSourceMapTest : ShouldSpec() { override fun isolationMode() = IsolationMode.InstancePerLeaf init { - val s7 = Source(7, MediaType.AUDIO, injected = true) + val s7 = Source(7, MediaType.AUDIO) val endpoint1SourceSet = EndpointSourceSet( setOf(s1, s2, s7), setOf( @@ -189,34 +189,6 @@ class ConferenceSourceMapTest : ShouldSpec() { it.getFirstChildOfType(SSRCInfoPacketExtension::class.java).owner shouldBe jid1 } } - context("removeInjected") { - context("With remaining") { - val conferenceSourceMap = ConferenceSourceMap( - jid1 to endpoint1SourceSet, - jid2 to endpoint2SourceSet - ).stripInjected() - - conferenceSourceMap.size shouldBe 2 - conferenceSourceMap[jid1] shouldBe EndpointSourceSet( - setOf( - Source(1, MediaType.VIDEO), - Source(2, MediaType.VIDEO), - ), - setOf( - SsrcGroup(SsrcGroupSemantics.Fid, listOf(1, 2)) - ) - ) - conferenceSourceMap[jid2] shouldBe endpoint2SourceSet - } - context("Without remaining") { - val conferenceSourceMap = ConferenceSourceMap( - jid1 to EndpointSourceSet( - setOf(Source(1, MediaType.AUDIO, injected = true)) - ) - ).stripInjected() - conferenceSourceMap.isEmpty() shouldBe true - } - } context("unmodifiable") { val conferenceSourceMap = ConferenceSourceMap( jid1 to endpoint1SourceSet, @@ -238,7 +210,7 @@ class ConferenceSourceMapTest : ShouldSpec() { } } context("Strip") { - val s7 = Source(7, MediaType.AUDIO, injected = true) + val s7 = Source(7, MediaType.AUDIO) val sourceSet = EndpointSourceSet( setOf(s1, s2, s3, s4, s5, s6, s7), setOf(sim, fid1, fid2, fid3) @@ -246,25 +218,11 @@ class ConferenceSourceMapTest : ShouldSpec() { val conferenceSourceMap = ConferenceSourceMap(jid1 to sourceSet, jid2 to e2sourceSet) // Assume EndpointSourceSet.stripSimulcast works correctly, tested above. - context("Nothing") { - conferenceSourceMap.strip(stripSimulcast = false, stripInjected = false) shouldBe - ConferenceSourceMap(jid1 to sourceSet, jid2 to e2sourceSet) - } context("Simulcast") { conferenceSourceMap.stripSimulcast() conferenceSourceMap[jid1] shouldBe sourceSet.stripSimulcast() conferenceSourceMap[jid2] shouldBe e2sourceSet.stripSimulcast() } - context("Injected") { - conferenceSourceMap.stripInjected() - conferenceSourceMap[jid1] shouldBe sourceSet.stripInjected() - conferenceSourceMap[jid2] shouldBe e2sourceSet.stripInjected() - } - context("Simulcast and injected") { - conferenceSourceMap.strip(stripSimulcast = true, stripInjected = true) - conferenceSourceMap[jid1] shouldBe sourceSet.stripSimulcast(stripInjected = true) - conferenceSourceMap[jid2] shouldBe e2sourceSet.stripSimulcast(stripInjected = true) - } } context("Compact JSON") { val endpointId1 = "bebebebe" diff --git a/src/test/kotlin/org/jitsi/jicofo/conference/source/EndpointSourceSetTest.kt b/src/test/kotlin/org/jitsi/jicofo/conference/source/EndpointSourceSetTest.kt index 2654c3cb15..be585d966b 100644 --- a/src/test/kotlin/org/jitsi/jicofo/conference/source/EndpointSourceSetTest.kt +++ b/src/test/kotlin/org/jitsi/jicofo/conference/source/EndpointSourceSetTest.kt @@ -66,61 +66,31 @@ class EndpointSourceSetTest : ShouldSpec() { audioSources shouldBe sourceSet.sources.filter { it.mediaType == AUDIO }.toSet() } context("Strip simulcast") { - val s8 = Source(8, VIDEO, injected = true) + val s8 = Source(8, VIDEO) context("Without RTX") { - context("stripSimulcast") { - EndpointSourceSet( - setOf(s1, s2, s3, s7, s8), - setOf(sim) - ).stripSimulcast() shouldBe EndpointSourceSet(setOf(s1, s7, s8)) - } - context("stripSimulcast and remove injected") { - EndpointSourceSet( - setOf(s1, s2, s3, s7, s8), - setOf(sim) - ).stripSimulcast(stripInjected = true) shouldBe EndpointSourceSet(setOf(s1, s7)) - } + EndpointSourceSet( + setOf(s1, s2, s3, s7, s8), + setOf(sim) + ).stripSimulcast() shouldBe EndpointSourceSet(setOf(s1, s7, s8)) } context("With multiple SIM groups") { - context("stripSimulcast") { - EndpointSourceSet( - sourceSet.sources + s8, - setOf( - sim, - SsrcGroup(SsrcGroupSemantics.Sim, listOf(4, 5, 6)) - ) - ).stripSimulcast() shouldBe EndpointSourceSet(setOf(s1, s4, s7, s8)) - } - context("stripSimulcast and remove injected") { - EndpointSourceSet( - sourceSet.sources + s8, - setOf( - sim, - SsrcGroup(SsrcGroupSemantics.Sim, listOf(4, 5, 6)) - ) - ).stripSimulcast(stripInjected = true) shouldBe EndpointSourceSet(setOf(s1, s4, s7)) - } + EndpointSourceSet( + sourceSet.sources + s8, + setOf( + sim, + SsrcGroup(SsrcGroupSemantics.Sim, listOf(4, 5, 6)) + ) + ).stripSimulcast() shouldBe EndpointSourceSet(setOf(s1, s4, s7, s8)) } context("With RTX") { - context("stripSimulcast") { - EndpointSourceSet( - sourceSet.sources + s8, - sourceSet.ssrcGroups - ).stripSimulcast() shouldBe EndpointSourceSet( - setOf(s1, s4, s7, s8), - setOf(fid1) - ) - } - context("stripSimulcast and remove injected") { - EndpointSourceSet( - sourceSet.sources + s8, - sourceSet.ssrcGroups - ).stripSimulcast(stripInjected = true) shouldBe EndpointSourceSet( - setOf(s1, s4, s7), - setOf(fid1) - ) - } + EndpointSourceSet( + sourceSet.sources + s8, + sourceSet.ssrcGroups + ).stripSimulcast() shouldBe EndpointSourceSet( + setOf(s1, s4, s7, s8), + setOf(fid1) + ) } context("Compact JSON") { // See the documentation of [EndpointSourceSet.compactJson] for the expected JSON format. diff --git a/src/test/kotlin/org/jitsi/jicofo/conference/source/SourceTest.kt b/src/test/kotlin/org/jitsi/jicofo/conference/source/SourceTest.kt index 9c4cc87d7c..20a81807b2 100644 --- a/src/test/kotlin/org/jitsi/jicofo/conference/source/SourceTest.kt +++ b/src/test/kotlin/org/jitsi/jicofo/conference/source/SourceTest.kt @@ -33,22 +33,20 @@ class SourceTest : ShouldSpec() { ssrc = 1 name = "name-1" addChildExtension(ParameterPacketExtension("msid", "msid")) - isInjected = true } Source(MediaType.VIDEO, packetExtension) shouldBe - Source(1, MediaType.VIDEO, name = "name-1", msid = "msid", injected = true) + Source(1, MediaType.VIDEO, name = "name-1", msid = "msid") } context("To XML") { val msidValue = "msid-value" val nameValue = "source-name-value" - val source = Source(1, MediaType.VIDEO, name = nameValue, msid = msidValue, injected = true) + val source = Source(1, MediaType.VIDEO, name = nameValue, msid = msidValue) val ownerJid = JidCreate.fullFrom("confname@conference.example.com/abcdabcd") val extension = source.toPacketExtension(owner = ownerJid) extension.ssrc shouldBe 1 extension.name shouldBe nameValue - extension.isInjected shouldBe true val parameters = extension.getChildExtensionsOfType(ParameterPacketExtension::class.java) parameters.filter { it.name == "msid" && it.value == msidValue }.size shouldBe 1