Duplication of code for synchronous and asynchronous implementationsRuntimeError: This event loop is already running in pythonHow can I get jQuery to perform a synchronous, rather than asynchronous, Ajax request?How can I upload files asynchronously?Asynchronous vs synchronous execution, what does it really mean?How would I run an async Task<T> method synchronously?Removing duplicates in listsHow to call asynchronous method from synchronous method in C#?How do I return the response from an asynchronous call?Why is my variable unaltered after I modify it inside of a function? - Asynchronous code referenceFail on second callHow to read all message from queue using stomp library in Python?

Cops: The Hidden OEIS Substring

Who Can Help Retag This?

What's a moment that's more impactful on a reread called?

How do I take a fraction to a negative power?

If a specific mass of air is polluted, will the pollution stick with it?

Password maker in c#

What would be the ideal melee weapon made of "Phase Metal"?

Did any of the founding fathers anticipate Lysander Spooner's criticism of the constitution?

Why does resistance reduce when a conductive fabric is stretched?

Does Google Maps take into account hills/inclines for route times?

Is Arc Length always irrational between two rational points?

Email about missed connecting flight compensation 5 months after flight, is there a point?

Why can't supermassive black holes merge? (or can they?)

Was adding milk to tea started to reduce employee tea break time?

Why would guns not work in the dungeon?

Can I call 112 to check a police officer's identity in the Czech Republic?

Supporting developers who insist on using their pet language

Wrapper in return method for test class

Machine learning and operations research projects

Creating custom objects with custom properties using generics

diff shows a file that does not exist

QGIS Welcome page: What is 'pin to list' for?

How to query contacts with no cases, opportunities etc

Why do players in the past play much longer tournaments than today's top players?



Duplication of code for synchronous and asynchronous implementations


RuntimeError: This event loop is already running in pythonHow can I get jQuery to perform a synchronous, rather than asynchronous, Ajax request?How can I upload files asynchronously?Asynchronous vs synchronous execution, what does it really mean?How would I run an async Task<T> method synchronously?Removing duplicates in listsHow to call asynchronous method from synchronous method in C#?How do I return the response from an asynchronous call?Why is my variable unaltered after I modify it inside of a function? - Asynchronous code referenceFail on second callHow to read all message from queue using stomp library in Python?






.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;








16















When implementing classes that have uses in both synchronous and asynchronous applications, I find myself maintaining virtually identical code for both use cases.



Just as an example, consider:



from time import sleep
import asyncio


class UselessExample:
def __init__(self, delay):
self.delay = delay

async def a_ticker(self, to):
for i in range(to):
yield i
await asyncio.sleep(self.delay)

def ticker(self, to):
for i in range(to):
yield i
sleep(self.delay)


def func(ue):
for value in ue.ticker(5):
print(value)


async def a_func(ue):
async for value in ue.a_ticker(5):
print(value)


def main():
ue = UselessExample(1)
func(ue)
loop = asyncio.get_event_loop()
loop.run_until_complete(a_func(ue))


if __name__ == '__main__':
main()


In this example, it's not too bad, the ticker methods of UselessExample are easy to maintain in tandem, but you can imagine that exception handling and more complicated functionality can quickly grow a method and make it more of an issue, even though both methods can remain virtually identical (only replacing certain elements with their asynchronous counterparts).



Assuming there's no substantial difference that makes it worth having both fully implemented, what is the best (and most Pythonic) way of maintaining a class like this and avoiding needless duplication?










share|improve this question






























    16















    When implementing classes that have uses in both synchronous and asynchronous applications, I find myself maintaining virtually identical code for both use cases.



    Just as an example, consider:



    from time import sleep
    import asyncio


    class UselessExample:
    def __init__(self, delay):
    self.delay = delay

    async def a_ticker(self, to):
    for i in range(to):
    yield i
    await asyncio.sleep(self.delay)

    def ticker(self, to):
    for i in range(to):
    yield i
    sleep(self.delay)


    def func(ue):
    for value in ue.ticker(5):
    print(value)


    async def a_func(ue):
    async for value in ue.a_ticker(5):
    print(value)


    def main():
    ue = UselessExample(1)
    func(ue)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(a_func(ue))


    if __name__ == '__main__':
    main()


    In this example, it's not too bad, the ticker methods of UselessExample are easy to maintain in tandem, but you can imagine that exception handling and more complicated functionality can quickly grow a method and make it more of an issue, even though both methods can remain virtually identical (only replacing certain elements with their asynchronous counterparts).



    Assuming there's no substantial difference that makes it worth having both fully implemented, what is the best (and most Pythonic) way of maintaining a class like this and avoiding needless duplication?










    share|improve this question


























      16












      16








      16


      7






      When implementing classes that have uses in both synchronous and asynchronous applications, I find myself maintaining virtually identical code for both use cases.



      Just as an example, consider:



      from time import sleep
      import asyncio


      class UselessExample:
      def __init__(self, delay):
      self.delay = delay

      async def a_ticker(self, to):
      for i in range(to):
      yield i
      await asyncio.sleep(self.delay)

      def ticker(self, to):
      for i in range(to):
      yield i
      sleep(self.delay)


      def func(ue):
      for value in ue.ticker(5):
      print(value)


      async def a_func(ue):
      async for value in ue.a_ticker(5):
      print(value)


      def main():
      ue = UselessExample(1)
      func(ue)
      loop = asyncio.get_event_loop()
      loop.run_until_complete(a_func(ue))


      if __name__ == '__main__':
      main()


      In this example, it's not too bad, the ticker methods of UselessExample are easy to maintain in tandem, but you can imagine that exception handling and more complicated functionality can quickly grow a method and make it more of an issue, even though both methods can remain virtually identical (only replacing certain elements with their asynchronous counterparts).



      Assuming there's no substantial difference that makes it worth having both fully implemented, what is the best (and most Pythonic) way of maintaining a class like this and avoiding needless duplication?










      share|improve this question
















      When implementing classes that have uses in both synchronous and asynchronous applications, I find myself maintaining virtually identical code for both use cases.



      Just as an example, consider:



      from time import sleep
      import asyncio


      class UselessExample:
      def __init__(self, delay):
      self.delay = delay

      async def a_ticker(self, to):
      for i in range(to):
      yield i
      await asyncio.sleep(self.delay)

      def ticker(self, to):
      for i in range(to):
      yield i
      sleep(self.delay)


      def func(ue):
      for value in ue.ticker(5):
      print(value)


      async def a_func(ue):
      async for value in ue.a_ticker(5):
      print(value)


      def main():
      ue = UselessExample(1)
      func(ue)
      loop = asyncio.get_event_loop()
      loop.run_until_complete(a_func(ue))


      if __name__ == '__main__':
      main()


      In this example, it's not too bad, the ticker methods of UselessExample are easy to maintain in tandem, but you can imagine that exception handling and more complicated functionality can quickly grow a method and make it more of an issue, even though both methods can remain virtually identical (only replacing certain elements with their asynchronous counterparts).



      Assuming there's no substantial difference that makes it worth having both fully implemented, what is the best (and most Pythonic) way of maintaining a class like this and avoiding needless duplication?







      python asynchronous async-await python-asyncio coroutine






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Mar 26 at 22:21









      Martijn Pieters

      746k158 gold badges2687 silver badges2418 bronze badges




      746k158 gold badges2687 silver badges2418 bronze badges










      asked Mar 14 at 0:04









      GrismarGrismar

      2,3281 gold badge8 silver badges24 bronze badges




      2,3281 gold badge8 silver badges24 bronze badges






















          2 Answers
          2






          active

          oldest

          votes


















          11





          +100









          There is no one-size-fits-all road to making an asyncio coroutine-based codebase useable from traditional synchronous codebases. You have to make choices per codepath.



          Pick and choose from a series of tools:



          Synchronous versions using async.run()



          Provide synchronous wrappers around coroutines, which block until the coroutine completes.



          Even an async generator function such as ticker() can be handled this way, in a loop:



          class UselessExample:
          def __init__(self, delay):
          self.delay = delay

          async def a_ticker(self, to):
          for i in range(to):
          yield i
          await asyncio.sleep(self.delay)

          def ticker(self, to):
          agen = self.a_ticker(to)
          try:
          while True:
          yield asyncio.run(agen.__anext__())
          except StopAsyncIteration:
          return


          These synchronous wrappers can be generated with helper functions:



          from functools import wraps

          def sync_agen_method(agen_method):
          @wraps(agen_method)
          def wrapper(self, *args, **kwargs):
          agen = agen_method(self, *args, **kwargs)
          try:
          while True:
          yield asyncio.run(agen.__anext__())
          except StopAsyncIteration:
          return
          if wrapper.__name__[:2] == 'a_':
          wrapper.__name__ = wrapper.__name__[2:]
          return wrapper


          then just use ticker = sync_agen_method(a_ticker) in the class definition.



          Straight-up coroutine methods (not generator coroutines) could be wrapped with:



          def sync_method(async_method):
          @wraps(async_method)
          def wrapper(self, *args, **kwargs):
          return async.run(async_method(self, *args, **kwargs))
          if wrapper.__name__[:2] == 'a_':
          wrapper.__name__ = wrapper.__name__[2:]
          return wrapper


          Factor out common components



          Refactor out the synchronous parts, into generators, context managers, utility functions, etc.



          For your specific example, pulling out the for loop into a separate generator would minimise the duplicated code to the way the two versions sleep:



          class UselessExample:
          def __init__(self, delay):
          self.delay = delay

          def _ticker_gen(self, to):
          yield from range(to)

          async def a_ticker(self, to):
          for i in self._ticker_gen(to):
          yield i
          await asyncio.sleep(self.delay)

          def ticker(self, to):
          for i in self._ticker_gen(to):
          yield i
          sleep(self.delay)


          While this doesn't make much of any difference here it can work in other contexts.



          Abstract Syntax Tree tranformation



          Use AST rewriting and a map to transform coroutines into synchronous code. This can be quite fragile if you are not careful on how you recognise utility functions such as asyncio.sleep() vs time.sleep():



          import inspect
          import ast
          import copy
          import textwrap
          import time

          asynciomap =
          # asyncio function to (additional globals, replacement source) tuples
          "sleep": ("time": time, "time.sleep")



          class AsyncToSync(ast.NodeTransformer):
          def __init__(self):
          self.globals =

          def visit_AsyncFunctionDef(self, node):
          return ast.copy_location(
          ast.FunctionDef(
          node.name,
          self.visit(node.args),
          [self.visit(stmt) for stmt in node.body],
          [self.visit(stmt) for stmt in node.decorator_list],
          node.returns and ast.visit(node.returns),
          ),
          node,
          )

          def visit_Await(self, node):
          return self.visit(node.value)

          def visit_Attribute(self, node):
          if (
          isinstance(node.value, ast.Name)
          and isinstance(node.value.ctx, ast.Load)
          and node.value.id == "asyncio"
          and node.attr in asynciomap
          ):
          g, replacement = asynciomap[node.attr]
          self.globals.update(g)
          return ast.copy_location(
          ast.parse(replacement, mode="eval").body,
          node
          )
          return node


          def transform_sync(f):
          filename = inspect.getfile(f)
          lines, lineno = inspect.getsourcelines(f)
          ast_tree = ast.parse(textwrap.dedent(''.join(lines)), filename)
          ast.increment_lineno(ast_tree, lineno - 1)

          transformer = AsyncToSync()
          transformer.visit(ast_tree)
          tranformed_globals = **f.__globals__, **transformer.globals
          exec(compile(ast_tree, filename, 'exec'), tranformed_globals)
          return tranformed_globals[f.__name__]


          While the above is probably far from complete enough to fit all needs, and transforming AST trees can be daunting, the above would let you maintain just the async version and map that version to synchronous versions directly:



          >>> import example
          >>> del example.UselessExample.ticker
          >>> example.main()
          Traceback (most recent call last):
          File "<stdin>", line 1, in <module>
          File "/.../example.py", line 32, in main
          func(ue)
          File "/.../example.py", line 21, in func
          for value in ue.ticker(5):
          AttributeError: 'UselessExample' object has no attribute 'ticker'
          >>> example.UselessExample.ticker = transform_sync(example.UselessExample.a_ticker)
          >>> example.main()
          0
          1
          2
          3
          4
          0
          1
          2
          3
          4





          share|improve this answer




















          • 1





            The first solution doesn't work in the case that the sync method is called from async code, because async.run will fail if another event loop is already running. This is very important if you want to support usage in a Jupyter notebook (because there is a background loop running in the kernel all the time).

            – gdlmx
            Mar 27 at 14:38












          • Thanks - no magical fix in there, but I wasn't really expecting it either; I think you addressed the problem in a meaningful way and it has some useful suggestions I will use. Hopefully the same is true for others struggling with this.

            – Grismar
            Mar 28 at 5:20






          • 1





            @gdlmx: true, I've updated the answer to use a helper function that falls back to using the existing loop.

            – Martijn Pieters
            Mar 28 at 12:23






          • 1





            @MartijnPieters There was nothing wrong with your original wrapper. run_until_complete will fail with the same error as run if the event loop is already running. Actually it is not possible to write a wrapper to await a coroutine, future or task inside a sync function. Although it is possible to submit the coroutine to the existing event loop, the coroutine will only be called after the sync function returns. There's only a single thread running anyway.

            – gdlmx
            Mar 28 at 21:20











          • @gdlmx: gah, yes, you are quite right. Using run_until_complete() was a dumb idea, as it'll also stop the loop on completion. It may require using a new thread in that case.

            – Martijn Pieters
            Mar 29 at 15:54



















          0














          async/await is infectious by design.



          Accept that your code will have different users — synchronous and asynchronous, and that these users will have different requirements, that over time the implementations will diverge.



          Publish separate libraries



          For example, compare aiohttp vs. aiohttp-requests vs. requests.



          Likewise, compare asyncpg vs. psycopg2.



          How to get there



          Opt1. (easy) clone implementation, allow them to diverge.



          Opt2. (sensible) partial refactor, let e.g. async library depend on and import sync library.



          Opt3. (radical) create a "pure" library that can be used both in sync and async program. For example, see https://github.com/python-hyper/hyper-h2 .



          On the upside, testing is easier and thorough. Consider how hard (or impossible) it is force the test framework to evaluate all possible concurrent execution orders in an async program. Pure library doesn't need that :)



          On the down-side this style of programming requires different thinking, is not always straightforward, and may be suboptimal. For example, instead of await socket.read(2**20) you'd write for event in fsm.push(data): ... and rely on your library user to provide you with data in good-sized chunks.



          For context, see the backpressure argument in https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/






          share|improve this answer

























          • I don't disagree with the principle, but it does nothing to change the fact that these libraries can end up having extremely similar code and would have to be maintained side by side. The question was what the best practices would be to limit the amount of replication between such libraries - whether all in one file, or split into separate libraries (which isn't bad advice).

            – Grismar
            Apr 1 at 2:03











          • @Grismar Updated, thanks!

            – Dima Tisnek
            Apr 1 at 23:26













          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55152952%2fduplication-of-code-for-synchronous-and-asynchronous-implementations%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          2 Answers
          2






          active

          oldest

          votes








          2 Answers
          2






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          11





          +100









          There is no one-size-fits-all road to making an asyncio coroutine-based codebase useable from traditional synchronous codebases. You have to make choices per codepath.



          Pick and choose from a series of tools:



          Synchronous versions using async.run()



          Provide synchronous wrappers around coroutines, which block until the coroutine completes.



          Even an async generator function such as ticker() can be handled this way, in a loop:



          class UselessExample:
          def __init__(self, delay):
          self.delay = delay

          async def a_ticker(self, to):
          for i in range(to):
          yield i
          await asyncio.sleep(self.delay)

          def ticker(self, to):
          agen = self.a_ticker(to)
          try:
          while True:
          yield asyncio.run(agen.__anext__())
          except StopAsyncIteration:
          return


          These synchronous wrappers can be generated with helper functions:



          from functools import wraps

          def sync_agen_method(agen_method):
          @wraps(agen_method)
          def wrapper(self, *args, **kwargs):
          agen = agen_method(self, *args, **kwargs)
          try:
          while True:
          yield asyncio.run(agen.__anext__())
          except StopAsyncIteration:
          return
          if wrapper.__name__[:2] == 'a_':
          wrapper.__name__ = wrapper.__name__[2:]
          return wrapper


          then just use ticker = sync_agen_method(a_ticker) in the class definition.



          Straight-up coroutine methods (not generator coroutines) could be wrapped with:



          def sync_method(async_method):
          @wraps(async_method)
          def wrapper(self, *args, **kwargs):
          return async.run(async_method(self, *args, **kwargs))
          if wrapper.__name__[:2] == 'a_':
          wrapper.__name__ = wrapper.__name__[2:]
          return wrapper


          Factor out common components



          Refactor out the synchronous parts, into generators, context managers, utility functions, etc.



          For your specific example, pulling out the for loop into a separate generator would minimise the duplicated code to the way the two versions sleep:



          class UselessExample:
          def __init__(self, delay):
          self.delay = delay

          def _ticker_gen(self, to):
          yield from range(to)

          async def a_ticker(self, to):
          for i in self._ticker_gen(to):
          yield i
          await asyncio.sleep(self.delay)

          def ticker(self, to):
          for i in self._ticker_gen(to):
          yield i
          sleep(self.delay)


          While this doesn't make much of any difference here it can work in other contexts.



          Abstract Syntax Tree tranformation



          Use AST rewriting and a map to transform coroutines into synchronous code. This can be quite fragile if you are not careful on how you recognise utility functions such as asyncio.sleep() vs time.sleep():



          import inspect
          import ast
          import copy
          import textwrap
          import time

          asynciomap =
          # asyncio function to (additional globals, replacement source) tuples
          "sleep": ("time": time, "time.sleep")



          class AsyncToSync(ast.NodeTransformer):
          def __init__(self):
          self.globals =

          def visit_AsyncFunctionDef(self, node):
          return ast.copy_location(
          ast.FunctionDef(
          node.name,
          self.visit(node.args),
          [self.visit(stmt) for stmt in node.body],
          [self.visit(stmt) for stmt in node.decorator_list],
          node.returns and ast.visit(node.returns),
          ),
          node,
          )

          def visit_Await(self, node):
          return self.visit(node.value)

          def visit_Attribute(self, node):
          if (
          isinstance(node.value, ast.Name)
          and isinstance(node.value.ctx, ast.Load)
          and node.value.id == "asyncio"
          and node.attr in asynciomap
          ):
          g, replacement = asynciomap[node.attr]
          self.globals.update(g)
          return ast.copy_location(
          ast.parse(replacement, mode="eval").body,
          node
          )
          return node


          def transform_sync(f):
          filename = inspect.getfile(f)
          lines, lineno = inspect.getsourcelines(f)
          ast_tree = ast.parse(textwrap.dedent(''.join(lines)), filename)
          ast.increment_lineno(ast_tree, lineno - 1)

          transformer = AsyncToSync()
          transformer.visit(ast_tree)
          tranformed_globals = **f.__globals__, **transformer.globals
          exec(compile(ast_tree, filename, 'exec'), tranformed_globals)
          return tranformed_globals[f.__name__]


          While the above is probably far from complete enough to fit all needs, and transforming AST trees can be daunting, the above would let you maintain just the async version and map that version to synchronous versions directly:



          >>> import example
          >>> del example.UselessExample.ticker
          >>> example.main()
          Traceback (most recent call last):
          File "<stdin>", line 1, in <module>
          File "/.../example.py", line 32, in main
          func(ue)
          File "/.../example.py", line 21, in func
          for value in ue.ticker(5):
          AttributeError: 'UselessExample' object has no attribute 'ticker'
          >>> example.UselessExample.ticker = transform_sync(example.UselessExample.a_ticker)
          >>> example.main()
          0
          1
          2
          3
          4
          0
          1
          2
          3
          4





          share|improve this answer




















          • 1





            The first solution doesn't work in the case that the sync method is called from async code, because async.run will fail if another event loop is already running. This is very important if you want to support usage in a Jupyter notebook (because there is a background loop running in the kernel all the time).

            – gdlmx
            Mar 27 at 14:38












          • Thanks - no magical fix in there, but I wasn't really expecting it either; I think you addressed the problem in a meaningful way and it has some useful suggestions I will use. Hopefully the same is true for others struggling with this.

            – Grismar
            Mar 28 at 5:20






          • 1





            @gdlmx: true, I've updated the answer to use a helper function that falls back to using the existing loop.

            – Martijn Pieters
            Mar 28 at 12:23






          • 1





            @MartijnPieters There was nothing wrong with your original wrapper. run_until_complete will fail with the same error as run if the event loop is already running. Actually it is not possible to write a wrapper to await a coroutine, future or task inside a sync function. Although it is possible to submit the coroutine to the existing event loop, the coroutine will only be called after the sync function returns. There's only a single thread running anyway.

            – gdlmx
            Mar 28 at 21:20











          • @gdlmx: gah, yes, you are quite right. Using run_until_complete() was a dumb idea, as it'll also stop the loop on completion. It may require using a new thread in that case.

            – Martijn Pieters
            Mar 29 at 15:54
















          11





          +100









          There is no one-size-fits-all road to making an asyncio coroutine-based codebase useable from traditional synchronous codebases. You have to make choices per codepath.



          Pick and choose from a series of tools:



          Synchronous versions using async.run()



          Provide synchronous wrappers around coroutines, which block until the coroutine completes.



          Even an async generator function such as ticker() can be handled this way, in a loop:



          class UselessExample:
          def __init__(self, delay):
          self.delay = delay

          async def a_ticker(self, to):
          for i in range(to):
          yield i
          await asyncio.sleep(self.delay)

          def ticker(self, to):
          agen = self.a_ticker(to)
          try:
          while True:
          yield asyncio.run(agen.__anext__())
          except StopAsyncIteration:
          return


          These synchronous wrappers can be generated with helper functions:



          from functools import wraps

          def sync_agen_method(agen_method):
          @wraps(agen_method)
          def wrapper(self, *args, **kwargs):
          agen = agen_method(self, *args, **kwargs)
          try:
          while True:
          yield asyncio.run(agen.__anext__())
          except StopAsyncIteration:
          return
          if wrapper.__name__[:2] == 'a_':
          wrapper.__name__ = wrapper.__name__[2:]
          return wrapper


          then just use ticker = sync_agen_method(a_ticker) in the class definition.



          Straight-up coroutine methods (not generator coroutines) could be wrapped with:



          def sync_method(async_method):
          @wraps(async_method)
          def wrapper(self, *args, **kwargs):
          return async.run(async_method(self, *args, **kwargs))
          if wrapper.__name__[:2] == 'a_':
          wrapper.__name__ = wrapper.__name__[2:]
          return wrapper


          Factor out common components



          Refactor out the synchronous parts, into generators, context managers, utility functions, etc.



          For your specific example, pulling out the for loop into a separate generator would minimise the duplicated code to the way the two versions sleep:



          class UselessExample:
          def __init__(self, delay):
          self.delay = delay

          def _ticker_gen(self, to):
          yield from range(to)

          async def a_ticker(self, to):
          for i in self._ticker_gen(to):
          yield i
          await asyncio.sleep(self.delay)

          def ticker(self, to):
          for i in self._ticker_gen(to):
          yield i
          sleep(self.delay)


          While this doesn't make much of any difference here it can work in other contexts.



          Abstract Syntax Tree tranformation



          Use AST rewriting and a map to transform coroutines into synchronous code. This can be quite fragile if you are not careful on how you recognise utility functions such as asyncio.sleep() vs time.sleep():



          import inspect
          import ast
          import copy
          import textwrap
          import time

          asynciomap =
          # asyncio function to (additional globals, replacement source) tuples
          "sleep": ("time": time, "time.sleep")



          class AsyncToSync(ast.NodeTransformer):
          def __init__(self):
          self.globals =

          def visit_AsyncFunctionDef(self, node):
          return ast.copy_location(
          ast.FunctionDef(
          node.name,
          self.visit(node.args),
          [self.visit(stmt) for stmt in node.body],
          [self.visit(stmt) for stmt in node.decorator_list],
          node.returns and ast.visit(node.returns),
          ),
          node,
          )

          def visit_Await(self, node):
          return self.visit(node.value)

          def visit_Attribute(self, node):
          if (
          isinstance(node.value, ast.Name)
          and isinstance(node.value.ctx, ast.Load)
          and node.value.id == "asyncio"
          and node.attr in asynciomap
          ):
          g, replacement = asynciomap[node.attr]
          self.globals.update(g)
          return ast.copy_location(
          ast.parse(replacement, mode="eval").body,
          node
          )
          return node


          def transform_sync(f):
          filename = inspect.getfile(f)
          lines, lineno = inspect.getsourcelines(f)
          ast_tree = ast.parse(textwrap.dedent(''.join(lines)), filename)
          ast.increment_lineno(ast_tree, lineno - 1)

          transformer = AsyncToSync()
          transformer.visit(ast_tree)
          tranformed_globals = **f.__globals__, **transformer.globals
          exec(compile(ast_tree, filename, 'exec'), tranformed_globals)
          return tranformed_globals[f.__name__]


          While the above is probably far from complete enough to fit all needs, and transforming AST trees can be daunting, the above would let you maintain just the async version and map that version to synchronous versions directly:



          >>> import example
          >>> del example.UselessExample.ticker
          >>> example.main()
          Traceback (most recent call last):
          File "<stdin>", line 1, in <module>
          File "/.../example.py", line 32, in main
          func(ue)
          File "/.../example.py", line 21, in func
          for value in ue.ticker(5):
          AttributeError: 'UselessExample' object has no attribute 'ticker'
          >>> example.UselessExample.ticker = transform_sync(example.UselessExample.a_ticker)
          >>> example.main()
          0
          1
          2
          3
          4
          0
          1
          2
          3
          4





          share|improve this answer




















          • 1





            The first solution doesn't work in the case that the sync method is called from async code, because async.run will fail if another event loop is already running. This is very important if you want to support usage in a Jupyter notebook (because there is a background loop running in the kernel all the time).

            – gdlmx
            Mar 27 at 14:38












          • Thanks - no magical fix in there, but I wasn't really expecting it either; I think you addressed the problem in a meaningful way and it has some useful suggestions I will use. Hopefully the same is true for others struggling with this.

            – Grismar
            Mar 28 at 5:20






          • 1





            @gdlmx: true, I've updated the answer to use a helper function that falls back to using the existing loop.

            – Martijn Pieters
            Mar 28 at 12:23






          • 1





            @MartijnPieters There was nothing wrong with your original wrapper. run_until_complete will fail with the same error as run if the event loop is already running. Actually it is not possible to write a wrapper to await a coroutine, future or task inside a sync function. Although it is possible to submit the coroutine to the existing event loop, the coroutine will only be called after the sync function returns. There's only a single thread running anyway.

            – gdlmx
            Mar 28 at 21:20











          • @gdlmx: gah, yes, you are quite right. Using run_until_complete() was a dumb idea, as it'll also stop the loop on completion. It may require using a new thread in that case.

            – Martijn Pieters
            Mar 29 at 15:54














          11





          +100







          11





          +100



          11




          +100





          There is no one-size-fits-all road to making an asyncio coroutine-based codebase useable from traditional synchronous codebases. You have to make choices per codepath.



          Pick and choose from a series of tools:



          Synchronous versions using async.run()



          Provide synchronous wrappers around coroutines, which block until the coroutine completes.



          Even an async generator function such as ticker() can be handled this way, in a loop:



          class UselessExample:
          def __init__(self, delay):
          self.delay = delay

          async def a_ticker(self, to):
          for i in range(to):
          yield i
          await asyncio.sleep(self.delay)

          def ticker(self, to):
          agen = self.a_ticker(to)
          try:
          while True:
          yield asyncio.run(agen.__anext__())
          except StopAsyncIteration:
          return


          These synchronous wrappers can be generated with helper functions:



          from functools import wraps

          def sync_agen_method(agen_method):
          @wraps(agen_method)
          def wrapper(self, *args, **kwargs):
          agen = agen_method(self, *args, **kwargs)
          try:
          while True:
          yield asyncio.run(agen.__anext__())
          except StopAsyncIteration:
          return
          if wrapper.__name__[:2] == 'a_':
          wrapper.__name__ = wrapper.__name__[2:]
          return wrapper


          then just use ticker = sync_agen_method(a_ticker) in the class definition.



          Straight-up coroutine methods (not generator coroutines) could be wrapped with:



          def sync_method(async_method):
          @wraps(async_method)
          def wrapper(self, *args, **kwargs):
          return async.run(async_method(self, *args, **kwargs))
          if wrapper.__name__[:2] == 'a_':
          wrapper.__name__ = wrapper.__name__[2:]
          return wrapper


          Factor out common components



          Refactor out the synchronous parts, into generators, context managers, utility functions, etc.



          For your specific example, pulling out the for loop into a separate generator would minimise the duplicated code to the way the two versions sleep:



          class UselessExample:
          def __init__(self, delay):
          self.delay = delay

          def _ticker_gen(self, to):
          yield from range(to)

          async def a_ticker(self, to):
          for i in self._ticker_gen(to):
          yield i
          await asyncio.sleep(self.delay)

          def ticker(self, to):
          for i in self._ticker_gen(to):
          yield i
          sleep(self.delay)


          While this doesn't make much of any difference here it can work in other contexts.



          Abstract Syntax Tree tranformation



          Use AST rewriting and a map to transform coroutines into synchronous code. This can be quite fragile if you are not careful on how you recognise utility functions such as asyncio.sleep() vs time.sleep():



          import inspect
          import ast
          import copy
          import textwrap
          import time

          asynciomap =
          # asyncio function to (additional globals, replacement source) tuples
          "sleep": ("time": time, "time.sleep")



          class AsyncToSync(ast.NodeTransformer):
          def __init__(self):
          self.globals =

          def visit_AsyncFunctionDef(self, node):
          return ast.copy_location(
          ast.FunctionDef(
          node.name,
          self.visit(node.args),
          [self.visit(stmt) for stmt in node.body],
          [self.visit(stmt) for stmt in node.decorator_list],
          node.returns and ast.visit(node.returns),
          ),
          node,
          )

          def visit_Await(self, node):
          return self.visit(node.value)

          def visit_Attribute(self, node):
          if (
          isinstance(node.value, ast.Name)
          and isinstance(node.value.ctx, ast.Load)
          and node.value.id == "asyncio"
          and node.attr in asynciomap
          ):
          g, replacement = asynciomap[node.attr]
          self.globals.update(g)
          return ast.copy_location(
          ast.parse(replacement, mode="eval").body,
          node
          )
          return node


          def transform_sync(f):
          filename = inspect.getfile(f)
          lines, lineno = inspect.getsourcelines(f)
          ast_tree = ast.parse(textwrap.dedent(''.join(lines)), filename)
          ast.increment_lineno(ast_tree, lineno - 1)

          transformer = AsyncToSync()
          transformer.visit(ast_tree)
          tranformed_globals = **f.__globals__, **transformer.globals
          exec(compile(ast_tree, filename, 'exec'), tranformed_globals)
          return tranformed_globals[f.__name__]


          While the above is probably far from complete enough to fit all needs, and transforming AST trees can be daunting, the above would let you maintain just the async version and map that version to synchronous versions directly:



          >>> import example
          >>> del example.UselessExample.ticker
          >>> example.main()
          Traceback (most recent call last):
          File "<stdin>", line 1, in <module>
          File "/.../example.py", line 32, in main
          func(ue)
          File "/.../example.py", line 21, in func
          for value in ue.ticker(5):
          AttributeError: 'UselessExample' object has no attribute 'ticker'
          >>> example.UselessExample.ticker = transform_sync(example.UselessExample.a_ticker)
          >>> example.main()
          0
          1
          2
          3
          4
          0
          1
          2
          3
          4





          share|improve this answer















          There is no one-size-fits-all road to making an asyncio coroutine-based codebase useable from traditional synchronous codebases. You have to make choices per codepath.



          Pick and choose from a series of tools:



          Synchronous versions using async.run()



          Provide synchronous wrappers around coroutines, which block until the coroutine completes.



          Even an async generator function such as ticker() can be handled this way, in a loop:



          class UselessExample:
          def __init__(self, delay):
          self.delay = delay

          async def a_ticker(self, to):
          for i in range(to):
          yield i
          await asyncio.sleep(self.delay)

          def ticker(self, to):
          agen = self.a_ticker(to)
          try:
          while True:
          yield asyncio.run(agen.__anext__())
          except StopAsyncIteration:
          return


          These synchronous wrappers can be generated with helper functions:



          from functools import wraps

          def sync_agen_method(agen_method):
          @wraps(agen_method)
          def wrapper(self, *args, **kwargs):
          agen = agen_method(self, *args, **kwargs)
          try:
          while True:
          yield asyncio.run(agen.__anext__())
          except StopAsyncIteration:
          return
          if wrapper.__name__[:2] == 'a_':
          wrapper.__name__ = wrapper.__name__[2:]
          return wrapper


          then just use ticker = sync_agen_method(a_ticker) in the class definition.



          Straight-up coroutine methods (not generator coroutines) could be wrapped with:



          def sync_method(async_method):
          @wraps(async_method)
          def wrapper(self, *args, **kwargs):
          return async.run(async_method(self, *args, **kwargs))
          if wrapper.__name__[:2] == 'a_':
          wrapper.__name__ = wrapper.__name__[2:]
          return wrapper


          Factor out common components



          Refactor out the synchronous parts, into generators, context managers, utility functions, etc.



          For your specific example, pulling out the for loop into a separate generator would minimise the duplicated code to the way the two versions sleep:



          class UselessExample:
          def __init__(self, delay):
          self.delay = delay

          def _ticker_gen(self, to):
          yield from range(to)

          async def a_ticker(self, to):
          for i in self._ticker_gen(to):
          yield i
          await asyncio.sleep(self.delay)

          def ticker(self, to):
          for i in self._ticker_gen(to):
          yield i
          sleep(self.delay)


          While this doesn't make much of any difference here it can work in other contexts.



          Abstract Syntax Tree tranformation



          Use AST rewriting and a map to transform coroutines into synchronous code. This can be quite fragile if you are not careful on how you recognise utility functions such as asyncio.sleep() vs time.sleep():



          import inspect
          import ast
          import copy
          import textwrap
          import time

          asynciomap =
          # asyncio function to (additional globals, replacement source) tuples
          "sleep": ("time": time, "time.sleep")



          class AsyncToSync(ast.NodeTransformer):
          def __init__(self):
          self.globals =

          def visit_AsyncFunctionDef(self, node):
          return ast.copy_location(
          ast.FunctionDef(
          node.name,
          self.visit(node.args),
          [self.visit(stmt) for stmt in node.body],
          [self.visit(stmt) for stmt in node.decorator_list],
          node.returns and ast.visit(node.returns),
          ),
          node,
          )

          def visit_Await(self, node):
          return self.visit(node.value)

          def visit_Attribute(self, node):
          if (
          isinstance(node.value, ast.Name)
          and isinstance(node.value.ctx, ast.Load)
          and node.value.id == "asyncio"
          and node.attr in asynciomap
          ):
          g, replacement = asynciomap[node.attr]
          self.globals.update(g)
          return ast.copy_location(
          ast.parse(replacement, mode="eval").body,
          node
          )
          return node


          def transform_sync(f):
          filename = inspect.getfile(f)
          lines, lineno = inspect.getsourcelines(f)
          ast_tree = ast.parse(textwrap.dedent(''.join(lines)), filename)
          ast.increment_lineno(ast_tree, lineno - 1)

          transformer = AsyncToSync()
          transformer.visit(ast_tree)
          tranformed_globals = **f.__globals__, **transformer.globals
          exec(compile(ast_tree, filename, 'exec'), tranformed_globals)
          return tranformed_globals[f.__name__]


          While the above is probably far from complete enough to fit all needs, and transforming AST trees can be daunting, the above would let you maintain just the async version and map that version to synchronous versions directly:



          >>> import example
          >>> del example.UselessExample.ticker
          >>> example.main()
          Traceback (most recent call last):
          File "<stdin>", line 1, in <module>
          File "/.../example.py", line 32, in main
          func(ue)
          File "/.../example.py", line 21, in func
          for value in ue.ticker(5):
          AttributeError: 'UselessExample' object has no attribute 'ticker'
          >>> example.UselessExample.ticker = transform_sync(example.UselessExample.a_ticker)
          >>> example.main()
          0
          1
          2
          3
          4
          0
          1
          2
          3
          4






          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Mar 29 at 15:53

























          answered Mar 26 at 20:11









          Martijn PietersMartijn Pieters

          746k158 gold badges2687 silver badges2418 bronze badges




          746k158 gold badges2687 silver badges2418 bronze badges







          • 1





            The first solution doesn't work in the case that the sync method is called from async code, because async.run will fail if another event loop is already running. This is very important if you want to support usage in a Jupyter notebook (because there is a background loop running in the kernel all the time).

            – gdlmx
            Mar 27 at 14:38












          • Thanks - no magical fix in there, but I wasn't really expecting it either; I think you addressed the problem in a meaningful way and it has some useful suggestions I will use. Hopefully the same is true for others struggling with this.

            – Grismar
            Mar 28 at 5:20






          • 1





            @gdlmx: true, I've updated the answer to use a helper function that falls back to using the existing loop.

            – Martijn Pieters
            Mar 28 at 12:23






          • 1





            @MartijnPieters There was nothing wrong with your original wrapper. run_until_complete will fail with the same error as run if the event loop is already running. Actually it is not possible to write a wrapper to await a coroutine, future or task inside a sync function. Although it is possible to submit the coroutine to the existing event loop, the coroutine will only be called after the sync function returns. There's only a single thread running anyway.

            – gdlmx
            Mar 28 at 21:20











          • @gdlmx: gah, yes, you are quite right. Using run_until_complete() was a dumb idea, as it'll also stop the loop on completion. It may require using a new thread in that case.

            – Martijn Pieters
            Mar 29 at 15:54













          • 1





            The first solution doesn't work in the case that the sync method is called from async code, because async.run will fail if another event loop is already running. This is very important if you want to support usage in a Jupyter notebook (because there is a background loop running in the kernel all the time).

            – gdlmx
            Mar 27 at 14:38












          • Thanks - no magical fix in there, but I wasn't really expecting it either; I think you addressed the problem in a meaningful way and it has some useful suggestions I will use. Hopefully the same is true for others struggling with this.

            – Grismar
            Mar 28 at 5:20






          • 1





            @gdlmx: true, I've updated the answer to use a helper function that falls back to using the existing loop.

            – Martijn Pieters
            Mar 28 at 12:23






          • 1





            @MartijnPieters There was nothing wrong with your original wrapper. run_until_complete will fail with the same error as run if the event loop is already running. Actually it is not possible to write a wrapper to await a coroutine, future or task inside a sync function. Although it is possible to submit the coroutine to the existing event loop, the coroutine will only be called after the sync function returns. There's only a single thread running anyway.

            – gdlmx
            Mar 28 at 21:20











          • @gdlmx: gah, yes, you are quite right. Using run_until_complete() was a dumb idea, as it'll also stop the loop on completion. It may require using a new thread in that case.

            – Martijn Pieters
            Mar 29 at 15:54








          1




          1





          The first solution doesn't work in the case that the sync method is called from async code, because async.run will fail if another event loop is already running. This is very important if you want to support usage in a Jupyter notebook (because there is a background loop running in the kernel all the time).

          – gdlmx
          Mar 27 at 14:38






          The first solution doesn't work in the case that the sync method is called from async code, because async.run will fail if another event loop is already running. This is very important if you want to support usage in a Jupyter notebook (because there is a background loop running in the kernel all the time).

          – gdlmx
          Mar 27 at 14:38














          Thanks - no magical fix in there, but I wasn't really expecting it either; I think you addressed the problem in a meaningful way and it has some useful suggestions I will use. Hopefully the same is true for others struggling with this.

          – Grismar
          Mar 28 at 5:20





          Thanks - no magical fix in there, but I wasn't really expecting it either; I think you addressed the problem in a meaningful way and it has some useful suggestions I will use. Hopefully the same is true for others struggling with this.

          – Grismar
          Mar 28 at 5:20




          1




          1





          @gdlmx: true, I've updated the answer to use a helper function that falls back to using the existing loop.

          – Martijn Pieters
          Mar 28 at 12:23





          @gdlmx: true, I've updated the answer to use a helper function that falls back to using the existing loop.

          – Martijn Pieters
          Mar 28 at 12:23




          1




          1





          @MartijnPieters There was nothing wrong with your original wrapper. run_until_complete will fail with the same error as run if the event loop is already running. Actually it is not possible to write a wrapper to await a coroutine, future or task inside a sync function. Although it is possible to submit the coroutine to the existing event loop, the coroutine will only be called after the sync function returns. There's only a single thread running anyway.

          – gdlmx
          Mar 28 at 21:20





          @MartijnPieters There was nothing wrong with your original wrapper. run_until_complete will fail with the same error as run if the event loop is already running. Actually it is not possible to write a wrapper to await a coroutine, future or task inside a sync function. Although it is possible to submit the coroutine to the existing event loop, the coroutine will only be called after the sync function returns. There's only a single thread running anyway.

          – gdlmx
          Mar 28 at 21:20













          @gdlmx: gah, yes, you are quite right. Using run_until_complete() was a dumb idea, as it'll also stop the loop on completion. It may require using a new thread in that case.

          – Martijn Pieters
          Mar 29 at 15:54






          @gdlmx: gah, yes, you are quite right. Using run_until_complete() was a dumb idea, as it'll also stop the loop on completion. It may require using a new thread in that case.

          – Martijn Pieters
          Mar 29 at 15:54














          0














          async/await is infectious by design.



          Accept that your code will have different users — synchronous and asynchronous, and that these users will have different requirements, that over time the implementations will diverge.



          Publish separate libraries



          For example, compare aiohttp vs. aiohttp-requests vs. requests.



          Likewise, compare asyncpg vs. psycopg2.



          How to get there



          Opt1. (easy) clone implementation, allow them to diverge.



          Opt2. (sensible) partial refactor, let e.g. async library depend on and import sync library.



          Opt3. (radical) create a "pure" library that can be used both in sync and async program. For example, see https://github.com/python-hyper/hyper-h2 .



          On the upside, testing is easier and thorough. Consider how hard (or impossible) it is force the test framework to evaluate all possible concurrent execution orders in an async program. Pure library doesn't need that :)



          On the down-side this style of programming requires different thinking, is not always straightforward, and may be suboptimal. For example, instead of await socket.read(2**20) you'd write for event in fsm.push(data): ... and rely on your library user to provide you with data in good-sized chunks.



          For context, see the backpressure argument in https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/






          share|improve this answer

























          • I don't disagree with the principle, but it does nothing to change the fact that these libraries can end up having extremely similar code and would have to be maintained side by side. The question was what the best practices would be to limit the amount of replication between such libraries - whether all in one file, or split into separate libraries (which isn't bad advice).

            – Grismar
            Apr 1 at 2:03











          • @Grismar Updated, thanks!

            – Dima Tisnek
            Apr 1 at 23:26















          0














          async/await is infectious by design.



          Accept that your code will have different users — synchronous and asynchronous, and that these users will have different requirements, that over time the implementations will diverge.



          Publish separate libraries



          For example, compare aiohttp vs. aiohttp-requests vs. requests.



          Likewise, compare asyncpg vs. psycopg2.



          How to get there



          Opt1. (easy) clone implementation, allow them to diverge.



          Opt2. (sensible) partial refactor, let e.g. async library depend on and import sync library.



          Opt3. (radical) create a "pure" library that can be used both in sync and async program. For example, see https://github.com/python-hyper/hyper-h2 .



          On the upside, testing is easier and thorough. Consider how hard (or impossible) it is force the test framework to evaluate all possible concurrent execution orders in an async program. Pure library doesn't need that :)



          On the down-side this style of programming requires different thinking, is not always straightforward, and may be suboptimal. For example, instead of await socket.read(2**20) you'd write for event in fsm.push(data): ... and rely on your library user to provide you with data in good-sized chunks.



          For context, see the backpressure argument in https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/






          share|improve this answer

























          • I don't disagree with the principle, but it does nothing to change the fact that these libraries can end up having extremely similar code and would have to be maintained side by side. The question was what the best practices would be to limit the amount of replication between such libraries - whether all in one file, or split into separate libraries (which isn't bad advice).

            – Grismar
            Apr 1 at 2:03











          • @Grismar Updated, thanks!

            – Dima Tisnek
            Apr 1 at 23:26













          0












          0








          0







          async/await is infectious by design.



          Accept that your code will have different users — synchronous and asynchronous, and that these users will have different requirements, that over time the implementations will diverge.



          Publish separate libraries



          For example, compare aiohttp vs. aiohttp-requests vs. requests.



          Likewise, compare asyncpg vs. psycopg2.



          How to get there



          Opt1. (easy) clone implementation, allow them to diverge.



          Opt2. (sensible) partial refactor, let e.g. async library depend on and import sync library.



          Opt3. (radical) create a "pure" library that can be used both in sync and async program. For example, see https://github.com/python-hyper/hyper-h2 .



          On the upside, testing is easier and thorough. Consider how hard (or impossible) it is force the test framework to evaluate all possible concurrent execution orders in an async program. Pure library doesn't need that :)



          On the down-side this style of programming requires different thinking, is not always straightforward, and may be suboptimal. For example, instead of await socket.read(2**20) you'd write for event in fsm.push(data): ... and rely on your library user to provide you with data in good-sized chunks.



          For context, see the backpressure argument in https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/






          share|improve this answer















          async/await is infectious by design.



          Accept that your code will have different users — synchronous and asynchronous, and that these users will have different requirements, that over time the implementations will diverge.



          Publish separate libraries



          For example, compare aiohttp vs. aiohttp-requests vs. requests.



          Likewise, compare asyncpg vs. psycopg2.



          How to get there



          Opt1. (easy) clone implementation, allow them to diverge.



          Opt2. (sensible) partial refactor, let e.g. async library depend on and import sync library.



          Opt3. (radical) create a "pure" library that can be used both in sync and async program. For example, see https://github.com/python-hyper/hyper-h2 .



          On the upside, testing is easier and thorough. Consider how hard (or impossible) it is force the test framework to evaluate all possible concurrent execution orders in an async program. Pure library doesn't need that :)



          On the down-side this style of programming requires different thinking, is not always straightforward, and may be suboptimal. For example, instead of await socket.read(2**20) you'd write for event in fsm.push(data): ... and rely on your library user to provide you with data in good-sized chunks.



          For context, see the backpressure argument in https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/







          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Apr 1 at 23:26

























          answered Apr 1 at 1:37









          Dima TisnekDima Tisnek

          7,2832 gold badges38 silver badges87 bronze badges




          7,2832 gold badges38 silver badges87 bronze badges












          • I don't disagree with the principle, but it does nothing to change the fact that these libraries can end up having extremely similar code and would have to be maintained side by side. The question was what the best practices would be to limit the amount of replication between such libraries - whether all in one file, or split into separate libraries (which isn't bad advice).

            – Grismar
            Apr 1 at 2:03











          • @Grismar Updated, thanks!

            – Dima Tisnek
            Apr 1 at 23:26

















          • I don't disagree with the principle, but it does nothing to change the fact that these libraries can end up having extremely similar code and would have to be maintained side by side. The question was what the best practices would be to limit the amount of replication between such libraries - whether all in one file, or split into separate libraries (which isn't bad advice).

            – Grismar
            Apr 1 at 2:03











          • @Grismar Updated, thanks!

            – Dima Tisnek
            Apr 1 at 23:26
















          I don't disagree with the principle, but it does nothing to change the fact that these libraries can end up having extremely similar code and would have to be maintained side by side. The question was what the best practices would be to limit the amount of replication between such libraries - whether all in one file, or split into separate libraries (which isn't bad advice).

          – Grismar
          Apr 1 at 2:03





          I don't disagree with the principle, but it does nothing to change the fact that these libraries can end up having extremely similar code and would have to be maintained side by side. The question was what the best practices would be to limit the amount of replication between such libraries - whether all in one file, or split into separate libraries (which isn't bad advice).

          – Grismar
          Apr 1 at 2:03













          @Grismar Updated, thanks!

          – Dima Tisnek
          Apr 1 at 23:26





          @Grismar Updated, thanks!

          – Dima Tisnek
          Apr 1 at 23:26

















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55152952%2fduplication-of-code-for-synchronous-and-asynchronous-implementations%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Kamusi Yaliyomo Aina za kamusi | Muundo wa kamusi | Faida za kamusi | Dhima ya picha katika kamusi | Marejeo | Tazama pia | Viungo vya nje | UrambazajiKuhusu kamusiGo-SwahiliWiki-KamusiKamusi ya Kiswahili na Kiingerezakuihariri na kuongeza habari

          Swift 4 - func physicsWorld not invoked on collision? The Next CEO of Stack OverflowHow to call Objective-C code from Swift#ifdef replacement in the Swift language@selector() in Swift?#pragma mark in Swift?Swift for loop: for index, element in array?dispatch_after - GCD in Swift?Swift Beta performance: sorting arraysSplit a String into an array in Swift?The use of Swift 3 @objc inference in Swift 4 mode is deprecated?How to optimize UITableViewCell, because my UITableView lags

          Access current req object everywhere in Node.js ExpressWhy are global variables considered bad practice? (node.js)Using req & res across functionsHow do I get the path to the current script with Node.js?What is Node.js' Connect, Express and “middleware”?Node.js w/ express error handling in callbackHow to access the GET parameters after “?” in Express?Modify Node.js req object parametersAccess “app” variable inside of ExpressJS/ConnectJS middleware?Node.js Express app - request objectAngular Http Module considered middleware?Session variables in ExpressJSAdd properties to the req object in expressjs with Typescript