Skip to content
Snippets Groups Projects
Verified Commit 48f5ce16 authored by Daniel Göbel's avatar Daniel Göbel
Browse files

Create first database model and integrate alembic into the project.

#1
parent 98f846a5
Branches
Tags
No related merge requests found
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = alembic
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
Generic single-database configuration with an async dbapi.
\ No newline at end of file
import asyncio
from logging.config import fileConfig
from sqlalchemy import engine_from_config, create_engine
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import AsyncEngine
from alembic import context
from app.db.base import Base
import os
from pydantic import AnyUrl
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def get_url():
return str(AnyUrl.build(
scheme="mysql+aiomysql",
password=os.getenv("DB_PASSWORD", ""),
user=os.getenv("DB_USER", ""),
port=os.getenv("DB_PORT", "3306"),
host=os.getenv("DB_HOST", "localhost"),
path=f"/{os.getenv('DB_DATABASE', '')}",
))
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = get_url()
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
url = get_url()
connectable = AsyncEngine(
engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
future=True,
url=url
)
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
if context.is_offline_mode():
run_migrations_offline()
else:
asyncio.run(run_migrations_online())
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}
"""Create user and bucket table
Revision ID: 5521b5759004
Revises:
Create Date: 2022-05-03 14:01:22.154984
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
from app.db.hex_column import HexColumn
# revision identifiers, used by Alembic.
revision = '5521b5759004'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('user',
sa.Column('aaiid', HexColumn(length=64), nullable=False),
sa.Column('name', sa.String(length=256), nullable=False),
sa.PrimaryKeyConstraint('aaiid')
)
op.create_index(op.f('ix_user_aaiid'), 'user', ['aaiid'], unique=False)
op.create_table('bucket',
sa.Column('name', sa.String(length=256), nullable=False),
sa.Column('description', mysql.TEXT(), nullable=False),
sa.Column('public', sa.Boolean(), server_default='0', nullable=True),
sa.Column('owner_id', HexColumn(length=64), nullable=True),
sa.ForeignKeyConstraint(['owner_id'], ['user.aaiid'], ),
sa.PrimaryKeyConstraint('name')
)
op.create_index(op.f('ix_bucket_name'), 'bucket', ['name'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_bucket_name'), table_name='bucket')
op.drop_table('bucket')
op.drop_index(op.f('ix_user_aaiid'), table_name='user')
op.drop_table('user')
# ### end Alembic commands ###
# Import all the models, so that Base has them before being
# imported by Alembic
from app.db.base_class import Base # noqa
from app.models.bucket import Bucket # noqa
from app.models.user import User # noqa
from sqlalchemy.orm import declarative_base
Base = declarative_base()
from sqlalchemy import func
from sqlalchemy.types import VARCHAR
class HexColumn(VARCHAR):
def bind_expression(self, bindvalue):
# convert the bind's type from String to HEX encoded
return func.HEX(bindvalue)
def column_expression(self, col):
# convert select value from HEX encoded to String
return func.UNHEX(col)
from typing import TYPE_CHECKING
from app.db.base_class import Base
from sqlalchemy import Column, String, Boolean, ForeignKey
from app.db.hex_column import HexColumn
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.mysql import TEXT
if TYPE_CHECKING:
from .user import User
class Bucket(Base):
__tablename__: str = "bucket"
name = Column(String(256), primary_key=True, index=True)
description = Column(TEXT, nullable=False)
public = Column(Boolean(), default=False, server_default="0")
owner_id = Column(HexColumn(64), ForeignKey("user.aaiid"), nullable=True)
owner = relationship("User", back_populates="buckets")
from typing import TYPE_CHECKING
from app.db.base_class import Base
from sqlalchemy import Column, String
from app.db.hex_column import HexColumn
from sqlalchemy.orm import relationship
if TYPE_CHECKING:
from .bucket import Bucket
class User(Base):
__tablename__: str = "user"
aaiid = Column(HexColumn(64), primary_key=True, index=True)
name = Column(String(256), nullable=False)
buckets = relationship("Bucket", back_populates="owner")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment