src/main/scala/CouchStream.scala
changeset 0 a279a342bc78
child 4 b25c02bd6b11
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/scala/CouchStream.scala	Tue Jul 21 09:24:52 2015 +0200
@@ -0,0 +1,52 @@
+package couchdb.changes
+
+import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
+import spray.http._
+import spray.json._
+import spray.client.pipelining._
+import akka.actor.{ActorRef, Actor, ActorLogging}
+import scala.util.Try
+import spray.can.Http
+import akka.io.IO
+
+case class ResultRow(seq: Int, id: String, doc: JsObject)
+
+trait ChangesMarshaller {
+
+  implicit object ChangesMarshaller extends Unmarshaller[ResultRow] {
+    import DefaultJsonProtocol._
+    implicit val changesFormat = jsonFormat3(ResultRow)
+
+    def apply(entity: HttpEntity): Deserialized[ResultRow] = (Try {
+      Right(JsonParser(entity.asString).asJsObject.convertTo[ResultRow])
+    }).getOrElse(Left(MalformedContent("bad json")))
+
+  }
+}
+
+class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
+  with ChangesMarshaller {
+
+  val io = IO(Http)(context.system)
+
+  def receive: Receive = {
+    case uri: Uri =>
+      val rq = HttpRequest(HttpMethods.GET, uri = uri)
+      log.debug("Sending request {}", rq)
+      sendTo(io).withResponsesReceivedBy(self)(rq)
+    case x@ChunkedResponseStart(_) =>
+      log.debug("Received {}", x)
+    case MessageChunk(data, _) =>
+      log.debug("Received data chunk {}", data)
+      if (data.length > 2) // skip heartbeat messages
+        ChangesMarshaller(data).fold(
+          _ => log.warning("Failed to deserialize entity {}", data),
+          processor !
+        )
+    case x =>
+      log.debug("Received {}", x)
+  }
+}
+
+
+// vim: set ts=2 sw=2 et: