OpenShot Library | libopenshot-audio  0.1.9
juce_InterprocessConnection.cpp
1 /*
2  ==============================================================================
3 
4  This file is part of the JUCE library.
5  Copyright (c) 2017 - ROLI Ltd.
6 
7  JUCE is an open source library subject to commercial or open-source
8  licensing.
9 
10  The code included in this file is provided under the terms of the ISC license
11  http://www.isc.org/downloads/software-support-policy/isc-license. Permission
12  To use, copy, modify, and/or distribute this software for any purpose with or
13  without fee is hereby granted provided that the above copyright notice and
14  this permission notice appear in all copies.
15 
16  JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
17  EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
18  DISCLAIMED.
19 
20  ==============================================================================
21 */
22 
23 namespace juce
24 {
25 
27 {
28  ConnectionThread (InterprocessConnection& c) : Thread ("JUCE IPC"), owner (c) {}
29  void run() override { owner.runThread(); }
30 
32  JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionThread)
33 };
34 
35 //==============================================================================
36 InterprocessConnection::InterprocessConnection (bool callbacksOnMessageThread, uint32 magicMessageHeaderNumber)
37  : useMessageThread (callbacksOnMessageThread),
38  magicMessageHeader (magicMessageHeaderNumber)
39 {
40  thread.reset (new ConnectionThread (*this));
41 }
42 
44 {
45  callbackConnectionState = false;
46  disconnect();
47  masterReference.clear();
48  thread.reset();
49 }
50 
51 //==============================================================================
53  int portNumber, int timeOutMillisecs)
54 {
55  disconnect();
56 
57  const ScopedLock sl (pipeAndSocketLock);
58  socket.reset (new StreamingSocket());
59 
60  if (socket->connect (hostName, portNumber, timeOutMillisecs))
61  {
62  threadIsRunning = true;
63  connectionMadeInt();
64  thread->startThread();
65  return true;
66  }
67 
68  socket.reset();
69  return false;
70 }
71 
72 bool InterprocessConnection::connectToPipe (const String& pipeName, int timeoutMs)
73 {
74  disconnect();
75 
76  std::unique_ptr<NamedPipe> newPipe (new NamedPipe());
77 
78  if (newPipe->openExisting (pipeName))
79  {
80  const ScopedLock sl (pipeAndSocketLock);
81  pipeReceiveMessageTimeout = timeoutMs;
82  initialiseWithPipe (newPipe.release());
83  return true;
84  }
85 
86  return false;
87 }
88 
89 bool InterprocessConnection::createPipe (const String& pipeName, int timeoutMs, bool mustNotExist)
90 {
91  disconnect();
92 
93  std::unique_ptr<NamedPipe> newPipe (new NamedPipe());
94 
95  if (newPipe->createNewPipe (pipeName, mustNotExist))
96  {
97  const ScopedLock sl (pipeAndSocketLock);
98  pipeReceiveMessageTimeout = timeoutMs;
99  initialiseWithPipe (newPipe.release());
100  return true;
101  }
102 
103  return false;
104 }
105 
107 {
108  thread->signalThreadShouldExit();
109 
110  {
111  const ScopedLock sl (pipeAndSocketLock);
112  if (socket != nullptr) socket->close();
113  if (pipe != nullptr) pipe->close();
114  }
115 
116  thread->stopThread (4000);
117  deletePipeAndSocket();
118  connectionLostInt();
119 }
120 
121 void InterprocessConnection::deletePipeAndSocket()
122 {
123  const ScopedLock sl (pipeAndSocketLock);
124  socket.reset();
125  pipe.reset();
126 }
127 
129 {
130  const ScopedLock sl (pipeAndSocketLock);
131 
132  return ((socket != nullptr && socket->isConnected())
133  || (pipe != nullptr && pipe->isOpen()))
134  && threadIsRunning;
135 }
136 
138 {
139  {
140  const ScopedLock sl (pipeAndSocketLock);
141 
142  if (pipe == nullptr && socket == nullptr)
143  return {};
144 
145  if (socket != nullptr && ! socket->isLocal())
146  return socket->getHostName();
147  }
148 
149  return IPAddress::local().toString();
150 }
151 
152 //==============================================================================
154 {
155  uint32 messageHeader[2] = { ByteOrder::swapIfBigEndian (magicMessageHeader),
156  ByteOrder::swapIfBigEndian ((uint32) message.getSize()) };
157 
158  MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
159  messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
160  messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
161 
162  return writeData (messageData.getData(), (int) messageData.getSize()) == (int) messageData.getSize();
163 }
164 
165 int InterprocessConnection::writeData (void* data, int dataSize)
166 {
167  const ScopedLock sl (pipeAndSocketLock);
168 
169  if (socket != nullptr)
170  return socket->write (data, dataSize);
171 
172  if (pipe != nullptr)
173  return pipe->write (data, dataSize, pipeReceiveMessageTimeout);
174 
175  return 0;
176 }
177 
178 //==============================================================================
179 void InterprocessConnection::initialiseWithSocket (StreamingSocket* newSocket)
180 {
181  jassert (socket == nullptr && pipe == nullptr);
182  socket.reset (newSocket);
183 
184  threadIsRunning = true;
185  connectionMadeInt();
186  thread->startThread();
187 }
188 
189 void InterprocessConnection::initialiseWithPipe (NamedPipe* newPipe)
190 {
191  jassert (socket == nullptr && pipe == nullptr);
192  pipe.reset (newPipe);
193 
194  threadIsRunning = true;
195  connectionMadeInt();
196  thread->startThread();
197 }
198 
199 //==============================================================================
201 {
202  ConnectionStateMessage (InterprocessConnection* ipc, bool connected) noexcept
203  : owner (ipc), connectionMade (connected)
204  {}
205 
206  void messageCallback() override
207  {
208  if (auto* ipc = owner.get())
209  {
210  if (connectionMade)
211  ipc->connectionMade();
212  else
213  ipc->connectionLost();
214  }
215  }
216 
218  bool connectionMade;
219 
220  JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionStateMessage)
221 };
222 
223 void InterprocessConnection::connectionMadeInt()
224 {
225  if (! callbackConnectionState)
226  {
227  callbackConnectionState = true;
228 
229  if (useMessageThread)
230  (new ConnectionStateMessage (this, true))->post();
231  else
232  connectionMade();
233  }
234 }
235 
236 void InterprocessConnection::connectionLostInt()
237 {
238  if (callbackConnectionState)
239  {
240  callbackConnectionState = false;
241 
242  if (useMessageThread)
243  (new ConnectionStateMessage (this, false))->post();
244  else
245  connectionLost();
246  }
247 }
248 
250 {
252  : owner (ipc), data (d)
253  {}
254 
255  void messageCallback() override
256  {
257  if (auto* ipc = owner.get())
258  ipc->messageReceived (data);
259  }
260 
262  MemoryBlock data;
263 };
264 
265 void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
266 {
267  jassert (callbackConnectionState);
268 
269  if (useMessageThread)
270  (new DataDeliveryMessage (this, data))->post();
271  else
272  messageReceived (data);
273 }
274 
275 //==============================================================================
276 int InterprocessConnection::readData (void* data, int num)
277 {
278  if (socket != nullptr)
279  return socket->read (data, num, true);
280 
281  if (pipe != nullptr)
282  return pipe->read (data, num, pipeReceiveMessageTimeout);
283 
284  jassertfalse;
285  return -1;
286 }
287 
288 bool InterprocessConnection::readNextMessage()
289 {
290  uint32 messageHeader[2];
291  auto bytes = readData (messageHeader, sizeof (messageHeader));
292 
293  if (bytes == sizeof (messageHeader)
294  && ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
295  {
296  auto bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
297 
298  if (bytesInMessage > 0)
299  {
300  MemoryBlock messageData ((size_t) bytesInMessage, true);
301  int bytesRead = 0;
302 
303  while (bytesInMessage > 0)
304  {
305  if (thread->threadShouldExit())
306  return false;
307 
308  auto numThisTime = jmin (bytesInMessage, 65536);
309  auto bytesIn = readData (addBytesToPointer (messageData.getData(), bytesRead), numThisTime);
310 
311  if (bytesIn <= 0)
312  break;
313 
314  bytesRead += bytesIn;
315  bytesInMessage -= bytesIn;
316  }
317 
318  if (bytesRead >= 0)
319  deliverDataInt (messageData);
320  }
321 
322  return true;
323  }
324 
325  if (bytes < 0)
326  {
327  if (socket != nullptr)
328  deletePipeAndSocket();
329 
330  connectionLostInt();
331  }
332 
333  return false;
334 }
335 
336 void InterprocessConnection::runThread()
337 {
338  while (! thread->threadShouldExit())
339  {
340  if (socket != nullptr)
341  {
342  auto ready = socket->waitUntilReady (true, 100);
343 
344  if (ready < 0)
345  {
346  deletePipeAndSocket();
347  connectionLostInt();
348  break;
349  }
350 
351  if (ready == 0)
352  {
353  thread->wait (1);
354  continue;
355  }
356  }
357  else if (pipe != nullptr)
358  {
359  if (! pipe->isOpen())
360  {
361  deletePipeAndSocket();
362  connectionLostInt();
363  break;
364  }
365  }
366  else
367  {
368  break;
369  }
370 
371  if (thread->threadShouldExit() || ! readNextMessage())
372  break;
373  }
374 
375  threadIsRunning = false;
376 }
377 
378 } // namespace juce
String toString() const
Returns a dot- or colon-separated string in the form "1.2.3.4" (IPv4) or "1:2:3:4:5:6:7:8" (IPv6)...
size_t getSize() const noexcept
Returns the block&#39;s current allocated size, in bytes.
static Type swapIfBigEndian(Type value) noexcept
Swaps the byte order of a signed or unsigned integer if the CPU is big-endian.
bool connectToSocket(const String &hostName, int portNumber, int timeOutMillisecs)
Tries to connect this object to a socket.
Thread(const String &threadName, size_t threadStackSize=0)
Creates a thread.
Definition: juce_Thread.cpp:26
void copyFrom(const void *srcData, int destinationOffset, size_t numBytes) noexcept
Copies data into this MemoryBlock from a memory address.
The JUCE String class!
Definition: juce_String.h:42
virtual void messageReceived(const MemoryBlock &message)=0
Called when a message arrives.
void * getData() const noexcept
Returns a void pointer to the data.
A cross-process pipe that can have data written to and read from it.
A wrapper for a streaming (TCP) socket.
Definition: juce_Socket.h:41
InterprocessConnection(bool callbacksOnMessageThread=true, uint32 magicMessageHeaderNumber=0xf2b49e2c)
Creates a connection.
bool createPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs, bool mustNotExist=false)
Tries to create a new pipe for other processes to connect to.
Encapsulates a thread.
Definition: juce_Thread.h:46
virtual void connectionLost()=0
Called when the connection is broken.
bool connectToPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs)
Tries to connect the object to an existing named pipe.
bool sendMessage(const MemoryBlock &message)
Tries to send a message to the other end of this connection.
virtual void connectionMade()=0
Called when the connection is first connected.
bool isConnected() const
True if a socket or pipe is currently active.
This class acts as a pointer which will automatically become null if the object to which it points is...
void disconnect()
Disconnects and closes any currently-open sockets or pipes.
static IPAddress local(bool IPv6=false) noexcept
Returns an IPv4 or IPv6 address meaning "localhost", equivalent to 127.0.0.1 (IPv4) or ::1 (IPv6) ...
String getConnectedHostName() const
Returns the name of the machine at the other end of this connection.
A class to hold a resizable block of raw data.
Automatically locks and unlocks a mutex object.
The base class for objects that can be sent to a MessageListener.
Definition: juce_Message.h:47
Manages a simple two-way messaging connection to another process, using either a socket or a named pi...
void run() override
Must be implemented to perform the thread&#39;s actual code.
Internal class used as the base class for all message objects.