GCC Code Coverage Report
Directory: ../ Exec Total Coverage
File: /home/iojs/build/workspace/node-test-commit-linux-coverage/nodes/benchmark/out/../src/stream_base.h Lines: 73 76 96.1 %
Date: 2017-06-14 Branches: 16 24 66.7 %

Line Branch Exec Source
1
#ifndef SRC_STREAM_BASE_H_
2
#define SRC_STREAM_BASE_H_
3
4
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
5
6
#include "env.h"
7
#include "async-wrap.h"
8
#include "req-wrap.h"
9
#include "req-wrap-inl.h"
10
#include "node.h"
11
#include "util.h"
12
13
#include "v8.h"
14
15
namespace node {
16
17
// Forward declarations
18
class StreamBase;
19
20
template <class Req>
21
class StreamReq {
22
 public:
23
  typedef void (*DoneCb)(Req* req, int status);
24
25
17364
  explicit StreamReq(DoneCb cb) : cb_(cb) {
26
17364
  }
27
28
16813
  inline void Done(int status, const char* error_str = nullptr) {
29
16813
    Req* req = static_cast<Req*>(this);
30
16813
    Environment* env = req->env();
31

16813
    if (error_str != nullptr) {
32
20
      req->object()->Set(env->error_string(),
33
                         OneByteString(env->isolate(), error_str));
34
    }
35
36
16813
    cb_(req, status);
37
16811
  }
38
39
 private:
40
  DoneCb cb_;
41
};
42
43
class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
44
                     public StreamReq<ShutdownWrap> {
45
 public:
46
2096
  ShutdownWrap(Environment* env,
47
               v8::Local<v8::Object> req_wrap_obj,
48
               StreamBase* wrap,
49
               DoneCb cb)
50
      : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP),
51
        StreamReq<ShutdownWrap>(cb),
52
2096
        wrap_(wrap) {
53
2096
    Wrap(req_wrap_obj, this);
54
2096
  }
55
56
6270
  ~ShutdownWrap() {
57
2090
    ClearWrap(object());
58
4180
  }
59
60
2088
  static ShutdownWrap* from_req(uv_shutdown_t* req) {
61
2088
    return ContainerOf(&ShutdownWrap::req_, req);
62
  }
63
64
2090
  inline StreamBase* wrap() const { return wrap_; }
65
  size_t self_size() const override { return sizeof(*this); }
66
67
 private:
68
  StreamBase* const wrap_;
69
};
70
71
class WriteWrap: public ReqWrap<uv_write_t>,
72
                 public StreamReq<WriteWrap> {
73
 public:
74
  static inline WriteWrap* New(Environment* env,
75
                               v8::Local<v8::Object> obj,
76
                               StreamBase* wrap,
77
                               DoneCb cb,
78
                               size_t extra = 0);
79
  inline void Dispose();
80
  inline char* Extra(size_t offset = 0);
81
82
14723
  inline StreamBase* wrap() const { return wrap_; }
83
84
  size_t self_size() const override { return storage_size_; }
85
86
13922
  static WriteWrap* from_req(uv_write_t* req) {
87
13922
    return ContainerOf(&WriteWrap::req_, req);
88
  }
89
90
  static const size_t kAlignSize = 16;
91
92
 protected:
93
15268
  WriteWrap(Environment* env,
94
            v8::Local<v8::Object> obj,
95
            StreamBase* wrap,
96
            DoneCb cb,
97
            size_t storage_size)
98
      : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
99
        StreamReq<WriteWrap>(cb),
100
        wrap_(wrap),
101
15268
        storage_size_(storage_size) {
102
15268
    Wrap(obj, this);
103
15268
  }
104
105
29468
  ~WriteWrap() {
106
14734
    ClearWrap(object());
107
14734
  }
108
109
  void* operator new(size_t size) = delete;
110
15268
  void* operator new(size_t size, char* storage) { return storage; }
111
112
  // This is just to keep the compiler happy. It should never be called, since
113
  // we don't use exceptions in node.
114
  void operator delete(void* ptr, char* storage) { UNREACHABLE(); }
115
116
 private:
117
  // People should not be using the non-placement new and delete operator on a
118
  // WriteWrap. Ensure this never happens.
119
  void operator delete(void* ptr) { UNREACHABLE(); }
120
121
  StreamBase* const wrap_;
122
  const size_t storage_size_;
123
};
124
125
class StreamResource {
126
 public:
127
  template <class T>
128
  struct Callback {
129
42554
    Callback() : fn(nullptr), ctx(nullptr) {}
130
34110
    Callback(T fn, void* ctx) : fn(fn), ctx(ctx) {}
131
    Callback(const Callback&) = default;
132
133
78547
    inline bool is_empty() { return fn == nullptr; }
134
2966
    inline void clear() {
135
2966
      fn = nullptr;
136
2966
      ctx = nullptr;
137
2966
    }
138
139
    T fn;
140
    void* ctx;
141
  };
142
143
  typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx);
144
  typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx);
145
  typedef void (*ReadCb)(ssize_t nread,
146
                         const uv_buf_t* buf,
147
                         uv_handle_type pending,
148
                         void* ctx);
149
  typedef void (*DestructCb)(void* ctx);
150
151
9654
  StreamResource() : bytes_read_(0) {
152
9654
  }
153
7561
  virtual ~StreamResource() {
154
7561
    if (!destruct_cb_.is_empty())
155
706
      destruct_cb_.fn(destruct_cb_.ctx);
156
7561
  }
157
158
  virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
159
  virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
160
  virtual int DoWrite(WriteWrap* w,
161
                      uv_buf_t* bufs,
162
                      size_t count,
163
                      uv_stream_t* send_handle) = 0;
164
  virtual const char* Error() const;
165
  virtual void ClearError();
166
167
  // Events
168
12394
  inline void OnAfterWrite(WriteWrap* w) {
169
12394
    if (!after_write_cb_.is_empty())
170
11607
      after_write_cb_.fn(w, after_write_cb_.ctx);
171
12394
  }
172
173
28221
  inline void OnAlloc(size_t size, uv_buf_t* buf) {
174
28221
    if (!alloc_cb_.is_empty())
175
28221
      alloc_cb_.fn(size, buf, alloc_cb_.ctx);
176
28221
  }
177
178
28888
  inline void OnRead(ssize_t nread,
179
                     const uv_buf_t* buf,
180
                     uv_handle_type pending = UV_UNKNOWN_HANDLE) {
181
28888
    if (nread > 0)
182
25209
      bytes_read_ += static_cast<uint64_t>(nread);
183
28888
    if (!read_cb_.is_empty())
184
28888
      read_cb_.fn(nread, buf, pending, read_cb_.ctx);
185
28820
  }
186
187
9644
  inline void set_after_write_cb(Callback<AfterWriteCb> c) {
188
9644
    after_write_cb_ = c;
189
9644
  }
190
191
12885
  inline void set_alloc_cb(Callback<AllocCb> c) { alloc_cb_ = c; }
192
12885
  inline void set_read_cb(Callback<ReadCb> c) { read_cb_ = c; }
193
726
  inline void set_destruct_cb(Callback<DestructCb> c) { destruct_cb_ = c; }
194
195
  inline Callback<AfterWriteCb> after_write_cb() { return after_write_cb_; }
196
1500
  inline Callback<AllocCb> alloc_cb() { return alloc_cb_; }
197
1500
  inline Callback<ReadCb> read_cb() { return read_cb_; }
198
  inline Callback<DestructCb> destruct_cb() { return destruct_cb_; }
199
200
 private:
201
  Callback<AfterWriteCb> after_write_cb_;
202
  Callback<AllocCb> alloc_cb_;
203
  Callback<ReadCb> read_cb_;
204
  Callback<DestructCb> destruct_cb_;
205
  uint64_t bytes_read_;
206
207
  friend class StreamBase;
208
};
209
210
class StreamBase : public StreamResource {
211
 public:
212
  enum Flags {
213
    kFlagNone = 0x0,
214
    kFlagHasWritev = 0x1,
215
    kFlagNoShutdown = 0x2
216
  };
217
218
  template <class Base>
219
  static inline void AddMethods(Environment* env,
220
                                v8::Local<v8::FunctionTemplate> target,
221
                                int flags = kFlagNone);
222
223
  virtual void* Cast() = 0;
224
  virtual bool IsAlive() = 0;
225
  virtual bool IsClosing() = 0;
226
  virtual bool IsIPCPipe();
227
  virtual int GetFD();
228
229
  virtual int ReadStart() = 0;
230
  virtual int ReadStop() = 0;
231
232
2226
  inline void Consume() {
233
2226
    CHECK_EQ(consumed_, false);
234
2226
    consumed_ = true;
235
2226
  }
236
237
  template <class Outer>
238
2341
  inline Outer* Cast() { return static_cast<Outer*>(Cast()); }
239
240
  void EmitData(ssize_t nread,
241
                v8::Local<v8::Object> buf,
242
                v8::Local<v8::Object> handle);
243
244
 protected:
245
9654
  explicit StreamBase(Environment* env) : env_(env), consumed_(false) {
246
9654
  }
247
248
7561
  virtual ~StreamBase() = default;
249
250
  // One of these must be implemented
251
  virtual AsyncWrap* GetAsyncWrap() = 0;
252
  virtual v8::Local<v8::Object> GetObject();
253
254
  // Libuv callbacks
255
  static void AfterShutdown(ShutdownWrap* req, int status);
256
  static void AfterWrite(WriteWrap* req, int status);
257
258
  // JS Methods
259
  int ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args);
260
  int ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args);
261
  int Shutdown(const v8::FunctionCallbackInfo<v8::Value>& args);
262
  int Writev(const v8::FunctionCallbackInfo<v8::Value>& args);
263
  int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
264
  template <enum encoding enc>
265
  int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
266
267
  template <class Base>
268
  static void GetFD(v8::Local<v8::String> key,
269
                    const v8::PropertyCallbackInfo<v8::Value>& args);
270
271
  template <class Base>
272
  static void GetExternal(v8::Local<v8::String> key,
273
                          const v8::PropertyCallbackInfo<v8::Value>& args);
274
275
  template <class Base>
276
  static void GetBytesRead(v8::Local<v8::String> key,
277
                           const v8::PropertyCallbackInfo<v8::Value>& args);
278
279
  template <class Base,
280
            int (StreamBase::*Method)(
281
      const v8::FunctionCallbackInfo<v8::Value>& args)>
282
  static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
283
284
 private:
285
  Environment* env_;
286
  bool consumed_;
287
};
288
289
}  // namespace node
290
291
#endif  // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
292
293
#endif  // SRC_STREAM_BASE_H_