# HG changeset patch # User Tomas Zeman # Date 1437463492 -7200 # Node ID a279a342bc78f9bf64eb89c185efdce9174c42c4 Initial import diff -r 000000000000 -r a279a342bc78 .hgignore --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.hgignore Tue Jul 21 09:24:52 2015 +0200 @@ -0,0 +1,5 @@ +syntax: glob +*~ +.*.sw* +*.class +target diff -r 000000000000 -r a279a342bc78 build.sbt --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/build.sbt Tue Jul 21 09:24:52 2015 +0200 @@ -0,0 +1,21 @@ +name := "spray-couchdb-changes" + +organization := "tz" + +version := "0.1-SNAPSHOT" + +scalaVersion := "2.11.7" + +resolvers += "spray repo" at "http://repo.spray.io" + +val sprayVer = "1.3.2" +val akkaVer = "2.3.12" + +libraryDependencies ++= Seq( + "io.spray" %% "spray-client" % sprayVer +, "io.spray" %% "spray-json" % sprayVer +, "com.typesafe.akka" %% "akka-actor" % akkaVer +, "com.typesafe.akka" %% "akka-slf4j" % akkaVer +) + +Revolver.settings diff -r 000000000000 -r a279a342bc78 project/build.properties --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/project/build.properties Tue Jul 21 09:24:52 2015 +0200 @@ -0,0 +1,1 @@ +sbt.version=0.13.8 diff -r 000000000000 -r a279a342bc78 project/plugins.sbt --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/project/plugins.sbt Tue Jul 21 09:24:52 2015 +0200 @@ -0,0 +1,1 @@ +addSbtPlugin("io.spray" % "sbt-revolver" % "0.7.2") diff -r 000000000000 -r a279a342bc78 src/main/resources/application.conf --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/resources/application.conf Tue Jul 21 09:24:52 2015 +0200 @@ -0,0 +1,9 @@ +spray { + can.client { + idle-timeout = 90 s + request-timeout = 80 s + connection-timeout = 90 s + response-chunk-aggregation-limit = 0 + } + io.confirm-sends = on +} diff -r 000000000000 -r a279a342bc78 src/main/scala/CouchStream.scala --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/scala/CouchStream.scala Tue Jul 21 09:24:52 2015 +0200 @@ -0,0 +1,52 @@ +package couchdb.changes + +import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized} +import spray.http._ +import spray.json._ +import spray.client.pipelining._ +import akka.actor.{ActorRef, Actor, ActorLogging} +import scala.util.Try +import spray.can.Http +import akka.io.IO + +case class ResultRow(seq: Int, id: String, doc: JsObject) + +trait ChangesMarshaller { + + implicit object ChangesMarshaller extends Unmarshaller[ResultRow] { + import DefaultJsonProtocol._ + implicit val changesFormat = jsonFormat3(ResultRow) + + def apply(entity: HttpEntity): Deserialized[ResultRow] = (Try { + Right(JsonParser(entity.asString).asJsObject.convertTo[ResultRow]) + }).getOrElse(Left(MalformedContent("bad json"))) + + } +} + +class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging + with ChangesMarshaller { + + val io = IO(Http)(context.system) + + def receive: Receive = { + case uri: Uri => + val rq = HttpRequest(HttpMethods.GET, uri = uri) + log.debug("Sending request {}", rq) + sendTo(io).withResponsesReceivedBy(self)(rq) + case x@ChunkedResponseStart(_) => + log.debug("Received {}", x) + case MessageChunk(data, _) => + log.debug("Received data chunk {}", data) + if (data.length > 2) // skip heartbeat messages + ChangesMarshaller(data).fold( + _ => log.warning("Failed to deserialize entity {}", data), + processor ! + ) + case x => + log.debug("Received {}", x) + } +} + + +// vim: set ts=2 sw=2 et: diff -r 000000000000 -r a279a342bc78 src/main/scala/Example.scala --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/scala/Example.scala Tue Jul 21 09:24:52 2015 +0200 @@ -0,0 +1,27 @@ +package couchdb.changes + +import akka.actor._ +import spray.http.Uri + +object LoggingActor extends Actor with ActorLogging { + log.info("Initializing LoggingActor") + def receive: Receive = { + case x => log.info("Received {}", x) + } +} + +object Example extends App { + + val system = ActorSystem() + val stream = system.actorOf(Props(new ChangesStreamActor( + system.actorOf(Props(LoggingActor))))) + + stream ! Uri("http://localhost:5984/example/_changes").withQuery( + "feed" -> "continuous" + , "heartbeat" -> "5000" + , "include_docs" -> "true" + ) + +} + +// vim: set ts=2 sw=2 et: