|
0
|
1 |
package couchdb.changes
|
|
|
2 |
|
|
4
|
3 |
import akka.actor.{ActorRef, Actor, ActorLogging, Status}
|
|
|
4 |
import akka.io.IO
|
|
|
5 |
|
|
|
6 |
import scala.concurrent.ExecutionContext.Implicits.global
|
|
|
7 |
import scala.language.postfixOps
|
|
|
8 |
import scala.language.reflectiveCalls
|
|
|
9 |
import scala.util.Try
|
|
|
10 |
|
|
|
11 |
import spray.can.Http
|
|
|
12 |
import spray.can.Http.ConnectionAttemptFailedException
|
|
|
13 |
import spray.client.pipelining._
|
|
0
|
14 |
import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
|
|
|
15 |
import spray.http._
|
|
|
16 |
import spray.json._
|
|
|
17 |
|
|
|
18 |
case class ResultRow(seq: Int, id: String, doc: JsObject)
|
|
|
19 |
|
|
|
20 |
trait ChangesMarshaller {
|
|
|
21 |
|
|
|
22 |
implicit object ChangesMarshaller extends Unmarshaller[ResultRow] {
|
|
|
23 |
import DefaultJsonProtocol._
|
|
|
24 |
implicit val changesFormat = jsonFormat3(ResultRow)
|
|
|
25 |
|
|
|
26 |
def apply(entity: HttpEntity): Deserialized[ResultRow] = (Try {
|
|
|
27 |
Right(JsonParser(entity.asString).asJsObject.convertTo[ResultRow])
|
|
|
28 |
}).getOrElse(Left(MalformedContent("bad json")))
|
|
|
29 |
|
|
|
30 |
}
|
|
|
31 |
}
|
|
|
32 |
|
|
|
33 |
class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
|
|
4
|
34 |
with ChangesMarshaller with settings {
|
|
0
|
35 |
|
|
|
36 |
val io = IO(Http)(context.system)
|
|
|
37 |
|
|
4
|
38 |
def config = context.system.settings.config
|
|
|
39 |
|
|
0
|
40 |
def receive: Receive = {
|
|
4
|
41 |
case Start =>
|
|
|
42 |
val uri = Uri(s"${couchConf.url}/_changes").withQuery(
|
|
|
43 |
"feed" -> "continuous"
|
|
|
44 |
, "heartbeat" -> couchConf.heartbeat.toMillis.toString
|
|
|
45 |
, "include_docs" -> "true"
|
|
|
46 |
)
|
|
0
|
47 |
val rq = HttpRequest(HttpMethods.GET, uri = uri)
|
|
|
48 |
log.debug("Sending request {}", rq)
|
|
|
49 |
sendTo(io).withResponsesReceivedBy(self)(rq)
|
|
|
50 |
case x@ChunkedResponseStart(_) =>
|
|
|
51 |
log.debug("Received {}", x)
|
|
|
52 |
case MessageChunk(data, _) =>
|
|
|
53 |
log.debug("Received data chunk {}", data)
|
|
|
54 |
if (data.length > 2) // skip heartbeat messages
|
|
|
55 |
ChangesMarshaller(data).fold(
|
|
|
56 |
_ => log.warning("Failed to deserialize entity {}", data),
|
|
|
57 |
processor !
|
|
|
58 |
)
|
|
4
|
59 |
case Status.Failure(t) =>
|
|
|
60 |
log.error("Received failure {}", t)
|
|
|
61 |
t match {
|
|
|
62 |
case x:ConnectionAttemptFailedException =>
|
|
|
63 |
log.info("Scheduling reconnect to couchdb in {}", couchConf.reconnect)
|
|
|
64 |
context.system.scheduler.scheduleOnce(couchConf.reconnect, self,
|
|
|
65 |
Start)
|
|
|
66 |
case _ =>
|
|
|
67 |
}
|
|
0
|
68 |
case x =>
|
|
|
69 |
log.debug("Received {}", x)
|
|
|
70 |
}
|
|
|
71 |
}
|
|
|
72 |
|
|
|
73 |
|
|
|
74 |
// vim: set ts=2 sw=2 et:
|