|
0
|
1 |
package couchdb.changes
|
|
|
2 |
|
|
|
3 |
import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
|
|
|
4 |
import spray.http._
|
|
|
5 |
import spray.json._
|
|
|
6 |
import spray.client.pipelining._
|
|
|
7 |
import akka.actor.{ActorRef, Actor, ActorLogging}
|
|
|
8 |
import scala.util.Try
|
|
|
9 |
import spray.can.Http
|
|
|
10 |
import akka.io.IO
|
|
|
11 |
|
|
|
12 |
case class ResultRow(seq: Int, id: String, doc: JsObject)
|
|
|
13 |
|
|
|
14 |
trait ChangesMarshaller {
|
|
|
15 |
|
|
|
16 |
implicit object ChangesMarshaller extends Unmarshaller[ResultRow] {
|
|
|
17 |
import DefaultJsonProtocol._
|
|
|
18 |
implicit val changesFormat = jsonFormat3(ResultRow)
|
|
|
19 |
|
|
|
20 |
def apply(entity: HttpEntity): Deserialized[ResultRow] = (Try {
|
|
|
21 |
Right(JsonParser(entity.asString).asJsObject.convertTo[ResultRow])
|
|
|
22 |
}).getOrElse(Left(MalformedContent("bad json")))
|
|
|
23 |
|
|
|
24 |
}
|
|
|
25 |
}
|
|
|
26 |
|
|
|
27 |
class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
|
|
|
28 |
with ChangesMarshaller {
|
|
|
29 |
|
|
|
30 |
val io = IO(Http)(context.system)
|
|
|
31 |
|
|
|
32 |
def receive: Receive = {
|
|
|
33 |
case uri: Uri =>
|
|
|
34 |
val rq = HttpRequest(HttpMethods.GET, uri = uri)
|
|
|
35 |
log.debug("Sending request {}", rq)
|
|
|
36 |
sendTo(io).withResponsesReceivedBy(self)(rq)
|
|
|
37 |
case x@ChunkedResponseStart(_) =>
|
|
|
38 |
log.debug("Received {}", x)
|
|
|
39 |
case MessageChunk(data, _) =>
|
|
|
40 |
log.debug("Received data chunk {}", data)
|
|
|
41 |
if (data.length > 2) // skip heartbeat messages
|
|
|
42 |
ChangesMarshaller(data).fold(
|
|
|
43 |
_ => log.warning("Failed to deserialize entity {}", data),
|
|
|
44 |
processor !
|
|
|
45 |
)
|
|
|
46 |
case x =>
|
|
|
47 |
log.debug("Received {}", x)
|
|
|
48 |
}
|
|
|
49 |
}
|
|
|
50 |
|
|
|
51 |
|
|
|
52 |
// vim: set ts=2 sw=2 et:
|