GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage-daily/nodes/benchmark/out/../src/stream_base.h Lines: 24 31 77.4 %
Date: 2019-03-02 22:23:06 Branches: 10 20 50.0 %

Line Branch Exec Source
1
#ifndef SRC_STREAM_BASE_H_
2
#define SRC_STREAM_BASE_H_
3
4
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5
6
#include "env.h"
7
#include "async_wrap-inl.h"
8
#include "node.h"
9
#include "util.h"
10
11
#include "v8.h"
12
13
namespace node {
14
15
// Forward declarations
16
class ShutdownWrap;
17
class WriteWrap;
18
class StreamBase;
19
class StreamResource;
20
21
struct StreamWriteResult {
22
  bool async;
23
  int err;
24
  WriteWrap* wrap;
25
  size_t bytes;
26
};
27
28
29
class StreamReq {
30
 public:
31
  static constexpr int kStreamReqField = 1;
32
33
47692
  explicit StreamReq(StreamBase* stream,
34
47692
                     v8::Local<v8::Object> req_wrap_obj) : stream_(stream) {
35
47692
    AttachToObject(req_wrap_obj);
36
47692
  }
37
38
47673
  virtual ~StreamReq() {}
39
  virtual AsyncWrap* GetAsyncWrap() = 0;
40
  v8::Local<v8::Object> object();
41
42
  void Done(int status, const char* error_str = nullptr);
43
  void Dispose();
44
45
26056
  inline StreamBase* stream() const { return stream_; }
46
47
  static StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj);
48
49
  // Sets all internal fields of `req_wrap_obj` to `nullptr`.
50
  // This is what the `WriteWrap` and `ShutdownWrap` JS constructors do,
51
  // and what we use in C++ after creating these objects from their
52
  // v8::ObjectTemplates, to avoid the overhead of calling the
53
  // constructor explicitly.
54
  static inline void ResetObject(v8::Local<v8::Object> req_wrap_obj);
55
56
 protected:
57
  virtual void OnDone(int status) = 0;
58
59
  void AttachToObject(v8::Local<v8::Object> req_wrap_obj);
60
61
 private:
62
  StreamBase* const stream_;
63
};
64
65
42839
class ShutdownWrap : public StreamReq {
66
 public:
67
42842
  ShutdownWrap(StreamBase* stream,
68
               v8::Local<v8::Object> req_wrap_obj)
69
42842
    : StreamReq(stream, req_wrap_obj) { }
70
71
  // Call stream()->EmitAfterShutdown() and dispose of this request wrap.
72
  void OnDone(int status) override;
73
};
74
75
4834
class WriteWrap : public StreamReq {
76
 public:
77
  void SetAllocatedStorage(AllocatedBuffer&& storage);
78
79
4850
  WriteWrap(StreamBase* stream,
80
            v8::Local<v8::Object> req_wrap_obj)
81
4850
    : StreamReq(stream, req_wrap_obj) { }
82
83
  // Call stream()->EmitAfterWrite() and dispose of this request wrap.
84
  void OnDone(int status) override;
85
86
 private:
87
  AllocatedBuffer storage_;
88
};
89
90
91
// This is the generic interface for objects that control Node.js' C++ streams.
92
// For example, the default `EmitToJSStreamListener` emits a stream's data
93
// as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream.
94
113396
class StreamListener {
95
 public:
96
  virtual ~StreamListener();
97
98
  // This is called when a stream wants to allocate memory before
99
  // reading data into the freshly allocated buffer (i.e. it is always followed
100
  // by a `OnStreamRead()` call).
101
  // This memory may be statically or dynamically allocated; for example,
102
  // a protocol parser may want to read data into a static buffer if it knows
103
  // that all data is going to be fully handled during the next
104
  // `OnStreamRead()` call.
105
  // The returned buffer does not need to contain `suggested_size` bytes.
106
  // The default implementation of this method returns a buffer that has exactly
107
  // the suggested size and is allocated using malloc().
108
  // It is not valid to return a zero-length buffer from this method.
109
  // It is not guaranteed that the corresponding `OnStreamRead()` call
110
  // happens in the same event loop turn as this call.
111
  virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0;
112
113
  // `OnStreamRead()` is called when data is available on the socket and has
114
  // been read into the buffer provided by `OnStreamAlloc()`.
115
  // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer
116
  // with base nullptr in case of an error.
117
  // `nread` is the number of read bytes (which is at most the buffer length),
118
  // or, if negative, a libuv error code.
119
  virtual void OnStreamRead(ssize_t nread,
120
                            const uv_buf_t& buf) = 0;
121
122
  // This is called once a write has finished. `status` may be 0 or,
123
  // if negative, a libuv error code.
124
  // By default, this is simply passed on to the previous listener
125
  // (and raises an assertion if there is none).
126
  virtual void OnStreamAfterWrite(WriteWrap* w, int status);
127
128
  // This is called once a shutdown has finished. `status` may be 0 or,
129
  // if negative, a libuv error code.
130
  // By default, this is simply passed on to the previous listener
131
  // (and raises an assertion if there is none).
132
  virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status);
133
134
  // This is called by the stream if it determines that it wants more data
135
  // to be written to it. Not all streams support this.
136
  // This callback will not be called as long as there are active writes.
137
  // It is not supported by all streams; `stream->HasWantsWrite()` returns
138
  // true if it is supported by a stream.
139
2399
  virtual void OnStreamWantsWrite(size_t suggested_size) {}
140
141
  // This is called immediately before the stream is destroyed.
142
521
  virtual void OnStreamDestroy() {}
143
144
  // The stream this is currently associated with, or nullptr if there is none.
145
936
  inline StreamResource* stream() { return stream_; }
146
147
 protected:
148
  // Pass along a read error to the `StreamListener` instance that was active
149
  // before this one. For example, a protocol parser does not care about read
150
  // errors and may instead want to let the original handler
151
  // (e.g. the JS handler) take care of the situation.
152
  void PassReadErrorToPreviousListener(ssize_t nread);
153
154
  StreamResource* stream_ = nullptr;
155
  StreamListener* previous_listener_ = nullptr;
156
157
  friend class StreamResource;
158
};
159
160
161
// An (incomplete) stream listener class that calls the `.oncomplete()`
162
// method of the JS objects associated with the wrap objects.
163
153503
class ReportWritesToJSStreamListener : public StreamListener {
164
 public:
165
  void OnStreamAfterWrite(WriteWrap* w, int status) override;
166
  void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
167
168
 private:
169
  void OnStreamAfterReqFinished(StreamReq* req_wrap, int status);
170
};
171
172
173
// A default emitter that just pushes data chunks as Buffer instances to
174
// JS land via the handle’s .ondata method.
175
153503
class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
176
 public:
177
  uv_buf_t OnStreamAlloc(size_t suggested_size) override;
178
  void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
179
};
180
181
182
// A generic stream, comparable to JS land’s `Duplex` streams.
183
// A stream is always controlled through one `StreamListener` instance.
184
76990
class StreamResource {
185
 public:
186
  virtual ~StreamResource();
187
188
  // These need to be implemented on the readable side of this stream:
189
190
  // Start reading from the underlying resource. This is called by the consumer
191
  // when more data is desired. Use `EmitAlloc()` and `EmitData()` to
192
  // pass data along to the consumer.
193
  virtual int ReadStart() = 0;
194
  // Stop reading from the underlying resource. This is called by the
195
  // consumer when its buffers are full and no more data can be handled.
196
  virtual int ReadStop() = 0;
197
198
  // These need to be implemented on the writable side of this stream:
199
  // All of these methods may return an error code synchronously.
200
  // In that case, the finish callback should *not* be called.
201
202
  // Perform a shutdown operation, and either call req_wrap->Done() when
203
  // finished and return 0, return 1 for synchronous success, or
204
  // a libuv error code for synchronous failures.
205
  virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
206
  // Try to write as much data as possible synchronously, and modify
207
  // `*bufs` and `*count` accordingly. This is a no-op by default.
208
  // Return 0 for success and a libuv error code for failures.
209
  virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
210
  // Perform a write of data, and either call req_wrap->Done() when finished
211
  // and return 0, or return a libuv error code for synchronous failures.
212
  virtual int DoWrite(WriteWrap* w,
213
                      uv_buf_t* bufs,
214
                      size_t count,
215
                      uv_stream_t* send_handle) = 0;
216
217
  // Returns true if the stream supports the `OnStreamWantsWrite()` interface.
218
  virtual bool HasWantsWrite() const { return false; }
219
220
  // Optionally, this may provide an error message to be used for
221
  // failing writes.
222
  virtual const char* Error() const;
223
  // Clear the current error (i.e. that would be returned by Error()).
224
  virtual void ClearError();
225
226
  // Transfer ownership of this stream to `listener`. The previous listener
227
  // will not receive any more callbacks while the new listener was active.
228
  void PushStreamListener(StreamListener* listener);
229
  // Remove a listener, and, if this was the currently active one,
230
  // transfer ownership back to the previous listener.
231
  void RemoveStreamListener(StreamListener* listener);
232
233
 protected:
234
  // Call the current listener's OnStreamAlloc() method.
235
  uv_buf_t EmitAlloc(size_t suggested_size);
236
  // Call the current listener's OnStreamRead() method and update the
237
  // stream's read byte counter.
238
  void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0));
239
  // Call the current listener's OnStreamAfterWrite() method.
240
  void EmitAfterWrite(WriteWrap* w, int status);
241
  // Call the current listener's OnStreamAfterShutdown() method.
242
  void EmitAfterShutdown(ShutdownWrap* w, int status);
243
  // Call the current listener's OnStreamWantsWrite() method.
244
  void EmitWantsWrite(size_t suggested_size);
245
246
  StreamListener* listener_ = nullptr;
247
  uint64_t bytes_read_ = 0;
248
  uint64_t bytes_written_ = 0;
249
250
  friend class StreamListener;
251
};
252
253
254
76513
class StreamBase : public StreamResource {
255
 public:
256
  template <class Base>
257
  static inline void AddMethods(Environment* env,
258
                                v8::Local<v8::FunctionTemplate> target);
259
260
  virtual bool IsAlive() = 0;
261
  virtual bool IsClosing() = 0;
262
  virtual bool IsIPCPipe();
263
  virtual int GetFD();
264
265
  void CallJSOnreadMethod(ssize_t nread,
266
                          v8::Local<v8::ArrayBuffer> ab,
267
                          size_t offset = 0);
268
269
  // This is named `stream_env` to avoid name clashes, because a lot of
270
  // subclasses are also `BaseObject`s.
271
  Environment* stream_env() const;
272
273
  // Shut down the current stream. This request can use an existing
274
  // ShutdownWrap object (that was created in JS), or a new one will be created.
275
  // Returns 1 in case of a synchronous completion, 0 in case of asynchronous
276
  // completion, and a libuv error case in case of synchronous failure.
277
  int Shutdown(v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
278
279
  // Write data to the current stream. This request can use an existing
280
  // WriteWrap object (that was created in JS), or a new one will be created.
281
  // This will first try to write synchronously using `DoTryWrite()`, then
282
  // asynchronously using `DoWrite()`.
283
  // If the return value indicates a synchronous completion, no callback will
284
  // be invoked.
285
  StreamWriteResult Write(
286
      uv_buf_t* bufs,
287
      size_t count,
288
      uv_stream_t* send_handle = nullptr,
289
      v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
290
291
  // These can be overridden by subclasses to get more specific wrap instances.
292
  // For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap
293
  // (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like
294
  // an associated libuv request.
295
  virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object);
296
  virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object);
297
298
  // One of these must be implemented
299
  virtual AsyncWrap* GetAsyncWrap() = 0;
300
  virtual v8::Local<v8::Object> GetObject();
301
302
 protected:
303
  explicit StreamBase(Environment* env);
304
305
  // JS Methods
306
  int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args);
307
  int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args);
308
  int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
309
  int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
310
  int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
311
  template <enum encoding enc>
312
  int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
313
314
  template <class Base>
315
  static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
316
317
  template <class Base>
318
  static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args);
319
320
  template <class Base>
321
  static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args);
322
323
  template <class Base>
324
  static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args);
325
326
  template <class Base,
327
            int (StreamBase::*Method)(
328
      const v8::FunctionCallbackInfo<v8::Value>& args)>
329
  static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
330
331
  // Internal, used only in StreamBase methods + env.cc.
332
  enum StreamBaseStateFields {
333
    kReadBytesOrError,
334
    kArrayBufferOffset,
335
    kBytesWritten,
336
    kLastWriteWasAsync,
337
    kNumStreamBaseStateFields
338
  };
339
340
 private:
341
  Environment* env_;
342
  EmitToJSStreamListener default_listener_;
343
344
  void SetWriteResult(const StreamWriteResult& res);
345
346
  friend class WriteWrap;
347
  friend class ShutdownWrap;
348
  friend class Environment;  // For kNumStreamBaseStateFields.
349
};
350
351
352
// These are helpers for creating `ShutdownWrap`/`WriteWrap` instances.
353
// `OtherBase` must have a constructor that matches the `AsyncWrap`
354
// constructors’s (Environment*, Local<Object>, AsyncWrap::Provider) signature
355
// and be a subclass of `AsyncWrap`.
356
template <typename OtherBase>
357

85678
class SimpleShutdownWrap : public ShutdownWrap, public OtherBase {
358
 public:
359
  SimpleShutdownWrap(StreamBase* stream,
360
                     v8::Local<v8::Object> req_wrap_obj);
361
362
85299
  AsyncWrap* GetAsyncWrap() override { return this; }
363
364
  SET_NO_MEMORY_INFO()
365
  SET_MEMORY_INFO_NAME(SimpleShutdownWrap)
366
  SET_SELF_SIZE(SimpleShutdownWrap)
367
};
368
369
template <typename OtherBase>
370

9668
class SimpleWriteWrap : public WriteWrap, public OtherBase {
371
 public:
372
  SimpleWriteWrap(StreamBase* stream,
373
                  v8::Local<v8::Object> req_wrap_obj);
374
375
13646
  AsyncWrap* GetAsyncWrap() override { return this; }
376
377
  SET_NO_MEMORY_INFO()
378
  SET_MEMORY_INFO_NAME(SimpleWriteWrap)
379
  SET_SELF_SIZE(SimpleWriteWrap)
380
};
381
382
}  // namespace node
383
384
#endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
385
386
#endif  // SRC_STREAM_BASE_H_