GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: stream_base.h Lines: 14 22 63.6 %
Date: 2022-08-06 04:16:36 Branches: 1 2 50.0 %

Line Branch Exec Source
1
#ifndef SRC_STREAM_BASE_H_
2
#define SRC_STREAM_BASE_H_
3
4
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5
6
#include "env.h"
7
#include "async_wrap.h"
8
#include "node.h"
9
#include "util.h"
10
11
#include "v8.h"
12
13
namespace node {
14
15
// Forward declarations
16
class Environment;
17
class ShutdownWrap;
18
class WriteWrap;
19
class StreamBase;
20
class StreamResource;
21
class ExternalReferenceRegistry;
22
23
struct StreamWriteResult {
24
  bool async;
25
  int err;
26
  WriteWrap* wrap;
27
  size_t bytes;
28
  BaseObjectPtr<AsyncWrap> wrap_obj;
29
};
30
31
using JSMethodFunction = void(const v8::FunctionCallbackInfo<v8::Value>& args);
32
33
class StreamReq {
34
 public:
35
  // The kSlot internal field here mirrors BaseObject::InternalFields::kSlot
36
  // here because instances derived from StreamReq will also derive from
37
  // BaseObject, and the slots are used for the identical purpose.
38
  enum InternalFields {
39
    kSlot = BaseObject::kSlot,
40
    kStreamReqField = BaseObject::kInternalFieldCount,
41
    kInternalFieldCount
42
  };
43
44
  inline explicit StreamReq(
45
      StreamBase* stream,
46
      v8::Local<v8::Object> req_wrap_obj);
47
48
31096
  virtual ~StreamReq() = default;
49
  virtual AsyncWrap* GetAsyncWrap() = 0;
50
  inline v8::Local<v8::Object> object();
51
52
  // TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
53
  // if there is a pending exception/termination.
54
  inline void Done(int status, const char* error_str = nullptr);
55
  inline void Dispose();
56
57
15473
  StreamBase* stream() const { return stream_; }
58
59
  static inline StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj);
60
61
  // Sets all internal fields of `req_wrap_obj` to `nullptr`.
62
  // This is what the `WriteWrap` and `ShutdownWrap` JS constructors do,
63
  // and what we use in C++ after creating these objects from their
64
  // v8::ObjectTemplates, to avoid the overhead of calling the
65
  // constructor explicitly.
66
  static inline void ResetObject(v8::Local<v8::Object> req_wrap_obj);
67
68
 protected:
69
  virtual void OnDone(int status) = 0;
70
71
  inline void AttachToObject(v8::Local<v8::Object> req_wrap_obj);
72
73
 private:
74
  StreamBase* const stream_;
75
};
76
77
class ShutdownWrap : public StreamReq {
78
 public:
79
  inline ShutdownWrap(
80
      StreamBase* stream,
81
      v8::Local<v8::Object> req_wrap_obj);
82
83
  static inline ShutdownWrap* FromObject(v8::Local<v8::Object> req_wrap_obj);
84
  template <typename T, bool kIsWeak>
85
  static inline ShutdownWrap* FromObject(
86
      const BaseObjectPtrImpl<T, kIsWeak>& base_obj);
87
88
  // Call stream()->EmitAfterShutdown() and dispose of this request wrap.
89
  void OnDone(int status) override;
90
};
91
92
class WriteWrap : public StreamReq {
93
 public:
94
  inline void SetBackingStore(std::unique_ptr<v8::BackingStore> bs);
95
96
  inline WriteWrap(
97
      StreamBase* stream,
98
      v8::Local<v8::Object> req_wrap_obj);
99
100
  static inline WriteWrap* FromObject(v8::Local<v8::Object> req_wrap_obj);
101
  template <typename T, bool kIsWeak>
102
  static inline WriteWrap* FromObject(
103
      const BaseObjectPtrImpl<T, kIsWeak>& base_obj);
104
105
  // Call stream()->EmitAfterWrite() and dispose of this request wrap.
106
  void OnDone(int status) override;
107
108
 private:
109
  std::unique_ptr<v8::BackingStore> backing_store_;
110
};
111
112
113
// This is the generic interface for objects that control Node.js' C++ streams.
114
// For example, the default `EmitToJSStreamListener` emits a stream's data
115
// as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream.
116
class StreamListener {
117
 public:
118
  virtual ~StreamListener();
119
120
  // This is called when a stream wants to allocate memory before
121
  // reading data into the freshly allocated buffer (i.e. it is always followed
122
  // by a `OnStreamRead()` call).
123
  // This memory may be statically or dynamically allocated; for example,
124
  // a protocol parser may want to read data into a static buffer if it knows
125
  // that all data is going to be fully handled during the next
126
  // `OnStreamRead()` call.
127
  // The returned buffer does not need to contain `suggested_size` bytes.
128
  // The default implementation of this method returns a buffer that has exactly
129
  // the suggested size and is allocated using malloc().
130
  // It is not valid to return a zero-length buffer from this method.
131
  // It is not guaranteed that the corresponding `OnStreamRead()` call
132
  // happens in the same event loop turn as this call.
133
  virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0;
134
135
  // `OnStreamRead()` is called when data is available on the socket and has
136
  // been read into the buffer provided by `OnStreamAlloc()`.
137
  // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer
138
  // with base nullptr in case of an error.
139
  // `nread` is the number of read bytes (which is at most the buffer length),
140
  // or, if negative, a libuv error code.
141
  virtual void OnStreamRead(ssize_t nread,
142
                            const uv_buf_t& buf) = 0;
143
144
  // This is called once a write has finished. `status` may be 0 or,
145
  // if negative, a libuv error code.
146
  // By default, this is simply passed on to the previous listener
147
  // (and raises an assertion if there is none).
148
  virtual void OnStreamAfterWrite(WriteWrap* w, int status);
149
150
  // This is called once a shutdown has finished. `status` may be 0 or,
151
  // if negative, a libuv error code.
152
  // By default, this is simply passed on to the previous listener
153
  // (and raises an assertion if there is none).
154
  virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status);
155
156
  // This is called by the stream if it determines that it wants more data
157
  // to be written to it. Not all streams support this.
158
  // This callback will not be called as long as there are active writes.
159
  // It is not supported by all streams; `stream->HasWantsWrite()` returns
160
  // true if it is supported by a stream.
161
2515
  virtual void OnStreamWantsWrite(size_t suggested_size) {}
162
163
  // This is called immediately before the stream is destroyed.
164
619
  virtual void OnStreamDestroy() {}
165
166
  // The stream this is currently associated with, or nullptr if there is none.
167
53881
  StreamResource* stream() const { return stream_; }
168
169
 protected:
170
  // Pass along a read error to the `StreamListener` instance that was active
171
  // before this one. For example, a protocol parser does not care about read
172
  // errors and may instead want to let the original handler
173
  // (e.g. the JS handler) take care of the situation.
174
  inline void PassReadErrorToPreviousListener(ssize_t nread);
175
176
  StreamResource* stream_ = nullptr;
177
  StreamListener* previous_listener_ = nullptr;
178
179
  friend class StreamResource;
180
};
181
182
183
// An (incomplete) stream listener class that calls the `.oncomplete()`
184
// method of the JS objects associated with the wrap objects.
185
class ReportWritesToJSStreamListener : public StreamListener {
186
 public:
187
  void OnStreamAfterWrite(WriteWrap* w, int status) override;
188
  void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
189
190
 private:
191
  void OnStreamAfterReqFinished(StreamReq* req_wrap, int status);
192
};
193
194
195
// A default emitter that just pushes data chunks as Buffer instances to
196
// JS land via the handle’s .ondata method.
197
class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
198
 public:
199
  uv_buf_t OnStreamAlloc(size_t suggested_size) override;
200
  void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
201
};
202
203
204
// An alternative listener that uses a custom, user-provided buffer
205
// for reading data.
206
class CustomBufferJSListener : public ReportWritesToJSStreamListener {
207
 public:
208
  uv_buf_t OnStreamAlloc(size_t suggested_size) override;
209
  void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
210
12
  void OnStreamDestroy() override { delete this; }
211
212
12
  explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {}
213
214
 private:
215
  uv_buf_t buffer_;
216
};
217
218
219
// A generic stream, comparable to JS land’s `Duplex` streams.
220
// A stream is always controlled through one `StreamListener` instance.
221
class StreamResource {
222
 public:
223
  virtual ~StreamResource();
224
225
  // These need to be implemented on the readable side of this stream:
226
227
  // Start reading from the underlying resource. This is called by the consumer
228
  // when more data is desired. Use `EmitAlloc()` and `EmitData()` to
229
  // pass data along to the consumer.
230
  virtual int ReadStart() = 0;
231
  // Stop reading from the underlying resource. This is called by the
232
  // consumer when its buffers are full and no more data can be handled.
233
  virtual int ReadStop() = 0;
234
235
  // These need to be implemented on the writable side of this stream:
236
  // All of these methods may return an error code synchronously.
237
  // In that case, the finish callback should *not* be called.
238
239
  // Perform a shutdown operation, and either call req_wrap->Done() when
240
  // finished and return 0, return 1 for synchronous success, or
241
  // a libuv error code for synchronous failures.
242
  virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
243
  // Try to write as much data as possible synchronously, and modify
244
  // `*bufs` and `*count` accordingly. This is a no-op by default.
245
  // Return 0 for success and a libuv error code for failures.
246
  virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
247
  // Initiate a write of data. If the write completes synchronously, return 0 on
248
  // success (with bufs modified to indicate how much data was consumed) or a
249
  // libuv error code on failure. If the write will complete asynchronously,
250
  // return 0. When the write completes asynchronously, call req_wrap->Done()
251
  // with 0 on success (with bufs modified to indicate how much data was
252
  // consumed) or a libuv error code on failure. Do not call req_wrap->Done() if
253
  // the write completes synchronously, that is, it should never be called
254
  // before DoWrite() has returned.
255
  virtual int DoWrite(WriteWrap* w,
256
                      uv_buf_t* bufs,
257
                      size_t count,
258
                      uv_stream_t* send_handle) = 0;
259
260
  // Returns true if the stream supports the `OnStreamWantsWrite()` interface.
261
  virtual bool HasWantsWrite() const { return false; }
262
263
  // Optionally, this may provide an error message to be used for
264
  // failing writes.
265
  virtual const char* Error() const;
266
  // Clear the current error (i.e. that would be returned by Error()).
267
  virtual void ClearError();
268
269
  // Transfer ownership of this stream to `listener`. The previous listener
270
  // will not receive any more callbacks while the new listener was active.
271
  inline void PushStreamListener(StreamListener* listener);
272
  // Remove a listener, and, if this was the currently active one,
273
  // transfer ownership back to the previous listener.
274
  inline void RemoveStreamListener(StreamListener* listener);
275
276
 protected:
277
  // Call the current listener's OnStreamAlloc() method.
278
  inline uv_buf_t EmitAlloc(size_t suggested_size);
279
  // Call the current listener's OnStreamRead() method and update the
280
  // stream's read byte counter.
281
  inline void EmitRead(
282
      ssize_t nread,
283
14137
      const uv_buf_t& buf = uv_buf_init(nullptr, 0));
284
  // Call the current listener's OnStreamAfterWrite() method.
285
  inline void EmitAfterWrite(WriteWrap* w, int status);
286
  // Call the current listener's OnStreamAfterShutdown() method.
287
  inline void EmitAfterShutdown(ShutdownWrap* w, int status);
288
  // Call the current listener's OnStreamWantsWrite() method.
289
  inline void EmitWantsWrite(size_t suggested_size);
290
291
  StreamListener* listener_ = nullptr;
292
  uint64_t bytes_read_ = 0;
293
  uint64_t bytes_written_ = 0;
294
295
  friend class StreamListener;
296
};
297
298
299
class StreamBase : public StreamResource {
300
 public:
301
  // The kSlot field here mirrors that of BaseObject::InternalFields::kSlot
302
  // because instances deriving from StreamBase generally also derived from
303
  // BaseObject (it's possible for it not to, however).
304
  enum InternalFields {
305
    kSlot = BaseObject::kSlot,
306
    kStreamBaseField = BaseObject::kInternalFieldCount,
307
    kOnReadFunctionField,
308
    kInternalFieldCount
309
  };
310
311
  static void AddMethods(Environment* env,
312
                         v8::Local<v8::FunctionTemplate> target);
313
  static void RegisterExternalReferences(ExternalReferenceRegistry* registry);
314
  virtual bool IsAlive() = 0;
315
  virtual bool IsClosing() = 0;
316
  virtual bool IsIPCPipe();
317
  virtual int GetFD();
318
319
  enum StreamBaseJSChecks { DONT_SKIP_NREAD_CHECKS, SKIP_NREAD_CHECKS };
320
321
  v8::MaybeLocal<v8::Value> CallJSOnreadMethod(
322
      ssize_t nread,
323
      v8::Local<v8::ArrayBuffer> ab,
324
      size_t offset = 0,
325
      StreamBaseJSChecks checks = DONT_SKIP_NREAD_CHECKS);
326
327
  // This is named `stream_env` to avoid name clashes, because a lot of
328
  // subclasses are also `BaseObject`s.
329
246846
  Environment* stream_env() const { return env_; }
330
331
  // TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
332
  // if there is a pending exception/termination.
333
  // Shut down the current stream. This request can use an existing
334
  // ShutdownWrap object (that was created in JS), or a new one will be created.
335
  // Returns 1 in case of a synchronous completion, 0 in case of asynchronous
336
  // completion, and a libuv error case in case of synchronous failure.
337
  inline int Shutdown(
338
      v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
339
340
  // TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
341
  // if there is a pending exception/termination.
342
  // Write data to the current stream. This request can use an existing
343
  // WriteWrap object (that was created in JS), or a new one will be created.
344
  // This will first try to write synchronously using `DoTryWrite()`, then
345
  // asynchronously using `DoWrite()`.
346
  // If the return value indicates a synchronous completion, no callback will
347
  // be invoked.
348
  inline StreamWriteResult Write(
349
      uv_buf_t* bufs,
350
      size_t count,
351
      uv_stream_t* send_handle = nullptr,
352
      v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
353
354
  // These can be overridden by subclasses to get more specific wrap instances.
355
  // For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap
356
  // (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like
357
  // an associated libuv request.
358
  virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object);
359
  virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object);
360
361
  // One of these must be implemented
362
  virtual AsyncWrap* GetAsyncWrap() = 0;
363
  virtual v8::Local<v8::Object> GetObject();
364
365
  static inline StreamBase* FromObject(v8::Local<v8::Object> obj);
366
367
 protected:
368
  inline explicit StreamBase(Environment* env);
369
370
  // JS Methods
371
  int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args);
372
  int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args);
373
  int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
374
  int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
375
  int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
376
  template <enum encoding enc>
377
  int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
378
  int UseUserBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
379
380
  static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
381
  static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args);
382
  static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args);
383
  static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args);
384
  inline void AttachToObject(v8::Local<v8::Object> obj);
385
386
  template <int (StreamBase::*Method)(
387
      const v8::FunctionCallbackInfo<v8::Value>& args)>
388
  static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
389
390
  // Internal, used only in StreamBase methods + env.cc.
391
  enum StreamBaseStateFields {
392
    kReadBytesOrError,
393
    kArrayBufferOffset,
394
    kBytesWritten,
395
    kLastWriteWasAsync,
396
    kNumStreamBaseStateFields
397
  };
398
399
 private:
400
  Environment* env_;
401
  EmitToJSStreamListener default_listener_;
402
403
  void SetWriteResult(const StreamWriteResult& res);
404
  static void AddMethod(Environment* env,
405
                        v8::Local<v8::Signature> sig,
406
                        enum v8::PropertyAttribute attributes,
407
                        v8::Local<v8::FunctionTemplate> t,
408
                        JSMethodFunction* stream_method,
409
                        v8::Local<v8::String> str);
410
411
  friend class WriteWrap;
412
  friend class ShutdownWrap;
413
  friend class Environment;  // For kNumStreamBaseStateFields.
414
};
415
416
417
// These are helpers for creating `ShutdownWrap`/`WriteWrap` instances.
418
// `OtherBase` must have a constructor that matches the `AsyncWrap`
419
// constructors’s (Environment*, Local<Object>, AsyncWrap::Provider) signature
420
// and be a subclass of `AsyncWrap`.
421
template <typename OtherBase>
422
class SimpleShutdownWrap : public ShutdownWrap, public OtherBase {
423
 public:
424
  SimpleShutdownWrap(StreamBase* stream,
425
                     v8::Local<v8::Object> req_wrap_obj);
426
427
37427
  AsyncWrap* GetAsyncWrap() override { return this; }
428
429
7
  SET_NO_MEMORY_INFO()
430
15
  SET_MEMORY_INFO_NAME(SimpleShutdownWrap)
431
7
  SET_SELF_SIZE(SimpleShutdownWrap)
432
433
  bool IsNotIndicativeOfMemoryLeakAtExit() const override {
434
    return OtherBase::IsNotIndicativeOfMemoryLeakAtExit();
435
  }
436
};
437
438
template <typename OtherBase>
439
class SimpleWriteWrap : public WriteWrap, public OtherBase {
440
 public:
441
  SimpleWriteWrap(StreamBase* stream,
442
                  v8::Local<v8::Object> req_wrap_obj);
443
444
45400
  AsyncWrap* GetAsyncWrap() override { return this; }
445
446
  SET_NO_MEMORY_INFO()
447
  SET_MEMORY_INFO_NAME(SimpleWriteWrap)
448
  SET_SELF_SIZE(SimpleWriteWrap)
449
450
  bool IsNotIndicativeOfMemoryLeakAtExit() const override {
451
    return OtherBase::IsNotIndicativeOfMemoryLeakAtExit();
452
  }
453
};
454
455
}  // namespace node
456
457
#endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
458
459
#endif  // SRC_STREAM_BASE_H_