GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_base.h Lines: 25 33 75.8 %
Date: 2021-06-04 04:12:13 Branches: 1 2 50.0 %

Line Branch Exec Source
1
#ifndef SRC_STREAM_BASE_H_
2
#define SRC_STREAM_BASE_H_
3
4
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5
6
#include "env.h"
7
#include "allocated_buffer.h"
8
#include "async_wrap.h"
9
#include "node.h"
10
#include "util.h"
11
12
#include "v8.h"
13
14
namespace node {
15
16
// Forward declarations
17
class Environment;
18
class ShutdownWrap;
19
class WriteWrap;
20
class StreamBase;
21
class StreamResource;
22
class ExternalReferenceRegistry;
23
24
125629
struct StreamWriteResult {
25
  bool async;
26
  int err;
27
  WriteWrap* wrap;
28
  size_t bytes;
29
  BaseObjectPtr<AsyncWrap> wrap_obj;
30
};
31
32
using JSMethodFunction = void(const v8::FunctionCallbackInfo<v8::Value>& args);
33
34
class StreamReq {
35
 public:
36
  // The kSlot internal field here mirrors BaseObject::InternalFields::kSlot
37
  // here because instances derived from StreamReq will also derive from
38
  // BaseObject, and the slots are used for the identical purpose.
39
  enum InternalFields {
40
    kSlot = BaseObject::kSlot,
41
    kStreamReqField = BaseObject::kInternalFieldCount,
42
    kInternalFieldCount
43
  };
44
45
  inline explicit StreamReq(
46
      StreamBase* stream,
47
      v8::Local<v8::Object> req_wrap_obj);
48
49
15284
  virtual ~StreamReq() = default;
50
  virtual AsyncWrap* GetAsyncWrap() = 0;
51
  inline v8::Local<v8::Object> object();
52
53
  inline void Done(int status, const char* error_str = nullptr);
54
  inline void Dispose();
55
56
15345
  StreamBase* stream() const { return stream_; }
57
58
  static inline StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj);
59
60
  // Sets all internal fields of `req_wrap_obj` to `nullptr`.
61
  // This is what the `WriteWrap` and `ShutdownWrap` JS constructors do,
62
  // and what we use in C++ after creating these objects from their
63
  // v8::ObjectTemplates, to avoid the overhead of calling the
64
  // constructor explicitly.
65
  static inline void ResetObject(v8::Local<v8::Object> req_wrap_obj);
66
67
 protected:
68
  virtual void OnDone(int status) = 0;
69
70
  inline void AttachToObject(v8::Local<v8::Object> req_wrap_obj);
71
72
 private:
73
  StreamBase* const stream_;
74
};
75
76
7485
class ShutdownWrap : public StreamReq {
77
 public:
78
  inline ShutdownWrap(
79
      StreamBase* stream,
80
      v8::Local<v8::Object> req_wrap_obj);
81
82
  static inline ShutdownWrap* FromObject(v8::Local<v8::Object> req_wrap_obj);
83
  template <typename T, bool kIsWeak>
84
  static inline ShutdownWrap* FromObject(
85
      const BaseObjectPtrImpl<T, kIsWeak>& base_obj);
86
87
  // Call stream()->EmitAfterShutdown() and dispose of this request wrap.
88
  void OnDone(int status) override;
89
};
90
91
7799
class WriteWrap : public StreamReq {
92
 public:
93
  inline void SetAllocatedStorage(AllocatedBuffer&& storage);
94
95
  inline WriteWrap(
96
      StreamBase* stream,
97
      v8::Local<v8::Object> req_wrap_obj);
98
99
  static inline WriteWrap* FromObject(v8::Local<v8::Object> req_wrap_obj);
100
  template <typename T, bool kIsWeak>
101
  static inline WriteWrap* FromObject(
102
      const BaseObjectPtrImpl<T, kIsWeak>& base_obj);
103
104
  // Call stream()->EmitAfterWrite() and dispose of this request wrap.
105
  void OnDone(int status) override;
106
107
 private:
108
  AllocatedBuffer storage_;
109
};
110
111
112
// This is the generic interface for objects that control Node.js' C++ streams.
113
// For example, the default `EmitToJSStreamListener` emits a stream's data
114
// as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream.
115
106085
class StreamListener {
116
 public:
117
  virtual ~StreamListener();
118
119
  // This is called when a stream wants to allocate memory before
120
  // reading data into the freshly allocated buffer (i.e. it is always followed
121
  // by a `OnStreamRead()` call).
122
  // This memory may be statically or dynamically allocated; for example,
123
  // a protocol parser may want to read data into a static buffer if it knows
124
  // that all data is going to be fully handled during the next
125
  // `OnStreamRead()` call.
126
  // The returned buffer does not need to contain `suggested_size` bytes.
127
  // The default implementation of this method returns a buffer that has exactly
128
  // the suggested size and is allocated using malloc().
129
  // It is not valid to return a zero-length buffer from this method.
130
  // It is not guaranteed that the corresponding `OnStreamRead()` call
131
  // happens in the same event loop turn as this call.
132
  virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0;
133
134
  // `OnStreamRead()` is called when data is available on the socket and has
135
  // been read into the buffer provided by `OnStreamAlloc()`.
136
  // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer
137
  // with base nullptr in case of an error.
138
  // `nread` is the number of read bytes (which is at most the buffer length),
139
  // or, if negative, a libuv error code.
140
  virtual void OnStreamRead(ssize_t nread,
141
                            const uv_buf_t& buf) = 0;
142
143
  // This is called once a write has finished. `status` may be 0 or,
144
  // if negative, a libuv error code.
145
  // By default, this is simply passed on to the previous listener
146
  // (and raises an assertion if there is none).
147
  virtual void OnStreamAfterWrite(WriteWrap* w, int status);
148
149
  // This is called once a shutdown has finished. `status` may be 0 or,
150
  // if negative, a libuv error code.
151
  // By default, this is simply passed on to the previous listener
152
  // (and raises an assertion if there is none).
153
  virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status);
154
155
  // This is called by the stream if it determines that it wants more data
156
  // to be written to it. Not all streams support this.
157
  // This callback will not be called as long as there are active writes.
158
  // It is not supported by all streams; `stream->HasWantsWrite()` returns
159
  // true if it is supported by a stream.
160
2460
  virtual void OnStreamWantsWrite(size_t suggested_size) {}
161
162
  // This is called immediately before the stream is destroyed.
163
597
  virtual void OnStreamDestroy() {}
164
165
  // The stream this is currently associated with, or nullptr if there is none.
166
58193
  StreamResource* stream() const { return stream_; }
167
168
 protected:
169
  // Pass along a read error to the `StreamListener` instance that was active
170
  // before this one. For example, a protocol parser does not care about read
171
  // errors and may instead want to let the original handler
172
  // (e.g. the JS handler) take care of the situation.
173
  inline void PassReadErrorToPreviousListener(ssize_t nread);
174
175
  StreamResource* stream_ = nullptr;
176
  StreamListener* previous_listener_ = nullptr;
177
178
  friend class StreamResource;
179
};
180
181
182
// An (incomplete) stream listener class that calls the `.oncomplete()`
183
// method of the JS objects associated with the wrap objects.
184
130920
class ReportWritesToJSStreamListener : public StreamListener {
185
 public:
186
  void OnStreamAfterWrite(WriteWrap* w, int status) override;
187
  void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
188
189
 private:
190
  void OnStreamAfterReqFinished(StreamReq* req_wrap, int status);
191
};
192
193
194
// A default emitter that just pushes data chunks as Buffer instances to
195
// JS land via the handle’s .ondata method.
196
130896
class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
197
 public:
198
  uv_buf_t OnStreamAlloc(size_t suggested_size) override;
199
  void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
200
};
201
202
203
// An alternative listener that uses a custom, user-provided buffer
204
// for reading data.
205
24
class CustomBufferJSListener : public ReportWritesToJSStreamListener {
206
 public:
207
  uv_buf_t OnStreamAlloc(size_t suggested_size) override;
208
  void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
209
12
  void OnStreamDestroy() override { delete this; }
210
211
12
  explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {}
212
213
 private:
214
  uv_buf_t buffer_;
215
};
216
217
218
// A generic stream, comparable to JS land’s `Duplex` streams.
219
// A stream is always controlled through one `StreamListener` instance.
220
65717
class StreamResource {
221
 public:
222
  virtual ~StreamResource();
223
224
  // These need to be implemented on the readable side of this stream:
225
226
  // Start reading from the underlying resource. This is called by the consumer
227
  // when more data is desired. Use `EmitAlloc()` and `EmitData()` to
228
  // pass data along to the consumer.
229
  virtual int ReadStart() = 0;
230
  // Stop reading from the underlying resource. This is called by the
231
  // consumer when its buffers are full and no more data can be handled.
232
  virtual int ReadStop() = 0;
233
234
  // These need to be implemented on the writable side of this stream:
235
  // All of these methods may return an error code synchronously.
236
  // In that case, the finish callback should *not* be called.
237
238
  // Perform a shutdown operation, and either call req_wrap->Done() when
239
  // finished and return 0, return 1 for synchronous success, or
240
  // a libuv error code for synchronous failures.
241
  virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
242
  // Try to write as much data as possible synchronously, and modify
243
  // `*bufs` and `*count` accordingly. This is a no-op by default.
244
  // Return 0 for success and a libuv error code for failures.
245
  virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
246
  // Initiate a write of data. If the write completes synchronously, return 0 on
247
  // success (with bufs modified to indicate how much data was consumed) or a
248
  // libuv error code on failure. If the write will complete asynchronously,
249
  // return 0. When the write completes asynchronously, call req_wrap->Done()
250
  // with 0 on success (with bufs modified to indicate how much data was
251
  // consumed) or a libuv error code on failure. Do not call req_wrap->Done() if
252
  // the write completes synchronously, that is, it should never be called
253
  // before DoWrite() has returned.
254
  virtual int DoWrite(WriteWrap* w,
255
                      uv_buf_t* bufs,
256
                      size_t count,
257
                      uv_stream_t* send_handle) = 0;
258
259
  // Returns true if the stream supports the `OnStreamWantsWrite()` interface.
260
  virtual bool HasWantsWrite() const { return false; }
261
262
  // Optionally, this may provide an error message to be used for
263
  // failing writes.
264
  virtual const char* Error() const;
265
  // Clear the current error (i.e. that would be returned by Error()).
266
  virtual void ClearError();
267
268
  // Transfer ownership of this stream to `listener`. The previous listener
269
  // will not receive any more callbacks while the new listener was active.
270
  inline void PushStreamListener(StreamListener* listener);
271
  // Remove a listener, and, if this was the currently active one,
272
  // transfer ownership back to the previous listener.
273
  inline void RemoveStreamListener(StreamListener* listener);
274
275
 protected:
276
  // Call the current listener's OnStreamAlloc() method.
277
  inline uv_buf_t EmitAlloc(size_t suggested_size);
278
  // Call the current listener's OnStreamRead() method and update the
279
  // stream's read byte counter.
280
  inline void EmitRead(
281
      ssize_t nread,
282
28084
      const uv_buf_t& buf = uv_buf_init(nullptr, 0));
283
  // Call the current listener's OnStreamAfterWrite() method.
284
  inline void EmitAfterWrite(WriteWrap* w, int status);
285
  // Call the current listener's OnStreamAfterShutdown() method.
286
  inline void EmitAfterShutdown(ShutdownWrap* w, int status);
287
  // Call the current listener's OnStreamWantsWrite() method.
288
  inline void EmitWantsWrite(size_t suggested_size);
289
290
  StreamListener* listener_ = nullptr;
291
  uint64_t bytes_read_ = 0;
292
  uint64_t bytes_written_ = 0;
293
294
  friend class StreamListener;
295
};
296
297
298
65179
class StreamBase : public StreamResource {
299
 public:
300
  // The kSlot field here mirrors that of BaseObject::InternalFields::kSlot
301
  // because instances deriving from StreamBase generally also derived from
302
  // BaseObject (it's possible for it not to, however).
303
  enum InternalFields {
304
    kSlot = BaseObject::kSlot,
305
    kStreamBaseField = BaseObject::kInternalFieldCount,
306
    kOnReadFunctionField,
307
    kInternalFieldCount
308
  };
309
310
  static void AddMethods(Environment* env,
311
                         v8::Local<v8::FunctionTemplate> target);
312
  static void RegisterExternalReferences(ExternalReferenceRegistry* registry);
313
  virtual bool IsAlive() = 0;
314
  virtual bool IsClosing() = 0;
315
  virtual bool IsIPCPipe();
316
  virtual int GetFD();
317
318
  enum StreamBaseJSChecks { DONT_SKIP_NREAD_CHECKS, SKIP_NREAD_CHECKS };
319
320
  v8::MaybeLocal<v8::Value> CallJSOnreadMethod(
321
      ssize_t nread,
322
      v8::Local<v8::ArrayBuffer> ab,
323
      size_t offset = 0,
324
      StreamBaseJSChecks checks = DONT_SKIP_NREAD_CHECKS);
325
326
  // This is named `stream_env` to avoid name clashes, because a lot of
327
  // subclasses are also `BaseObject`s.
328
247170
  Environment* stream_env() const { return env_; }
329
330
  // Shut down the current stream. This request can use an existing
331
  // ShutdownWrap object (that was created in JS), or a new one will be created.
332
  // Returns 1 in case of a synchronous completion, 0 in case of asynchronous
333
  // completion, and a libuv error case in case of synchronous failure.
334
  inline int Shutdown(
335
      v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
336
337
  // Write data to the current stream. This request can use an existing
338
  // WriteWrap object (that was created in JS), or a new one will be created.
339
  // This will first try to write synchronously using `DoTryWrite()`, then
340
  // asynchronously using `DoWrite()`.
341
  // If the return value indicates a synchronous completion, no callback will
342
  // be invoked.
343
  inline StreamWriteResult Write(
344
      uv_buf_t* bufs,
345
      size_t count,
346
      uv_stream_t* send_handle = nullptr,
347
      v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
348
349
  // These can be overridden by subclasses to get more specific wrap instances.
350
  // For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap
351
  // (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like
352
  // an associated libuv request.
353
  virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object);
354
  virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object);
355
356
  // One of these must be implemented
357
  virtual AsyncWrap* GetAsyncWrap() = 0;
358
  virtual v8::Local<v8::Object> GetObject();
359
360
  static inline StreamBase* FromObject(v8::Local<v8::Object> obj);
361
362
 protected:
363
  inline explicit StreamBase(Environment* env);
364
365
  // JS Methods
366
  int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args);
367
  int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args);
368
  int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
369
  int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
370
  int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
371
  template <enum encoding enc>
372
  int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
373
  int UseUserBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
374
375
  static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
376
  static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args);
377
  static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args);
378
  static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args);
379
  inline void AttachToObject(v8::Local<v8::Object> obj);
380
381
  template <int (StreamBase::*Method)(
382
      const v8::FunctionCallbackInfo<v8::Value>& args)>
383
  static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
384
385
  // Internal, used only in StreamBase methods + env.cc.
386
  enum StreamBaseStateFields {
387
    kReadBytesOrError,
388
    kArrayBufferOffset,
389
    kBytesWritten,
390
    kLastWriteWasAsync,
391
    kNumStreamBaseStateFields
392
  };
393
394
 private:
395
  Environment* env_;
396
  EmitToJSStreamListener default_listener_;
397
398
  void SetWriteResult(const StreamWriteResult& res);
399
  static void AddMethod(Environment* env,
400
                        v8::Local<v8::Signature> sig,
401
                        enum v8::PropertyAttribute attributes,
402
                        v8::Local<v8::FunctionTemplate> t,
403
                        JSMethodFunction* stream_method,
404
                        v8::Local<v8::String> str);
405
406
  friend class WriteWrap;
407
  friend class ShutdownWrap;
408
  friend class Environment;  // For kNumStreamBaseStateFields.
409
};
410
411
412
// These are helpers for creating `ShutdownWrap`/`WriteWrap` instances.
413
// `OtherBase` must have a constructor that matches the `AsyncWrap`
414
// constructors’s (Environment*, Local<Object>, AsyncWrap::Provider) signature
415
// and be a subclass of `AsyncWrap`.
416
template <typename OtherBase>
417
14970
class SimpleShutdownWrap : public ShutdownWrap, public OtherBase {
418
 public:
419
  SimpleShutdownWrap(StreamBase* stream,
420
                     v8::Local<v8::Object> req_wrap_obj);
421
422
37427
  AsyncWrap* GetAsyncWrap() override { return this; }
423
424
3
  SET_NO_MEMORY_INFO()
425
3
  SET_MEMORY_INFO_NAME(SimpleShutdownWrap)
426
3
  SET_SELF_SIZE(SimpleShutdownWrap)
427
428
  bool IsNotIndicativeOfMemoryLeakAtExit() const override {
429
    return OtherBase::IsNotIndicativeOfMemoryLeakAtExit();
430
  }
431
};
432
433
template <typename OtherBase>
434
15598
class SimpleWriteWrap : public WriteWrap, public OtherBase {
435
 public:
436
  SimpleWriteWrap(StreamBase* stream,
437
                  v8::Local<v8::Object> req_wrap_obj);
438
439
44110
  AsyncWrap* GetAsyncWrap() override { return this; }
440
441
  SET_NO_MEMORY_INFO()
442
  SET_MEMORY_INFO_NAME(SimpleWriteWrap)
443
  SET_SELF_SIZE(SimpleWriteWrap)
444
445
  bool IsNotIndicativeOfMemoryLeakAtExit() const override {
446
    return OtherBase::IsNotIndicativeOfMemoryLeakAtExit();
447
  }
448
};
449
450
}  // namespace node
451
452
#endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
453
454
#endif  // SRC_STREAM_BASE_H_